Source code for profit.sur.gp.sklearn_surrogate

import numpy as np
from profit.sur import Surrogate
from profit.sur.gp import GaussianProcess
from profit.defaults import fit_gaussian_process as defaults, fit as base_defaults


[docs]@Surrogate.register("Sklearn") class SklearnGPSurrogate(GaussianProcess): """Surrogate for https://github.com/scikit-learn/scikit-learn Gaussian process. Attributes: model (sklearn.gaussian_process.GaussianProcessRegressor): Model object of Sklearn. """ def __init__(self): super().__init__() self.model = None
[docs] def train( self, X, y, kernel=defaults["kernel"], hyperparameters=defaults["hyperparameters"], fixed_sigma_n=base_defaults["fixed_sigma_n"], **kwargs ): from sklearn.gaussian_process import GaussianProcessRegressor self.pre_train(X, y, kernel, hyperparameters, fixed_sigma_n) numeric_noise = ( self.hyperparameters["sigma_n"].item() ** 2 if self.fixed_sigma_n else 1e-5 ) # Instantiate the model self.model = GaussianProcessRegressor(kernel=self.kernel, alpha=numeric_noise) # Train the model self.model.fit(self.Xtrain, self.ytrain) self.kernel = self.model.kernel_ self.post_train()
[docs] def post_train(self): self._set_hyperparameters_from_model() self.print_hyperparameters("Optimized") super().post_train()
[docs] def add_training_data(self, X, y): """Add training points to existing data. Parameters: X (ndarray): Input points to add. y (ndarray): Observed output to add. """ self.Xtrain = np.concatenate([self.Xtrain, X], axis=0) self.ytrain = np.concatenate([self.ytrain, y], axis=0)
[docs] def set_ytrain(self, ydata): """Set the observed training outputs. This is important for active learning. Parameters: ydata (np.array): Full training output data. """ self.ytrain = np.atleast_2d(ydata.copy())
[docs] def predict(self, Xpred, add_data_variance=True): Xpred = self.pre_predict(Xpred) ymean, ystd = self.model.predict(Xpred, return_std=True) yvar = ystd.reshape(-1, 1) ** 2 if add_data_variance: yvar = yvar + self.hyperparameters["sigma_n"] ** 2 ymean, yvar = self.decode_predict_data(ymean, yvar) return ymean, yvar
[docs] def save_model(self, path): """Save the SklGPSurrogate model to a pickle file. All attributes of the surrogate are loaded directly from the model. Parameters: path (str): Path including the file name, where the model should be saved. """ from pickle import dump dump(self.model, open(path, "wb"))
[docs] @classmethod def load_model(cls, path): """Load a saved SklGPSurrogate model from a pickle file and update its attributes. Parameters: path (str): Path including the file name, from where the model should be loaded. Returns: profit.sur.gaussian_process.SklearnGPSurrogate: Instantiated surrogate model. """ from pickle import load self = cls() self.model = load(open(path, "rb")) self.Xtrain = self.model.X_train_ self.ytrain = self.model.y_train_ self.kernel = self.model.kernel_ self.ndim = self.Xtrain.shape[-1] self.fixed_sigma_n = self.model.alpha != 1e-5 self.trained = True self._set_hyperparameters_from_model() self.print_hyperparameters("Loaded") return self
[docs] def optimize(self, **opt_kwargs): """For hyperparameter optimization the Sklearn base optimization is used. Currently, the inverse Hessian can not be retrieved, which limits the active learning effectivity. Parameters: opt_kwargs: Keyword arguments used directly in the Sklearn base optimization. """ self.encode_training_data() self.model.fit(self.Xtrain, self.ytrain, **opt_kwargs) self.decode_training_data() self._set_hyperparameters_from_model() self.print_hyperparameters("Optimized")
[docs] def select_kernel(self, kernel): """Get the sklearn.gaussian_process.kernels kernel by matching the given kernel identifier. Parameters: kernel (str): Kernel string such as 'RBF' or depending on the surrogate also product and sum kernels such as 'RBF+Matern52'. Returns: sklearn.gaussian_process.kernels: Scikit-learn kernel object. Currently, for sum and product kernels, the initial hyperparameters are the same for all kernels. """ from re import split from sklearn.gaussian_process import kernels as sklearn_kernels full_str = split("([+*])", kernel) try: kernel = [] for key in full_str: kernel += [ key if key in ("+", "*") else getattr(sklearn_kernels, key)( length_scale=self.hyperparameters["length_scale"] ) ] except AttributeError: raise RuntimeError("Kernel {} is not implemented.".format(kernel)) if len(kernel) == 1: kernel = kernel[0] else: kernel = [str(key) if not isinstance(key, str) else key for key in kernel] kernel = eval("".join(kernel)) # Add scale and noise to kernel kernel *= sklearn_kernels.ConstantKernel( constant_value=1 / self.hyperparameters["sigma_f"].item() ** 2 ) if not self.fixed_sigma_n: kernel += sklearn_kernels.WhiteKernel( noise_level=self.hyperparameters["sigma_n"].item() ** 2 ) return kernel
[docs] def _set_hyperparameters_from_model(self): r"""Helper function to set the hyperparameter dict from the model. It depends on whether $\sigma_n$ is fixed. Currently this is only stable for single kernels and not for Sum and Prod kernels. """ if self.fixed_sigma_n: self.hyperparameters["length_scale"] = np.atleast_1d( self.model.kernel_.k1.length_scale ) self.hyperparameters["sigma_f"] = np.sqrt( np.atleast_1d(1 / self.model.kernel_.k2.constant_value) ) self.hyperparameters["sigma_n"] = np.sqrt(np.atleast_1d(self.model.alpha)) else: self.hyperparameters["length_scale"] = np.atleast_1d( self.model.kernel_.k1.k1.length_scale ) self.hyperparameters["sigma_f"] = np.sqrt( np.atleast_1d(1 / self.model.kernel_.k1.k2.constant_value) ) self.hyperparameters["sigma_n"] = np.sqrt( np.atleast_1d(self.model.kernel_.k2.noise_level) ) self.decode_hyperparameters()
# Draft for Scikit-learn implementation of LinearEmbedding kernel of Garnett (2014) from sklearn.gaussian_process.kernels import ( Kernel, Hyperparameter, StationaryKernelMixin, NormalizedKernelMixin, )
[docs]class LinearEmbedding(StationaryKernelMixin, NormalizedKernelMixin, Kernel): def __init__( self, dims, length_scale=np.array([1.0]), length_scale_bounds=(1e-5, 1e5) ): self.length_scale = length_scale self.dims = dims self.length_scale_bounds = length_scale_bounds @property def hyperparameter_length_scale(self): return Hyperparameter( "length_scale", "numeric", self.length_scale_bounds, len(self.length_scale) )
[docs] def __call__(self, X, Y=None, eval_gradient=False): X = np.atleast_2d(X) R = self.length_scale.reshape(self.dims) X1 = X @ R.T X2 = X @ R.T if Y is None else Y @ R.T dX = X1[:, np.newaxis, :] - X2[np.newaxis, :, :] dX_sq = np.linalg.norm(dX, axis=-1) ** 2 K = np.exp(-0.5 * dX_sq) if eval_gradient: K_gradient = np.einsum("ijk,kl", dX**2, R) * K[..., np.newaxis] return K, K_gradient return K
[docs] def __repr__(self): return "{0}(length_scale=[{1}])".format( self.__class__.__name__, ", ".join(map("{0:.3g}".format, self.length_scale)) )