"""orcestrator, higherst level of the framework"""
#pylint: disable=global-variable-undefined
#pylint: disable=global-variable-not-assigned
#pylint: disable=no-self-argument
#pylint: disable=cell-var-from-loop
#pylint: disable=line-too-long
#pylint: disable=consider-using-set-comprehension
#pylint: disable=consider-iterating-dictionary
#pylint: disable=consider-using-dict-items
#pylint: disable=use-a-generator
#pylint: disable=no-else-break
#pyint: disable=use-implicit-booleaness-not-comparison
#pylint: disable=too-many-branches
#pylint: disable=use-implicit-booleaness-not-comparison
#pylint: disable=consider-using-generator
#pylint: disable=dangerous-default-value
import os
import sys
import datetime
from copy import copy
import time
import json
from typing import Union,Optional
from pathlib import Path
import asyncio
import requests
import uvicorn
from fastapi import FastAPI, WebSocket
from pydantic import BaseModel, validator
import numpy
import h5py
from mischbares.logger import logger
from mischbares.utils import orchestrator_utils
from mischbares.config.main_config import config
log = logger.get_logger("orchestrator")
SERVERKEY = "orchestrator"
app = FastAPI(title="Orchestrator",description="Orchestrator for the mischbares framework",
version="2.1.0")
[docs]class Experiment(BaseModel):
""" validation class
"""
soe: list
params: dict
meta: Optional[dict]
[docs] @validator('soe')
def native_command_ordering(cls, experiments):
""" check if the soe is in the right order
Args:
experiments (list): list of experiments
Returns:
retc (ReturnClass): return class with the parameters and the data
"""
for i in experiments:
if i.count('_') > 1:
raise ValueError("too many underscores in function name")
for i in experiments:
if i.count('/') != 1 or i[0] == '/' or i[-1] == '/':
raise ValueError("action must consist of a server name and a function name separated by '/'")
# check inappropriate experiments
parsed_v = [i.split('_')[0] for i in experiments]
if parsed_v.count("orchestrator/start") > 1:
raise ValueError("cannot have multiple calls to orchestrator/start in single soe")
if "orchestrator/start" in parsed_v and parsed_v[0] != "orchestrator/start":
raise ValueError("orchestrator/start must be first action in soe")
if parsed_v.count("orchestrator/finish") > 1:
raise ValueError("cannot have multiple calls to orchestrator/finish in single soe")
if "orchestrator/finish" in parsed_v and parsed_v[-1] != "orchestrator/finish":
raise ValueError("orchestrator/start must be last action in soe")
if parsed_v.count("orchestrator/repeat") > 1:
raise ValueError("cannot have multiple calls to orchestrator/repeat in single soe")
if "orchestrator/repeat" in parsed_v and not (parsed_v[-1] == "orchestrator/repeat" or\
parsed_v[-2] == "orchestrator/repeat" and parsed_v[-1] == "orchestrator/finish"):
raise ValueError("orchestrator/repeat can only be followed \
by orchestrator/finish in soe")
return experiments
[docs] @validator('params')
def parameter_correspondence(cls, experiment, values):
""" check if the parameters are in the right order
Args:
experiment (dict): dict of experiments
experiment_values (list): list of experiments
returns:
experiment (dict): dict of experiments
"""
action = set([i.split('/')[-1] for i in values['soe']])
if action != set(experiment.keys()):
raise ValueError("soe and params are not perfectly corresponding. \n \
must be params entry for every action in soe, and vice-versa")
if len(action) != len(values['soe']):
raise ValueError("duplicate entries in soe")
return experiment
[docs]@app.get("/health")
def health_check():
""" health check to see if the server is up and running
Returns:
dict: status
"""
return {"status": "healthy"}
[docs]@app.post("/orchestrator/addExperiment")
async def send_measurement(experiment: str, thread: int = 0, priority: int = 10):
""" Add sequence of experiment to the queue of the orchestrator
Args:
experiment (str): experiment to be added to the queue
thread (int): thread to which the experiment is added
priority (int): priority of the experiment in the queue
"""
global INDEX
# load experiment and run it through pydantic validation
experiment = dict(Experiment(**json.loads(experiment)))
await SCHEDULER_QUEUE.put((priority,INDEX,experiment,thread))
INDEX += 1
[docs]async def scheduler():
""" Receives all experiments, creates new threads, and sends experiments
to the appropriate thread"""
global EXPERIMENT_QUEUES,EXPERIMENT_TASKS,LOOP,TRACKING
while True:
priority,index,experiment,thread = await SCHEDULER_QUEUE.get()
if thread not in EXPERIMENT_QUEUES.keys():
EXPERIMENT_QUEUES.update({thread:asyncio.PriorityQueue()})
# several instances of the infinite loop , one for each thread
EXPERIMENT_TASKS.update({thread:LOOP.create_task(infl(thread))})
# get the status that initializing the thread
TRACKING[thread] = {'path':None, 'run':None,'experiment':None,\
'current_action':None,'status':'uninitialized','history':[]}
await EXPERIMENT_QUEUES[thread].put((priority,index,experiment))
[docs]async def infl(thread: int):
""" Executes a single thread of experiments"""
while True:
*_,experiment = await EXPERIMENT_QUEUES[thread].get()
if TRACKING[thread]['status'] == 'clear':
#await update_tracking(thread,'running','status')
TRACKING[thread]['status'] = 'running'
await do_measurement(experiment, thread)
if EXPERIMENT_QUEUES[thread].empty() and TRACKING[thread]['status'] == 'running':
#await update_tracking(thread,'clear','status')
TRACKING[thread]['status'] = 'clear'
[docs]async def do_measurement(experiment: dict, thread: int):
""" Executes a single experiment"""
global TRACKING,LOOP,FILELOCKS,SERVERLOCKS,TASK
log.info(f'experiment: {experiment} on thread {thread}')
# add header for new experiment if you already have an initialized, nonempty experiment
if isinstance(TRACKING[thread]['experiment'],int):
async with FILELOCKS[TRACKING[thread]['path']]:
with h5py.File(TRACKING[thread]['path'], 'r') as session:
if len(list(session[f"/run_{TRACKING[thread]['run']}/experiment_{TRACKING[thread]['experiment']}:{thread}"].keys())) > 0:
TRACKING[thread]['experiment'] += 1
# put thread in experiment for native commands
experiment['meta']['thread'] = thread
# handling unset-up threads and couple to the thread directly below if it exists
# raise a flag and skip the experiment if it doesn't
if experiment['soe'] != [] and all([TRACKING[thread][i] is None \
for i in ['path','run','experiment','current_action']]) \
and experiment['soe'][0].split('_')[0] != 'orchestrator/start':
log.info(f"thread {thread} for experiment {experiment} has not been set up")
if thread-1 in TRACKING.keys() and all([TRACKING[thread][i] is not None for i in \
['path','run','experiment','current_action']]):
TRACKING[thread]['path'] = TRACKING[thread-1]['path']
TRACKING[thread]['run'] = TRACKING[thread-1]['run']
TRACKING[thread]['experiment'] = 0
log.info(f"thread {thread} bound to thread {thread-1}")
else:
experiment = {'soe':[],'params':{},'meta':{}}
log.info("experiment has been blanked")
for action_str in experiment['soe']:
# example: action is'movement' and the function is 'moveToHome_0
server, fnc = action_str.split('/')
while TRACKING[thread]['status'] != 'running' and server != 'orchestrator':
await asyncio.sleep(.1)
TRACKING[thread]['current_action'] = fnc
action = fnc.split('_')[0]
params = experiment['params'][fnc]
servertype = server.split(':')[0]
if server not in SERVERLOCKS.keys() and servertype != "orchestrator":
SERVERLOCKS[server] = asyncio.Lock()
# a placeholder for the appropriate conditional.
if servertype != 'orchestrator':
while True:
async with SERVERLOCKS[server]:
res = await LOOP.run_in_executor(None,lambda x: requests.get(x, params=params, timeout=None),
f"http://{config['servers'][server]['host']}:{config['servers'][server]['port']}/{servertype}/{action}")
#ensure that action completed successfully
if 200 <= res.status_code < 300:
res = res.json()
break
else:
log.info(f"Orchestrator has received an invalid response attempting \
action {action_str} with parameters {params}.")
input('Fix the problem, and press Enter to try the action again.')
elif servertype == '':
pass
elif servertype == 'orchestrator':
if params is None:
params = {}
# crash or fail on native command due to an unsafe state.
experiment = await process_native_command(action,experiment,**params)
continue
elif servertype == 'analysis':
add = list(filter(lambda s: s.split('_')[-1] == 'address',params.keys()))
if add != []:
analysis_time = int(params[add[0]].split('/')[0].split(':')[1])
if TRACKING[analysis_time]['path'] is not None:
async with FILELOCKS[TRACKING[analysis_time]['path']]:
await LOOP.run_in_executor(None,lambda x: \
requests.get(x,params=dict(path=TRACKING[analysis_time]['path'], \
run=TRACKING[analysis_time]['run'], \
addresses=json.dumps({a:params[a] for a in add})), \
timeout=None),\
f"http://{config['servers'][server]['host']}:{config['servers'][server]['port']}/{servertype}/receiveData")
else:
for history in TRACKING[thread]['history']:
async with FILELOCKS[history['path']]:
if orchestrator_utils.paths_in_hdf5(history['path'],[params[a] for a in add]):
await LOOP.run_in_executor(None,lambda x: \
requests.get(x,params=dict(path=history['path'],\
run=history['run'], \
addresses=json.dumps({a:params[a] for a in add})), \
timeout=None), \
f"http://{config['servers'][server]['host']}:{config['servers'][server]['port']}/{servertype}/receiveData")
break
async with SERVERLOCKS[server]:
res = await LOOP.run_in_executor(None,lambda x: requests.get(x,params=params, \
timeout=None).json(), \
f"http://{config['servers'][server]['host']}:{config['servers'][server]['port']}/{servertype}/{action}")
elif servertype == 'ml':
if "address" in params.keys():
analysis_time = int(params['address'].split('/')[0].split(':')[1])
if TRACKING[analysis_time]['path'] is not None:
async with SERVERLOCKS[server]:
async with FILELOCKS[TRACKING[analysis_time]['path']]:
if 'modelid' in params.keys():
await LOOP.run_in_executor(None,lambda x: \
requests.get(x,params=dict(path=TRACKING[analysis_time]['path'],\
run=TRACKING[analysis_time]['run'],\
address=params['address'], \
modelid=params['modelid']), timeout=None), \
f"http://{config['servers'][server]['host']}:{config['servers'][server]['port']}/{servertype}/receiveData")
else:
await LOOP.run_in_executor(None,lambda x: requests.get(x, \
params=dict(path=TRACKING[analysis_time]['path'], \
run=TRACKING[analysis_time]['run'],address=params['address']), \
timeout=None), \
f"http://{config['servers'][server]['host']}:{config['servers'][server]['port']}/{servertype}/receiveData")
else:
for history in TRACKING[thread]['history']:
async with SERVERLOCKS[server]:
async with FILELOCKS[history['path']]:
if orchestrator_utils.paths_in_hdf5(history['path'],params['address']):
if 'modelid' in params.keys():
await LOOP.run_in_executor(None,lambda x: requests.get(x, \
params=dict(path=history['path'], \
run=history['run'],address=params['address'], \
modelid=params['modelid']), timeout=None), \
f"http://{config['servers'][server]['host']}:{config['servers'][server]['port']}/{servertype}/receiveData")
else:
await LOOP.run_in_executor(None,lambda x: requests.get(x, \
params=dict(path=history['path'],run=history['run'], \
address=params['address']), timeout=None), \
f"http://{config['servers'][server]['host']}:{config['servers'][server]['port']}/{servertype}/receiveData")
break
async with SERVERLOCKS[server]:
res = await LOOP.run_in_executor(None,lambda x: requests.get(x,params=params, \
timeout=None).json(), \
f"http://{config['servers'][server]['host']}:{config['servers'][server]['port']}/{servertype}/{action}")
async with FILELOCKS[TRACKING[thread]['path']]:
res.update({'meta':{'measurement_time':datetime.datetime.now().strftime("%d/%m/%Y, %H:%M:%S")}})
orchestrator_utils.save_dict_to_hdf5({fnc:res},TRACKING[thread]['path'], \
path=f"/run_{TRACKING[thread]['run']}/experiment_{TRACKING[thread]['experiment']}:{thread}/",mode='a')
#add metadata to experiment
with h5py.File(TRACKING[thread]['path'], 'r') as session:
session_list = list(session[f"/run_{TRACKING[thread]['run']}/experiment_{TRACKING[thread]['experiment']}:{thread}"].keys())
if len(session_list) > 0 and 'meta' not in session_list:
session.close()
orchestrator_utils.save_dict_to_hdf5({'meta':experiment['meta']}, \
TRACKING[thread]['path'],\
path=f"/run_{TRACKING[thread]['run']}/experiment_{TRACKING[thread]['experiment']}:{thread}/",mode='a')
log.info(f"function {fnc} in thread {thread} completed at {time.time()}")
log.info(f"function operating in run {TRACKING[thread]['run']} \
in file {TRACKING[thread]['path']}")
[docs]async def process_native_command(command: str,experiment: dict,**params):
""" process a command that is native to the orchestrator """
if command in ['start','finish','modify','wait','repeat']:
return await getattr(sys.modules[__name__],command)(experiment,**params)
raise Exception("native command not recognized")
[docs]async def start(experiment: dict,collectionkey:str,meta:dict={}):
""" Ensure appropriate folder, file, and all keys and tracking variables are appropriately
initialized at the beginning of a run
Args:
experiment (dict): experiment dictionary
collectionkey (str): determines folder and file names for session, may correspond to key
of experiment['meta'], in which case name will be indexed by that value.
meta (dict, optional): will be placed as metadata under the header of the run set up by
this command. Defaults to {}.
"""
global TRACKING,FILELOCKS,SERVERLOCKS
thread = experiment['meta']['thread']
# give the directory an index if one is provided
if collectionkey in experiment['meta'].keys():
h5dir = os.path.join(config[SERVERKEY]['path'], \
f"{collectionkey}_{experiment['meta'][collectionkey]}")
# name the directory without an index if not.
else:
h5dir = os.path.join(config[SERVERKEY]['path'],f"{collectionkey}")
# ensure that the directory in which this session should be saved exists
h5dirlist = str(Path(h5dir)).split('\\')
for folder in ['\\'.join(h5dirlist[:i+1]) for f,i in zip(h5dirlist,range(len(h5dirlist)))]:
if not os.path.exists(folder):
os.mkdir(folder)
# create a session, if there is no current session to load
if list(filter(lambda s: s[-3:]=='.h5',os.listdir(h5dir))) == []:
TRACKING[thread]['path'] = \
os.path.join(h5dir,config['instrument']+'_'+os.path.basename(h5dir)+'_session_0.h5')
#add a lock to a file if it does not already exist
if TRACKING[thread]['path'] not in FILELOCKS.keys():
FILELOCKS[TRACKING[thread]['path']] = asyncio.Lock()
async with FILELOCKS[TRACKING[thread]['path']]:
orchestrator_utils.save_dict_to_hdf5(dict(meta=dict(date=datetime.date.today().strftime("%d/%m/%Y"))),TRACKING[thread]['path'])
# otherwise grab most recent session in dir
else:
TRACKING[thread]['path'] = os.path.join(h5dir, \
orchestrator_utils.highest_name(list(filter(lambda s: s[-3:]=='.h5',os.listdir(h5dir)))))
if TRACKING[thread]['path'] not in FILELOCKS.keys():
FILELOCKS[TRACKING[thread]['path']] = asyncio.Lock()
async with FILELOCKS[TRACKING[thread]['path']]:
with h5py.File(TRACKING[thread]['path'], 'r') as session:
# assigns date to this session if necessary, or replaces session if too old
if 'meta' not in session.keys():
orchestrator_utils.save_dict_to_hdf5(dict(meta=dict(date=datetime.date.today().strftime("%d/%m/%Y"))),TRACKING[thread]['path'], path='/',mode='a')
elif 'date' not in session['meta'].keys():
session.close()
orchestrator_utils.save_dict_to_hdf5(dict(date=datetime.date.today().strftime("%d/%m/%Y")),TRACKING[thread]['path'], path='/meta/',mode='a')
elif session['meta/date/'][()] != datetime.date.today().strftime("%d/%m/%Y"):
log.info('current session is old, saving current session and creating new session')
session.close()
if "orch_kadi" not in SERVERLOCKS.keys():
SERVERLOCKS["orch_kadi"] = asyncio.Lock()
try:
async with SERVERLOCKS["orch_kadi"]:
print(requests.get(f"{config[SERVERKEY]['kadiurl']}/kadi/uploadhdf5",
params=dict(filename=os.path.basename(TRACKING[thread]['path']), \
filepath=os.path.dirname(TRACKING[thread]['path'])), \
timeout=None).json())
except:
log.info('not connected to kadi4mat and did not upload the session there')
TRACKING[thread]['path'] = os.path.join(h5dir, \
orchestrator_utils.increment_name(os.path.basename(TRACKING[thread]['path'])))
if TRACKING[thread]['path'] not in FILELOCKS.keys():
FILELOCKS[TRACKING[thread]['path']] = asyncio.Lock()
async with FILELOCKS[TRACKING[thread]['path']]:
orchestrator_utils.save_dict_to_hdf5(dict(meta= \
dict(date=datetime.date.today().strftime("%d/%m/%Y"))),\
TRACKING[thread]['path'])
async with FILELOCKS[TRACKING[thread]['path']]:
#adds a new run to session to receive incoming data
with h5py.File(TRACKING[thread]['path'], 'r') as session:
if "run_0" not in session.keys():
session.close()
orchestrator_utils.save_dict_to_hdf5({"run_0":{f"experiment_0:{thread}":None}, \
"meta":meta},TRACKING[thread]['path'],mode='a')
TRACKING[thread]['run'] = 0
else:
run = orchestrator_utils.increment_name(orchestrator_utils.highest_name(list(filter(lambda k: k[:4]=="run_",list(session.keys())))))
session.close()
orchestrator_utils.save_dict_to_hdf5({run:{f"experiment_0:{thread}":None,\
"meta":meta}},TRACKING[thread]['path'],mode='a')
TRACKING[thread]['run'] = int(run[4:])
TRACKING[thread]['experiment'] = 0
orchestrator_utils.save_dict_to_hdf5({'meta':None},\
TRACKING[thread]['path'],path=f'/run_{TRACKING[thread]["run"]}/experiment_0:{thread}/',\
mode='a')
TRACKING[thread]['status'] = 'running'
return experiment
[docs]async def finish(experiment: dict):
""" Ensure tracking variables are appropriately reset at the end of a run,
and upload the finished session
Args:
experiment (dict): dictionary containing experiment metadata
"""
global TRACKING,FILELOCKS,SERVERLOCKS
thread = experiment['meta']['thread']
log.info(f'thread {thread} finishing')
# get the number of the threads
working_threads = sum([1 if TRACKING[k]['path'] == TRACKING[thread]['path'] else 0 for k in TRACKING.keys()])
if working_threads == 1:
print('attempting to upload session')
if "orch_kadi" not in SERVERLOCKS.keys():
SERVERLOCKS["orch_kadi"] = asyncio.Lock()
try:
async with SERVERLOCKS["orch_kadi"]:
log.info(requests.get(f"{config[SERVERKEY]['kadiurl']}/kadi/uploadhdf5",
params=dict(filename=os.path.basename(TRACKING[thread]['path']), \
filepath=os.path.dirname(TRACKING[thread]['path'])), timeout=None).json())
except:
log.info('no kadi4mart connection, did not upload the session there')
# adds a new hdf5 file which will be used for the next incoming data
newpath = os.path.join(os.path.dirname(TRACKING[thread]['path']), \
orchestrator_utils.increment_name(os.path.basename(TRACKING[thread]['path'])))
FILELOCKS[newpath] = asyncio.Lock()
async with FILELOCKS[TRACKING[thread]['path']]:
orchestrator_utils.save_dict_to_hdf5(dict(meta=None),newpath)
# clear history relating to this file from all threads
for tracking in TRACKING.values():
for tracking_history in tracking['history']:
if tracking_history['path'] == TRACKING['thread']['path']:
del tracking_history
else:
log.info(f'{working_threads-1} threads still operating on {TRACKING[thread]["path"]}')
#free up the thread
TRACKING[thread] = {'path':None,'run':None,'experiment':None, \
'current_action':None,'status':'uninitialized', \
'history':[{'path':TRACKING[thread]['path'], \
'run':TRACKING[thread]['run']}]+TRACKING[thread]['history']}
return experiment
[docs]async def modify(experiment: dict, addresses:Union[str,list], pointers:Union[str,list]):
""" Set undefined values under experiment parameter dict.
Values must come from currently running threads
Args:
experiment (dict): dictionary containing experiment metadata
addresses (Union[str,list]): within a run, address(es) of the value(s)
that should be transmitted to parameter(s)
pointers (Union[str,list]): within param dict of experiment,
addresses to transmit values to.
parameter must have previously been initialized as "?"
Returns:
dict: dictionary containing experiment
"""
global TRACKING,FILELOCKS
mainthread = experiment['meta']['thread']
if not isinstance(addresses, list):
addresses = [addresses]
if not isinstance(pointers, list):
pointers = [pointers]
assert len(addresses) == len(pointers)
threads = [int(address.split('/')[0].split(':')[1]) for address in addresses]
for address, pointer, thread in zip(addresses, pointers, threads):
if orchestrator_utils.dict_address(pointer, experiment['params']) != "?":
raise Exception(f"pointer {pointer} is not intended to be written to")
if TRACKING[thread]['path'] is not None:
async with FILELOCKS[TRACKING[thread]['path']]:
with h5py.File(TRACKING[thread]['path'], 'r') as session:
val = session[f'run_{TRACKING[thread]["run"]}/'+address][()]
orchestrator_utils.dict_address_set(pointer, experiment['params'],val)
log.info(f'pointer {pointer} in params for experiment \
{TRACKING[mainthread]["experiment"]} in thread {mainthread} set to {val}')
else:
for history_track in TRACKING[thread]['history']:
if history_track['path'] == TRACKING[thread]['path']:
async with FILELOCKS[history_track['path']]:
with h5py.File(history_track['path'], 'r') as session:
try:
val = session[f'run_{history_track["run"]}/'+address][()]
except:
continue
orchestrator_utils.dict_address_set(pointer, experiment['params'],val)
print(f'pointer {pointer} in params for experiment \
{TRACKING[mainthread]["experiment"]} in thread {mainthread} \
set to {val} from history')
break
if orchestrator_utils.dict_address(pointer,experiment['params']) == '?':
raise Exception('modify failed to find address in history')
return experiment
[docs]async def wait(experiment: dict, addresses: Union[str,list]):
""" Pause experiment until given thread(s) complete(s) given action(s)
Args:
experiment (dict): dictionary containing experiment metadata
addresses (Union[str,list]): path(s) below run to awaited address(es),
i.e "experiment/action"
Returns:
experiment: dictionary containing experiment metadata
"""
global TRACKING,FILELOCKS
log.info(f"waiting on {addresses}")
if not isinstance(addresses, list):
addresses = [addresses]
threads = [int(address.split('/')[0].split(':')[1]) for address in addresses]
while addresses != []:
await asyncio.sleep(.1)
address_and_thread = list(zip(range(len(addresses)),copy(addresses), copy(threads)))
for i,address,thread in address_and_thread:
if TRACKING[thread]['path'] is not None:
async with FILELOCKS[TRACKING[thread]['path']]:
with h5py.File(TRACKING[thread]['path'], 'r') as session:
exp = address.split('/')[0]
action = address.split('/')[1]
if exp in session[f'run_{TRACKING[thread]["run"]}/'].keys():
if action in session[f'run_{TRACKING[thread]["run"]}/{exp}'].keys():
log.info(f"{addresses[i]} found")
del addresses[i]
del threads[i]
break
# waiting on the results of a session that already finished,
# check history for path & run
else:
for exp_history in TRACKING[thread]['history']:
if exp_history['path'] == TRACKING[thread]['path']:
async with FILELOCKS[exp_history['path']]:
with h5py.File(exp_history['path'], 'r') as session:
exp = address.split('/')[0]
action = address.split('/')[1]
if exp in session[f'run_{exp_history["run"]}/'].keys():
if action in session[f'run_{exp_history["run"]}/{exp}'].keys():
print(f"{addresses[i]} found in history")
del addresses[i]
del threads[i]
return experiment
# submit an experiment identical to the current one to the orchestrator thread.
# will have higher-than default priority, and will go before any intervening experiments
# that may have been submitted.
# an experiment should only have one call of repeat, and it should only be at the end of
# the experiment (unless followed by a finish command)
[docs]async def repeat(experiment: dict, number_of_repeat: int = 0, priority: int = 5):
""" Submit an experiment identical to the current one to the orchestrator thread.
Args:
experiment (dict): dictionary containing experiment metadata
n (int, optional): number of times to repeat after 1st experiment,
or 0 to repeat until forced to stop. Defaults to 0.
priority (int, optional): priority of experiment. Defaults to 5.
Returns:
_type_: _description_
"""
global INDEX
#copy current experiment
new_exp = experiment
if number_of_repeat == 1: #if n=1, repeating is finished.
del new_exp['params']['repeat']
del new_exp['soe'][-1]
elif number_of_repeat != 0: #else, decrement repeats left to go
new_exp['params']['repeat'] -= 1
#and of course, for n==0, we do nothing and it repeats forever
#then the new experiment is added to the appropriate queue with a higher-than-default priority
await EXPERIMENT_QUEUES[experiment['meta']['thread']].put((priority,INDEX,new_exp))
INDEX += 1
return experiment
[docs]@app.on_event("startup")
async def memory():
""" Initialize memory for orchestrator
"""
global TRACKING
# a dict of useful variables to keep track of
# every single experiment will have a tracking key here.
TRACKING = {}
global SCHEDULER_QUEUE
SCHEDULER_QUEUE = asyncio.PriorityQueue()
global TASK
TASK = asyncio.create_task(scheduler())
global EXPERIMENT_QUEUES
EXPERIMENT_QUEUES = {}
global EXPERIMENT_TASKS
EXPERIMENT_TASKS = {}
# two thread can access to the same file and same server at the same time
global FILELOCKS
FILELOCKS = {}
global SERVERLOCKS
SERVERLOCKS = {}
global LOOP
# for fixing the error handling
LOOP = asyncio.get_event_loop()
global ERROR
ERROR = LOOP.create_task(raise_exceptions())
global INDEX #assign a number to each experiment to retain order within priority queues
INDEX = 0
[docs]@app.on_event("shutdown")
def disconnect():
""" Disconnect from orchestrator
"""
global TASK, ERROR
if not ERROR.cancelled():
ERROR.cancel()
if not TASK.cancelled():
TASK.cancel()
for exp_task in EXPERIMENT_TASKS.values():
if not exp_task.cancelled():
exp_task.cancel()
[docs]@app.post("/orchestrator/clear")
def clear(thread: Optional[int] = None):
""" Empty queue for thread, or for all threads if no thread specified.
Args:
thread (Optional[int], optional): thread to clear. Defaults to None.
"""
global EXPERIMENT_QUEUES
if thread is None:
for k in EXPERIMENT_QUEUES.keys():
while not EXPERIMENT_QUEUES[k].empty():
EXPERIMENT_QUEUES[k].get_nowait()
elif thread in EXPERIMENT_QUEUES.keys():
while not EXPERIMENT_QUEUES[thread].empty():
EXPERIMENT_QUEUES[thread].get_nowait()
else:
print(f"thread {thread} not found")
# TODO add it to thinker having different buttons for each
# can not stop the action that is currently running , just after the action is done
[docs]@app.post("/orchestrator/kill")
def kill(thread: Optional[int] = None):
""" Empty queue and cancel current experiment for thread,
or for all threads if no thread specified
Args:
thread (Optional[int], optional): thread to kill. Defaults to None.
"""
global EXPERIMENT_TASKS,TRACKING
global EXPERIMENT_TASKS,TRACKING
clear(thread)
if thread is None:
for k in EXPERIMENT_TASKS.keys():
EXPERIMENT_TASKS[k].cancel()
del EXPERIMENT_TASKS[k]
EXPERIMENT_TASKS.update({k:LOOP.create_task(infl(k))})
exp_history = {'path':TRACKING[k]['path'],'run':TRACKING[k]['run']}
TRACKING[k] = {'path':None,'run':None,'experiment':None,'current_action':None,'status':'uninitialized','history':[exp_history]+TRACKING[k]['history']}
elif thread in EXPERIMENT_TASKS.keys():
EXPERIMENT_TASKS[thread].cancel()
del EXPERIMENT_TASKS[thread]
EXPERIMENT_TASKS.update({thread:LOOP.create_task(infl(thread))})
exp_history = {'path':TRACKING[thread]['path'],'run':TRACKING[thread]['run']}
TRACKING[thread] = {'path':None,'run':None,'experiment':None,'current_action':None,'status':'uninitialized','history':[exp_history]+TRACKING[thread]['history']}
else:
print(f"thread {thread} not found")
[docs]@app.post("/orchestrator/pause")
def pause(thread: Optional[int] = None):
""" Pause current experiment for thread, or for all threads if no thread specified
Args:
thread (Optional[int], optional): thread to pause. Defaults to None.
"""
global TRACKING
if thread is None:
for k in TRACKING.keys():
if TRACKING[k]['status'] == 'running':
TRACKING[k]['status'] = 'paused'
log.info(f"thread {k} paused")
else:
log.info(f'attempted to pause thread {k}, but status was {TRACKING[k]["status"]}')
elif thread in TRACKING.keys():
if TRACKING[thread]['status'] == 'running':
TRACKING[thread]['status'] = 'paused'
log.info(f"thread {thread} paused")
else:
log.info(f'attempted to pause thread {thread}, but status was {TRACKING[thread]["status"]}')
else:
log.info(f"thread {thread} not found")
[docs]@app.post("/orchestrator/resume")
def resume(thread: Optional[int] = None):
""" Resume current experiment for thread, or for all threads if no thread specified
Args:
thread (Optional[int], optional): thread to resume. Defaults to None.
"""
global TRACKING
if thread is None:
for k in TRACKING.keys():
if TRACKING[k]['status'] == 'paused':
TRACKING[k]['status'] = 'running'
log.info(f"thread {k} resumed")
else:
log.info(f'attempted to resume thread {k}, but status was {TRACKING[k]["status"]}')
elif thread in TRACKING.keys():
if TRACKING[thread]['status'] == 'paused':
TRACKING[thread]['status'] = 'running'
log.info(f"thread {thread} resumed")
else:
log.info(f'attempted to resume thread {thread}, but status was {TRACKING[thread]["status"]}')
else:
log.info(f"thread {thread} not found")
[docs]@app.post("/orchestrator/getStatus")
def get_status():
""" Get status of all threads
Returns:
dict: status of all threads
"""
return TRACKING
[docs]async def raise_exceptions():
""" Check for exceptions. If found, print stack trace and cancel the infinite loop
Error handing within the infinite loop
"""
global TASK,EXPERIMENT_TASKS,ERROR
while True:
# check for errors every second (maybe this should be a different number?)
await asyncio.sleep(1)
try:
TASK.exception()
TASK.print_stack()
break
except:
pass
task_check = None
for task in EXPERIMENT_TASKS.values():
try:
task.exception()
task.print_stack()
task_check = task
break
except:
pass
try:
task_check.exception()
break
except:
pass
#if an error shows up anywhere, bring the whole house down.
# cancel all tasks
for task_value in EXPERIMENT_TASKS.values():
try:
task_value.cancel()
except:
pass
try:
TASK.cancel()
except:
pass
try:
ERROR.cancel()
except:
pass
[docs]@app.get("/orchestrator/getData")
async def get_data(thread:int, addresses:str ,mode:str, wait_time:float=.01):
""" Get data from the specified addresses
Args:
thread (int): thread to get data from
addresses (str): addresses to get data from
mode (str): mode to get data in
wait (float, optional): time to wait between requests. Defaults to .01.
Raises:
ValueError: _description_
Returns:
dict: data from the specified addresses
"""
await asyncio.sleep(wait_time)
data = []
try:
addresses = json.loads(addresses)
except:
addresses = [addresses]
if mode not in ['list','group','next']:
raise ValueError('invalid mode for getData')
path = TRACKING[thread]['path'] if \
TRACKING[thread]['path'] is not None else TRACKING[thread]['history'][0]['path']
run = TRACKING[thread]['run'] if \
TRACKING[thread]['path'] is not None else TRACKING[thread]['history'][0]['run']
if mode == 'list':
async with FILELOCKS[path]:
with h5py.File(path, 'r') as session:
experiments = list(session[f'run_{run}'].keys())
for experiment in experiments:
subdata = []
for address in addresses:
dpath = f'run_{run}/'+experiment+'/'+address
if orchestrator_utils.paths_in_hdf5(session,dpath):
datum = orchestrator_utils.hdf5_group_to_dict(session,dpath)
if isinstance(datum,numpy.ndarray):
datum = datum.tolist()
subdata.append(datum)
else:
log.info(f'address {address} not found in file')
if subdata != []:
data.append(subdata if len(subdata) != 1 else subdata[0])
elif mode == 'group':
async with FILELOCKS[path]:
with h5py.File(path, 'r') as session:
for address in addresses:
data.append(orchestrator_utils.hdf5_group_to_dict(session,f'run_{run}/{address}'))
elif mode == 'next':
new_exp = f'experiment_{TRACKING[thread]["experiment"]}:{thread}'
log.info(f'awaiting data at {new_exp}')
present = False
while not present:
async with FILELOCKS[path]:
with h5py.File(path, 'r') as session:
present = orchestrator_utils.paths_in_hdf5(path,[f'run_{run}/{new_exp}/{address}' for address in addresses])
await asyncio.sleep(.1)
for address in addresses:
async with FILELOCKS[path]:
with h5py.File(path, 'r') as session:
datum = orchestrator_utils.hdf5_group_to_dict(session,f'run_{run}/{new_exp}/{address}')
if isinstance(datum,numpy.ndarray):
datum = datum.tolist()
data.append(datum)
return data
[docs]def main():
"""Main entry point of the application."""
uvicorn.run(app, host= config['servers'][SERVERKEY]['host'], port= config['servers'][SERVERKEY]['port'])
if __name__ == "__main__":
main()