from __future__ import annotations
from abc import abstractmethod
import numpy as np
import pymoo.gradient.toolbox as anp
from pymoo.util.cache import Cache
from pymoo.util.misc import at_least_2d_array
class LoopedElementwiseEvaluation:
"""Default sequential evaluation for elementwise problems."""
def __call__(self, f, X):
return [f(x) for x in X]
class ElementwiseEvaluationFunction:
def __init__(self, problem, args, kwargs) -> None:
super().__init__()
self.problem = problem
self.args = args
self.kwargs = kwargs
def __call__(self, x):
out = dict()
self.problem._evaluate(x, out, *self.args, **self.kwargs)
return out
[docs]
class Problem:
def __init__(self,
n_var=-1,
n_obj=1,
n_ieq_constr=0,
n_eq_constr=0,
xl=None,
xu=None,
vtype=None,
vars=None,
elementwise=False,
elementwise_func=ElementwiseEvaluationFunction,
elementwise_runner=LoopedElementwiseEvaluation(),
requires_kwargs=False,
replace_nan_values_by=None,
exclude_from_serialization=None,
callback=None,
strict=True,
**kwargs):
"""
Parameters
----------
n_var : int
Number of Variables
n_obj : int
Number of Objectives
n_ieq_constr : int
Number of Inequality Constraints
n_eq_constr : int
Number of Equality Constraints
xl : np.array, float, int
Lower bounds for the variables. if integer all lower bounds are equal.
xu : np.array, float, int
Upper bounds for the variable. if integer all upper bounds are equal.
vtype : type
The variable type. So far, just used as a type hint.
"""
# number of variable
self.n_var = n_var
# number of objectives
self.n_obj = n_obj
# number of inequality constraints
self.n_ieq_constr = n_ieq_constr if "n_constr" not in kwargs else max(n_ieq_constr, kwargs["n_constr"])
# number of equality constraints
self.n_eq_constr = n_eq_constr
# type of the variable to be evaluated
self.data = dict(**kwargs)
# the lower bounds, make sure it is a numpy array with the length of n_var
self.xl, self.xu = xl, xu
# a callback function to be called after every evaluation
self.callback = callback
# if the variables are provided in their explicit form
if vars is not None:
self.vars = vars
self.n_var = len(vars)
if self.xl is None:
self.xl = {name: var.lb if hasattr(var, "lb") else None for name, var in vars.items()}
if self.xu is None:
self.xu = {name: var.ub if hasattr(var, "ub") else None for name, var in vars.items()}
# the variable type (only as a type hint at this point)
self.vtype = vtype
# the functions used if elementwise is enabled
self.elementwise = elementwise
self.elementwise_func = elementwise_func
self.elementwise_runner = elementwise_runner
# whether evaluation requires kwargs (passing them can cause overhead in parallelization)
self.requires_kwargs = requires_kwargs
# whether the shapes are checked strictly
self.strict = strict
# if it is a problem with an actual number of variables - make sure xl and xu are numpy arrays
if n_var > 0:
if self.xl is not None:
if not isinstance(self.xl, np.ndarray):
self.xl = np.ones(n_var) * xl
self.xl = self.xl.astype(float)
if self.xu is not None:
if not isinstance(self.xu, np.ndarray):
self.xu = np.ones(n_var) * xu
self.xu = self.xu.astype(float)
# this defines if NaN values should be replaced or not
self.replace_nan_values_by = replace_nan_values_by
# attribute which are excluded from being serialized
self.exclude_from_serialization = exclude_from_serialization
def evaluate(self,
X,
*args,
return_values_of=None,
return_as_dictionary=False,
**kwargs):
# if the problem does not require any kwargs they are re-initialized
if not self.requires_kwargs:
kwargs = dict()
if return_values_of is None:
return_values_of = ["F"]
if self.n_ieq_constr > 0:
return_values_of.append("G")
if self.n_eq_constr > 0:
return_values_of.append("H")
# make sure the array is at least 2d. store if reshaping was necessary
if isinstance(X, np.ndarray) and X.dtype != object:
X, only_single_value = at_least_2d_array(X, extend_as="row", return_if_reshaped=True)
assert X.shape[1] == self.n_var, f'Input dimension {X.shape[1]} are not equal to n_var {self.n_var}!'
else:
only_single_value = not (isinstance(X, list) or isinstance(X, np.ndarray))
# this is where the actual evaluation takes place
_out = self.do(X, return_values_of, *args, **kwargs)
out = {}
for k, v in _out.items():
# copy it to a numpy array (it might be one of jax at this point)
v = np.array(v)
# in case the input had only one dimension, then remove always the first dimension from each output
if only_single_value:
v = v[0]
# if the NaN values should be replaced
if self.replace_nan_values_by is not None:
v[np.isnan(v)] = self.replace_nan_values_by
try:
out[k] = v.astype(np.float64)
except:
out[k] = v
if self.callback is not None:
self.callback(X, out)
# now depending on what should be returned prepare the output
if return_as_dictionary:
return out
if len(return_values_of) == 1:
return out[return_values_of[0]]
else:
return tuple([out[e] for e in return_values_of])
def do(self, X, return_values_of, *args, **kwargs):
# create an empty dictionary
out = {name: None for name in return_values_of}
# do the function evaluation
if self.elementwise:
self._evaluate_elementwise(X, out, *args, **kwargs)
else:
self._evaluate_vectorized(X, out, *args, **kwargs)
# finally format the output dictionary
out = self._format_dict(out, len(X), return_values_of)
return out
def _evaluate_vectorized(self, X, out, *args, **kwargs):
self._evaluate(X, out, *args, **kwargs)
def _evaluate_elementwise(self, X, out, *args, **kwargs):
# create the function that evaluates a single individual
f = self.elementwise_func(self, args, kwargs)
# execute the runner
elems = self.elementwise_runner(f, X)
# for each evaluation call
for elem in elems:
# for each key stored for this evaluation
for k, v in elem.items():
# if the element does not exist in out yet -> create it
if out.get(k, None) is None:
out[k] = []
out[k].append(v)
# convert to arrays (the none check is important because otherwise an empty array is initialized)
for k in out:
if out[k] is not None:
out[k] = anp.array(out[k])
def _format_dict(self, out, N, return_values_of):
# get the default output shape for the default values
shape = default_shape(self, N)
# finally the array to be returned
ret = {}
# for all values that have been set in the user implemented function
for name, v in out.items():
# only if they have truly been set
if v is not None:
# if there is a shape to be expected
if name in shape:
if isinstance(v, list):
v = anp.column_stack(v)
try:
v = v.reshape(shape[name])
except Exception as e:
raise Exception(
f"Problem Error: {name} can not be set, expected shape {shape[name]} but provided {v.shape}",
e)
ret[name] = v
# if some values that are necessary have not been set
for name in return_values_of:
if name not in ret:
s = shape.get(name, N)
ret[name] = np.full(s, np.inf)
return ret
@Cache
def nadir_point(self, *args, **kwargs):
pf = self.pareto_front(*args, **kwargs)
if pf is not None:
return np.max(pf, axis=0)
@Cache
def ideal_point(self, *args, **kwargs):
pf = self.pareto_front(*args, **kwargs)
if pf is not None:
return np.min(pf, axis=0)
@Cache
def pareto_front(self, *args, **kwargs):
pf = self._calc_pareto_front(*args, **kwargs)
pf = at_least_2d_array(pf, extend_as='r')
if pf is not None and pf.shape[1] == 2:
pf = pf[np.argsort(pf[:, 0])]
return pf
@Cache
def pareto_set(self, *args, **kwargs):
ps = self._calc_pareto_set(*args, **kwargs)
ps = at_least_2d_array(ps, extend_as='r')
return ps
@property
def n_constr(self):
return self.n_ieq_constr + self.n_eq_constr
@abstractmethod
def _evaluate(self, x, out, *args, **kwargs):
pass
def has_bounds(self):
return self.xl is not None and self.xu is not None
def has_constraints(self):
return self.n_constr > 0
def bounds(self):
return self.xl, self.xu
def name(self):
return self.__class__.__name__
def _calc_pareto_front(self, *args, **kwargs):
pass
def _calc_pareto_set(self, *args, **kwargs):
pass
def __str__(self):
s = "# name: %s\n" % self.name()
s += "# n_var: %s\n" % self.n_var
s += "# n_obj: %s\n" % self.n_obj
s += "# n_ieq_constr: %s\n" % self.n_ieq_constr
s += "# n_eq_constr: %s\n" % self.n_eq_constr
return s
def __getstate__(self):
if self.exclude_from_serialization is not None:
state = self.__dict__.copy()
# exclude objects which should not be stored
for key in self.exclude_from_serialization:
state[key] = None
return state
else:
return self.__dict__
[docs]
class ElementwiseProblem(Problem):
def __init__(self, elementwise=True, **kwargs):
super().__init__(elementwise=elementwise, **kwargs)
def default_shape(problem, n):
n_var = problem.n_var
DEFAULTS = dict(
F=(n, problem.n_obj),
G=(n, problem.n_ieq_constr),
H=(n, problem.n_eq_constr),
dF=(n, problem.n_obj, n_var),
dG=(n, problem.n_ieq_constr, n_var),
dH=(n, problem.n_eq_constr, n_var),
)
return DEFAULTS