"""Definition of the optimisation domain and a sample."""
import ast
import copy
import os
import pickle
import random
from collections import namedtuple
from typing import Tuple
__all__ = [
"Domain",
"DomainNotIterableError",
"DomainSpecificationError",
"Sample"
]
class _RecursiveDict:
"""Helper base class for the :class:`Domain` and :class:`Sample` classes.
It implements common logic for creation, representation, type conversion
and serialisation.
"""
def __init__(self, dct):
if isinstance(dct, dict):
self._data = dct
elif isinstance(dct, str):
self._data = ast.literal_eval(dct)
else:
raise TypeError(
f"A {self.__class__.__name__} object can be created from a "
f"Python dict or str objects only. "
f"Unknown type {type(dct)} at initialisation."
)
self._ndim = 0
for _, val in _deepiter_dict(self._data):
self._ndim += 1
def __hash__(self):
return hash(str(self))
def __repr__(self):
"""Return the representation of the recursive dict using the
string method.
"""
return str(self)
def __str__(self):
"""Return the string representation of the recursive dict."""
return str(self._data)
def __eq__(self, other):
"""Compare all subdomains for equal bounds and sets. The order of the
subdomains is not important.
"""
return self.as_dict() == other.as_dict()
def __len__(self):
"""Compute the dimensionality of the recursive dict as the length of
the flattened dict.
"""
return self._ndim
def __getitem__(self, item):
"""Return the item (possibly a subdomain) for a given key.
Args:
item: str of tuple of str. If the latter it will access nested
structures with the next str in the tuple.
"""
if isinstance(item, str):
return self._data.__getitem__(item)
elif isinstance(item, tuple) and all(map(lambda x: isinstance(x, str), item)):
sub_dict = self._data
for it in item:
if not isinstance(sub_dict, dict):
raise KeyError(f"Unknown sub-key {it}.")
sub_dict = sub_dict[it]
return sub_dict
def __add__(self, other: '_RecursiveDict'):
"""Merge self with the `other` :class:`_RecursiveDict`.
Args:
other: :class:`_RecursiveDict`. The recursive dictionary that will
be merged into the current one.
Returns:
A new :class:`_RecursiveDict` object consisting of the subdomains
of both domains. If the keys overlap and the subdomains are discrete
or categorical, the values will be unified.
Raises:
:obj:`ValueError`: if identical keys point to different values.
"""
flattened_a = self.flatten()
flattened_b = other.flatten()
# validate that the two _RecursiveDicts are disjoint
if len(flattened_a.keys()) > len(flattened_a.keys() - flattened_b.keys()):
raise ValueError(
f"Ambiguous addition of {self.__class__.__name__} objects."
)
merged = list(flattened_a.items())
merged.extend(list(flattened_b.items()))
return self.__class__.from_list(merged)
def flatten(self):
"""Return the flattened version of the recursive dict, i.e. without
nested dicts.
The keys of the nested subdomains are collected in a tuple to create a
new unique key. For the sake of type consistency, the key of a
non-nested subdomain is converted to a tuple with a single element.
"""
return {keys: val for keys, val in _deepiter_dict(self._data)}
def as_dict(self):
"""Convert the recursive dict object from :class:`_RecursiveDict`
to :obj:`dict` type.
"""
return copy.deepcopy(self._data)
@classmethod
def from_list(cls, lst):
"""Create a :class:`_RecursiveDict` object from a list of tuples.
Args:
lst: :obj:`List[Tuple]`. Each element is a pair of the keys
(tuple of strings) and the value.
Returns:
A :class:`_RecursiveDict` object.
Raises:
:obj:`ValueError`: if the list contains duplicating keys with
different values.
Examples:
```python
>>> lst = [(("a", "b"), {2, 3, 4}), (("c",), [0, 0.1])]
>>> _RecursiveDict.from_list(lst)
{"a": {"b": {2, 3, 4}}, "c": [0, 0.1]}
```
"""
dct = {}
head = dct
for keys, vals in lst:
if not keys:
continue
for k in keys[:-1]:
if k not in dct:
dct[k] = {}
dct = dct[k]
if keys[-1] in dct and dct[keys[-1]] == vals:
raise ValueError(f"Duplicating entries for keys {keys}.")
dct[keys[-1]] = vals
dct = head
return cls(head)
def serialise(self, filepath=None):
"""Serialise the :class:`_RecursiveDict` object to a file or a string
if `filepath` is not supplied.
Args:
filepath: (optional) :obj:`str`. Filepath as to dump the serialised
:class:`_RecursiveDict` object.
Returns:
The bytes representing the serialised :class:`_RecursiveDict` object.
"""
serialised = pickle.dumps(self._data)
if filepath is not None:
with open(filepath, "wb") as fp:
pickle.dump(self._data, fp)
return serialised
@classmethod
def deserialise(cls, series):
"""Deserialise a serialised :class:`_RecursiveDict` object from a byte
stream or file.
Args:
series: :obj:`str`. The serialised :class:`_RecursiveDict` object or
a filepath to it.
Returns:
A :class:`_RecursiveDict` object.
"""
if not isinstance(series, (bytes, bytearray)) and os.path.isfile(series):
with open(series, "rb") as fp:
return cls(pickle.load(fp))
return cls(pickle.loads(series))
def as_namedtuple(self):
"""Convert a :class:`_RecursiveDict` to a namedtuple type.
Returns:
A Python namedtuple object with names the same as the keys of the
:class:`_RecursiveDict` dict. Nested dicts are accessed by
successive attribute getters.
Examples:
```python
>>> rd = _RecursiveDict({"a": {"b": [1, 2]}, "c": {1, 2, 3}, "d": 2.})
>>> nt = rd.as_namedtuple()
>>> nt.a.b
[1, 2]
>>> nt.c == {1, 2, 3} and nt.d == 2.
True
```
"""
def helper(dct):
keys, vals = [], []
for k, v in dct.items():
keys.append(k)
if isinstance(v, dict):
vals.append(helper(v))
else:
vals.append(v)
# The dict.keys() and dict.values() will iterate in the same order
# as long as dct is not modified.
return namedtuple("NT_" + self.__class__.__name__, keys)(*vals)
return helper(self._data)
[docs]class Domain(_RecursiveDict):
"""Defines the optimisation domain of the objective function. It can be a
continuous interval or a discrete set of numeric or non-numeric values.
The latter is also designated as a categorical domain. It is represented as
a Python dict object with the keys naming the variables and the values defining
the set of allowed values. A :class:`Domain` can also be recursively
specified. That is, a key can name a subdomain represented as a Python dict.
For continuous sets use Python list to define an interval in the form
[a, b], a < b. For discrete sets use Python sets, e.g. {1, 2, 5, -0.1}
or {"option_a", "option_b"}.
Examples:
>>> simple_domain = {"x": {0, 1},
>>> "y": [-1, 1],
>>> "z": {-1, 2, 4}}
>>> nested_domain = {"discrete": {"x": {1, 2, 3}, "y": {4, 5, 6}}
>>> "continuous": {"x": [-4, 4], "y": [0, 1]}
>>> "categorical": {"opt1", "opt2"}}
"""
# Domain types
Continuous = 1
Discrete = 2
Categorical = 3
Invalid = 4
[docs] def __init__(self, dct, seed=None):
"""Initialise the :class:`Domain`.
Args:
dct: :obj:`dict`. The mapping of variable names to sets of
allowed values.
seed: (optional) :obj:`int`. Seed for the randomised sampling.
"""
super(Domain, self).__init__(dct)
self._validate()
self._rng = random.Random(seed)
self._is_continuous = False
for _, val in _deepiter_dict(self._data):
if isinstance(val, list):
self._is_continuous = True
[docs] def __iter__(self):
"""Iterate over the domain if it is fully discrete.
The iterations are over the Cartesian product of all 1-dim discrete
subdomains.
Raises:
:class:`DomainNotIterableError`: if the domain has a at least one
continuous subdomain.
"""
if self._is_continuous:
raise DomainNotIterableError(
"The domain has a continuous subdomain and cannot be iterated."
)
def cartesian_walk(dct):
if dct:
key, vals = dct.popitem()
if isinstance(vals, set):
for v in vals:
yield from (
dict(**rem, **{key: v})
for rem in cartesian_walk(copy.deepcopy(dct))
)
elif isinstance(vals, dict):
for sub_v in cartesian_walk(copy.deepcopy(vals)):
yield from (
dict(**rem, **{key: sub_v})
for rem in cartesian_walk(copy.deepcopy(dct))
)
else:
raise TypeError(
f"Unexpected subdomain of type {type(vals)}."
)
else:
yield {}
yield from map(Sample, cartesian_walk(copy.deepcopy(self._data)))
def _validate(self):
"""Check for invalid domain specifications."""
for keys, values in _deepiter_dict(self._data):
if not (all(map(lambda x: isinstance(x, str), keys))
and isinstance(values, (set, list, dict))):
raise DomainSpecificationError(
"Keys must be of type string and values "
"must be either of type set, list or dict."
)
if (isinstance(values, list)
and (len(values) != 2 or values[0] >= values[1])):
raise DomainSpecificationError(
"Interval must be specified by two numbers: [a, b], a < b."
)
[docs] def sample(self):
"""Draw a sample from the domain. All subdomains are sampled uniformly.
Returns:
A :class:`Sample` object.
"""
def sample_dict(dct):
sample = {}
for key, vals in dct.items():
if isinstance(vals, set):
sample[key] = self._rng.choice(list(vals))
elif isinstance(vals, list):
sample[key] = self._rng.uniform(*vals)
else:
sample[key] = sample_dict(vals)
return sample
return Sample(sample_dict(self._data))
@property
def is_continuous(self):
"""Return `True` if at least one subdomain is continuous."""
return self._is_continuous
[docs] @classmethod
def get_type(cls, subdomain):
"""Return the type of the set of values in a subdomain.
Args:
subdomain: one of :obj:`dict`, :obj:`list` or :obj:`set`. The
subdomain to get the type for.
Returns:
One of `Domain.Continuous`, `Domain.Discrete`, `Domain.Categorical`
or `Domain.Invalid`.
"""
def is_numeric(x):
try:
float(x)
except ValueError:
return False
return True
if isinstance(subdomain, list):
return Domain.Continuous
if isinstance(subdomain, set):
if all(map(is_numeric, subdomain)):
return Domain.Discrete
return Domain.Categorical
return Domain.Invalid
[docs] def split_by_type(self) -> Tuple['Domain', 'Domain', 'Domain']:
"""Split the domain into discrete, categorical and continuous
subdomains respectively.
Returns:
A tuple of three :class:`Domain` objects for the discrete
numerical, categorical and continuous subdomains.
"""
discrete, categorical, continuous = [], [], []
for keys, vals in self.flatten().items():
if Domain.get_type(vals) == Domain.Continuous:
continuous.append((keys, vals))
elif Domain.get_type(vals) == Domain.Categorical:
categorical.append((keys, vals))
elif Domain.get_type(vals) == Domain.Discrete:
discrete.append((keys, vals))
else:
raise ValueError("Encountered an invalid subdomain.")
return (
Domain.from_list(discrete),
Domain.from_list(categorical),
Domain.from_list(continuous)
)
class DomainNotIterableError(TypeError):
"""Alias for the :obj:`TypeError` raised during iteration of (partially)
continuous :class:`Domain` object.
"""
pass
class DomainSpecificationError(ValueError):
"""Alias for the :obj:`ValueError` raised during :class:`Domain` object
creation from an invalid set of values.
"""
pass
[docs]class Sample(_RecursiveDict):
"""Defines a sample from the optimisation domain.
It has the same recursive structure a :class:`Domain` object, however each
dimension is represented by one value only. The keys are exactly as the
keys of the respective domain.
Examples:
>>> domain = Domain({"x": {"y": {0, 1, 2}}, "z": [3, 4]})
>>> domain.sample()
{'x': {'y': 0}, 'z': 3.1415926535897932}
"""
[docs] def __init__(self, dct):
"""Initialise the :class:`Sample` object from a dict."""
super(Sample, self).__init__(dct)
[docs] def __iter__(self):
"""Iterate over all values in the sample.
Yields:
A tuple of keys and a single value, where the keys are a tuple
of strings.
"""
yield from self.flatten().items()
def _deepiter_dict(dct):
"""Iterate over all key, value pairs of a (possibly nested) dictionary.
In this case, all keys of the nested dicts are summarised in a tuple.
Args:
dct: dict object to iterate.
Yields:
Tuple of keys (itself a tuple) and the corresponding value.
Examples:
>>> list(_deepiter_dict({"a": {"b": 1, "c": 2}, "d": 3}))
[(('a', 'b'), 1), (('a', 'c'), 2), (('d',), 3)]
"""
def chained_keys_iter(prefix_keys, dct_tmp):
for key, val in dct_tmp.items():
chained_keys = prefix_keys + (key,)
if isinstance(val, dict):
yield from chained_keys_iter(chained_keys, val)
else:
yield chained_keys, val
yield from chained_keys_iter((), dct)