Source code for deephyper.ensemble.selector._greedy
from typing import Callable, List, Sequence
import numpy as np
from sklearn.utils import check_random_state
from deephyper.ensemble.aggregator._aggregator import Aggregator
from deephyper.ensemble.selector._selector import Selector
[docs]
class GreedySelector(Selector):
"""Selection method implementing Greedy (a.k.a., Caruana) selection.
This method iteratively and greedily selects the predictors that minimize
the loss when aggregated together.
Args:
loss_func (Callable or Loss): a loss function that takes two arguments: the true target
values and the predicted target values.
aggregator (Aggregator): The aggregator to use to combine the predictions of the selected
predictors.
k (int, optional): The number of unique predictors to select for the ensemble. Defaults to
``5``.
k_init (int, optional): Regularization parameter for greedy selection. It is the number of
predictors to select in the initialization step. Defaults to ``1``.
max_it (int, optional): Maximum number of iterations which also corresponds to the number
of non-unique predictors added to the ensemble. Defaults to ``-1``.
eps_tol (float, optional): Tolerance for the stopping criterion. Defaults to ``1e-3``.
with_replacement (bool, optional): Performs greedy selection with replacement of models
already selected. Defaults to ``True``.
early_stopping (bool, optional): Stops the ensemble selection as soon as the loss stops
improving. Defaults to ``True``.
bagging (bool, optional): Performs boostrap resampling of available predictors at each
iteration. This can be particularly useful when the dataset used for selection is
small. Defaults to ``False``.
verbose (bool, optional):
Turns on the verbose mode. Defaults to ``False``.
"""
def __init__(
self,
loss_func: Callable,
aggregator: Aggregator,
k: int = 5,
k_init: int = 5,
max_it: int = -1,
eps_tol: float = 1e-3,
with_replacement: bool = True,
early_stopping: bool = True,
bagging: bool = False,
random_state=None,
verbose: bool = False,
):
super().__init__(loss_func)
self.aggregator = aggregator
self.k = k
self.k_init = k_init
self.max_it = max_it
self.eps_tol = eps_tol
self.with_replacement = with_replacement
self.early_stopping = early_stopping
self.bagging = bagging
self.random_state = check_random_state(random_state)
self.verbose = verbose
def _aggregate(self, y_predictors: np.ndarray, weights: List = None):
return self.aggregator.aggregate(y_predictors, weights)
[docs]
def select(self, y, y_predictors) -> Sequence[int]:
# Initialization
losses = [self._evaluate(y, y_pred_i) for y_pred_i in y_predictors]
selected_indices = np.argsort(losses)[: self.k_init].tolist()
selected_indices_weights = [1 / self.k_init] * self.k_init
loss_min = self._evaluate(y, self._aggregate([y_predictors[i] for i in selected_indices]))
n_predictors = len(y_predictors)
bagged_predictors = None
if self.verbose:
tmp = [losses[i] for i in selected_indices]
print(f"Ensemble initialized with {selected_indices} with loss {tmp}")
# Greedy steps
it = 0
while (self.max_it < 0 or it < self.max_it) and len(np.unique(selected_indices)) < self.k:
losses = []
if self.bagging:
bagged_predictors = np.unique(
self.random_state.randint(low=0, high=n_predictors, size=n_predictors)
)
for i in range(n_predictors):
# Applying conditions that ignore some indices in the selection
if len(selected_indices) == 1 and i in selected_indices:
losses.append(np.nan)
continue
if not self.with_replacement and i in selected_indices:
losses.append(np.nan)
continue
if self.bagging and i not in bagged_predictors:
losses.append(np.nan)
continue
indices_ = selected_indices + [i]
indices_, indices_weights_ = np.unique(indices_, return_counts=True)
indices_weights_ = indices_weights_ / np.sum(indices_weights_)
y_ = [y_predictors[i] for i in indices_]
score = self._evaluate(
y,
self._aggregate(y_, indices_weights_),
)
losses.append(score)
i_min_ = np.nanargmin(losses)
loss_min_ = losses[i_min_]
it += 1
# The second condition is related to numerical errors
if (self.early_stopping and loss_min_ >= (loss_min - self.eps_tol)) or (
len(np.unique(selected_indices)) == 1 and selected_indices[0] == i_min_
):
if self.verbose:
print(f"Step {it}, ensemble selection stopped")
break
loss_min = loss_min_
selected_indices.append(i_min_)
if self.verbose:
print(
f"Step {it}, ensemble is {selected_indices}, new member {i_min_} with"
f" loss {loss_min}"
)
selected_indices, selected_indices_weights = np.unique(selected_indices, return_counts=True)
selected_indices_weights = selected_indices_weights / np.sum(selected_indices_weights)
if self.verbose:
print(
f"After {it} steps, the final ensemble is {selected_indices} with "
f"weights {selected_indices_weights}"
)
return selected_indices.tolist(), selected_indices_weights.tolist()