Source code for deephyper.skopt.utils

from collections import OrderedDict
from copy import deepcopy
from functools import wraps

import numpy as np
from joblib import dump as dump_
from joblib import load as load_
from scipy.optimize import OptimizeResult
from scipy.optimize import minimize as sp_minimize
from sklearn.base import is_regressor
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import FunctionTransformer, MinMaxScaler, QuantileTransformer
from .learning import (
    ExtraTreesRegressor,
    GaussianProcessRegressor,
    GradientBoostingQuantileRegressor,
    RandomForestRegressor,
)
from .learning.gaussian_process.kernels import ConstantKernel, HammingKernel, Matern
from .sampler import Grid, Halton, Hammersly, InitialPointGenerator, Lhs, Sobol
from .space import Dimension, Space

# Try to import Mondrian Forest
MF_INSTALLED = False
try:
    from .learning import MondrianForestRegressor

    MF_INSTALLED = True
except ImportError:
    MF_INSTALLED = False

__all__ = (
    "load",
    "dump",
)


[docs] def create_result(Xi, yi, space=None, rng=None, specs=None, models=None): """ Initialize an `OptimizeResult` object. Parameters ---------- Xi : list of lists, shape (n_iters, n_features) Location of the minimum at every iteration. yi : array-like, shape (n_iters,) Minimum value obtained at every iteration. space : Space instance, optional Search space. rng : RandomState instance, optional State of the random state. specs : dict, optional Call specifications. models : list, optional List of fit surrogate models. Returns ------- res : `OptimizeResult`, scipy object OptimizeResult instance with the required information. """ res = OptimizeResult() # Filter out failures Xi = np.asarray([x for x, y in zip(Xi, yi) if y != "F"]) yi = np.asarray([y for y in yi if y != "F"]) if np.ndim(yi) == 2: res.log_time = np.ravel(yi[:, 1]) yi = np.ravel(yi[:, 0]) if len(yi) == 0: res.x = None res.fun = "F" else: best = np.argmin(yi) res.x = Xi[best] res.fun = yi[best] res.func_vals = yi res.x_iters = Xi res.models = models res.space = space res.random_state = rng res.specs = specs return res
[docs] def eval_callbacks(callbacks, result): """Evaluate list of callbacks on result. The return values of the `callbacks` are ORed together to give the overall decision on whether or not the optimization procedure should continue. Parameters ---------- callbacks : list of callables Callbacks to evaluate. result : `OptimizeResult`, scipy object Optimization result object to be stored. Returns ------- decision : bool Decision of the callbacks whether or not to keep optimizing """ stop = False if callbacks: for c in callbacks: decision = c(result) if decision is not None: stop = stop or decision return stop
[docs] def dump(res, filename, store_objective=True, **kwargs): """ Store an skopt optimization result into a file. Parameters ---------- res : `OptimizeResult`, scipy object Optimization result object to be stored. filename : string or `pathlib.Path` The path of the file in which it is to be stored. The compression method corresponding to one of the supported filename extensions ('.z', '.gz', '.bz2', '.xz' or '.lzma') will be used automatically. store_objective : boolean, default=True Whether the objective function should be stored. Set `store_objective` to `False` if your objective function (`.specs['args']['func']`) is unserializable (i.e. if an exception is raised when trying to serialize the optimization result). Notice that if `store_objective` is set to `False`, a deep copy of the optimization result is created, potentially leading to performance problems if `res` is very large. If the objective function is not critical, one can delete it before calling `deephyper.skopt.dump()` and thus avoid deep copying of `res`. **kwargs : other keyword arguments All other keyword arguments will be passed to `joblib.dump`. """ if store_objective: dump_(res, filename, **kwargs) elif "func" in res.specs["args"]: # If the user does not want to store the objective and it is indeed # present in the provided object, then create a deep copy of it and # remove the objective function before dumping it with joblib.dump. res_without_func = deepcopy(res) del res_without_func.specs["args"]["func"] dump_(res_without_func, filename, **kwargs) else: # If the user does not want to store the objective and it is already # missing in the provided object, dump it without copying. dump_(res, filename, **kwargs)
[docs] def load(filename, **kwargs): """ Reconstruct a skopt optimization result from a file persisted with deephyper.skopt.dump. .. note:: Notice that the loaded optimization result can be missing the objective function (`.specs['args']['func']`) if `deephyper.skopt.dump` was called with `store_objective=False`. Parameters ---------- filename : string or `pathlib.Path` The path of the file from which to load the optimization result. **kwargs : other keyword arguments All other keyword arguments will be passed to `joblib.load`. Returns ------- res : `OptimizeResult`, scipy object Reconstructed OptimizeResult instance. """ return load_(filename, **kwargs)
[docs] def is_listlike(x): return isinstance(x, (list, tuple))
[docs] def is_2Dlistlike(x): return np.all([is_listlike(xi) for xi in x])
[docs] def check_x_in_space(x, space): if is_2Dlistlike(x): if not np.all([p in space for p in x]): raise ValueError("Not all points are within the bounds of" " the space.") if any([len(p) != len(space.dimensions) for p in x]): raise ValueError("Not all points have the same dimensions as" " the space.") elif is_listlike(x): if x not in space: raise ValueError( "Point (%s) is not within the bounds of" " the space (%s)." % (x, space.bounds) ) if len(x) != len(space.dimensions): raise ValueError( "Dimensions of point (%s) and space (%s) do not match" % (x, space.bounds) )
[docs] def expected_minimum(res, n_random_starts=20, random_state=None): """Compute the minimum over the predictions of the last surrogate model. Uses `expected_minimum_random_sampling` with `n_random_starts` = 100000, when the space contains any categorical values. .. note:: The returned minimum may not necessarily be an accurate prediction of the minimum of the true objective function. Parameters ---------- res : `OptimizeResult`, scipy object The optimization result returned by a `skopt` minimizer. n_random_starts : int, default=20 The number of random starts for the minimization of the surrogate model. random_state : int, RandomState instance, or None (default) Set random state to something other than None for reproducible results. Returns ------- x : list location of the minimum. fun : float the surrogate function value at the minimum. """ if res.space.is_partly_categorical: return expected_minimum_random_sampling( res, n_random_starts=100000, random_state=random_state ) def func(x): reg = res.models[-1] x = res.space.transform(x.reshape(1, -1)) return reg.predict(x.reshape(1, -1))[0] xs = [res.x] if n_random_starts > 0: xs.extend(res.space.rvs(n_random_starts, random_state=random_state)) best_x = None best_fun = np.inf for x0 in xs: r = sp_minimize(func, x0=x0, bounds=res.space.bounds) if r.fun < best_fun: best_x = r.x best_fun = r.fun return [v for v in best_x], best_fun
[docs] def expected_minimum_random_sampling(res, n_random_starts=100000, random_state=None): """Minimum search by doing naive random sampling, Returns the parameters that gave the minimum function value. Can be used when the space contains any categorical values. .. note:: The returned minimum may not necessarily be an accurate prediction of the minimum of the true objective function. Parameters ---------- res : `OptimizeResult`, scipy object The optimization result returned by a `skopt` minimizer. n_random_starts : int, default=100000 The number of random starts for the minimization of the surrogate model. random_state : int, RandomState instance, or None (default) Set random state to something other than None for reproducible results. Returns ------- x : list location of the minimum. fun : float the surrogate function value at the minimum. """ # sample points from search space random_samples = res.space.rvs(n_random_starts, random_state=random_state) # make estimations with surrogate model = res.models[-1] y_random = model.predict(res.space.transform(random_samples)) index_best_objective = np.argmin(y_random) min_x = random_samples[index_best_objective] return min_x, y_random[index_best_objective]
[docs] def has_gradients(estimator): """ Check if an estimator's ``predict`` method provides gradients. Parameters ---------- estimator : sklearn BaseEstimator instance. """ tree_estimators = [ ExtraTreesRegressor, RandomForestRegressor, GradientBoostingQuantileRegressor, ] if MF_INSTALLED: tree_estimators.append(MondrianForestRegressor) tree_estimators = tuple(tree_estimators) # cook_estimator() returns None for "dummy minimize" aka random values only if estimator is None: return False if isinstance(estimator, tree_estimators): return False categorical_gp = False if hasattr(estimator, "kernel"): params = estimator.get_params() categorical_gp = isinstance(estimator.kernel, HammingKernel) or any( [isinstance(params[p], HammingKernel) for p in params] ) return not categorical_gp
[docs] def cook_estimator(base_estimator, space=None, **kwargs): """Cook a default estimator. For the special base_estimator called "DUMMY" the return value is None. This corresponds to sampling points at random, hence there is no need for an estimator. Parameters ---------- base_estimator : "GP", "RF", "ET", "GBRT", "DUMMY" or sklearn regressor Should inherit from `sklearn.base.RegressorMixin`. In addition the `predict` method should have an optional `return_std` argument, which returns `std(Y | x)`` along with `E[Y | x]`. If base_estimator is one of ["GP", "RF", "ET", "GBRT", "DUMMY"], a surrogate model corresponding to the relevant `X_minimize` function is created. space : Space instance Has to be provided if the base_estimator is a gaussian process. Ignored otherwise. kwargs : dict Extra parameters provided to the base_estimator at init time. """ if isinstance(base_estimator, str): base_estimator = base_estimator.upper() if base_estimator not in ["GP", "ET", "RF", "GBRT", "DUMMY"]: raise ValueError( "Valid strings for the base_estimator parameter " " are: 'RF', 'ET', 'GP', 'GBRT' or 'DUMMY' not " "%s." % base_estimator ) elif not is_regressor(base_estimator): raise ValueError("base_estimator has to be a regressor.") if base_estimator == "GP": if space is not None: space = Space(space) space = Space(normalize_dimensions(space.dimensions)) n_dims = space.transformed_n_dims is_cat = space.is_categorical else: raise ValueError("Expected a Space instance, not None.") cov_amplitude = ConstantKernel(1.0, (0.01, 1000.0)) # only special if *all* dimensions are categorical if is_cat: other_kernel = HammingKernel(length_scale=np.ones(n_dims)) else: other_kernel = Matern( length_scale=np.ones(n_dims), length_scale_bounds=[(0.01, 100)] * n_dims, nu=2.5, ) base_estimator = GaussianProcessRegressor( kernel=cov_amplitude * other_kernel, normalize_y=True, noise="gaussian", n_restarts_optimizer=2, ) elif base_estimator == "RF": base_estimator = RandomForestRegressor(n_estimators=100, min_samples_leaf=3) elif base_estimator == "ET": base_estimator = ExtraTreesRegressor(n_estimators=100, min_samples_leaf=3) elif base_estimator == "GBRT": gbrt = GradientBoostingRegressor(n_estimators=30, loss="quantile") base_estimator = GradientBoostingQuantileRegressor(base_estimator=gbrt) elif base_estimator == "DUMMY": return None if ("n_jobs" in kwargs.keys()) and not hasattr(base_estimator, "n_jobs"): del kwargs["n_jobs"] base_estimator.set_params(**kwargs) return base_estimator
[docs] def cook_initial_point_generator(generator, **kwargs): """Cook a default initial point generator. For the special generator called "random" the return value is None. Parameters ---------- generator : "lhs", "sobol", "halton", "hammersly", "grid", "random" \ or InitialPointGenerator instance" Should inherit from `deephyper.skopt.sampler.InitialPointGenerator`. kwargs : dict Extra parameters provided to the generator at init time. """ if generator is None: generator = "random" elif isinstance(generator, str): generator = generator.lower() if generator not in ["sobol", "halton", "hammersly", "lhs", "random", "grid"]: raise ValueError( "Valid strings for the generator parameter " " are: 'sobol', 'lhs', 'halton', 'hammersly'," "'random', or 'grid' not " "%s." % generator ) elif not isinstance(generator, InitialPointGenerator): raise ValueError( "generator has to be an InitialPointGenerator." "Got %s" % (str(type(generator))) ) if isinstance(generator, str): if generator == "sobol": generator = Sobol() elif generator == "halton": generator = Halton() elif generator == "hammersly": generator = Hammersly() elif generator == "lhs": generator = Lhs() elif generator == "grid": generator = Grid(border="include") elif generator == "random": return None generator.set_params(**kwargs) return generator
[docs] def dimensions_aslist(search_space): """Convert a dict representation of a search space into a list of dimensions, ordered by sorted(search_space.keys()). Parameters ---------- search_space : dict Represents search space. The keys are dimension names (strings) and values are instances of classes that inherit from the class :class:`deephyper.skopt.space.Dimension` (Real, Integer or Categorical) Returns ------- params_space_list: list list of deephyper.skopt.space.Dimension instances. Examples -------- >>> from deephyper.skopt.space.space import Real, Integer >>> from deephyper.skopt.utils import dimensions_aslist >>> search_space = {'name1': Real(0,1), ... 'name2': Integer(2,4), 'name3': Real(-1,1)} >>> dimensions_aslist(search_space)[0] Real(low=0, high=1, prior='uniform', transform='identity') >>> dimensions_aslist(search_space)[1] Integer(low=2, high=4, prior='uniform', transform='identity') >>> dimensions_aslist(search_space)[2] Real(low=-1, high=1, prior='uniform', transform='identity') """ params_space_list = [search_space[k] for k in sorted(search_space.keys())] return params_space_list
[docs] def point_asdict(search_space, point_as_list): """Convert the list representation of a point from a search space to the dictionary representation, where keys are dimension names and values are corresponding to the values of dimensions in the list. .. seealso:: :class:`deephyper.skopt.utils.point_aslist` Parameters ---------- search_space : dict Represents search space. The keys are dimension names (strings) and values are instances of classes that inherit from the class :class:`deephyper.skopt.space.Dimension` (Real, Integer or Categorical) point_as_list : list list with parameter values.The order of parameters in the list is given by sorted(params_space.keys()). Returns ------- params_dict : OrderedDict dictionary with parameter names as keys to which corresponding parameter values are assigned. Examples -------- >>> from deephyper.skopt.space.space import Real, Integer >>> from deephyper.skopt.utils import point_asdict >>> search_space = {'name1': Real(0,1), ... 'name2': Integer(2,4), 'name3': Real(-1,1)} >>> point_as_list = [0.66, 3, -0.15] >>> point_asdict(search_space, point_as_list) OrderedDict([('name1', 0.66), ('name2', 3), ('name3', -0.15)]) """ params_dict = OrderedDict() for k, v in zip(sorted(search_space.keys()), point_as_list): params_dict[k] = v return params_dict
[docs] def point_aslist(search_space, point_as_dict): """Convert a dictionary representation of a point from a search space to the list representation. The list of values is created from the values of the dictionary, sorted by the names of dimensions used as keys. .. seealso:: :class:`deephyper.skopt.utils.point_asdict` Parameters ---------- search_space : dict Represents search space. The keys are dimension names (strings) and values are instances of classes that inherit from the class :class:`deephyper.skopt.space.Dimension` (Real, Integer or Categorical) point_as_dict : dict dict with parameter names as keys to which corresponding parameter values are assigned. Returns ------- point_as_list : list list with point values.The order of parameters in the list is given by sorted(params_space.keys()). Examples -------- >>> from deephyper.skopt.space.space import Real, Integer >>> from deephyper.skopt.utils import point_aslist >>> search_space = {'name1': Real(0,1), ... 'name2': Integer(2,4), 'name3': Real(-1,1)} >>> point_as_dict = {'name1': 0.66, 'name2': 3, 'name3': -0.15} >>> point_aslist(search_space, point_as_dict) [0.66, 3, -0.15] """ point_as_list = [point_as_dict[k] for k in sorted(search_space.keys())] return point_as_list
[docs] def normalize_dimensions(dimensions): """Create a ``Space`` where all dimensions are normalized to unit range. This is particularly useful for Gaussian process based regressors and is used internally by ``gp_minimize``. Parameters ---------- dimensions : list, shape (n_dims,) List of search space dimensions. Each search dimension can be defined either as - a `(lower_bound, upper_bound)` tuple (for `Real` or `Integer` dimensions), - a `(lower_bound, upper_bound, "prior")` tuple (for `Real` dimensions), - as a list of categories (for `Categorical` dimensions), or - an instance of a `Dimension` object (`Real`, `Integer` or `Categorical`). NOTE: The upper and lower bounds are inclusive for `Integer` dimensions. """ transformed_dimensions = [] for dimension in dimensions: # check if dimension is of a Dimension instance if isinstance(dimension, Dimension): # Change the transformer to normalize # and add it to the new transformed dimensions dimension.set_transformer("normalize") transformed_dimensions.append(dimension) else: raise RuntimeError("Unknown dimension type " "(%s)" % type(dimension)) return transformed_dimensions
[docs] def check_list_types(x, types): """ Check whether all elements of a list `x` are of the correct type(s) and raise a ValueError if they are not. Note that `types` can be either a single object-type or a tuple of object-types. Raises `ValueError`, If one or more element in the list `x` is not of the correct type(s). Parameters ---------- x : list List of objects. types : object or list(object) Either a single object-type or a tuple of object-types. """ # List of the elements in the list that are incorrectly typed. err = list(filter(lambda a: not isinstance(a, types), x)) # If the list is non-empty then raise an exception. if len(err) > 0: msg = "All elements in list must be instances of {}, but found: {}" msg = msg.format(types, err) raise ValueError(msg)
[docs] def check_dimension_names(dimensions): """ Check whether all dimensions have names. Raises `ValueError`, if one or more dimensions are unnamed. Parameters ---------- dimensions : list(Dimension) List of Dimension-objects. """ # List of the dimensions that have no names. err_dims = list(filter(lambda dim: dim.name is None, dimensions)) # If the list is non-empty then raise an exception. if len(err_dims) > 0: msg = "All dimensions must have names, but found: {}" msg = msg.format(err_dims) raise ValueError(msg)
[docs] def use_named_args(dimensions): """ Wrapper / decorator for an objective function that uses named arguments to make it compatible with optimizers that use a single list of parameters. Your objective function can be defined as being callable using named arguments: `func(foo=123, bar=3.0, baz='hello')` for a search-space with dimensions named `['foo', 'bar', 'baz']`. But the optimizer will only pass a single list `x` of unnamed arguments when calling the objective function: `func(x=[123, 3.0, 'hello'])`. This wrapper converts your objective function with named arguments into one that accepts a list as argument, while doing the conversion automatically. The advantage of this is that you don't have to unpack the list of arguments `x` yourself, which makes the code easier to read and also reduces the risk of bugs if you change the number of dimensions or their order in the search-space. Examples -------- >>> # Define the search-space dimensions. They must all have names! >>> from deephyper.skopt.space import Real >>> from deephyper.skopt import forest_minimize >>> from deephyper.skopt.utils import use_named_args >>> dim1 = Real(name='foo', low=0.0, high=1.0) >>> dim2 = Real(name='bar', low=0.0, high=1.0) >>> dim3 = Real(name='baz', low=0.0, high=1.0) >>> >>> # Gather the search-space dimensions in a list. >>> dimensions = [dim1, dim2, dim3] >>> >>> # Define the objective function with named arguments >>> # and use this function-decorator to specify the >>> # search-space dimensions. >>> @use_named_args(dimensions=dimensions) ... def my_objective_function(foo, bar, baz): ... return foo ** 2 + bar ** 4 + baz ** 8 >>> >>> # Not the function is callable from the outside as >>> # `my_objective_function(x)` where `x` is a list of unnamed arguments, >>> # which then wraps your objective function that is callable as >>> # `my_objective_function(foo, bar, baz)`. >>> # The conversion from a list `x` to named parameters `foo`, >>> # `bar`, `baz` >>> # is done automatically. >>> >>> # Run the optimizer on the wrapped objective function which is called >>> # as `my_objective_function(x)` as expected by `forest_minimize()`. >>> result = forest_minimize(func=my_objective_function, ... dimensions=dimensions, ... n_calls=20, base_estimator="ET", ... random_state=4) >>> >>> # Print the best-found results in same format as the expected result. >>> print("Best fitness: " + str(result.fun)) Best fitness: 0.1948080835239698 >>> print("Best parameters: {}".format(result.x)) Best parameters: [0.44134853091052617, 0.06570954323368307, 0.17586123323419825] Parameters ---------- dimensions : list(Dimension) List of `Dimension`-objects for the search-space dimensions. Returns ------- wrapped_func : callable Wrapped objective function. """ def decorator(func): """ This uses more advanced Python features to wrap `func` using a function-decorator, which are not explained so well in the official Python documentation. A good video tutorial explaining how this works is found here: https://www.youtube.com/watch?v=KlBPCzcQNU8 Parameters ---------- func : callable Function to minimize. Should take *named arguments* and return the objective value. """ # Ensure all dimensions are correctly typed. check_list_types(dimensions, Dimension) # Ensure all dimensions have names. check_dimension_names(dimensions) @wraps(func) def wrapper(x): """ This is the code that will be executed every time the wrapped / decorated `func` is being called. It takes `x` as a single list of parameters and converts them to named arguments and calls `func` with them. Parameters ---------- x : list A single list of parameters e.g. `[123, 3.0, 'linear']` which will be converted to named arguments and passed to `func`. Returns ------- objective_value The objective value returned by `func`. """ # Ensure the number of dimensions match # the number of parameters in the list x. if len(x) != len(dimensions): msg = ( "Mismatch in number of search-space dimensions. " "len(dimensions)=={} and len(x)=={}" ) msg = msg.format(len(dimensions), len(x)) raise ValueError(msg) # Create a dict where the keys are the names of the dimensions # and the values are taken from the list of parameters x. arg_dict = {dim.name: value for dim, value in zip(dimensions, x)} # Call the wrapped objective function with the named arguments. objective_value = func(**arg_dict) return objective_value return wrapper return decorator
[docs] def cook_objective_scaler(scaler, base_estimator): """Prepare a Scikit-Learn preprocessing pipeline to map the output objective to a different space.""" scalers = {} # identity pipeline = FunctionTransformer(func=lambda x: x, inverse_func=lambda x: x) scalers["identity"] = pipeline # minmax pipeline = MinMaxScaler() scalers["minmax"] = pipeline # minmaxlog scaler_log = FunctionTransformer( func=lambda x: np.log(x + 1e-12), inverse_func=lambda x: np.exp(x) - 1e-12, check_inverse=True, ) pipeline = make_pipeline(MinMaxScaler(), scaler_log) scalers["minmaxlog"] = pipeline # log pipeline = FunctionTransformer( func=lambda x: np.log(x - x.min(axis=0) + 1), inverse_func=None, check_inverse=False, ) scalers["log"] = pipeline # quantile-uniform rounding = FunctionTransformer( func=lambda x: np.around(x, decimals=100), inverse_func=lambda x: x, check_inverse=False, ) pipeline = make_pipeline( rounding, QuantileTransformer(output_distribution="uniform") ) scalers["quantile-uniform"] = pipeline if scaler == "auto": if isinstance(base_estimator, RandomForestRegressor): scaler = "quantile-uniform" else: scaler = "identity" if type(scaler) is str: if scaler in scalers: return scalers[scaler] else: raise ValueError( f"Objective scaler should be a sklearn.pipeline.Pipeline or a value in {list(scalers.keys())}" ) else: return scaler