Source code for deephyper.ensemble._bagging_ensemble

import os
import traceback

import tensorflow as tf
import numpy as np
import ray

from deephyper.nas.metrics import selectMetric
from deephyper.ensemble import BaseEnsemble
from deephyper.nas.run._util import set_memory_growth_for_visible_gpus


def mse(y_true, y_pred):
    return tf.square(y_true - y_pred)


@ray.remote(num_cpus=1)
def model_predict(model_path, X, batch_size=32, verbose=0):
    """Perform an inference of the model located at ``model_path``.

    :meta private:

    Args:
        model_path (str): Path to the ``h5`` file to load to perform the inferencec.
        X (array): array of input data for which we perform the inference.
        batch_size (int, optional): Batch size used to perform the inferencec. Defaults to 32.
        verbose (int, optional): Verbose option. Defaults to 0.

    Returns:
        array: The prediction based on the provided input data.
    """

    # GPU Configuration if available
    set_memory_growth_for_visible_gpus(True)
    tf.keras.backend.clear_session()
    model_file = model_path.split("/")[-1]

    try:
        if verbose:
            print(f"Loading model {model_file}", flush=True)
        model = tf.keras.models.load_model(model_path, compile=False)
    except:
        if verbose:
            print(f"Could not load model {model_file}", flush=True)
            traceback.print_exc()
        model = None

    if model:
        y = model.predict(X, batch_size=batch_size)
    else:
        y = None

    return y


class BaggingEnsemble(BaseEnsemble):
    """Ensemble based on uniform averaging of the predictions of each members.

    :meta private:

    Args:
        model_dir (str): Path to directory containing saved Keras models in .h5 format.
        loss (callable): a callable taking (y_true, y_pred) as input.
        size (int, optional): Number of unique models used in the ensemble. Defaults to 5.
        verbose (bool, optional): Verbose mode. Defaults to True.
        ray_address (str, optional): Address of the Ray cluster. If "auto" it will try to connect to an existing cluster. If "" it will start a local Ray cluster. Defaults to "".
        num_cpus (int, optional): Number of CPUs allocated to load one model and predict. Defaults to 1.
        num_gpus (int, optional): Number of GPUs allocated to load one model and predict. Defaults to None.
        batch_size (int, optional): Batch size used batchify the inference of loaded models. Defaults to 32.
        selection (str, optional): Selection strategy to build the ensemble. Value in ``["topk"]``. Default to ``topk``.
        mode (str, optional): Value in ``["regression", "classification"]``. Default to ``"regression"``.
    """

    def __init__(
        self,
        model_dir,
        loss=mse,
        size=5,
        verbose=True,
        ray_address="",
        num_cpus=1,
        num_gpus=None,
        batch_size=32,
        selection="topk",
        mode="regression",
    ):
        super().__init__(
            model_dir,
            loss,
            size,
            verbose,
            ray_address,
            num_cpus,
            num_gpus,
            batch_size,
        )
        assert selection in ["topk"]
        self.selection = selection
        assert mode in ["regression", "classification"]
        self.mode = mode

    def __repr__(self) -> str:
        out = super().__repr__()
        out += f"Mode: {self.mode}\n"
        out += f"Selection: {self.selection}\n"
        return out

    def fit(self, X, y):
        """Fit the current algorithm to the provided data.

        Args:
            X (array): The input data.
            y (array): The output data.

        Returns:
            BaseEnsemble: The current fitted instance.
        """
        X_id = ray.put(X)

        model_files = self._list_files_in_model_dir()
        model_path = lambda f: os.path.join(self.model_dir, f)

        y_pred = ray.get(
            [
                model_predict.options(
                    num_cpus=self.num_cpus, num_gpus=self.num_gpus
                ).remote(model_path(f), X_id, self.batch_size, self.verbose)
                for f in model_files
            ]
        )
        y_pred = np.array([arr for arr in y_pred if arr is not None])

        members_indexes = topk(self.loss, y_true=y, y_pred=y_pred, k=self.size)
        self.members_files = [model_files[i] for i in members_indexes]

        return self

    def predict(self, X) -> np.ndarray:
        """Execute an inference of the ensemble for the provided data.

        Args:
            X (array): An array of input data.

        Returns:
            array: The prediction.
        """
        # make predictions
        X_id = ray.put(X)
        model_path = lambda f: os.path.join(self.model_dir, f)

        y_pred = ray.get(
            [
                model_predict.options(
                    num_cpus=self.num_cpus, num_gpus=self.num_gpus
                ).remote(model_path(f), X_id, self.batch_size, self.verbose)
                for f in self.members_files
            ]
        )
        y_pred = np.array([arr for arr in y_pred if arr is not None])

        y = aggregate_predictions(y_pred, regression=(self.mode == "regression"))

        return y

    def evaluate(self, X, y, metrics=None):
        """Compute metrics based on the provided data.

        Args:
            X (array): An array of input data.
            y (array): An array of true output data.
            metrics (callable, optional): A metric. Defaults to None.
        """
        scores = {}

        y_pred = self.predict(X)

        scores["loss"] = tf.reduce_mean(self.loss(y, y_pred)).numpy()
        if metrics:
            for metric_name in metrics:
                scores[metric_name] = apply_metric(metric_name, y, y_pred)

        return scores


