Source code for profit.sur.gp.gpy_surrogate

import numpy as np
from profit.sur import Surrogate
from profit.sur.gp import GaussianProcess
from profit.defaults import fit_gaussian_process as defaults, fit as base_defaults


[docs]@Surrogate.register("GPy") class GPySurrogate(GaussianProcess): """Surrogate for https://github.com/SheffieldML/GPy. Attributes: model (GPy.models): Model object of GPy. """ def __init__(self): import GPy self.GPy = GPy super().__init__() self.model = None
[docs] def train( self, X, y, kernel=defaults["kernel"], hyperparameters=defaults["hyperparameters"], fixed_sigma_n=base_defaults["fixed_sigma_n"], ): self.pre_train(X, y, kernel, hyperparameters, fixed_sigma_n) self.model = self.GPy.models.GPRegression( self.Xtrain, self.ytrain, self.kernel, noise_var=self.hyperparameters["sigma_n"] ** 2, ) self.optimize() self.post_train()
[docs] def add_training_data(self, X, y): """Adds training points to the existing dataset. This is important for Active Learning. The data is added but the hyperparameters are not optimized yet. Parameters: X (ndarray): Input points to add. y (ndarray): Observed output to add. """ self.Xtrain, self.ytrain = np.concatenate( [self.Xtrain, X], axis=0 ), np.concatenate([self.ytrain, y], axis=0) self.encode_training_data() self.model.set_XY(self.Xtrain, self.ytrain) self.decode_training_data()
[docs] def set_ytrain(self, y): """Set the observed training outputs. This is important for active learning. Parameters: y (np.array): Full training output data. """ self.ytrain = np.atleast_2d(y.copy()) self.encode_training_data() self.model.set_XY(self.Xtrain, self.ytrain) self.decode_training_data()
[docs] def predict(self, Xpred, add_data_variance=True): Xpred = self.pre_predict(Xpred) ymean, yvar = self.model.predict(Xpred, include_likelihood=add_data_variance) ymean, yvar = self.decode_predict_data(ymean, yvar) return ymean, yvar
[docs] def save_model(self, path): """Save the model as dict to a .hdf5 file. Parameters: path (str): Path including the file name, where the model should be saved. """ from profit.util.file_handler import FileHandler sur_dict = {attr: getattr(self, attr) for attr in ("Xtrain", "ytrain")} sur_dict["model"] = self.model.to_dict() sur_dict["input_encoders"] = str([enc.repr for enc in self.input_encoders]) sur_dict["output_encoders"] = str([enc.repr for enc in self.output_encoders]) FileHandler.save(path, sur_dict)
[docs] @classmethod def load_model(cls, path): """Loads a saved model from a .hdf5 file and updates its attributes. In case of a multi-output model, the .pkl file is loaded, since .hdf5 is not supported yet. Parameters: path (str): Path including the file name, from where the model should be loaded. Returns: GPy.models: Instantiated surrogate model. """ from profit.util.file_handler import FileHandler from profit.sur.encoders import Encoder from GPy import models from numpy import array # needed for eval of arrays self = cls() sur_dict = FileHandler.load(path, as_type="dict") self.model = models.GPRegression.from_dict(sur_dict["model"]) self.Xtrain, self.ytrain = sur_dict["Xtrain"], sur_dict["ytrain"] for enc in eval(sur_dict["input_encoders"]): self.add_input_encoder( Encoder[enc["class"]](enc["columns"], enc["parameters"]) ) for enc in eval(sur_dict["output_encoders"]): self.add_output_encoder( Encoder[enc["class"]](enc["columns"], enc["parameters"]) ) # Initialize the encoder by encoding and decoding the training data once. self.encode_training_data() self.ndim = self.Xtrain.shape[-1] self.decode_training_data() self.kernel = self.model.kern self._set_hyperparameters_from_model() self.trained = True self.print_hyperparameters("Loaded") return self
[docs] def select_kernel(self, kernel): """Get the GPy.kern kernel by matching the given string kernel identifier. Parameters: kernel (str): Kernel string such as 'RBF' or depending on the surrogate also product and sum kernels such as 'RBF+Matern52'. Returns: GPy.kern: GPy kernel object. Currently, for sum and product kernels, the initial hyperparameters are the same for all kernels. """ try: if not any(operator in kernel for operator in ("+", "*")): # Single kernel return getattr(self.GPy.kern, kernel)( self.ndim, lengthscale=self.hyperparameters.get("length_scale", [1]), variance=self.hyperparameters.get("sigma_f", 1) ** 2, ARD=len(self.hyperparameters.get("length_scale", [1])) > 1, ) else: from re import split full_str = split("([+*])", kernel) kern = [] for key in full_str: kern += [ key if key in ("+", "*") else "self.GPy.kern.{}({}, lengthscale={}, variance={})".format( key, self.ndim, self.hyperparameters.get("length_scale", [1]), self.hyperparameters.get("sigma_f", 1) ** 2, ) ] return eval("".join(kern)) except AttributeError: raise RuntimeError("Kernel {} is not implemented.".format(kernel))
[docs] def optimize(self, return_hess_inv=False, **opt_kwargs): """For hyperparameter optimization the GPy base optimization is used. Currently, the inverse Hessian can not be retrieved, which limits the active learning effectivity. Parameters: return_hess_inv (bool): Is not considered currently. opt_kwargs: Keyword arguments used directly in the GPy base optimization. """ self.model.optimize(**opt_kwargs) self._set_hyperparameters_from_model() self.print_hyperparameters("Optimized")
[docs] def _set_hyperparameters_from_model(self): r"""Helper function to set the hyperparameter dict from the model. It depends on whether it is a single kernel or a combined one. """ if hasattr(self.model.kern, "lengthscale"): self.hyperparameters["length_scale"] = self.model.kern.lengthscale.values self.hyperparameters["sigma_f"] = np.sqrt(self.model.kern.variance) self.hyperparameters["sigma_n"] = np.sqrt(self.model.likelihood.variance) elif hasattr(self.model.kern, "parts"): for part in self.model.kern.parts: for key, value in zip(part.parameter_names(), part.param_array): value = np.atleast_1d(value) if key == "lengthscale": self.hyperparameters["length_scale"] = value elif key == "variance": self.hyperparameters["sigma_f"] = np.sqrt(value) else: self.hyperparameters[key] = value noise_var = self.model.likelihood.gaussian_variance( self.model.Y_metadata ).reshape(-1, 1, order="F") self.hyperparameters["sigma_n"] = np.sqrt(np.max(noise_var, axis=0)) self.decode_hyperparameters()
[docs] def special_hyperparameter_decoding(self, key, value): has_ard = ( any(p.ARD for p in self.model.kern.parts) if hasattr(self.model.kern, "parts") else self.model.kern.ARD ) if key == "length_scale" and self.ndim > 1 and not has_ard: return np.atleast_1d(np.linalg.norm(value)) elif key in ("sigma_f", "sigma_n") and len(value) > 1: return np.atleast_1d(np.max(value)) return value
[docs]@Surrogate.register("CoregionalizedGPy") class CoregionalizedGPySurrogate(GPySurrogate):
[docs] def pre_train( self, X, y, kernel=defaults["kernel"], hyperparameters=defaults["hyperparameters"], fixed_sigma_n=base_defaults["fixed_sigma_n"], ): super().pre_train(X, y, kernel, hyperparameters, fixed_sigma_n) self.output_ndim = self.ytrain.shape[-1]
[docs] def train( self, X, y, kernel=defaults["kernel"], hyperparameters=defaults["hyperparameters"], fixed_sigma_n=base_defaults["fixed_sigma_n"], ): self.pre_train(X, y, kernel, hyperparameters, fixed_sigma_n) icm = self.GPy.util.multioutput.ICM( input_dim=self.ndim, num_outputs=self.output_ndim, kernel=self.kernel ) _X = self.output_ndim * [self.Xtrain] _y = [self.ytrain[:, d].reshape(-1, 1) for d in range(self.output_ndim)] self.model = self.GPy.models.GPCoregionalizedRegression(_X, _y, kernel=icm) self.optimize() self.post_train()
[docs] def add_training_data(self, X, y): self.Xtrain, self.ytrain = np.concatenate( [self.Xtrain, X], axis=0 ), np.concatenate([self.ytrain, y], axis=0) self.encode_training_data() new_Xtrain = np.empty( (self.Xtrain.shape[0] * self.output_ndim, self.Xtrain.shape[-1] + 1) ) for d in range(self.output_ndim): start_idx = self.Xtrain.shape[0] * d end_idx = start_idx + self.Xtrain.shape[0] new_Xtrain[start_idx:end_idx] = np.hstack( [self.Xtrain, np.ones((self.Xtrain.shape[0], 1)) * d] ) new_noise_dict = {"output_index": new_Xtrain[:, -1:].astype(int)} self.model.Y_metadata = new_noise_dict self.model.set_XY(new_Xtrain, self.ytrain.reshape(-1, 1, order="F")) self.decode_training_data()
[docs] def set_ytrain(self, y): self.ytrain = np.atleast_2d(y.copy()) self.encode_training_data() self.model.set_Y(self.ytrain.reshape(-1, 1, order="F")) self.decode_training_data()
[docs] def predict(self, Xpred, add_data_variance=True): Xpred = super().pre_predict(Xpred) ymean = np.empty((Xpred.shape[0], self.output_ndim)) yvar = ymean.copy() for d in range(self.output_ndim): newX = np.hstack([Xpred, np.ones((Xpred.shape[0], 1)) * d]) noise_dict = {"output_index": newX[:, -1:].astype(int)} ym, yv = self.model.predict(newX, Y_metadata=noise_dict) ymean[:, d], yvar[:, d] = ym.flatten(), yv.flatten() ymean, yvar = self.decode_predict_data(ymean, yvar) return ymean, yvar
[docs] def save_model(self, path): # GPy does not support to_dict method for Coregionalization kernel yet. from profit.util.file_handler import FileHandler from os.path import splitext filepath, ending = splitext(path) if ending != ".pkl": print( f"Saving to '{ending}' not implemented yet for {self.__class__.__name__} surrogate." " Saving to '.pkl' file instead." ) save_list = [ self.model, self.Xtrain, self.ytrain, str([enc.repr for enc in self.input_encoders]), str([enc.repr for enc in self.output_encoders]), ] FileHandler.save(filepath + ".pkl", save_list)
[docs] @classmethod def load_model(cls, path): from profit.util.file_handler import FileHandler from profit.sur.encoders import Encoder from numpy import array # needed for eval of arrays self = cls() try: ( self.model, self.Xtrain, self.ytrain, input_encoder_str, output_encoder_str, ) = FileHandler.load(path, as_type="raw") except (OSError, FileNotFoundError): from os.path import splitext print("File not found. Try changing file ending to '.pkl'!") path = splitext(path)[0] + ".pkl" # Load multi-output model from pickle file ( self.model, self.Xtrain, self.ytrain, input_encoder_str, output_encoder_str, ) = FileHandler.load(path, as_type="raw") for enc in eval(input_encoder_str): self.add_input_encoder( Encoder[enc["class"]](enc["columns"], enc["parameters"]) ) for enc in eval(output_encoder_str): self.add_output_encoder( Encoder[enc["class"]](enc["columns"], enc["parameters"]) ) self.output_ndim = int(max(self.model.X[:, -1])) + 1 # Initialize the encoder by encoding and decoding the training data once. self.encode_training_data() self.decode_training_data() self.kernel = self.model.kern self.ndim = self.Xtrain.shape[-1] self._set_hyperparameters_from_model() self.trained = True self.print_hyperparameters("Loaded") return self
[docs] def special_hyperparameter_decoding(self, key, value): if key == "length_scale" and self.ndim > 1 and not self.model.kern.parts[0].ARD: return np.atleast_1d(np.linalg.norm(value)) elif key in ("sigma_f", "sigma_n") and len(value) > 1: return np.atleast_1d(np.max(value)) elif key in ("W", "kappa"): new_value = value for enc in self.output_encoders: if enc.label == "Normalization": new_value = enc.decode_hyperparameters(value) return np.atleast_1d(np.min(new_value)) return value