import numpy as np
from profit.sur import Surrogate
from profit.sur.gp import GaussianProcess
from profit.defaults import fit_gaussian_process as defaults, fit as base_defaults
[docs]@Surrogate.register("Custom")
class GPSurrogate(GaussianProcess):
"""Custom GP model made from scratch.
Supports custom Python and Fortran kernels with analytic derivatives and advanced Active Learning.
Attributes:
hess_inv (ndarray): Inverse Hessian matrix which is required for active learning.
It is calculated during the hyperparameter optimization.
"""
from .backend import gp_functions
def __init__(self):
super().__init__()
self.hess_inv = None
@property
def Ky(self):
"""Full training covariance matrix as defined in the kernel
including data noise as specified in the hyperparameters.
"""
return self.kernel(self.Xtrain, self.Xtrain, **self.hyperparameters)
@property
def alpha(self):
r"""Convenient matrix-vector product of the inverse training matrix and the training output data.
The equation is solved either exactly or with a least squares approximation.
$$
\begin{equation}
\alpha = K_y^{-1} y_{train}
\end{equation}
$$
"""
try:
return np.linalg.solve(self.Ky, self.ytrain)
except np.linalg.LinAlgError:
return np.linalg.lstsq(self.Ky, self.ytrain, rcond=1e-15)[0]
[docs] def train(
self,
X,
y,
kernel=defaults["kernel"],
hyperparameters=defaults["hyperparameters"],
fixed_sigma_n=base_defaults["fixed_sigma_n"],
eval_gradient=True,
return_hess_inv=False,
):
"""After initializing the model with a kernel function and initial hyperparameters,
it can be trained on input data X and observed output data y by optimizing the model's hyperparameters.
This is done by minimizing the negative log likelihood.
Parameters:
X (ndarray): (n, D) array of input training data.
y (ndarray): (n, 1) array of training output.
Currently, only scalar output is supported.
kernel (str/object): Identifier of kernel like 'RBF' or directly the kernel object of the
specific surrogate.
hyperparameters (dict): Hyperparameters such as length_scale, variance and noise.
Taken either from given parameter, config file or inferred from the training data.
The hyperparameters can be different depending on the kernel. E.g. The length_scale can be a scalar,
a vector of the size of the training data, or for the custom LinearEmbedding kernel a matrix.
fixed_sigma_n (bool): Indicates if the data noise should be optimized or not.
eval_gradient (bool): Whether the gradients of the kernel and negative log likelihood are
explicitly used in the scipy optimization or numerically calculated inside scipy.
return_hess_inv (bool): Whether to the attribute hess_inv after optimization. This is important
for active learning.
"""
self.pre_train(X, y, kernel, hyperparameters, fixed_sigma_n)
if self.input_encoders or self.output_encoders:
print(
"For now, encoding is not supported for this surrogate. The model is created without encoding."
)
self.decode_training_data()
# Find best hyperparameters
self.optimize(
fixed_sigma_n=self.fixed_sigma_n,
eval_gradient=eval_gradient,
return_hess_inv=return_hess_inv,
)
self.post_train()
[docs] def post_train(self):
self.trained = True
[docs] def predict(self, Xpred, add_data_variance=True):
Xpred = super().pre_predict(Xpred)
# Encoding is not supported yet for this surrogate.
for enc in self.input_encoders[::-1]:
Xpred = enc.decode(Xpred)
# Skip data noise sigma_n in hyperparameters
prediction_hyperparameters = {
key: value
for key, value in self.hyperparameters.items()
if key != "sigma_n"
}
# Calculate conditional mean and covariance functions
Kstar = self.kernel(self.Xtrain, Xpred, **prediction_hyperparameters)
Kstarstar_diag = np.diag(
self.kernel(Xpred, Xpred, **prediction_hyperparameters)
)
fstar = Kstar.T @ self.alpha
vstar = Kstarstar_diag - np.diag(
(Kstar.T @ (self.gp_functions.invert(self.Ky) @ Kstar))
)
vstar = np.maximum(vstar, 1e-10) # Assure a positive variance
if add_data_variance:
vstar = vstar + self.hyperparameters["sigma_n"] ** 2
return fstar, vstar.reshape(-1, 1) # Return predictive mean and variance
[docs] def add_training_data(self, X, y):
"""Add training points to existing data. This is important for active learning.
Only the training dataset is updated, but the hyperparameters are not optimized yet.
Parameters:
X (ndarray): Input points to add.
y (ndarray): Observed output to add.
"""
# TODO: Update Ky by applying the Sherman-Morrison-Woodbury formula?
self.Xtrain = np.concatenate([self.Xtrain, X], axis=0)
self.ytrain = np.concatenate([self.ytrain, y], axis=0)
[docs] def set_ytrain(self, ydata):
"""Set the observed training outputs. This is important for active learning.
Parameters:
ydata (np.array): Full training output data.
"""
self.ytrain = np.atleast_2d(ydata.copy())
[docs] def get_marginal_variance(self, Xpred):
from .backend.gp_functions import marginal_variance_BBQ
self.optimize(
fixed_sigma_n=self.fixed_sigma_n, eval_gradient=True, return_hess_inv=True
)
assert self.hess_inv is not None
_, fvar = self.predict(Xpred)
return marginal_variance_BBQ(
self.Xtrain,
self.ytrain,
Xpred,
self.kernel,
self.hyperparameters,
self.hess_inv,
self.fixed_sigma_n,
alpha=self.alpha,
predictive_variance=fvar,
)
[docs] def save_model(self, path):
"""Saves the model as dict to a .hdf5 file.
Parameters:
path (str): Path including the file name, where the model should be saved.
"""
from profit.util.file_handler import FileHandler
saved_attrs = (
"trained",
"fixed_sigma_n",
"Xtrain",
"ytrain",
"ndim",
"kernel",
"hyperparameters",
)
save_dict = {attr: getattr(self, attr) for attr in saved_attrs}
# Convert the kernel class object to a string, to be able to save it in the .hdf5 file
if not isinstance(save_dict["kernel"], str):
save_dict["kernel"] = self.kernel.__name__
FileHandler.save(path, save_dict)
[docs] @classmethod
def load_model(cls, path):
"""Loads a saved model from a .hdf5 file and updates its attributes.
Parameters:
path (str): Path including the file name, from where the model should be loaded.
Returns:
profit.sur.gaussian_process.GPSurrogate: Instantiated surrogate model.
"""
from profit.util.file_handler import FileHandler
sur_dict = FileHandler.load(path, as_type="dict")
self = cls()
for attr, value in sur_dict.items():
setattr(self, attr, value)
# Convert the kernel string back to the class object
self.kernel = self.select_kernel(self.kernel)
self.print_hyperparameters("Loaded")
return self
[docs] def select_kernel(self, kernel):
"""Convert the name of the kernel as string to the kernel class object of the surrogate.
First search the kernels implemented in python, then the Fortran kernels.
Parameters:
kernel (str): Kernel string such as 'RBF'. Only single kernels are supported currently.
Returns:
object: Kernel object of the class. This is the function which builds the kernel and not
the calculated covariance matrix.
"""
# TODO: Rewrite fortran kernels and rename them to be explicit, like 'fRBF'
try:
from .backend import kernels as fortran_kernels
except:
pass
from .backend import python_kernels
try:
return getattr(python_kernels, kernel)
except AttributeError:
try:
return getattr(fortran_kernels, kernel)
except AttributeError:
raise RuntimeError(f"Kernel {kernel} not implemented.")
[docs] def optimize(
self,
fixed_sigma_n=base_defaults["fixed_sigma_n"],
eval_gradient=False,
return_hess_inv=False,
):
r"""Optimize the hyperparameters length_scale $l$, scale $\sigma_f$ and noise $\sigma_n$.
As a backend, the scipy minimize optimizer is used.
Parameters:
fixed_sigma_n (bool): Indication if the data noise should also be optimized or not.
eval_gradient (bool): Flag if the gradients of the kernel and negative log likelihood should be
used explicitly or numerically calculated inside the optimizer.
return_hess_inv (bool): Whether to set the inverse Hessian attribute hess_inv which is used to calculate the
marginal variance in active learning.
"""
ordered_hyp_keys = ("length_scale", "sigma_f", "sigma_n")
a0 = np.concatenate([self.hyperparameters[key] for key in ordered_hyp_keys])
opt_hyperparameters = self.gp_functions.optimize(
self.Xtrain,
self.ytrain,
a0,
self.kernel,
fixed_sigma_n=self.fixed_sigma_n or fixed_sigma_n,
eval_gradient=eval_gradient,
return_hess_inv=return_hess_inv,
)
if return_hess_inv:
self.hess_inv = opt_hyperparameters[1]
opt_hyperparameters = opt_hyperparameters[0]
self._set_hyperparameters_from_model(opt_hyperparameters)
self.print_hyperparameters("Optimized")
[docs] def _set_hyperparameters_from_model(self, model_hyperparameters):
# Set optimized hyperparameters
last_idx = -1 if self.fixed_sigma_n else -2
self.hyperparameters["length_scale"] = np.atleast_1d(
model_hyperparameters[:last_idx]
)
self.hyperparameters["sigma_f"] = np.atleast_1d(model_hyperparameters[last_idx])
if not self.fixed_sigma_n:
self.hyperparameters["sigma_n"] = np.atleast_1d(model_hyperparameters[-1])
[docs]@Surrogate.register("CustomMultiOutputGP")
class MultiOutputGPSurrogate(GaussianProcess):
def __init__(self, child=GPSurrogate):
super().__init__()
self.child = child
self.models = []
[docs] def train(
self,
X,
y,
kernel=defaults["kernel"],
hyperparameters=defaults["hyperparameters"],
fixed_sigma_n=base_defaults["fixed_sigma_n"],
return_hess_inv=False,
):
self.pre_train(X, y)
self.models = [self.child() for _ in range(self.output_ndim)]
for dim, m in enumerate(self.models):
m.train(
self.Xtrain,
self.ytrain[:, [dim]],
self.kernel,
self.hyperparameters,
self.fixed_sigma_n,
False,
return_hess_inv,
)
self._set_hyperparameters_from_model()
self.post_train()
[docs] def _set_hyperparameters_from_model(self):
for m in self.models:
for k, v in m.hyperparameters.items():
self.hyperparameters[k] = np.concatenate([self.hyperparameters[k], v])
self.hyperparameters["length_scale"] = np.atleast_1d(
np.linalg.norm(self.hyperparameters["length_scale"])
)
self.hyperparameters["sigma_f"] = np.atleast_1d(
np.max(self.hyperparameters["sigma_f"])
)
self.hyperparameters["sigma_n"] = np.atleast_1d(
np.max(self.hyperparameters["sigma_n"])
)
self.decode_hyperparameters()
[docs] def predict(self, Xpred, add_data_variance=True):
Xpred = self.pre_predict(Xpred)
ypred = np.empty((Xpred.shape[0], self.output_ndim))
yvar = np.empty_like(ypred)
for dim, m in enumerate(self.models):
ypred[:, [dim]], yvar[:, [dim]] = m.predict(Xpred, add_data_variance)
ypred, yvar = self.decode_predict_data(ypred, yvar)
return ypred, yvar
[docs] def add_training_data(self, X, y):
start = self.Xtrain.shape[0]
self.Xtrain, self.ytrain = np.concatenate(
[self.Xtrain, X], axis=0
), np.concatenate([self.ytrain, y], axis=0)
self.encode_training_data()
for dim, m in enumerate(self.models):
m.add_training_data(self.Xtrain[start:], self.ytrain[start:, [dim]])
self.decode_training_data()
[docs] def set_ytrain(self, y):
for dim, m in enumerate(self.models):
m.set_ytrain(y[:, [dim]])
self.ytrain = np.atleast_2d(y.copy())
[docs] def save_model(self, path):
"""Saves the model as dict to a .hdf5 file.
Parameters:
path (str): Path including the file name, where the model should be saved.
"""
from profit.util.file_handler import FileHandler
save_dict = {
attr: getattr(self, attr)
for attr in (
"Xtrain",
"ytrain",
"trained",
"output_ndim",
"hyperparameters",
)
}
save_dict["input_encoders"] = str([enc.repr for enc in self.input_encoders])
save_dict["output_encoders"] = str([enc.repr for enc in self.output_encoders])
for i, m in enumerate(self.models):
save_dict[i] = {
attr: getattr(m, attr)
for attr in (
"trained",
"fixed_sigma_n",
"Xtrain",
"ytrain",
"ndim",
"output_ndim",
"kernel",
"hyperparameters",
)
}
# Convert the kernel class object to a string, to be able to save it in the .hdf5 file
if not isinstance(save_dict[i]["kernel"], str):
save_dict[i]["kernel"] = m.kernel.__name__
FileHandler.save(path, save_dict)
[docs] @classmethod
def load_model(cls, path):
from profit.util.file_handler import FileHandler
from profit.sur.encoders import Encoder
from numpy import array # needed for eval of arrays
load_dict = FileHandler.load(path, as_type="dict")
self = cls()
self.trained = load_dict["trained"]
self.output_ndim = load_dict["output_ndim"]
self.Xtrain, self.ytrain = load_dict["Xtrain"], load_dict["ytrain"]
self.hyperparameters = load_dict["hyperparameters"]
for enc in eval(load_dict["input_encoders"]):
self.add_input_encoder(
Encoder[enc["class"]](enc["columns"], enc["parameters"])
)
for enc in eval(load_dict["output_encoders"]):
self.add_output_encoder(
Encoder[enc["class"]](enc["columns"], enc["parameters"])
)
# Initialize the encoder by encoding and decoding the training data once.
self.encode_training_data()
self.decode_training_data()
self.models = [self.child() for _ in range(self.output_ndim)]
for i, m in enumerate(self.models):
for attr, value in load_dict[str(i)].items():
setattr(m, attr, value)
# Convert the kernel string back to the class object
m.kernel = m.select_kernel(m.kernel)
m.print_hyperparameters("Loaded")
return self
[docs] def optimize(self, **opt_kwargs):
for m in self.models:
m.optimize(**opt_kwargs)
[docs] def special_hyperparameter_decoding(self, key, value):
if len(value) > 1:
return (
np.atleast_1d(np.linalg.norm(value))
if key == "length_scale"
else np.atleast_1d(np.max(value))
)
return value
[docs] def get_marginal_variance(self, Xpred):
Xpred = self.encode_predict_data(Xpred)
v = np.zeros((Xpred.shape[0], 1))
for m in self.models:
v += m.get_marginal_variance(Xpred)
return v