"""proFit worker class & components"""
import os
import shutil
import logging
from abc import abstractmethod
import time
import subprocess
from typing import Mapping, MutableMapping, Sequence
from numpy import zeros, void
from warnings import warn
import functools
import json
from ..util.component import Component
from .interface import WorkerInterface as Interface
# === Worker === #
[docs]class Worker(Component):
def __init__(
self,
run_id: int,
*,
interface: Interface = "memmap",
debug=False,
log_path="log",
logger=None,
):
self.run_id = run_id
self.debug = debug
if logger is None:
self.logger = logging.getLogger("Worker")
self.logger.setLevel(logging.DEBUG)
try:
os.mkdir(log_path)
except FileExistsError:
pass
log_handler = logging.FileHandler(
os.path.join(log_path, f"run_{run_id:03d}.log"), mode="w"
)
if self.debug:
log_handler.setLevel(logging.DEBUG)
else:
log_handler.setLevel(logging.INFO)
log_formatter = logging.Formatter(
"{asctime} {levelname:8s} {name}: {message}", style="{"
)
log_handler.setFormatter(log_formatter)
self.logger.addHandler(log_handler)
else:
self.logger = logger
if isinstance(interface, str):
self.interface = Interface[interface](
self.run_id, logger_parent=self.logger
)
elif isinstance(interface, Mapping):
self.interface = Interface[interface["class"]](
self.run_id,
**{key: value for key, value in interface.items() if key != "class"},
logger_parent=self.logger,
)
else:
self.interface = interface
[docs] @abstractmethod
def work(self):
# self.interface.retrieve() -> self.interface.input
# timestamp = time.time()
# self.interface.output = simulate()
# self.interface.time = int(time.time() - timestamp)
# self.interface.transmit()
pass
[docs] def clean(self):
self.interface.clean()
[docs] @classmethod
def from_config(cls, config, interface, run_id):
return cls[config["class"]](
run_id=run_id,
interface=interface,
**{key: value for key, value in config.items() if key != "class"},
)
[docs] @classmethod
def from_env(cls, env=None):
from ..util import load_includes
if env is None:
env = os.environ.copy()
if env.get("PROFIT_INCLUDES"):
load_includes(json.loads(env["PROFIT_INCLUDES"]))
return cls.from_config(
config=json.loads(env["PROFIT_WORKER"]),
interface=json.loads(env["PROFIT_INTERFACE"]),
run_id=int(env["PROFIT_RUN_ID"]) + int(env.get("PROFIT_ARRAY_ID", 0)),
)
[docs] @classmethod
def wrap(cls, label, outputs=None, inputs=None):
"""
```
@Worker.wrap('label', ['f', 'g'], ['x', 'y'])
def func(x, y):
...
@Worker.wrap('label', ['f', 'g'])
def func(x, y):
...
@Worker.wrap('label')
def func(x, y) -> ['f', 'g']:
...
@Worker.wrap('name', 'f', 'x')
def func(x):
...
@Worker.wrap('name')
def func(x) -> 'f':
...
@Worker.wrap('name')
def f(x):
...
```
"""
def decorator(func):
nonlocal inputs, outputs
if isinstance(inputs, str):
inputs = [inputs]
elif inputs is None:
inputs = func.__code__.co_varnames[: func.__code__.co_argcount]
if outputs is None:
if "return" in func.__annotations__:
outputs = func.__annotations__["return"]
else:
outputs = func.__code__.co_name
if isinstance(outputs, str):
outputs = [outputs]
@functools.wraps(func, updated={})
class WrappedWorker(cls, label=label):
def work(self):
self.interface.retrieve()
self.logger.info(f"start {func.__name__}")
timestamp = time.time()
values = func(*[self.interface.input[key] for key in inputs])
duration = time.time() - timestamp
self.logger.info(
f"returned values: {values} after {duration:.1f} s"
)
self.interface.time = int(duration)
if len(outputs) == 1 and not (
isinstance(values, Sequence) and not isinstance(values, str)
):
values = [values]
for value, key in zip(values, outputs):
self.interface.output[key] = value
self.interface.transmit()
return WrappedWorker
return decorator
# === Entry Point === #
[docs]def main():
"""
entry point to run a worker
the run id and the path to the proFit configuration is provided via environment variables
"""
worker = Worker.from_env()
worker.work()
worker.clean()