Source code for openclsim.model.while_activity

"""While activity for the simulation."""

import openclsim.core as core

from .base_activities import GenericActivity, RegisterSubProcesses
from .helpers import register_processes


class ConditionProcessMixin:
    """Mixin for the condition process."""

    def main_process_function(self, activity_log, env):
        condition_event = self.parse_expression(self.condition_event)

        start_time = env.now
        args_data = {
            "env": env,
            "activity_log": activity_log,
            "activity": self,
        }
        yield from self.pre_process(args_data)

        start_while = env.now

        activity_log.log_entry(
            t=env.now,
            activity_id=activity_log.id,
            activity_state=core.LogState.START,
        )

        repetitions = 1
        while True:
            self.start_sequence.succeed()
            for sub_process in self.sub_processes:
                activity_log.log_entry(
                    t=env.now,
                    activity_id=activity_log.id,
                    activity_state=core.LogState.START,
                    activity_label={
                        "type": "subprocess",
                        "ref": sub_process.id,
                    },
                )

                stop_event = self.parse_expression(
                    [
                        {
                            "type": "activity",
                            "state": "done",
                            "name": sub_process.name,
                        }
                    ]
                )
                yield stop_event

                activity_log.log_entry(
                    t=env.now,
                    activity_id=activity_log.id,
                    activity_state=core.LogState.STOP,
                    activity_label={
                        "type": "subprocess",
                        "ref": sub_process.id,
                    },
                )

            if repetitions >= self.max_iterations or condition_event.processed is True:
                break
            else:
                repetitions += 1

                # Reset the sequential start events of the subprocesses
                self.register_subprocesses()

                # Re-add the activities to the simpy environment
                register_processes(self.sub_processes)

        activity_log.log_entry(
            t=env.now,
            activity_id=activity_log.id,
            activity_state=core.LogState.STOP,
        )

        args_data["start_preprocessing"] = start_time
        args_data["start_activity"] = start_while
        yield from self.post_process(**args_data)


[docs]class WhileActivity(GenericActivity, ConditionProcessMixin, RegisterSubProcesses): """ WhileActivity Class forms a specific class for executing multiple activities in a dedicated order within a simulation. The while activity is a structural activity, which does not require specific resources. sub_processes the sub_processes which is executed in sequence in every iteration condition_event a condition event provided in the expression language which will stop the iteration as soon as the event is fulfilled. start_event the activity will start as soon as this event is triggered by default will be to start immediately """ # activity_log, env, stop_event, sub_processes, requested_resources, keep_resources def __init__(self, sub_processes, condition_event, show=False, *args, **kwargs): super().__init__(*args, **kwargs) """Initialization""" self.print = show self.sub_processes = sub_processes self.condition_event = condition_event self.max_iterations = 1_000_000 self.register_subprocesses = self.register_sequential_subprocesses self.register_subprocesses()
[docs]class RepeatActivity(GenericActivity, ConditionProcessMixin, RegisterSubProcesses): """ RepeatActivity Class forms a specific class for executing multiple activities in a dedicated order within a simulation. Parameters ---------- sub_processes the sub_processes which is executed in sequence in every iteration repetitions Number of times the subprocess is repeated start_event the activity will start as soon as this event is triggered by default will be to start immediately """ def __init__(self, sub_processes, repetitions: int, show=False, *args, **kwargs): super().__init__(*args, **kwargs) """Initialization""" self.print = show self.sub_processes = sub_processes self.max_iterations = repetitions self.condition_event = [ {"type": "activity", "state": "done", "name": self.name} ] self.register_subprocesses = self.register_sequential_subprocesses self.register_subprocesses()