Source code for hypertunity.optimisation.bo

"""Bayesian Optimisation using Gaussian Process regression."""

from multiprocessing import cpu_count
from typing import Any, Dict, List, Sequence, Tuple, Type, TypeVar, Union

import GPy
import GPyOpt
import numpy as np
from GPyOpt.core import errors as gpyopt_err

from hypertunity import utils
from hypertunity.domain import Domain, Sample
from hypertunity.optimisation.base import (
    EvaluationScore,
    ExhaustedSearchSpaceError,
    Optimiser
)

__all__ = [
    "BayesianOptimisation",
    "BayesianOptimization"
]

GPyOptSample = TypeVar("GPyOptSample", List[List], np.ndarray)
GPyOptDomain = List[Dict[str, Any]]
GPyOptCategoricalValueMapper = Dict[str, Dict[Any, int]]
GPyOptDiscreteTypeMapper = Dict[str, Dict[Any, type]]


[docs]class BayesianOptimisation(Optimiser): """Bayesian Optimiser using `GPyOpt` as a backend.""" CONTINUOUS_TYPE = "continuous" DISCRETE_TYPE = "discrete" CATEGORICAL_TYPE = "categorical"
[docs] def __init__(self, domain, seed=None): """Initialise the optimiser's domain. Args: domain: :class:`Domain`. The domain of the objective function. seed: (optional) :obj:`int`. The seed of the optimiser. Used for reproducibility purposes. """ np.random.seed(seed) domain = Domain(domain.as_dict(), seed=seed) super(BayesianOptimisation, self).__init__(domain) converted_and_mappers = self._convert_to_gpyopt_domain(self.domain) ( self.gpyopt_domain, self._categorical_value_mapper, self._discrete_type_mapper ) = converted_and_mappers self._inv_categorical_value_mapper = { name: {v: k for k, v in mapping.items()} for name, mapping in self._categorical_value_mapper.items() } self._data_x = np.array([[]]) self._data_fx = np.array([[]]) self.__is_empty_data = True
@staticmethod def _convert_to_gpyopt_domain( orig_domain: Domain ) -> Tuple[GPyOptDomain, GPyOptCategoricalValueMapper, GPyOptDiscreteTypeMapper]: """Convert a :class:`Domain` type object to :obj:`GPyOptDomain`. Args: orig_domain: :class:`Domain` to convert. Returns: A tuple of the converted :obj:`GPyOptDomain` object and a value mapper to assign each categorical value to an integer (0, 1, 2, 3 ...). This is done to abstract away the type of the categorical domain from the `GPyOpt` internals and thus arbitrary types are supported. Notes: The categorical options must be hashable. This behaviour may change in the future. """ gpyopt_domain = [] value_mapper = {} type_mapper = {} flat_domain = orig_domain.flatten() for names, vals in flat_domain.items(): dim_name = utils.join_strings(names) domain_type = Domain.get_type(vals) if domain_type == Domain.Continuous: dim_type = BayesianOptimisation.CONTINUOUS_TYPE elif domain_type == Domain.Discrete: dim_type = BayesianOptimisation.DISCRETE_TYPE type_mapper[dim_name] = {v: type(v) for v in vals} elif domain_type == Domain.Categorical: dim_type = BayesianOptimisation.CATEGORICAL_TYPE value_mapper[dim_name] = {v: i for i, v in enumerate(vals)} vals = tuple(range(len(vals))) else: raise ValueError( f"Badly specified subdomain {names} with values {vals}." ) gpyopt_domain.append({ "name": dim_name, "type": dim_type, "domain": tuple(vals) }) assert len(gpyopt_domain) == len(orig_domain), \ "Mismatching dimensionality after domain conversion." return gpyopt_domain, value_mapper, type_mapper def _convert_to_gpyopt_sample(self, orig_sample: Sample) -> GPyOptSample: """Convert a sample of type :class:`Sample` to type :obj:`GPyOptSample` and vice versa. If the function is supplied with a :obj:`GPyOptSample` type object it calls the dedicated function `self._convert_from_gpyopt_sample`. Args: orig_sample: :class:`Sample` type object to be converted. Returns: A :obj:`GPyOptSample` type object with the same values as `orig_sample`. """ gpyopt_sample = [] # iterate in the order of the GPyOpt domain names for dim in self.gpyopt_domain: keys = utils.split_string(dim["name"]) val = orig_sample[keys] if dim["type"] == BayesianOptimisation.CATEGORICAL_TYPE: val = self._categorical_value_mapper[dim["name"]][val] gpyopt_sample.append(val) return np.asarray(gpyopt_sample) def _convert_from_gpyopt_sample(self, gpyopt_sample: GPyOptSample) -> Sample: """Convert :obj:`GPyOptSample` type object to the corresponding :class:`Sample` type. Args: gpyopt_sample: :obj:`GPyOptSample` object to be converted. Returns: A :class:`Sample` type object with the same values as `gpyopt_sample`. """ if len(self.gpyopt_domain) != len(gpyopt_sample): raise ValueError( f"Cannot convert sample with mismatching dimensionality. " f"The original space has {len(self.domain)} dimensions and the " f"sample {len(gpyopt_sample)} dimensions." ) orig_sample = {} for dim, value in zip(self.gpyopt_domain, gpyopt_sample): names = utils.split_string(dim["name"]) sub_dim = orig_sample for name in names[:-1]: if name not in sub_dim: sub_dim[name] = {} sub_dim = sub_dim[name] if dim["type"] == BayesianOptimisation.CATEGORICAL_TYPE: sub_dim[names[-1]] = self._inv_categorical_value_mapper[dim["name"]][value] elif dim["type"] == BayesianOptimisation.DISCRETE_TYPE: sub_dim[names[-1]] = self._discrete_type_mapper[dim["name"]][value](value) else: sub_dim[names[-1]] = value return Sample(orig_sample)
[docs] @utils.support_american_spelling def run_step( self, batch_size: int = 1, minimise: bool = False, **kwargs ) -> List[Sample]: """Run one step of Bayesian optimisation with a GP regression surrogate model. The first sample of the domain is chosen at random. Only after the model has been updated with at least one (data point, evaluation score)-pair the GPs are built and the acquisition function computed and optimised. Args: batch_size: (optional) :obj:`int`. The number of samples to suggest at once. If larger than one, there is no guarantee for the optimality of the number of probes. minimise: (optional) :obj:`bool`. Whether the objective should be minimised **kwargs: optional keyword arguments which will be passed to the backend `GPyOpt.methods.BayesianOptimisation` optimiser. Keyword Args: model: :obj:`str` or :obj:`GPy.Model` object. The surrogate model used by the backend optimiser. kernel: :obj:`GPy.Kern` object. The kernel used by the model. variance: :obj:`float`. The variance of the objective function. Returns: A list of `batch_size`-many :class:`Sample` instances at which the objective should be evaluated next. Raises: :class:`ExhaustedSearchSpaceError`: if the domain is discrete and gets exhausted. """ if self.__is_empty_data: next_samples = [self.domain.sample() for _ in range(batch_size)] else: assert len(self._data_x) > 0 and len(self._data_fx) > 0, \ "Cannot initialise BO from empty data." default_kwargs = { "num_cores": min(batch_size, cpu_count() - 1), "normalize_Y": True, "acquisition_type": "EI", "de_duplication": True, "model_type": "GP", "evaluator_type": "local_penalization" if batch_size > 1 else "sequential" } if "model" in kwargs: model = kwargs.pop("model") # NOTE: Remove this test for model type after the bug in GPyOpt # is fixed: https://github.com/SheffieldML/GPyOpt/issues/183 if (isinstance(model, str) and model.lower() == "gp_mcmc" and batch_size > 1): raise ValueError( "GP_MCMC model cannot be used with a batch size > 1 " "due to a bug in GPyOpt: " "https://github.com/SheffieldML/GPyOpt/issues/183" ) kernel = kwargs.pop("kernel", None) variance = kwargs.pop("variance", None) default_kwargs["model"] = self._build_model( model, kernel, variance ) if (variance is not None and all(np.atleast_1d(np.isclose(variance, 0.0)))): default_kwargs["exact_feval"] = True default_kwargs = _overwrite_dict(default_kwargs, kwargs) # NOTE: as of GPyOpt 1.2.5 adding new data to an existing model is # not yet possible, hence the object recreation. This behaviour # might be changed in future versions. In this case the code should # be refactored such that `bo` is initialised once and `update` # takes care of the extension of the (X, Y) samples. bo = GPyOpt.methods.BayesianOptimization( f=None, domain=self.gpyopt_domain, maximize=not minimise, X=self._data_x, # NOTE: the following hack is necessary due to a bug in GPyOpt. # The code should be updated once this gets fixed: # https://github.com/SheffieldML/GPyOpt/issues/180 Y=(-1 + 2 * minimise) * self._data_fx, initial_design_numdata=len(self._data_x), batch_size=batch_size, **default_kwargs) try: gpyopt_samples = bo.suggest_next_locations() except gpyopt_err.FullyExploredOptimizationDomainError as err: raise ExhaustedSearchSpaceError from err next_samples = [self._convert_from_gpyopt_sample(s) for s in gpyopt_samples] return next_samples
def _build_model(self, model: Union[str, Type[GPy.Model]] = "GP", kernel: GPy.kern.Kern = None, variance: float = None): """Build the surrogate model for the GPyOpt BayesianOptimisation. The default model is 'gp'. In case of a large number of already evaluated samples, a 'sparse_gp' is used to speed up computation. Args: model: :obj:`str` or :obj:`GPy.Model`, the GP regression model. kernel: :obj:`GPy.kern.Kern`, the kernel of the GP regression model. variance: :obj:`float`, the variance of the evaluations (used only if supported by the model). Returns: A :obj:`GPy.Model` instance. """ if isinstance(model, GPy.Model): return model if isinstance(model, str): model = model.lower() if model == "gp": return GPyOpt.models.GPModel(kernel=kernel, noise_var=variance, sparse=len(self._data_x) > 25) if model == "gp_mcmc": return GPyOpt.models.GPModel_MCMC( kernel=kernel, noise_var=variance ) raise ValueError( f"Unknown model {model}. When supplying a custom kernel or " f"the variance of the objective function, the model has to be " f"one from {{'GP', 'GP_MCMC'}}. Otherwise you should supply a " f"custom `GPy.Model` instance." ) raise TypeError("Argument `model` must be of type str or `GPy.Model`.")
[docs] def update(self, x, fx, **kwargs): """Update the surrogate model with the domain sample `x` and the function evaluation `fx`. Args: x: class:`Sample`. One sample of the domain of the objective function. fx: a :obj:`float`, an :class:`EvaluationScore` or a :obj:`dict`. The evaluation scores of the objective evaluated at `x`. If given as :obj:`dict` then it must be a mapping from metric names to :class:`EvaluationScore` or :obj:`float` results. **kwargs: unused by this model. """ super(BayesianOptimisation, self).update(x, fx) # both `converted_x` and `array_fx` must be 2dim arrays if isinstance(x, Sample): converted_x, array_fx = self._convert_evaluation_sample(x, fx) elif (isinstance(x, Sequence) and isinstance(fx, Sequence) and len(x) == len(fx)): # append each history point to the tracked history and # convert to numpy arrays converted_x, array_fx = map( np.concatenate, zip(*[self._convert_evaluation_sample(i, j) for i, j in zip(x, fx)])) else: raise ValueError( "Update values for `x` and `f(x)` must be either " "`Sample` and an evaluation or a list thereof." ) if self._data_x.size == 0: self._data_x = converted_x self._data_fx = array_fx else: self._data_x = np.concatenate([self._data_x, converted_x]) self._data_fx = np.concatenate([self._data_fx, array_fx]) self.__is_empty_data = False
def _convert_evaluation_sample(self, x, fx): if isinstance(fx, (float, int)): array_fx = np.array([[fx]]) elif isinstance(fx, EvaluationScore): array_fx = np.array([[fx.value]]) elif isinstance(fx, Dict): if not len(fx) == 1: raise NotImplementedError( "Currently only evaluations with a single metric are supported." ) array_fx = np.array([[list(fx.values())[0].value]]) else: raise TypeError( "Cannot update history for one sample and multiple evaluations." " Use batched update instead and provide a list of samples and " "a list of evaluation metrics." ) converted_x = self._convert_to_gpyopt_sample(x).reshape(1, -1) return converted_x, array_fx
[docs] def reset(self): """Reset the optimiser for a fresh start.""" super(BayesianOptimisation, self).reset() self._data_x = np.array([]) self._data_fx = np.array([]) self.__is_empty_data = True
BayesianOptimization = BayesianOptimisation def _overwrite_dict(old_dict, new_dict): updated_old = {} # copy the old dict for key, value in old_dict.items(): updated_old[key] = value # overwrite the existing and add the new values for key, value in new_dict.items(): updated_old[key] = value return updated_old