Source code for profit.config

from os import path
import yaml
from collections import OrderedDict
from profit import defaults
from profit.util.base_class import CustomABC
import warnings

VALID_FORMATS = (".yaml", ".py")

"""
yaml has to be configured to represent OrderedDict
see https://stackoverflow.com/questions/16782112/can-pyyaml-dump-dict-items-in-non-alphabetical-order
and https://stackoverflow.com/questions/5121931/in-python-how-can-you-load-yaml-mappings-as-ordereddicts
"""


[docs]def represent_ordereddict(dumper, data): value = [] for item_key, item_value in data.items(): node_key = dumper.represent_data(item_key) node_value = dumper.represent_data(item_value) value.append((node_key, node_value)) return yaml.nodes.MappingNode("tag:yaml.org,2002:map", value)
[docs]def dict_constructor(loader, node): return OrderedDict(loader.construct_pairs(node))
_mapping_tag = yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG yaml.add_representer(OrderedDict, represent_ordereddict) yaml.add_constructor(_mapping_tag, dict_constructor) """ now yaml is configured to handle OrderedDict input and output """
[docs]def load_config_from_py(filename): """Load the configuration parameters from a python file into dict.""" from importlib.util import spec_from_file_location, module_from_spec spec = spec_from_file_location("f", filename) f = module_from_spec(spec) spec.loader.exec_module(f) return { name: value for name, value in f.__dict__.items() if not name.startswith("_") }
[docs]class AbstractConfig(CustomABC): """General class with methods which are useful for all Config classes.""" labels = {} defaults = None def __init__(self, **entries): if self.defaults: self.set_defaults(getattr(defaults, self.defaults)) self.update(**entries)
[docs] def update(self, **entries): """Updates the attributes with user inputs. A warning is issued if the attribute set by the user is unknown. Parameters: entries (dict): User input of the config parameters. """ for name, value in entries.items(): if hasattr(self, name) or name in map(str.lower, self.labels): attr = getattr(self, name, None) if isinstance(attr, dict): attr.update(value) setattr(self, name, attr) else: setattr(self, name, value) else: message = f"Config parameter '{name}' for {self.__class__.__name__} configuration may be unused." warnings.warn(message) setattr(self, name, value)
[docs] def process_entries(self, base_config): """After the attributes are set, they are formatted and edited to standardize the user inputs. Parameters: base_config (BaseConfig): In sub configs, the data from the base config is needed. """ pass
[docs] def set_defaults(self, default_dict): """Default values are set from a default dictionary, which is usually located in the global profit.defaults file. """ for name, value in default_dict.items(): if name in self.labels and isinstance(value, str): value = {"class": value} setattr(self, name, value)
[docs] def create_subconfig(self, sub_config_label, **entries): """Instances of sub configs are created from a string or a dictionary. Parameters: sub_config_label (str): Dict key of registered sub config. entries (dict): User input parameters. """ if "class" in entries: # Load specific sub config or default config, if missing. try: sub = self.labels[sub_config_label][entries["class"]]() except KeyError: sub = self.labels[sub_config_label]["default"](**entries) else: # Load general sub config. sub = self.labels[sub_config_label]() # Split entries into entries for this config and further sub configs. base_entries = {k: v for k, v in entries.items() if k.lower() not in sub.labels} sub_entries = { k: {"class": v} if isinstance(v, str) else v for k, v in entries.items() if k.lower() in sub.labels } # Update defaults with user entries sub.update(**base_entries) # Create second level sub configs. for subsub_label in sub.labels: subsub_entries = sub[subsub_label] subsub_entries.update(sub_entries.get(subsub_label, {})) sub.create_subconfig(subsub_label, **subsub_entries) setattr(self, sub_config_label, sub)
[docs] def __getitem__(self, item): """Implements the dictionary like get method with brackets. Parameters: item (str): Label of the attribute to return. Returns: Attribute or if the attribute is a sub config, a dictionary of the sub config items. """ attr = getattr(self, item) if item in self.labels: if type(attr) is list: return {"list": attr} return {key: attr[key] for key, _ in attr.items()} return attr
[docs] def items(self): """Implements the dictionary like self.items() method. Returns: list: List of (key, value) tuples of the class attributes. """ return [(key, self[key]) for key in vars(self)]
[docs] def get(self, item, default=None): """Implements the dictionary like get method with a default value. Parameters: item (str): Label of the attribute to return. default: Default value, if the attribute is not found. Returns: Attribute or the default value. """ try: return self[item] except AttributeError: return default
[docs]class BaseConfig(AbstractConfig): """ This class and its modular subclasses provide all possible configuration parameters. Parts of the Config: - base_dir - run_dir - config_file - include - ntrain - variables - files - input - output - run - runner - interface - pre - post - fit - surrogate - save / load - fixed_sigma_n - active_learning - ui Base configuration for fundamental parameters. Parameters: base_dir (str): Base directory. run_dir (str): Run directory. config_path (str): Path to configuration file. include (list): Paths to custom files which are loaded in the beginning. files (dict): Paths for input and output files. ntrain (int): Number of training samples. variables (dict): All variables. input (dict): Input variables. output (dict): Output variables. independent (dict): Independent variables, if the result of the simulation is a vector. """ labels = {} def __init__(self, base_dir=defaults.base_dir, **entries): # Set defaults self.base_dir = path.abspath(base_dir) self.run_dir = self.base_dir self.config_path = path.join(self.base_dir, defaults.config_file) self.include = defaults.include self.ntrain = defaults.ntrain self.variables = defaults.variables.copy() self.input = {} self.output = {} self.independent = {} self.files = defaults.files.copy() # Split user entries in entries for base_config and for sub_configs base_entries = { k: v for k, v in entries.items() if k.lower() not in self.labels } sub_entries = { k: {"class": v} if isinstance(v, str) else v for k, v in entries.items() if k.lower() in self.labels } self.update(**base_entries) # Update the attributes with given entries. self.load_includes() # Load external files. for sub_config_label in self.labels: single_sub_entries = sub_entries.get(sub_config_label, {}) self.create_subconfig(sub_config_label, **single_sub_entries) self.process_entries() # Postprocess the attributes to standardize different user entries.
[docs] def process_entries(self): """Sets absolute paths, creates variables and delegates to the sub configs.""" from profit.util.variable import Variable, VariableGroup # Set absolute paths self.files["input"] = path.join( self.base_dir, self.files.get("input", defaults.files["input"]) ) self.files["output"] = path.join( self.base_dir, self.files.get("output", defaults.files["output"]) ) # Variable configuration as dict self.variable_group = VariableGroup(self.ntrain) vars = [] for k, v in self.variables.items(): if isinstance(v, (int, float)): v = f"Constant({v})" if isinstance(v, str): vars.append(Variable.create_from_str(k, (self.ntrain, 1), v)) else: vars.append(Variable.create(name=k, size=(self.ntrain, 1), **v)) self.variable_group.add(vars) self.variables = self.variable_group.as_dict self.input = { k: v for k, v in self.variables.items() if not any(k in v["kind"].lower() for k in ("output", "independent")) } self.output = { k: v for k, v in self.variables.items() if "output" in v["kind"].lower() } self.independent = { k: v for k, v in self.variables.items() if "independent" in v["kind"].lower() and v["size"] != (1, 1) } for sub_config_label in self.labels: getattr(self, sub_config_label).process_entries(self)
[docs] @classmethod def from_file(cls, filename=defaults.config_file): """Creates a configuration class from a .yaml or .py file.""" if filename.endswith(".yaml"): with open(filename) as f: entries = yaml.safe_load(f) elif filename.endswith(".py"): entries = load_config_from_py(filename) else: raise TypeError( f"Not supported file extension .{filename.split('.')[-1]} for config file.\n" f"Valid file formats: {VALID_FORMATS}" ) self = cls(base_dir=path.split(filename)[0], **entries) self.config_path = path.join(self.base_dir, filename) return self
[docs] def load_includes(self): from profit.util import load_includes import os import json if isinstance(self.include, str): self.include = [self.include] self.include = [path.abspath(path.join(self.base_dir, p)) for p in self.include] load_includes(self.include) os.environ["PROFIT_INCLUDES"] = json.dumps(self.include)
[docs]@BaseConfig.register("run") class RunConfig(AbstractConfig): """Run configuration with the following sub classes: - runner - local - slurm - interface - memmap - zeromq - pre - template - post - json - numpytxt - hdf5 A default sub class which just updates the entries from a user input is also implemented and used if the class from the user input is not found. Custom config classes can also be registered, e.g. as a custom runner: .. code-block:: python @RunnerConfig.register("custom") class CustomRunner(LocalRunnerConfig): def process_entries(self, base_config): # do something else than the usual LocalRunnerConfig pass Default values from the global profit.defaults.py file are loaded. """ labels = {} defaults = "run"
[docs] def update(self, **entries): """Updates the attributes with user inputs. No warning is issued if the attribute set by the user is unknown. Parameters: entries (dict): User input of the config parameters. """ for name, value in entries.items(): if hasattr(self, name) or name in map(str.lower, self.labels): attr = getattr(self, name, None) if isinstance(attr, dict): attr.update(value) setattr(self, name, attr) else: setattr(self, name, value) else: setattr(self, name, value)
[docs]@BaseConfig.register("fit") class FitConfig(AbstractConfig): """Configuration for the surrogate and encoder. Currently, the only sub config is for the GaussianProcess classes.""" labels = {} defaults = "fit" def __init__(self, **entries): self.set_defaults(defaults.fit) if len(entries) != 0: warnings.warn( f"FitConfig should be initialized with empty entries and not with {entries}" )
[docs] def update(self, **entries): from profit.sur import Surrogate from profit.sur.gp.gaussian_process import GaussianProcess from profit.sur.linreg import LinearRegression if "surrogate" in entries: self.surrogate = entries["surrogate"] if issubclass(Surrogate.labels[self.surrogate], GaussianProcess): self.set_defaults(defaults.fit_gaussian_process) elif issubclass(Surrogate.labels[self.surrogate], LinearRegression): self.set_defaults(defaults.fit_linear_regression) else: raise RuntimeError(f"unknown surrogate {self.surrogate}") super().update(**entries)
[docs] def process_entries(self, base_config): """Set 'load' and 'save' as well as the encoder.""" for mode_str in ("save", "load"): filepath = getattr(self, mode_str) if filepath: if self.surrogate not in filepath: filepath = filepath.rsplit(".", 1) filepath = ( "".join(filepath[:-1]) + f"_{self.surrogate}." + filepath[-1] ) setattr( self, mode_str, path.abspath(path.join(base_config.base_dir, filepath)), ) if self.load: self.save = False # Encoders from re import match import numpy as np # array: which columns belong to which variables input_columns = np.array( sum( ( [var.name] * var.size[1] for var in base_config.variable_group.input_list ), [], ) ) output_columns = np.array( sum( ( [var.name] * var.size[1] for var in base_config.variable_group.output_list ), [], ) ) for config in self.encoder: # handle shorthand notation, e.g. Name(a,b) -> {class: Name, variables: [a, b]} if isinstance(config, str): try: name, var_spec = match(r"(\w+)\((.*)\)", config).groups() except AttributeError as ex: raise ValueError( f"unable to parse encoder shortcut <{config}>" ) from ex var_spec = [ v.strip().lower() for v in var_spec.split(",") ] # variable specification elif isinstance(config, dict): name = config["class"] var_spec = [v.strip().lower() for v in config["variables"]] else: raise ValueError(f"unable to parse encoder <{config}>") # ToDo: check if var_spec is valid -> warn otherwise # select input columns based on variables or kinds if any(s in var_spec for s in ["all", "in", "input", "inputs"]): input_vars = base_config.variable_group.input_list input_select = np.arange(input_columns.size) else: input_vars = [ var for var in base_config.variable_group.input_list if var.name.lower() in var_spec or var.kind.lower() in var_spec ] if input_vars: input_select = np.hstack( [ np.arange(input_columns.size)[input_columns == var.name] for var in input_vars ] ) else: input_select = None # select output columns based on variable names or kinds if any(s in var_spec for s in ["all", "out", "output", "outputs"]): output_vars = base_config.variable_group.output_list output_select = np.arange(output_columns.size) else: output_vars = [ var for var in base_config.variable_group.output_list if var.name.lower() in var_spec or var.kind.lower() in var_spec ] if output_vars: output_select = np.hstack( [ np.arange(output_columns.size)[output_columns == var.name] for var in output_vars ] ) else: output_select = None # handle special cases if name == "Exclude": # remove excluded columns from column lists input_columns = np.array( [c for c in input_columns if c not in (v.name for v in input_vars)] ) output_columns = np.array( [ c for c in output_columns if c not in (v.name for v in output_vars) ] ) elif name in ["PCA", "KarhunenLoeve"]: # ToDo: can't handle dimensionality reduction yet if config is not self.encoder[-1]: raise NotImplementedError( "reduced dimensions cannot be encoded further" ) # add processed config to _input_encoders & _output_encoders for encoders, select in [ (self._input_encoders, input_select), (self._output_encoders, output_select), ]: if select is not None: encoders.append( { "class": name, "columns": select, "parameters": { k: float(v) for k, v in config.get("parameters", {}) } if not isinstance(config, str) else {}, } )
[docs]@BaseConfig.register("active_learning") class ALConfig(AbstractConfig): """Active learning configuration.""" labels = {} defaults = "active_learning"
[docs] def process_entries(self, base_config): for key in self.labels: getattr(self, key.lower()).process_entries(base_config)
[docs]@ALConfig.register("algorithm") class AlgorithmALConfig(AbstractConfig): labels = {} defaults = None
[docs]@AlgorithmALConfig.register("simple") class SimpleALConfig(AlgorithmALConfig): labels = {} defaults = "al_algorithm_simple"
[docs] def process_entries(self, base_config): if self.save: self.save = base_config["fit"]["save"] for sub_config_label in self.labels: getattr(self, sub_config_label).process_entries(base_config)
[docs]@AlgorithmALConfig.register("mcmc") class McmcConfig(AlgorithmALConfig): labels = {} defaults = "al_algorithm_mcmc"
[docs] def process_entries(self, base_config): self.save = path.abspath(path.join(base_config.base_dir, self.save)) self.reference_data = path.abspath( path.join(base_config.base_dir, self.reference_data) )
[docs]@SimpleALConfig.register("acquisition_function") class AcquisitionFunctionConfig(AbstractConfig): """Acquisition function configuration.""" labels = {} defaults = None
[docs] def process_entries(self, base_config): for k, v in self.items(): if isinstance(v, str): try: setattr(self, k, float(v)) except ValueError: pass
[docs]@AcquisitionFunctionConfig.register("simple_exploration") class SimpleExplorationConfig(AcquisitionFunctionConfig): labels = {} defaults = "al_acquisition_function_simple_exploration"
[docs]@AcquisitionFunctionConfig.register("exploration_with_distance_penalty") class ExplorationWithDistancePenaltyConfig(AcquisitionFunctionConfig): labels = {} defaults = "al_acquisition_function_exploration_with_distance_penalty"
[docs]@AcquisitionFunctionConfig.register("weighted_exploration") class WeightedExplorationConfig(AcquisitionFunctionConfig): labels = {} defaults = "al_acquisition_function_weighted_exploration"
[docs]@AcquisitionFunctionConfig.register("probability_of_improvement") class ProbabilityOfImprovementConfig(AcquisitionFunctionConfig): labels = {} defaults = "al_acquisition_function_probability_of_improvement"
[docs]@AcquisitionFunctionConfig.register("expected_improvement") class ExpectedImprovementConfig(AcquisitionFunctionConfig): labels = {} defaults = "al_acquisition_function_expected_improvement"
[docs]@AcquisitionFunctionConfig.register("expected_improvement_2") class ExpectedImprovement2Config(AcquisitionFunctionConfig): labels = {} defaults = "al_acquisition_function_expected_improvement_2"
[docs]@AcquisitionFunctionConfig.register("alternating_exploration") class AlternatingExplorationConfig(AcquisitionFunctionConfig): labels = {} defaults = "al_acquisition_function_alternating_exploration"
[docs]@BaseConfig.register("ui") class UIConfig(AbstractConfig): """Configuration for the Graphical User Interface.""" labels = {} defaults = "ui"
[docs]@AcquisitionFunctionConfig.register("default") class DefaultConfig(AbstractConfig): """Default config for all run sub configs which just updates the attributes with user entries.""" labels = {} defaults = None def __init__(self, **entries): name = entries.get("class", self.__class__.__name__) warnings.warn(f"Using default config for '{name}'.") self.update(**entries)
[docs] def update(self, **entries): for name, value in entries.items(): if hasattr(self, name) or name in map(str.lower, self.labels): attr = getattr(self, name, None) if isinstance(attr, dict): attr.update(value) setattr(self, name, attr) else: setattr(self, name, value) else: setattr(self, name, value)