Orchestrator
Submodules
mischbares.orchestrator.orchestrator module
orcestrator, higherst level of the framework
- class mischbares.orchestrator.orchestrator.Experiment(*, soe: list, params: dict, meta: Optional[dict])[source]
Bases:
pydantic.main.BaseModel
validation class
- meta: Optional[dict]
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[dict[str, FieldInfo]] = {'meta': FieldInfo(annotation=Union[dict, NoneType], required=True), 'params': FieldInfo(annotation=dict, required=True), 'soe': FieldInfo(annotation=list, required=True)}
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].
This replaces Model.__fields__ from Pydantic V1.
- classmethod native_command_ordering(experiments)[source]
check if the soe is in the right order
- Parameters
experiments (list) – list of experiments
- Returns
return class with the parameters and the data
- Return type
retc (ReturnClass)
- classmethod parameter_correspondence(experiment, values)[source]
check if the parameters are in the right order
- Parameters
experiment (dict) – dict of experiments
experiment_values (list) – list of experiments
- Returns
dict of experiments
- Return type
experiment (dict)
- params: dict
- soe: list
- mischbares.orchestrator.orchestrator.clear(thread: Optional[int] = None)[source]
Empty queue for thread, or for all threads if no thread specified.
- Parameters
thread (Optional[int], optional) – thread to clear. Defaults to None.
- async mischbares.orchestrator.orchestrator.do_measurement(experiment: dict, thread: int)[source]
Executes a single experiment
- async mischbares.orchestrator.orchestrator.finish(experiment: dict)[source]
- Ensure tracking variables are appropriately reset at the end of a run,
and upload the finished session
- Parameters
experiment (dict) – dictionary containing experiment metadata
- async mischbares.orchestrator.orchestrator.get_data(thread: int, addresses: str, mode: str, wait_time: float = 0.01)[source]
Get data from the specified addresses
- Parameters
thread (int) – thread to get data from
addresses (str) – addresses to get data from
mode (str) – mode to get data in
wait (float, optional) – time to wait between requests. Defaults to .01.
- Raises
ValueError – _description_
- Returns
data from the specified addresses
- Return type
dict
- mischbares.orchestrator.orchestrator.get_status()[source]
Get status of all threads
- Returns
status of all threads
- Return type
dict
- mischbares.orchestrator.orchestrator.health_check()[source]
health check to see if the server is up and running :returns: status :rtype: dict
- async mischbares.orchestrator.orchestrator.infl(thread: int)[source]
Executes a single thread of experiments
- mischbares.orchestrator.orchestrator.kill(thread: Optional[int] = None)[source]
- Empty queue and cancel current experiment for thread,
or for all threads if no thread specified
- Parameters
thread (Optional[int], optional) – thread to kill. Defaults to None.
- async mischbares.orchestrator.orchestrator.modify(experiment: dict, addresses: Union[str, list], pointers: Union[str, list])[source]
- Set undefined values under experiment parameter dict.
Values must come from currently running threads
- Parameters
experiment (dict) – dictionary containing experiment metadata
addresses (Union[str,list]) – within a run, address(es) of the value(s) that should be transmitted to parameter(s)
pointers (Union[str,list]) – within param dict of experiment, addresses to transmit values to. parameter must have previously been initialized as “?”
- Returns
dictionary containing experiment
- Return type
dict
- mischbares.orchestrator.orchestrator.pause(thread: Optional[int] = None)[source]
Pause current experiment for thread, or for all threads if no thread specified
- Parameters
thread (Optional[int], optional) – thread to pause. Defaults to None.
- async mischbares.orchestrator.orchestrator.process_native_command(command: str, experiment: dict, **params)[source]
process a command that is native to the orchestrator
- async mischbares.orchestrator.orchestrator.raise_exceptions()[source]
Check for exceptions. If found, print stack trace and cancel the infinite loop Error handing within the infinite loop
- async mischbares.orchestrator.orchestrator.repeat(experiment: dict, number_of_repeat: int = 0, priority: int = 5)[source]
Submit an experiment identical to the current one to the orchestrator thread.
- Parameters
experiment (dict) – dictionary containing experiment metadata
n (int, optional) – number of times to repeat after 1st experiment, or 0 to repeat until forced to stop. Defaults to 0.
priority (int, optional) – priority of experiment. Defaults to 5.
- Returns
_description_
- Return type
_type_
- mischbares.orchestrator.orchestrator.resume(thread: Optional[int] = None)[source]
Resume current experiment for thread, or for all threads if no thread specified
- Parameters
thread (Optional[int], optional) – thread to resume. Defaults to None.
- async mischbares.orchestrator.orchestrator.scheduler()[source]
Receives all experiments, creates new threads, and sends experiments to the appropriate thread
- async mischbares.orchestrator.orchestrator.send_measurement(experiment: str, thread: int = 0, priority: int = 10)[source]
Add sequence of experiment to the queue of the orchestrator
- Parameters
experiment (str) – experiment to be added to the queue
thread (int) – thread to which the experiment is added
priority (int) – priority of the experiment in the queue
- async mischbares.orchestrator.orchestrator.start(experiment: dict, collectionkey: str, meta: dict = {})[source]
- Ensure appropriate folder, file, and all keys and tracking variables are appropriately
initialized at the beginning of a run
- Parameters
experiment (dict) – experiment dictionary
collectionkey (str) – determines folder and file names for session, may correspond to key of experiment[‘meta’], in which case name will be indexed by that value.
meta (dict, optional) – will be placed as metadata under the header of the run set up by this command. Defaults to {}.
- async mischbares.orchestrator.orchestrator.wait(experiment: dict, addresses: Union[str, list])[source]
Pause experiment until given thread(s) complete(s) given action(s)
- Parameters
experiment (dict) – dictionary containing experiment metadata
addresses (Union[str,list]) – path(s) below run to awaited address(es), i.e “experiment/action”
- Returns
dictionary containing experiment metadata
- Return type
experiment