Source code for pymoo.termination.ftol

import numpy as np

from pymoo.indicators.hv import Hypervolume
from pymoo.indicators.igd import IGD
from pymoo.util.normalization import normalize
from pymoo.termination.delta import DeltaToleranceTermination


def calc_delta(a, b):
    return np.max(np.abs((a - b)))


def calc_delta_norm(a, b, norm):
    return np.max(np.abs((a - b) / norm))


[docs] class SingleObjectiveSpaceTermination(DeltaToleranceTermination): def __init__(self, tol=1e-6, only_feas=True, **kwargs) -> None: super().__init__(tol, **kwargs) self.only_feas = only_feas def _delta(self, prev, current): if prev == np.inf or current == np.inf: return np.inf else: return max(0, prev - current) def _data(self, algorithm): opt = algorithm.opt f = opt.get("f") if self.only_feas: f = f[opt.get("feas")] if len(f) > 0: return f.min() else: return np.inf
class MultiObjectiveSpaceTermination(DeltaToleranceTermination): def __init__(self, tol=0.0025, only_feas=True, **kwargs): super().__init__(tol, **kwargs) self.delta_ideal = None self.delta_nadir = None self.delta_f = None self.only_feas = only_feas def _data(self, algorithm): feas, F = algorithm.opt.get("feas", "F") if self.only_feas: F = F[feas] if len(F) > 0: return dict(ideal=F.min(axis=0), nadir=F.max(axis=0), F=F, feas=True) else: return dict(ideal=None, nadir=None, F=F, feas=False) def _delta(self, prev, current): if not (prev["feas"] and current["feas"]): return np.inf # this is the range between the nadir and the ideal point norm = current["nadir"] - current["ideal"] # if the range is degenerated (very close to zero) - disable normalization by dividing by one norm[norm < 1e-32] = 1.0 # calculate the change from last to current in ideal and nadir point delta_ideal = calc_delta_norm(current["ideal"], prev["ideal"], norm) delta_nadir = calc_delta_norm(current["nadir"], prev["nadir"], norm) # get necessary data from the current population c_F, c_ideal, c_nadir = current["F"], current["ideal"], current["nadir"] p_F = prev["F"] # normalize last and current with respect to most recent ideal and nadir c_N = normalize(c_F, c_ideal, c_nadir) p_N = normalize(p_F, c_ideal, c_nadir) # calculate IGD from one to another delta_f = IGD(c_N).do(p_N) # store the delta values to the object self.delta_ideal, self.delta_nadir, self.delta_f = delta_ideal, delta_nadir, delta_f return max(delta_ideal, delta_nadir, delta_f) class MultiObjectiveSpaceTerminationWithRenormalization(MultiObjectiveSpaceTermination): def __init__(self, n=30, all_to_current=False, sliding_window=True, indicator="igd", **kwargs) -> None: super().__init__(**kwargs) self.n = n self.all_to_current = all_to_current self.sliding_window = sliding_window self.indicator = indicator self.data = [] def _metric(self, data): ret = super()._metric(data) if not self.sliding_window: data = self.data[-self.metric_window_size:] # get necessary data from the current population current = data[-1] c_F, c_ideal, c_nadir = current["F"], current["ideal"], current["nadir"] # normalize all previous generations with respect to current ideal and nadir N = [normalize(e["F"], c_ideal, c_nadir) for e in data] # check if the movement of all points is significant if self.all_to_current: c_N = normalize(c_F, c_ideal, c_nadir) if self.indicator == "igd": delta_f = [IGD(c_N).do(N[k]) for k in range(len(N))] elif self.indicator == "hv": hv = Hypervolume(ref_point=np.ones(c_F.shape[1])) delta_f = [hv.do(N[k]) for k in range(len(N))] else: delta_f = [IGD(N[k + 1]).do(N[k]) for k in range(len(N) - 1)] ret["delta_f"] = delta_f return ret def _decide(self, metrics): delta_ideal = [e["delta_ideal"] for e in metrics] delta_nadir = [e["delta_nadir"] for e in metrics] delta_f = [max(e["delta_f"]) for e in metrics] return max(max(delta_ideal), max(delta_nadir), max(delta_f)) > self.tol