Source code for deephyper.hpo._search

import abc
import copy
import csv
import json
import logging
import os
import pathlib
import time
from typing import Any, Dict, List, Literal, Optional
from inspect import iscoroutinefunction

import numpy as np
import pandas as pd

from deephyper.analysis.hpo import (
    get_mask_of_rows_without_failures,
    read_results_from_csv,
)
from deephyper.evaluator import Evaluator, HPOJob, MaximumJobsSpawnReached, JobStatus
from deephyper.evaluator.callback import TqdmCallback
from deephyper.hpo._problem import HpProblem
from deephyper.hpo._solution import (
    ArgMaxEstSelection,
    ArgMaxObsSelection,
    Solution,
    SolutionSelection,
)
from deephyper.stopper import Stopper
from deephyper.skopt.moo import non_dominated_set

__all__ = ["Search", "SearchHistory"]

logger = logging.getLogger(__name__)


def get_init_params_as_json(obj):
    """Get the parameters of an object in a json format.

    Args:
        obj (any): The object of which we want to know the ``__init__`` arguments.

    Returns:
        params (dict): Parameter names mapped to their values.
    """
    if hasattr(obj, "_init_params"):
        base_init_params = obj._init_params
        if "self" in base_init_params:
            base_init_params.pop("self")
    else:
        base_init_params = dict()
    params = dict()
    for k, v in base_init_params.items():
        if "__" not in k:
            if hasattr(v, "to_json"):
                params[k] = v.to_json()
            else:
                try:
                    params[k] = json.loads(json.dumps(v))
                except Exception:
                    params[k] = "NA"
    return params


class SearchHistory:
    """Represents the history of a search."""

    def __init__(
        self,
        problem: HpProblem,
        solution_selection: Optional[SolutionSelection] = None,
    ):
        self.problem = problem
        self.solution_selection = solution_selection

        self.num_objective = None
        self.jobs: list[HPOJob] = []
        self.pareto_efficient = []
        self._csv_cursor = 0
        self._csv_columns = None
        self.solution_history = {}

    def __len__(self):
        return len(self.jobs)

    def __getitem__(self, idx) -> HPOJob:
        return self.jobs[idx]

    def set_num_objective(self, job):
        obj = job.objective
        if isinstance(obj, (tuple, list)):
            self.num_objective = len(obj)
        else:
            self.num_objective = 1
        if isinstance(self.solution_selection, SolutionSelection):
            self.solution_selection.num_objective = self.num_objective

    def extend(self, jobs: List[HPOJob]):
        # Do nothing if input list is empty
        if len(jobs) == 0:
            return

        if self.num_objective is None:
            self.set_num_objective(jobs[0])
        self.jobs.extend(jobs)

        if isinstance(self.solution_selection, SolutionSelection):
            self.solution_selection.update(jobs)
            for job in jobs:
                self.solution_history[job.id] = self.solution

    @property
    def solution(self) -> Solution | None:
        if isinstance(self.solution_selection, SolutionSelection):
            return self.solution_selection.solution
        else:
            return None

    def _to_dict(self, jobs: List[HPOJob]) -> List[Dict[str, Any]]:
        results = []

        for job in jobs:
            # Prefix args with "p:"
            result = {f"p:{k}": v for k, v in job.args.items()}

            # Extract and process the objective
            obj = job.objective
            if isinstance(obj, (tuple, list)):
                if self.num_objective is None:
                    self.num_objective = len(obj)
                    self.solution_selection.num_objective = self.num_objective
                for i, val in enumerate(obj):
                    result[f"objective_{i}"] = val
            else:
                if self.num_objective is None:
                    self.num_objective = 1
                    self.solution_selection.num_objective = self.num_objective
                if self.num_objective > 1:
                    for i in range(self.num_objective):
                        result[f"objective_{i}"] = obj
                else:
                    result["objective"] = obj

            # Add job metadata
            result["job_id"] = int(job.id.split(".")[1])
            result["job_status"] = job.status.name

            # Add filtered metadata with "m:" prefix
            result.update({f"m:{k}": v for k, v in job.metadata.items() if not k.startswith("_")})

            # Optional Pareto-efficient tag
            if hasattr(job, "pareto_efficient"):
                result["pareto_efficient"] = job.pareto_efficient

            # Solution
            if isinstance(self.solution_selection, SolutionSelection):
                if self.num_objective == 1:
                    sol = dict(self.solution_history[job.id])
                    parameters = sol.pop("parameters")
                    objective = sol.pop("objective")
                    if parameters is not None and objective is not None:
                        result.update({f"sol.p:{k}": v for k, v in parameters.items()})
                        result.update({"sol.objective": objective})
                        result.update({f"sol.{k}": v for k, v in sol.items() if v is not None})

            results.append(result)

        return results

    def to_dataframe(self) -> pd.DataFrame:
        df = pd.DataFrame(self._to_dict(self.jobs))
        return df

    def to_csv_complete(self, path: str) -> pd.DataFrame:
        df = self.to_dataframe()
        df.to_csv(path, index=False)
        return df

    def to_csv_partial(self, path: str, flush: bool = False):
        """Write results to CSV file.

        Args:
            path (str):

                Path of the CSV file.

            flush (bool, optional):

                A boolean that indicates if the CSV write in the case where ``partial=True`` should
                be flushed anyway. Otherwise if ``False`` it will not write to disk until there is
                a successful job. Defaults to ``False``.
        """
        resultsList = self._to_dict(self.jobs[self._csv_cursor :])

        if len(resultsList) > 0:
            started_dumping = self._csv_cursor > 0
            file_mode = "a" if started_dumping else "w"

            if not (started_dumping):
                for result in resultsList:
                    # Waiting to start receiving non-failed jobs before dumping results
                    is_single_obj_and_has_success = (
                        "objective" in result and type(result["objective"]) is not str
                    )
                    is_multi_obj_and_has_success = (
                        "objective_0" in result and type(result["objective_0"]) is not str
                    )
                    if is_single_obj_and_has_success or is_multi_obj_and_has_success or flush:
                        self._csv_columns = result.keys()
                        break

            if self._csv_columns is not None:
                with open(os.path.join(path), file_mode) as fp:
                    writer = csv.DictWriter(fp, self._csv_columns, extrasaction="ignore")
                    if not (started_dumping):
                        writer.writeheader()
                    writer.writerows(resultsList)
                    self._csv_cursor += len(resultsList)

    def compute_pareto_efficiency(self):
        """Compute the Pareto-Front from the current history.

        A column ``pareto_efficient`` is added to the dataframe. It is ``True`` if the
        point is Pareto efficient.
        """
        logger.info("Computing pareto efficient indicator...")
        df = self.to_dataframe()

        # Check if Multi-Objective Optimization was performed to save the pareto front
        objective_columns = [col for col in df.columns if col.startswith("objective")]

        if len(objective_columns) > 1:
            _, mask_no_failures = get_mask_of_rows_without_failures(df, objective_columns[0])
            objectives = -df.loc[mask_no_failures, objective_columns].values.astype(float)
            mask_pareto_front = non_dominated_set(objectives)

            self.pareto_efficient = np.zeros((len(self.jobs),), dtype=bool)
            self.pareto_efficient[mask_no_failures] = mask_pareto_front

            for job, pf in zip(self.jobs, self.pareto_efficient):
                job.pareto_efficient = pf