"""Runner-Worker Interface
The Interface is responsible for the data transfer between the Runner and all Workers.
Each Interface consists of two components: a Runner-Interface and a Worker-Interface
"""
from abc import abstractmethod
import logging
import numpy as np
from typing import Mapping
from ..util.component import Component
[docs]class RunnerInterface(Component):
internal_vars = [("DONE", np.bool8), ("TIME", np.uint32)]
def __init__(
self, size, input_config, output_config, *, logger_parent: logging.Logger = None
):
"""
Parameters:
size (int): number of runs for which space is allocated
input_config (Mapping): {name(str): specification(Variable)}
output_config (Mapping): {name(str): specification(Variable)}
input_config can be generated by VariableGroup.input_dict
output_config can be generated by VariableGroup.output_dict
a Mapping containing "dtype" and "size" can be used instead of Variable
"""
self.logger = logging.getLogger("Interface")
if logger_parent is not None:
self.logger.parent = logger_parent
self.input_vars = [
(variable, spec["dtype"].__name__)
for variable, spec in input_config.items()
]
self.output_vars = [
(
variable,
spec["dtype"].__name__,
() if spec["size"] == (1, 1) else (spec["size"][-1],),
)
for variable, spec in output_config.items()
]
self.input = np.zeros(size, dtype=self.input_vars)
self.output = np.zeros(size, dtype=self.output_vars)
self.internal = np.zeros(size, dtype=self.internal_vars)
@property
def config(self):
# return all config parameters
return {"class": self.label}
[docs] def resize(self, size):
if size <= self.size:
self.logger.warning("shrinking RunnerInterface is not supported")
return
self.input.resize(size, refcheck=True) # filled with 0 by default
self.output.resize(size, refcheck=True)
self.internal.resize(size, refcheck=True)
@property
def size(self):
assert self.input.size == self.output.size == self.internal.size
return self.input.size
[docs] def poll(self):
self.logger.debug("polling")
[docs] def clean(self):
self.logger.debug("cleaning")
[docs]class WorkerInterface(Component):
"""The Worker-Interface
The Worker-side of the Interface performs two tasks:
retrieving input data and transmitting output data.
Only the Worker interacts directly with the Interface, following the scheme:
```
self.interface.retrieve() -> self.interface.input
timestamp = time.time()
self.interface.output = simulate()
self.interface.time = int(time.time() - timestamp)
self.interface.transmit()
```
"""
def __init__(self, run_id: int, *, logger_parent: logging.Logger = None):
self.run_id = run_id
self.logger = logging.getLogger("Interface")
if logger_parent is not None:
self.logger.parent = logger_parent
if "time" not in self.__dir__():
self.time: int = 0
if "input" not in self.__dir__():
self.input: void = np.zeros(1, dtype=[])[0]
if "output" not in self.__dir__():
self.output: void = np.zeros(1, dtype=[])[0]
@property
def config(self):
return {"class": self.label}
[docs] @abstractmethod
def retrieve(self):
"""retrieve the input
1) connect to the Runner-Interface
2) retrieve the input data and store it in `.input`"""
pass
[docs] @abstractmethod
def transmit(self):
"""transmit the output
1) transmit the output and time data (`.output` and `.time`)
2) signal the Worker has finished
3) close the connection to the Runner-Interface"""
pass