Source code for openclsim.model

from functools import partial
import itertools
import openclsim.core as core
import datetime
import simpy
import shapely
import shapely.geometry
import scipy.interpolate
import scipy.integrate
import pandas as pd
import numpy as np


[docs]class Activity(core.Identifiable, core.Log): """The Activity Class forms a specific class for a single activity within a simulation. It deals with a single origin container, destination container and a single combination of equipment to move substances from the origin to the destination. It will initiate and suspend processes according to a number of specified conditions. To run an activity after it has been initialized call env.run() on the Simpy environment with which it was initialized. To check when a transportation of substances can take place, the Activity class uses three different condition arguments: start_condition, stop_condition and condition. These condition arguments should all be given a condition object which has a satisfied method returning a boolean value. True if the condition is satisfied, False otherwise. origin: object inheriting from HasContainer, HasResource, Locatable, Identifiable and Log destination: object inheriting from HasContainer, HasResource, Locatable, Identifiable and Log loader: object which will get units from 'origin' Container and put them into 'mover' Container should inherit from Processor, HasResource, Identifiable and Log after the simulation is complete, its log will contain entries for each time it started loading and stopped loading mover: moves to 'origin' if it is not already there, is loaded, then moves to 'destination' and is unloaded should inherit from Movable, HasContainer, HasResource, Identifiable and Log after the simulation is complete, its log will contain entries for each time it started moving, stopped moving, started loading / unloading and stopped loading / unloading unloader: gets amount from 'mover' Container and puts it into 'destination' Container should inherit from Processor, HasResource, Identifiable and Log after the simulation is complete, its log will contain entries for each time it started unloading and stopped unloading start_event: the activity will start as soon as this event is triggered by default will be to start immediately stop_event: the activity will stop (terminate) as soon as this event is triggered by default will be an event triggered when the destination container becomes full or the source container becomes empty """ def __init__( self, origin, destination, loader, mover, unloader, start_event=None, stop_event=None, show=False, *args, **kwargs ): super().__init__(*args, **kwargs) """Initialization""" self.origin = origin if type(origin) == list else [origin] self.destination = destination if type(destination) == list else [destination] self.start_event = ( start_event if start_event is None or isinstance(start_event, simpy.Event) else self.env.all_of(events=start_event) ) if type(stop_event) == list: stop_event = self.env.any_of(events=stop_event) if stop_event is not None: self.stop_event = stop_event elif start_event is None: stop_event = [] stop_event.extend(orig.container.empty_event for orig in self.origin) stop_event.extend(dest.container.full_event for dest in self.destination) self.stop_event = self.env.any_of(stop_event) elif start_event: stop_event = [] stop_event.extend(orig.container.get_empty_event for orig in self.origin) stop_event.extend( dest.container.get_full_event for dest in self.destination ) self.stop_event = stop_event self.stop_reservation_waiting_event = ( self.stop_event() if hasattr(self.stop_event, "__call__") else self.stop_event ) self.loader = loader self.mover = mover self.unloader = unloader self.print = show single_run_proc = partial( single_run_process, origin=self.origin, destination=self.destination, loader=self.loader, mover=self.mover, unloader=self.unloader, stop_reservation_waiting_event=self.stop_reservation_waiting_event, verbose=self.print, ) main_proc = partial( conditional_process, stop_event=self.stop_event, sub_processes=[single_run_proc], ) if start_event is not None: main_proc = partial( delayed_process, start_event=self.start_event, sub_processes=[main_proc] ) self.main_process = self.env.process(main_proc(activity_log=self, env=self.env))
[docs]def delayed_process(activity_log, env, start_event, sub_processes): """"Returns a generator which can be added as a process to a simpy.Environment. In the process the given sub_processes will be executed after the given start_event occurs. activity_log: the core.Log object in which log_entries about the activities progress will be added. env: the simpy.Environment in which the process will be run start_event: a simpy.Event object, when this event occurs the delayed process will start executing its sub_processes sub_processes: an Iterable of methods which will be called with the activity_log and env parameters and should return a generator which could be added as a process to a simpy.Environment the sub_processes will be executed sequentially, in the order in which they are given after the start_event occurs """ if hasattr(start_event, "__call__"): start_event = start_event() yield start_event activity_log.log_entry( "delayed activity started", env.now, -1, None, activity_log.id ) for sub_process in sub_processes: yield from sub_process(activity_log=activity_log, env=env)
[docs]def conditional_process(activity_log, env, stop_event, sub_processes): """Returns a generator which can be added as a process to a simpy.Environment. In the process the given sub_processes will be executed until the given stop_event occurs. If the stop_event occurs during the execution of the sub_processes, the conditional process will first complete all sub_processes (which are executed sequentially in the order in which they are given), before finishing its own process. activity_log: the core.Log object in which log_entries about the activities progress will be added. env: the simpy.Environment in which the process will be run stop_event: a simpy.Event object, when this event occurs, the conditional process will finish executing its current run of its sub_processes and then finish sub_processes: an Iterable of methods which will be called with the activity_log and env parameters and should return a generator which could be added as a process to a simpy.Environment the sub_processes will be executed sequentially, in the order in which they are given as long as the stop_event has not occurred. """ if activity_log.log["Message"]: if activity_log.log["Message"][-1] == "delayed activity started" and hasattr( stop_event, "__call__" ): stop_event = stop_event() if hasattr(stop_event, "__call__"): stop_event = stop_event() elif type(stop_event) == list: stop_event = env.any_of(events=[event() for event in stop_event]) while not stop_event.processed: for sub_process in sub_processes: yield from sub_process(activity_log=activity_log, env=env) activity_log.log_entry("stopped", env.now, -1, None, activity_log.id)
[docs]def sequential_process(activity_log, env, sub_processes): """Returns a generator which can be added as a process to a simpy.Environment. In the process the given sub_processes will be executed sequentially in the order in which they are given. activity_log: the core.Log object in which log_entries about the activities progress will be added. env: the simpy.Environment in which the process will be run sub_processes: an Iterable of methods which will be called with the activity_log and env parameters and should return a generator which could be added as a process to a simpy.Environment the sub_processes will be executed sequentially, in the order in which they are given """ activity_log.log_entry("sequential start", env.now, -1, None, activity_log.id) for sub_process in sub_processes: yield from sub_process(activity_log=activity_log, env=env) activity_log.log_entry("sequential stop", env.now, -1, None, activity_log.id)
[docs]def move_process(activity_log, env, mover, destination, engine_order=1.0): """Returns a generator which can be added as a process to a simpy.Environment. In the process, a move will be made by the mover, moving it to the destination. activity_log: the core.Log object in which log_entries about the activities progress will be added. env: the simpy.Environment in which the process will be run mover: moves from its current position to the destination should inherit from core.Movable destination: the location the mover will move to should inherit from core.Locatable engine_order: optional parameter specifying at what percentage of the maximum speed the mover should sail. for example, engine_order=0.5 corresponds to sailing at 50% of max speed """ activity_log.log_entry( "started move activity of {} to {}".format(mover.name, destination.name), env.now, -1, mover.geometry, activity_log.id, ) with mover.resource.request() as my_mover_turn: yield my_mover_turn mover.ActivityID = activity_log.id yield from mover.move(destination=destination, engine_order=engine_order) activity_log.log_entry( "completed move activity of {} to {}".format(mover.name, destination.name), env.now, -1, mover.geometry, activity_log.id, )
[docs]def single_run_process( activity_log, env, origin, destination, loader, mover, unloader, engine_order=1.0, filling=1.0, stop_reservation_waiting_event=None, verbose=False, ): """Returns a generator which can be added as a process to a simpy.Environment. In the process, a single run will be made by the given mover, transporting content from the origin to the destination. activity_log: the core.Log object in which log_entries about the activities progress will be added. env: the simpy.Environment in which the process will be run origin: object inheriting from HasContainer, HasResource, Locatable, Identifiable and Log destination: object inheriting from HasContainer, HasResource, Locatable, Identifiable and Log loader: object which will get units from 'origin' Container and put them into 'mover' Container should inherit from Processor, HasResource, Identifiable and Log after the simulation is complete, its log will contain entries for each time it started loading and stopped loading mover: moves to 'origin' if it is not already there, is loaded, then moves to 'destination' and is unloaded should inherit from Movable, HasContainer, HasResource, Identifiable and Log after the simulation is complete, its log will contain entries for each time it started moving, stopped moving, started loading / unloading and stopped loading / unloading unloader: gets amount from 'mover' Container and puts it into 'destination' Container should inherit from Processor, HasResource, Identifiable and Log after the simulation is complete, its log will contain entries for each time it started unloading and stopped unloading engine_order: optional parameter specifying at what percentage of the maximum speed the mover should sail. for example, engine_order=0.5 corresponds to sailing at 50% of max speed filling: optional parameter specifying at what percentage of the maximum capacity the mover should be loaded. for example, filling=0.5 corresponds to loading the mover up to 50% of its capacity stop_reservation_waiting_event: a simpy.Event, if there is no content available in the origin, or no space available in the destination, instead of performing a single run, the process will wait for new content or space to become available. If a stop_reservation_waiting_event is passed, this event will be combined through a simpy.AnyOf event with the event occurring when new content or space becomes available. This can be used to prevent waiting for new content or space indefinitely when we know it will not become available. verbose: optional boolean indicating whether additional debug prints should be given. """ # Required for logging from json if not hasattr(activity_log, "loader"): activity_log.loader = loader if not hasattr(activity_log, "mover"): activity_log.mover = mover if not hasattr(activity_log, "unloader"): activity_log.unloader = unloader amount, all_amounts = mover.determine_amount( origin, destination, loader, unloader, filling ) # Check if activity can start if hasattr(stop_reservation_waiting_event, "__call__"): stop_reservation_waiting_event = stop_reservation_waiting_event() elif type(stop_reservation_waiting_event) == list: stop_reservation_waiting_event = env.any_of( events=[event() for event in stop_reservation_waiting_event] ) # If the transported amount is larger than zero, start activity if 0 < amount: resource_requests = {} vrachtbrief = mover.determine_schedule(amount, all_amounts, origin, destination) origins = vrachtbrief[vrachtbrief["Type"] == "Origin"] destinations = vrachtbrief[vrachtbrief["Type"] == "Destination"] if verbose: print("Using " + mover.name + " to process " + str(amount)) activity_log.log_entry( "transporting start", env.now, amount, mover.geometry, activity_log.id ) # request the mover's resource yield from _request_resource(resource_requests, mover.resource) for i in origins.index: origin = origins.loc[i, "ID"] amount = float(origins.loc[i, "Amount"]) # move the mover to the origin (if necessary) if not mover.is_at(origin): yield from _move_mover( mover, origin, ActivityID=activity_log.id, engine_order=engine_order, verbose=verbose, ) yield from _request_resources_if_transfer_possible( env, resource_requests, origin, loader, mover, amount, mover.resource, activity_log.id, engine_order=engine_order, verbose=verbose, ) # load the mover yield from _shift_amount( env, loader, mover, mover.container.level + amount, origin, ActivityID=activity_log.id, verbose=verbose, ) # release the loader and origin resources (but always keep the mover requested) _release_resource( resource_requests, loader.resource, kept_resource=mover.resource ) # If the loader is not the origin, release the origin as well if origin.resource in resource_requests.keys(): _release_resource( resource_requests, origin.resource, kept_resource=mover.resource ) for i in destinations.index: destination = destinations.loc[i, "ID"] amount = float(destinations.loc[i, "Amount"]) # move the mover to the destination if not mover.is_at(destination): yield from _move_mover( mover, destination, ActivityID=activity_log.id, engine_order=engine_order, verbose=verbose, ) yield from _request_resources_if_transfer_possible( env, resource_requests, mover, unloader, destination, amount, mover.resource, activity_log.id, engine_order=engine_order, verbose=verbose, ) # unload the mover yield from _shift_amount( env, unloader, mover, mover.container.level - amount, destination, ActivityID=activity_log.id, verbose=verbose, ) # release the unloader, destination and mover requests _release_resource(resource_requests, unloader.resource) if destination.resource in resource_requests: _release_resource(resource_requests, destination.resource) if mover.resource in resource_requests: _release_resource(resource_requests, mover.resource) activity_log.log_entry( "transporting stop", env.now, amount, mover.geometry, activity_log.id ) else: origin_requested = 0 origin_left = 0 destination_requested = 0 destination_left = 0 for key in all_amounts.keys(): if "origin" in key: origin_requested += all_amounts[key] elif "destination" in key: destination_requested += all_amounts[key] if origin_requested == 0: events = [orig.container.reserve_get_available for orig in origin] activity_log.log_entry( "waiting origin reservation start", env.now, mover.container.level, mover.geometry, activity_log.id, ) yield _or_optional_event( env, env.any_of(events), stop_reservation_waiting_event ) activity_log.log_entry( "waiting origin reservation stop", env.now, mover.container.level, mover.geometry, activity_log.id, ) elif destination_requested == 0: events = [dest.container.reserve_put_available for dest in destination] activity_log.log_entry( "waiting destination reservation start", env.now, mover.container.level, mover.geometry, activity_log.id, ) yield _or_optional_event( env, env.any_of(events), stop_reservation_waiting_event ) activity_log.log_entry( "waiting destination reservation stop", env.now, mover.container.level, mover.geometry, activity_log.id, ) elif mover.container.level == mover.container.capacity: activity_log.log_entry( "waiting mover to finish start", env.now, mover.container.capacity, mover.geometry, activity_log.id, ) yield env.timeout(3600) activity_log.log_entry( "waiting mover to finish stop", env.now, mover.container.capacity, mover.geometry, activity_log.id, ) else: raise RuntimeError("Attempting to move content with a full ship")
def _or_optional_event(env, event, optional_event): """If the optional_event is None, the event is returned. Otherwise the event and optional_event are combined through an any_of event, thus returning an event that will trigger if either of these events triggers. Used by single_run_process to combine an event used to wait for a reservation to become available with its optional stop_reservation_waiting_event.""" if optional_event is None: return event return env.any_of(events=[event, optional_event]) def _request_resource(requested_resources, resource): """Requests the given resource and yields it. """ if resource not in requested_resources: requested_resources[resource] = resource.request() yield requested_resources[resource] def _release_resource(requested_resources, resource, kept_resource=None): """Releases the given resource, provided it does not equal the kept_resource parameter. Deletes the released resource from the requested_resources dictionary.""" if resource != kept_resource: resource.release(requested_resources[resource]) del requested_resources[resource] def _shift_amount( env, processor, mover, desired_level, site, ActivityID, verbose=False ): """Calls the processor.process method, giving debug print statements when verbose is True.""" amount = np.abs(mover.container.level - desired_level) # Set ActivityID to processor and mover processor.ActivityID = ActivityID mover.ActivityID = ActivityID # Check if loading or unloading yield from processor.process(mover, desired_level, site) if verbose: print("Processed {}:".format(amount)) print(" by: " + processor.name) print( " mover " + mover.name + " contains: " + str(mover.container.level) ) print(" site: " + site.name + " contains: " + str(site.container.level)) def _request_resources_if_transfer_possible( env, resource_requests, origin, processor, destination, amount, kept_resource, ActivityID, engine_order=1.0, verbose=False, ): """ Sets up everything needed for single_run_process to shift an amount from the origin to the destination using the processor.process method. After yielding from this method: - the origin and destination contain a valid amount of content/space for the transferring of an amount to be possible - resource requests will have been added for the origin, destination and processor if they were not present already - the processor will be located at the origin if it was not already If during the yield to wait for space, content or a resource, some other process has caused the available space or content to be removed, all resource requests granted up to that point will be released and the method will restart the process of requesting space, content and resources. The passed "kept_resource" will never be released. """ all_available = False while not all_available: # yield until enough content and space available in origin and destination yield env.all_of( events=[ origin.container.get_available(amount), destination.container.put_available(amount), ] ) yield from _request_resource(resource_requests, processor.resource) if ( origin.container.level < amount or destination.container.capacity - destination.container.level < amount ): # someone removed / added content while we were requesting the processor, so abort and wait for available # space/content again _release_resource( resource_requests, processor.resource, kept_resource=kept_resource ) continue if not processor.is_at(origin): # todo have the processor move simultaneously with the mover by starting a different process for it? yield from _move_mover( processor, origin, ActivityID=ActivityID, engine_order=engine_order, verbose=verbose, ) if ( origin.container.level < amount or destination.container.capacity - destination.container.level < amount ): # someone messed us up again, so return to waiting for space/content _release_resource( resource_requests, processor.resource, kept_resource=kept_resource ) continue yield from _request_resource(resource_requests, origin.resource) if ( origin.container.level < amount or destination.container.capacity - destination.container.level < amount ): _release_resource( resource_requests, processor.resource, kept_resource=kept_resource ) _release_resource( resource_requests, origin.resource, kept_resource=kept_resource ) continue yield from _request_resource(resource_requests, destination.resource) if ( origin.container.level < amount or destination.container.capacity - destination.container.level < amount ): _release_resource( resource_requests, processor.resource, kept_resource=kept_resource ) _release_resource( resource_requests, origin.resource, kept_resource=kept_resource ) _release_resource( resource_requests, destination.resource, kept_resource=kept_resource ) continue all_available = True def _move_mover(mover, origin, ActivityID, engine_order=1.0, verbose=False): """Calls the mover.move method, giving debug print statements when verbose is True.""" old_location = mover.geometry # Set ActivityID to mover mover.ActivityID = ActivityID yield from mover.move(origin, engine_order=engine_order) if verbose: print("Moved:") print( " object: " + mover.name + " contains: " + str(mover.container.level) ) print( " from: " + format(old_location.x, "02.5f") + " " + format(old_location.y, "02.5f") ) print( " to: " + format(mover.geometry.x, "02.5f") + " " + format(mover.geometry.y, "02.5f") )
[docs]class Simulation(core.Identifiable, core.Log): """The Simulation Class can be used to set up a full simulation using configuration dictionaries (json). sites: a list of dictionaries specifying which site objects should be constructed equipment: a list of dictionaries specifying which equipment objects should be constructed activities: list of dictionaries specifying which activities should be performed during the simulation Each of the values the sites and equipment lists, are a dictionary specifying "id", "name", "type" and "properties". Here "id" can be used to refer to this site / equipment in other parts of the configuration, "name" is used to initialize the objects name (required by core.Identifiable). The "type" must be a list of mixin class names which will be used to construct a dynamic class for the object. For example: ["HasStorage", "HasResource", "Locatable"]. The core.Identifiable and core.Log class will always be added automatically by the Simulation class. The "properties" must be a dictionary which is used to construct the arguments for initializing the object. For example, if "HasContainer" is included in the "type" list, the "properties" dictionary must include a "capacity" which has the value that will be passed to the constructor of HasContainer. In this case, the "properties" dictionary can also optionally specify the "level". Each of the values of the activities list, is a dictionary specifying an "id", "type", and other fields depending on the type. The supported types are "move", "single_run", "sequential", "conditional", and "delayed". For a "move" type activity, the dictionary should also contain a "mover", "destination" and can optionally contain a "moverProperties" dictionary containing an "engineOrder". For a "single_run" type activity, the dictionary should also contain an "origin", "destination", "loader", "mover", "unloader" and can optionally contain a "moverProperties" dictionary containing an "engineOrder" and/or "load". For a "sequential" type activity, the dictionary should also contain "activities". This is a list off activities (dictionaries as before) which will be performed until sequentially in the order in which they appear in the list. For a "conditional" type activity, the dictionary should also contain a "condition" and "activities", where the "activities" is another list of activities which will be performed until the event corresponding with the condition occurs. For a "delayed" type activity, the dictionary should also contain a "condition" and "activities", where the "activities" is another list of activities which will be performed after the event corresponding with the condition occurs. The "condition" of a "conditional" or "delayed" type activity is a dictionary containing an "operator" and one other field depending on the type. The operator can be "is_full", "is_empty", "is_done", "any_of" and "all_of". For the "is_full" operator, the dictionary should contain an "operand" which must be the id of the object (site or equipment) of which the container should be full for the event to occur. For the "is_empty" operator, the dictionary should contain an "operand" which must be the id of the object (site or equipment) of which the container should be empty for the event to occur. For the "is_done" operator, the dictionary should contain an "operand" which must the the id of an activity which should be finished for the event to occur. To instantiate such an event, the operand activity must already be instantiated. The Simulation class takes care of instantiating its activities in an order which ensures this is the case. However, if there is no such order because activities contain "is_done" conditions which circularly reference each other, a ValueError will be raised. For the "any_of" operator, the dictionary should contain "conditions", a list of (sub)conditions of which any must occur for the event to occur. For the "all_of" operator, the dictionary should contain "conditions", a list of (sub)conditions which all must occur for the event to occur. """ def __init__(self, sites, equipment, activities, *args, **kwargs): super().__init__(*args, **kwargs) self.__init_sites(sites) self.__init_equipment(equipment) self.__init_activities(activities) def __init_sites(self, sites): self.sites = {} for site in sites: self.sites[site["id"]] = self.__init_object_from_json(site) def __init_equipment(self, equipment): self.equipment = {} for equipment_piece in equipment: self.equipment[equipment_piece["id"]] = self.__init_object_from_json( equipment_piece ) def __init_activities(self, activities): self.activities = {} activity_log_class = type("ActivityLog", (core.Log, core.Identifiable), {}) uninstantiated_activities = activities while len(uninstantiated_activities) > 0: still_uninstantiated = self.__try_to_init_activities( activities, activity_log_class ) if len(still_uninstantiated) == len(uninstantiated_activities): raise ValueError( "Unable to instantiate activities {}; their is_done conditions form a circle.".format( ", ".join( activity["id"] for activity in uninstantiated_activities ) ) ) uninstantiated_activities = still_uninstantiated def __try_to_init_activities(self, activities, activity_log_class): failed_activities = [] for activity in activities: successful = self.__try_to_init_activity(activity, activity_log_class) if not successful: failed_activities.append(activity) return failed_activities def __try_to_init_activity(self, activity, activity_log_class): try: process_control = self.get_process_control(activity) except KeyError: return False id = activity["id"] activity_log = activity_log_class(env=self.env, name=id) process = self.env.process( process_control(activity_log=activity_log, env=self.env) ) self.activities[id] = {"activity_log": activity_log, "process": process} return True
[docs] def get_process_control(self, activity, stop_reservation_waiting_event=None): activity_type = activity["type"] if activity_type == "move": mover = self.equipment[activity["mover"]] mover_properties = self.get_mover_properties_kwargs(activity) destination = self.sites[activity["destination"]] kwargs = {"mover": mover, "destination": destination} if "engine_order" in mover_properties: kwargs["engine_order"] = mover_properties["engine_order"] return partial(move_process, **kwargs) if activity_type == "single_run": kwargs = self.get_mover_properties_kwargs(activity) kwargs["mover"] = self.equipment[activity["mover"]] if type(self.sites[activity["origin"]]) == list: kwargs["origin"] = self.sites[activity["origin"]] else: kwargs["origin"] = [self.sites[activity["origin"]]] if type(self.sites[activity["destination"]]) == list: kwargs["destination"] = self.sites[activity["destination"]] else: kwargs["destination"] = [self.sites[activity["destination"]]] kwargs["loader"] = self.equipment[activity["loader"]] kwargs["unloader"] = self.equipment[activity["unloader"]] if stop_reservation_waiting_event is not None: kwargs[ "stop_reservation_waiting_event" ] = stop_reservation_waiting_event return partial(single_run_process, **kwargs) if activity_type == "conditional": stop_event = self.get_condition_event(activity["condition"]) sub_processes = [ self.get_process_control(act, stop_reservation_waiting_event=stop_event) for act in activity["activities"] ] return partial( conditional_process, stop_event=stop_event, sub_processes=sub_processes ) if activity_type == "sequential": sub_processes = [ self.get_process_control(act) for act in activity["activities"] ] return partial(sequential_process, sub_processes=sub_processes) if activity_type == "delayed": sub_processes = [ self.get_process_control(act) for act in activity["activities"] ] start_event = self.get_condition_event(activity["condition"]) return partial( delayed_process, start_event=start_event, sub_processes=sub_processes ) raise ValueError("Unrecognized activity type: " + activity_type)
[docs] @staticmethod def get_mover_properties_kwargs(activity): if "moverProperties" not in activity: return {} kwargs = {} mover_options = activity["moverProperties"] if "engineOrder" in mover_options: kwargs["engine_order"] = mover_options["engineOrder"] if "load" in mover_options: kwargs["filling"] = mover_options["load"] return kwargs
[docs] def get_level_event_operand(self, condition): operand_key = condition["operand"] try: operand = ( self.sites[operand_key] if operand_key in self.sites else self.equipment[operand_key] ) except KeyError: # rethrow a KeyError as a ValueError to avoid assuming there is a circular dependency raise ValueError( 'No object with id "{}" present in configuration'.format(operand_key) ) return operand
[docs] def get_sub_condition_events(self, condition): conditions = condition["conditions"] events = [self.get_condition_event(condition) for condition in conditions] return events
[docs] def get_condition_event(self, condition): operator = condition["operator"] if operator == "is_full": operand = self.get_level_event_operand(condition) return operand.container.get_full_event elif operator == "is_empty": operand = self.get_level_event_operand(condition) return operand.container.get_empty_event elif operator == "is_done": operand_key = condition["operand"] return self.activities[operand_key][ "process" ] # potential KeyError is caught in try_to_init_activity elif operator == "any_of": sub_events = self.get_sub_condition_events(condition) return self.env.any_of(events=sub_events) elif operator == "all_of": sub_events = self.get_sub_condition_events(condition) return self.env.all_of(events=sub_events) else: raise ValueError("Unrecognized operator type: " + operator)
def __init_object_from_json(self, object_json): class_name = object_json["id"] name = object_json["name"] type = object_json["type"] properties = object_json["properties"] klass = get_class_from_type_list(class_name, type) kwargs = get_kwargs_from_properties(self.env, name, properties, self.sites) try: new_object = klass(**kwargs) except TypeError as type_err: # create a useful error message message_template = "Unable to instantiate {} for {}: {} with arguments {}" message = message_template.format(klass, class_name, type_err, kwargs) raise ValueError(message) add_object_properties(new_object, properties) return new_object
[docs] def get_logging(self): json = {} sites_logging = [] for key, site in self.sites.items(): sites_logging.append( self.get_as_feature_collection(key, site.get_log_as_json()) ) json["sites"] = sites_logging equipment_logging = [] for key, equipment in self.equipment.items(): equipment_logging.append( self.get_as_feature_collection(key, equipment.get_log_as_json()) ) json["equipment"] = equipment_logging activity_logging = [] for key, activity in self.activities.items(): activity_logging.append( self.get_as_feature_collection( key, activity["activity_log"].get_log_as_json() ) ) json["activities"] = activity_logging return json
[docs] @staticmethod def get_as_feature_collection(id, features): return dict(type="FeatureCollection", id=id, features=features)
[docs]def get_class_from_type_list(class_name, type_list): mixin_classes = ( [core.Identifiable, core.Log] + [string_to_class(text) for text in type_list] + [core.DebugArgs] ) return type(class_name, tuple(mixin_classes), {})
[docs]def string_to_class(text): try: return getattr(core, text) except AttributeError: raise ValueError("Invalid core class name given: " + text)
[docs]def get_kwargs_from_properties(environment, name, properties, sites): kwargs = {"env": environment, "name": name} # some checks on the configuration could be added here, # for example, if both level and capacity are given, is level <= capacity, level >= 0, capacity >= 0 etc. # for compute functions: # - check if there are enough entries for interp1d / interp2d, # - check if functions of for example level have a range from 0 to max level (capacity) # Locatable if "geometry" in properties: kwargs["geometry"] = shapely.geometry.asShape(properties["geometry"]).centroid if "location" in properties: kwargs["geometry"] = sites[properties["location"]].geometry # HasContainer if "capacity" in properties: kwargs["capacity"] = properties["capacity"] if "level" in properties: kwargs["level"] = properties["level"] # HasCosts if "dayrate" in properties: kwargs["dayrate"] = properties["dayrate"] elif "weekrate" in properties: kwargs["weekrate"] = properties["weekrate"] # HasPlume if "sigma_d" in properties: kwargs["sigma_d"] = properties["sigma_d"] if "sigma_o" in properties: kwargs["sigma_o"] = properties["sigma_o"] if "sigma_p" in properties: kwargs["sigma_p"] = properties["sigma_p"] if "f_sett" in properties: kwargs["f_sett"] = properties["f_sett"] if "f_trap" in properties: kwargs["f_trap"] = properties["f_trap"] # HasSpillCondition if "conditions" in properties: condition_list = properties["conditions"] condition_objects = [ core.SpillCondition( **get_spill_condition_kwargs(environment, condition_dict) ) for condition_dict in condition_list ] kwargs["conditions"] = condition_objects # HasWeather if "weather" in properties: df = pd.DataFrame(properties["weather"]) df.index = df["time"].apply(datetime.datetime.fromtimestamp) df = df.drop(columns=["time"]) kwargs["dataframe"] = df.rename(columns={"tide": "Tide", "hs": "Hs"}) if "bed" in properties: kwargs["bed"] = properties["bed"] # HasWorkabilityCriteria # todo Movable has the same parameter v, so this value might be overwritten by speed! if "v" in properties: kwargs["v"] = properties["v"] # HasDepthRestriction if "draught" in properties: df = pd.DataFrame(properties["draught"]) df["filling_degree"] = df["level"] / kwargs["capacity"] kwargs["compute_draught"] = scipy.interpolate.interp1d( df["filling_degree"], df["draught"] ) if "waves" in properties: kwargs["waves"] = properties["waves"] if "ukc" in properties: kwargs["ukc"] = properties["ukc"] # Routable arguments: route -> todo figure out how this would appear in properties and can be parsed into kwargs # ContainerDependentMovable & Movable if "speed" in properties: speed = properties["speed"] if isinstance(speed, list): df = pd.DataFrame(speed) df["filling_degree"] = df["level"] / kwargs["capacity"] compute_function = scipy.interpolate.interp1d( df["filling_degree"], df["speed"] ) kwargs["compute_v"] = compute_function v_empty = compute_function(0) v_full = compute_function(1) else: kwargs["v"] = speed v_empty = speed v_full = speed else: # set v_empty and v_full to dummy values to ensure we can instantiate energyUseSailing v_empty = 1.0 v_full = 1.0 # EnergyUse if "energyUseSailing" in properties: energy_use_sailing_dict = properties["energyUseSailing"] max_propulsion = energy_use_sailing_dict["maxPropulsion"] boardnet = energy_use_sailing_dict["boardnet"] kwargs["energy_use_sailing"] = partial( energy_use_sailing, speed_max_full=v_full, speed_max_empty=v_empty, propulsion_power_max=max_propulsion, boardnet_power=boardnet, ) # EnergyUse if "energyUseLoading" in properties: kwargs["energy_use_loading"] = partial( energy_use_processing, constant_hourly_use=properties["energyUseLoading"] ) if "energyUseUnloading" in properties: kwargs["energy_use_unloading"] = partial( energy_use_processing, constant_hourly_use=properties["energyUseUnloading"] ) # HasResource if "nr_resources" in properties: kwargs["nr_resources"] = properties["nr_resources"] # Processor if "loadingRate" in properties: kwargs["loading_rate"] = get_loading_func(properties["loadingRate"]) if "unloadingRate" in properties: kwargs["unloading_rate"] = get_unloading_func(properties["unloadingRate"]) return kwargs
[docs]def add_object_properties(new_object, properties): # HasSoil if "layers" in properties: layer_list = properties["layers"] layer_objects = [ core.SoilLayer(i, **layer_dict) for i, layer_dict in enumerate(layer_list) ] new_object.add_layers(layer_objects)
[docs]def get_spill_condition_kwargs(environment, condition_dict): kwargs = {} kwargs["spill_limit"] = condition_dict["limit"] initial_time = datetime.datetime.fromtimestamp(environment.now) kwargs["start"] = initial_time + datetime.timedelta(days=condition_dict["start"]) kwargs["end"] = initial_time + datetime.timedelta(days=condition_dict["end"]) return kwargs
[docs]def get_compute_function(table_entry_list, x_key, y_key): df = pd.DataFrame(table_entry_list) return scipy.interpolate.interp1d(df[x_key], df[y_key])
[docs]def get_loading_func(property): """Returns a loading_func based on the given input property. Input can be a flat rate or a table defining the rate depending on the level. In the second case, note that by definition the rate is the derivative of the level with respect to time. Therefore d level / dt = f(level), from which we can obtain that the time taken for loading can be calculated by integrating 1 / f(level) from current_level to desired_level.""" if isinstance(property, list): # given property is a list of data points rate_function = get_compute_function(property, "level", "rate") inversed_rate_function = lambda x: 1 / rate_function(x) return lambda current_level, desired_level: scipy.integrate.quad( inversed_rate_function, current_level, desired_level )[0] else: # given property is a flat rate return property
[docs]def get_unloading_func(property): """Returns an unloading_funct based on the given input property. Input can be a flat rate or a table defining the rate depending on the level. In the second case, note that by definition the rate is -1 times the derivative of the level with respect to time. Therefore d level / dt = - f(level), from which we can obtain the the time taken for unloading can be calculated by integrating 1 / f(level) from desired_level to current_level.""" if isinstance(property, list): # given property is a list of data points rate_function = get_compute_function(property, "level", "rate") inversed_rate_function = lambda x: 1 / rate_function(x) return lambda current_level, desired_level: scipy.integrate.quad( inversed_rate_function, desired_level, current_level )[0] else: # given property is a flat rate return property
[docs]def energy_use_sailing( distance, current_speed, filling_degree, speed_max_full, speed_max_empty, propulsion_power_max, boardnet_power, ): duration_seconds = distance / current_speed duration_hours = duration_seconds / 3600 speed_factor_full = current_speed / speed_max_full speed_factor_empty = current_speed / speed_max_empty energy_use_sailing_full = duration_hours * ( speed_factor_full ** 3 * propulsion_power_max + boardnet_power * 0.6 ) energy_use_sailing_empty = duration_hours * ( speed_factor_empty ** 3 * propulsion_power_max + boardnet_power * 0.6 ) return ( filling_degree * (energy_use_sailing_full - energy_use_sailing_empty) + energy_use_sailing_empty )
[docs]def energy_use_processing(duration_seconds, constant_hourly_use): duration_hours = duration_seconds / 3600 return duration_hours * constant_hourly_use