Source code for openclsim.model.base_activities

"""Base classes for the openclsim activities."""

from abc import ABC

import simpy

import openclsim.core as core


[docs]class AbstractPluginClass(ABC): """ Abstract class used as the basis for all Classes implementing a plugin for a specific Activity. Instance checks will be performed on this class level. """ def __init__(self): pass
[docs] def pre_process(self, env, activity_log, activity, *args, **kwargs): return {}
[docs] def post_process( self, env, activity_log, activity, start_preprocessing, start_activity, *args, **kwargs, ): return {}
[docs] def validate(self): pass
class RegisterSubProcesses: """Mixin for the activities that want to execute their sub_processes in sequence.""" def register_sequential_subprocesses(self): self.start_sequence = self.env.event() for (i, sub_process) in enumerate(self.sub_processes): if i == 0: sub_process.start_event_parent = self.start_sequence else: sub_process.start_event_parent = { "type": "activity", "state": "done", "name": self.sub_processes[i - 1].name, } for sub_process in self.sub_processes: if hasattr(sub_process, "register_subprocesses"): sub_process.register_subprocesses() def register_parallel_subprocesses(self): self.start_parallel = self.env.event() for (i, sub_process) in enumerate(self.sub_processes): sub_process.start_event_parent = self.start_parallel if hasattr(sub_process, "register_subprocesses"): sub_process.register_subprocesses()
[docs]class PluginActivity(core.Identifiable, core.Log): """ Base class for all activities which will provide a plugin mechanism. The plugin mechanism foresees that the plugin function pre_process is called before the activity is executed, while the function post_process is called after the activity has been executed. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.plugins = list()
[docs] def register_plugin(self, plugin, priority=0): self.plugins.append({"priority": priority, "plugin": plugin}) self.plugins = sorted(self.plugins, key=lambda x: x["priority"])
[docs] def pre_process(self, args_data): # iterating over all registered plugins for this activity calling pre_process for item in self.plugins: yield from item["plugin"].pre_process(**args_data)
[docs] def post_process(self, *args, **kwargs): # iterating over all registered plugins for this activity calling post_process for item in self.plugins: yield from item["plugin"].post_process(*args, **kwargs)
[docs] def delay_processing(self, env, activity_label, activity_log, waiting): activity_log.log_entry( t=env.now, activity_id=activity_log.id, activity_state=core.LogState.WAIT_START, activity_label=activity_label, ) yield env.timeout(waiting) activity_log.log_entry( t=env.now, activity_id=activity_log.id, activity_state=core.LogState.WAIT_STOP, activity_label=activity_label, )
[docs]class GenericActivity(PluginActivity): """The GenericActivity Class forms a generic class which sets up all activites.""" def __init__( self, registry, start_event=None, requested_resources=dict(), keep_resources=list(), *args, **kwargs, ): super().__init__(*args, **kwargs) """Initialization""" self.registry = registry self.start_event = start_event self.requested_resources = requested_resources self.keep_resources = keep_resources self.done_event = self.env.event()
[docs] def register_process(self): # replace the events self.done_event = self.env.event() if hasattr(self, "start_sequence") and self.start_sequence.triggered: self.start_sequence = self.env.event() if hasattr(self, "start_parallel") and self.start_parallel.triggered: self.start_parallel = self.env.event() # add the activity withs start event to the simpy environment self.main_process = self.env.process( self.delayed_process(activity_log=self, env=self.env) ) # add activity to the registry self.registry.setdefault("name", {}).setdefault(self.name, set()).add(self) self.registry.setdefault("id", {}).setdefault(self.id, set()).add(self)
[docs] def parse_expression(self, expr): if isinstance(expr, simpy.Event): return expr if isinstance(expr, list): return self.env.all_of([self.parse_expression(item) for item in expr]) if isinstance(expr, dict): if "and" in expr: return self.env.all_of( [self.parse_expression(item) for item in expr["and"]] ) if "or" in expr: return self.env.any_of( [self.parse_expression(item) for item in expr["or"]] ) if expr.get("type") == "container": id_ = expr.get("id_", "default") obj = expr["concept"] if expr["state"] == "full": return obj.container.get_full_event(id_=id_) elif expr["state"] == "empty": return obj.container.get_empty_event(id_=id_) raise ValueError if expr.get("type") == "activity": if expr.get("state") != "done": raise ValueError( f"Unknown state {expr.get('state')} in ActivityExpression." ) key = expr.get("ID", expr.get("name")) activity_ = self.registry.get("id", {}).get( key, self.registry.get("name", {}).get(key) ) if activity_ is None: raise Exception( f"No activity found in ActivityExpression for id/name {key}" ) return self.env.all_of( [activity_item.main_process for activity_item in activity_] ) raise ValueError raise ValueError( f"{type(expr)} is not a valid input type. Valid input types are: simpy.Event, dict, and list" )
[docs] def delayed_process( self, activity_log, env, ): """Return a generator which can be added as a process to a simpy environment.""" additional_logs = getattr(self, "additional_logs", []) start_event = ( None if self.start_event is None else self.parse_expression(self.start_event) ) if hasattr(self, "start_event_parent"): yield self.parse_expression(self.start_event_parent) start_time = env.now if start_event is not None: yield start_event if env.now > start_time: # log start activity_log.log_entry( t=start_time, activity_id=activity_log.id, activity_state=core.LogState.WAIT_START, ) for log in additional_logs: log.log_entry( t=start_time, activity_id=activity_log.id, activity_state=core.LogState.WAIT_START, activity_label={ "type": "additional log", "ref": self.id, }, ) # log stop activity_log.log_entry( t=env.now, activity_id=activity_log.id, activity_state=core.LogState.WAIT_STOP, ) for log in additional_logs: log.log_entry( t=env.now, activity_id=activity_log.id, activity_state=core.LogState.WAIT_STOP, activity_label={ "type": "additional log", "ref": self.id, }, ) yield from self.main_process_function(activity_log=self, env=self.env)
def _request_resource(self, requested_resources, resource): """Request the given resource and yields it.""" if resource not in requested_resources: requested_resources[resource] = resource.request() yield requested_resources[resource] def _release_resource(self, requested_resources, resource, kept_resource=None): """ Release the given resource, provided it does not equal the kept_resource parameter. Deletes the released resource from the requested_resources dictionary. """ if kept_resource is not None: if isinstance(kept_resource, list): if resource in [item.resource for item in kept_resource]: return elif resource == kept_resource.resource or resource == kept_resource: return if resource in requested_resources.keys(): resource.release(requested_resources[resource]) del requested_resources[resource]