Source code for profit.sur.sur

"""
Abstract base class for all surrogate models.

Class structure:
- Surrogate
    - GaussianProcess
        - GPSurrogate (Custom)
        - GPySurrogate (GPy)
        - SklearnGPSurrogate (Sklearn)
    - ANN
        - ANNSurrogate (Pytorch)
        - Autoencoder (Pytorch)
    - LinearRegression
        - Linear regression surrogates (Work in progress)
"""

from abc import abstractmethod
import numpy as np
from profit.util.base_class import CustomABC
from profit.defaults import fit as defaults


[docs]class Surrogate(CustomABC): """Base class for all surrogate models. Attributes: trained (bool): Flag that indicates if the model is already trained and ready to make predictions. fixed_sigma_n (bool): Indicates if the data noise should be optimized or not. Xtrain (ndarray): Input training points. ytrain (ndarray): Observed output data. Vector output is supported for independent variables only. ndim (int): Dimension of input data. output_ndim (int): Dimension of output data. input_encoders (list of profit.sur.encoders.Encoder): Encoding used on input data. output_encoders (list of profit.sur.encoders.Encoder): Encoding used on output data. Default parameters: surrogate: GPy save: ./model_{surrogate label}.hdf5 load: False fixed_sigma_n: False input_encoders: [{'class': 'exclude', 'columns': {constant columns} {'class': 'log10', 'columns': {log input columns}, 'parameters': {}}, {'class': 'normalization', 'columns': {input columns}, 'parameters': {}}] output_encoders: [{'class': 'normalization', 'columns': {output columns}, 'parameters': {}}] """ labels = {} # All surrogates are registered here def __init__(self): self.trained = False self.fixed_sigma_n = False self.Xtrain = None self.ytrain = None self.ndim = None # TODO: Consistency between len(base_config['input']) and self.Xtrain.shape[-1] self.output_ndim = 1 self.input_encoders = [] self.output_encoders = []
[docs] def encode_training_data(self): """Encodes the input and output training data.""" for enc in self.input_encoders: self.Xtrain = enc.encode(self.Xtrain) for enc in self.output_encoders: self.ytrain = enc.encode(self.ytrain)
[docs] def decode_training_data(self): """Applies the decoding function of the encoder in reverse order on the input and output training data.""" for enc in self.input_encoders[::-1]: self.Xtrain = enc.decode(self.Xtrain) for enc in self.output_encoders[::-1]: self.ytrain = enc.decode(self.ytrain)
[docs] def encode_predict_data(self, x): """Transforms the input prediction points according to the encoder used for training. Parameters: x (ndarray): Prediction input points. Returns: ndarray: Encoded and normalized prediction points. """ for enc in self.input_encoders: x = enc.encode(x) return x
[docs] def decode_predict_data(self, ym, yv): """Rescales and then back-transforms the predicted output. Parameters: ym (ndarray): Predictive output. yv (ndarray): Variance of predicted output. Returns: tuple: a tuple containing: - ym (ndarray) Rescaled and decoded output values at the test input points. - yv (ndarray): Rescaled predictive variance. """ for enc in self.output_encoders[::-1]: ym = enc.decode(ym) yv = enc.decode_variance(yv) return ym, yv
[docs] def add_input_encoder(self, encoder): """Add encoder on input data. Parameters: encoder (profit.sur.encoder.Encoder) """ self.input_encoders.append(encoder)
[docs] def add_output_encoder(self, encoder): """Add encoder on output data. Parameters: encoder (profit.sur.encoder.Encoder) """ self.output_encoders.append(encoder)
[docs] @abstractmethod def train(self, X, y, fixed_sigma_n=defaults["fixed_sigma_n"]): r"""Trains the surrogate on input points X and model outputs y. Depending on the surrogate, the signature can vary. Parameters: X (ndarray): Input training points. y (ndarray): Observed output data. fixed_sigma_n (bool): Whether the noise $\sigma_n$ is fixed during optimization. """ pass
[docs] def pre_train(self, X, y): """Check the training data Parameters: X: (n, d) or (n,) array of input training data. y: (n, D) or (n,) array of training output. """ X, y = np.asarray(X), np.asarray(y) # more verbose than check_ndim if X.ndim == 1: X = X.reshape(-1, 1) elif X.ndim != 2: raise ValueError( f"X should have shape (n,) or (n, d) but has shape {X.shape}" ) if y.ndim == 1: y = y.reshape(-1, 1) elif y.ndim != 2: raise ValueError( f"y should have shape (n,) or (n, D) but has shape {y.shape}" ) if X.shape[0] != y.shape[0]: raise ValueError( f"mismatched number of data points for X and y: {X.shape[0]} != {y.shape[0]}" ) self.Xtrain, self.ytrain = X, y self.encode_training_data() self.ndim = self.Xtrain.shape[-1]
[docs] def post_train(self): self.trained = True self.decode_training_data()
[docs] @abstractmethod def predict(self, Xpred, add_data_variance=True): r"""Predicts model output y for input Xpred based on surrogate. Parameters: Xpred (ndarray/list): Input points for prediction. add_data_variance (bool): Adds the data noise $\sigma_n^2$ to the prediction variance. This is especially useful for plotting. Returns: tuple: a tuple containing: - ymean (ndarray) Predicted output values at the test input points. - yvar (ndarray): Generally the uncertainty of the fit. For Gaussian Processes this is the diagonal of the posterior covariance matrix. """ pass
[docs] def pre_predict(self, Xpred): """Prepares the surrogate for prediction by checking if it is trained and validating the data. Parameters: Xpred (ndarray): (n, d) or (n,) array of input points for prediction Returns: ndarray: Checked input data or default values inferred from training data. """ if not self.trained: raise RuntimeError("Need to train() before predict()!") if Xpred is None: Xpred = self.default_Xpred() # more verbose thant check_ndim Xpred = np.asarray(Xpred) if Xpred.ndim == 1: Xpred = Xpred.reshape(-1, 1) elif Xpred.ndim != 2: if self.ndim == 1: raise ValueError( f"Xpred should have shape (n,) or (n, 1) but has shape {Xpred.shape}" ) raise ValueError( f"Xpred should have shape (n, {self.ndim}) but has shape {Xpred.shape}" ) Xpred = self.encode_predict_data(Xpred) if Xpred.shape[1] != self.ndim: raise ValueError( f"Xpred should have shape (n, {self.ndim}) but has shape {Xpred.shape}" ) return Xpred
[docs] @abstractmethod def save_model(self, path): """Saves the surrogate to a file. The file format can vary between surrogates. As default, the surrogate is saved to 'base_dir/model_{surrogate_label}.hdf5'. Parameters: path (str): Path including the file name, where the model should be saved. """ pass
[docs] @classmethod @abstractmethod def load_model(cls, path): """Loads a saved surrogate from a file. The file format can vary between surrogates. Identifies the surrogate by its class label in the file name. Parameters: path (str): Path including the file name, from where the model should be loaded. Returns: profit.sur.Surrogate: Instantiated surrogate model. """ label = defaults["surrogate"] for f in filter(lambda l: l in path, cls.labels): if len(f) > len(label): label = f return cls[label].load_model(path)
[docs] @classmethod @abstractmethod def from_config(cls, config, base_config): """Instantiates a surrogate based on the parameters given in the configuration file and delegates to child. Parameters: config (dict): Only the 'fit' part of the base_config. base_config (dict): The whole configuration parameters. """ from .encoders import Encoder child = cls[config["surrogate"]] if config.get("load"): child_instance = child.load_model(config["load"]) else: child_instance = child.from_config(config, base_config) # Set global attributes child_instance.ndim = len(base_config["input"]) child_instance.output_ndim = len(base_config["output"]) child_instance.fixed_sigma_n = config["fixed_sigma_n"] for enc in config["_input_encoders"]: child_instance.add_input_encoder( Encoder[enc["class"]](enc["columns"], enc["parameters"]) ) for enc in config["_output_encoders"]: child_instance.add_output_encoder( Encoder[enc["class"]](enc["columns"], enc["parameters"]) ) return child_instance
[docs] def plot( self, Xpred=None, independent=None, show=False, ref=None, add_data_variance=True, axes=None, ): r"""Simple plotting for dimensions <= 2. Fore more sophisticated plots use the command 'profit ui'. Parameters: Xpred (ndarray): Prediction points where the fit is plotted. If None, it is inferred from the training points. independent (dict): Dictionary of independent variables from config. show (bool): If the figure should be shown directly. ref (ndarray): Reference function which is fitted. add_data_variance (bool): Adds the data noise $\sigma_n^2$ to the prediction variance. axes (matplotlib.pyplot.axes): Axes object to insert the plot into. If None, a new figure is created. """ import matplotlib.pyplot as plt if Xpred is None: Xpred = self.default_Xpred() ypred, yvarpred = self.predict(Xpred, add_data_variance=add_data_variance) ystd_pred = np.sqrt(yvarpred) if independent: # 2D with one input parameter and one independent variable. if self.ndim == 1 and ypred.ndim == 2: ax = axes or plt.axes(projection="3d") xind = np.hstack([v["value"] for v in independent.values()]) xtgrid = np.meshgrid(*[xind, self.Xtrain]) xgrid = np.meshgrid(*[xind, Xpred]) for i in range(self.Xtrain.shape[0]): ax.plot( xtgrid[0][i], xtgrid[1][i], self.ytrain[i], color="blue", linewidth=2, ) ax.plot_surface(xgrid[0], xgrid[1], ypred, color="red", alpha=0.8) ax.plot_surface( xgrid[0], xgrid[1], ypred + 2 * ystd_pred, color="grey", alpha=0.6 ) ax.plot_surface( xgrid[0], xgrid[1], ypred - 2 * ystd_pred, color="grey", alpha=0.6 ) else: raise NotImplementedError( "Plotting is only implemented for dimensions <= 2. Use profit ui instead." ) else: if self.ndim == 1 and ypred.shape[-1] == 1: # Only one input parameter to plot. ax = axes or plt.axes() if ref: ax.plot(Xpred, ref(Xpred), color="red") ax.plot(Xpred, ypred) ax.scatter(self.Xtrain, self.ytrain, marker="x", s=50, c="k") ax.fill_between( Xpred.flatten(), ypred.flatten() + 2 * ystd_pred.flatten(), ypred.flatten() - 2 * ystd_pred.flatten(), color="grey", alpha=0.6, ) elif self.ndim == 2 and ypred.shape[-1] == 1: # Two fitted input variables. ax = axes or plt.axes(projection="3d") ypred = ypred.flatten() ystd_pred = ystd_pred.flatten() ax.scatter( self.Xtrain[:, 0], self.Xtrain[:, 1], self.ytrain, color="red", alpha=0.8, ) ax.plot_trisurf(Xpred[:, 0], Xpred[:, 1], ypred, color="red", alpha=0.8) ax.plot_trisurf( Xpred[:, 0], Xpred[:, 1], ypred + 2 * ystd_pred, color="grey", alpha=0.6, ) ax.plot_trisurf( Xpred[:, 0], Xpred[:, 1], ypred - 2 * ystd_pred, color="grey", alpha=0.6, ) elif self.ndim == 1 and self.output_ndim == 2: # One input variable and two outputs ax = axes or plt.axes() for d in range(self.output_ndim): yp = ypred[:, d] ystd_p = ystd_pred[:, d] ax.scatter(self.Xtrain, self.ytrain[:, d], alpha=0.8) ax.plot(Xpred, yp) ax.fill_between( Xpred.flatten(), yp + 2 * ystd_p, yp - 2 * ystd_p, alpha=0.6 ) else: raise NotImplementedError( "Plotting is only implemented for dimension <= 2. Use profit ui instead." ) if show: plt.show()
[docs] def default_Xpred(self): """Infer prediction values from training points in each dimension. Currently a dense grid is created. This becomes inefficient for > 3 dimensions. Returns: ndarray: Prediction points. """ if self.ndim <= 3: minval = self.Xtrain.min(axis=0) maxval = self.Xtrain.max(axis=0) npoints = [50] * len(minval) xpred = [ np.linspace(minv, maxv, n) for minv, maxv, n in zip(minval, maxval, npoints) ] return np.hstack( [xi.flatten().reshape(-1, 1) for xi in np.meshgrid(*xpred)] ) else: raise RuntimeError("Require x for prediction in > 3 dimensions!")