Source code for profit.run.interface

"""Runner-Worker Interface

The Interface is responsible for the data transfer between the Runner and all Workers.
Each Interface consists of two components: a Runner-Interface and a Worker-Interface
"""

from abc import abstractmethod
import logging
import numpy as np
from typing import Mapping

from ..util.component import Component


[docs]class RunnerInterface(Component): internal_vars = [("DONE", np.bool8), ("TIME", np.uint32)] def __init__( self, size, input_config, output_config, *, logger_parent: logging.Logger = None ): """ Parameters: size (int): number of runs for which space is allocated input_config (Mapping): {name(str): specification(Variable)} output_config (Mapping): {name(str): specification(Variable)} input_config can be generated by VariableGroup.input_dict output_config can be generated by VariableGroup.output_dict a Mapping containing "dtype" and "size" can be used instead of Variable """ self.logger = logging.getLogger("Interface") if logger_parent is not None: self.logger.parent = logger_parent self.input_vars = [ (variable, spec["dtype"].__name__) for variable, spec in input_config.items() ] self.output_vars = [ ( variable, spec["dtype"].__name__, () if spec["size"] == (1, 1) else (spec["size"][-1],), ) for variable, spec in output_config.items() ] self.input = np.zeros(size, dtype=self.input_vars) self.output = np.zeros(size, dtype=self.output_vars) self.internal = np.zeros(size, dtype=self.internal_vars) @property def config(self): # return all config parameters return {"class": self.label}
[docs] def resize(self, size): if size <= self.size: self.logger.warning("shrinking RunnerInterface is not supported") return self.input.resize(size, refcheck=True) # filled with 0 by default self.output.resize(size, refcheck=True) self.internal.resize(size, refcheck=True)
@property def size(self): assert self.input.size == self.output.size == self.internal.size return self.input.size
[docs] def poll(self): self.logger.debug("polling")
[docs] def clean(self): self.logger.debug("cleaning")
[docs]class WorkerInterface(Component): """The Worker-Interface The Worker-side of the Interface performs two tasks: retrieving input data and transmitting output data. Only the Worker interacts directly with the Interface, following the scheme: ``` self.interface.retrieve() -> self.interface.input timestamp = time.time() self.interface.output = simulate() self.interface.time = int(time.time() - timestamp) self.interface.transmit() ``` """ def __init__(self, run_id: int, *, logger_parent: logging.Logger = None): self.run_id = run_id self.logger = logging.getLogger("Interface") if logger_parent is not None: self.logger.parent = logger_parent if "time" not in self.__dir__(): self.time: int = 0 if "input" not in self.__dir__(): self.input: void = np.zeros(1, dtype=[])[0] if "output" not in self.__dir__(): self.output: void = np.zeros(1, dtype=[])[0] @property def config(self): return {"class": self.label}
[docs] @abstractmethod def retrieve(self): """retrieve the input 1) connect to the Runner-Interface 2) retrieve the input data and store it in `.input`""" pass
[docs] @abstractmethod def transmit(self): """transmit the output 1) transmit the output and time data (`.output` and `.time`) 2) signal the Worker has finished 3) close the connection to the Runner-Interface""" pass
[docs] def clean(self): pass