[docs]class BaggingEnsembleRegressor(BaggingEnsemble): """Ensemble for regression based on uniform averaging of the predictions of each members. Args: model_dir (str): Path to directory containing saved Keras models in .h5 format. loss (callable): a callable taking (y_true, y_pred) as input. size (int, optional): Number of unique models used in the ensemble. Defaults to 5. verbose (bool, optional): Verbose mode. Defaults to True. ray_address (str, optional): Address of the Ray cluster. If "auto" it will try to connect to an existing cluster. If "" it will start a local Ray cluster. Defaults to "". num_cpus (int, optional): Number of CPUs allocated to load one model and predict. Defaults to 1. num_gpus (int, optional): Number of GPUs allocated to load one model and predict. Defaults to None. batch_size (int, optional): Batch size used batchify the inference of loaded models. Defaults to 32. selection (str, optional): Selection strategy to build the ensemble. Value in ``["topk"]``. Default to ``topk``. """ def __init__( self, model_dir, loss=mse, size=5, verbose=True, ray_address="", num_cpus=1, num_gpus=None, selection="topk", ): super().__init__( model_dir, loss, size, verbose, ray_address, num_cpus, num_gpus, selection, mode="regression", )
[docs]class BaggingEnsembleClassifier(BaggingEnsemble): """Ensemble for classification based on uniform averaging of the predictions of each members. Args: model_dir (str): Path to directory containing saved Keras models in .h5 format. loss (callable): a callable taking (y_true, y_pred) as input. size (int, optional): Number of unique models used in the ensemble. Defaults to 5. verbose (bool, optional): Verbose mode. Defaults to True. ray_address (str, optional): Address of the Ray cluster. If "auto" it will try to connect to an existing cluster. If "" it will start a local Ray cluster. Defaults to "". num_cpus (int, optional): Number of CPUs allocated to load one model and predict. Defaults to 1. num_gpus (int, optional): Number of GPUs allocated to load one model and predict. Defaults to None. batch_size (int, optional): Batch size used batchify the inference of loaded models. Defaults to 32. selection (str, optional): Selection strategy to build the ensemble. Value in ``["topk"]``. Default to ``topk``. """ def __init__( self, model_dir, loss=mse, size=5, verbose=True, ray_address="", num_cpus=1, num_gpus=None, selection="topk", ): super().__init__( model_dir, loss, size, verbose, ray_address, num_cpus, num_gpus, selection, mode="classification", )
def apply_metric(metric_name, y_true, y_pred) -> float: """Perform the computation of provided metric. :meta private: Args: metric_name (str|callable): If ``str`` then it needs to be a metric available in ``deephyper.nas.metrics``. y_true (array): Array of true predictions. y_pred (array): Array of predicted predictions Returns: float: a scalar value of the computed metric. """ metric_func = selectMetric(metric_name) metric = tf.reduce_mean( metric_func( tf.convert_to_tensor(y_true, dtype=np.float32), tf.convert_to_tensor(y_pred, dtype=np.float32), ) ).numpy() return metric def aggregate_predictions(y_pred, regression=True): """Build an ensemble from predictions. :meta private: Args: ensemble_members (np.array): Indexes of selected members in the axis-0 of y_pred. y_pred (np.array): Predictions array of shape (n_models, n_samples, n_outputs). regression (bool): Boolean (True) if it is a regression (False) if it is a classification. Return: A TFP Normal Distribution in the case of regression and a np.array with average probabilities in the case of classification. """ n = np.shape(y_pred)[0] y_pred = np.sum(y_pred, axis=0) if regression: agg_y_pred = y_pred / n else: # classification agg_y_pred = np.argmax(y_pred, axis=1) return agg_y_pred def topk(loss_func, y_true, y_pred, k=2) -> list: """Select the Top-k models to be part of the ensemble. A model can appear only once in the ensemble for this strategy. :meta private: Args: loss_func (callable): loss function. y_true (array): Array of true predictions. y_pred (array): Array of predicted predictions k (int, optional): Number of models composing the ensemble. Defaults to 2. Returns: list: a list of model indexes composing the ensembles. """ # losses is of shape: (n_models, n_outputs) losses = tf.reduce_mean(loss_func(y_true, y_pred), axis=1).numpy() ensemble_members = np.argsort(losses, axis=0)[:k].reshape(-1).tolist() return ensemble_members