Skip to content

Event-driven models

So far everything we have built in Plugboard has been a discrete-time model. This means that the whole model advances step-wise, i.e. step gets called on each Component, calculating all of their outputs before advancing the simulation on.

In this tutorial we're going to introduce an event-driven model, where data can be passed around between components based on triggers that you can define. Event-based models can be useful in a variety of scenarios, for example when modelling parts moving around a production line, or to trigger expensive computation only when certain conditions are met in the model.

Event-based model

Here's the model that we're going to build. Given a stream of random numbers, we'll trigger HighEvent whenever the value is above 0.8 and LowEvent whenever the value is below 0.2. This allows us to funnel data into different parts of the model: in this case we'll just save the latest high/low values to a file at each step. In the diagram the dotted lines represent the flow of event data: FindHighLowValues will publish events, while CollectHigh and CollectLow will subscribe to receive high and low events respectively.

flowchart LR
    collect-high@{ shape: rounded, label: CollectHigh<br>**collect-high** } --> save-high@{ shape: rounded, label: FileWriter<br>**save-high** }
    collect-low@{ shape: rounded, label: CollectLow<br>**collect-low** } --> save-low@{ shape: rounded, label: FileWriter<br>**save-low** }
    random-generator@{ shape: rounded, label: Random<br>**random-generator** } --> find-high-low@{ shape: rounded, label: FindHighLowValues<br>**find-high-low** }
    low_event@{ shape: hex, label: LowEvent } -.-> collect-low@{ shape: rounded, label: CollectLow<br>**collect-low** }
    high_event@{ shape: hex, label: HighEvent } -.-> collect-high@{ shape: rounded, label: CollectHigh<br>**collect-high** }
    find-high-low@{ shape: rounded, label: FindHighLowValues<br>**find-high-low** } -.-> high_event@{ shape: hex, label: HighEvent }
    find-high-low@{ shape: rounded, label: FindHighLowValues<br>**find-high-low** } -.-> low_event@{ shape: hex, label: LowEvent }

Defining events

First we need to define the events that are going to get used in the model. Each event needs a name, in this case "high_event" and "low_event" and a data type associated with it. Use a Pydantic model to define the format of this data field.

class ExtremeValue(BaseModel):
    """Data for event_A."""

    value: float
    extreme_type: _t.Literal["high", "low"]


class HighEvent(Event):
    """High value event type."""

    type: _t.ClassVar[str] = "high_event"
    data: ExtremeValue


class LowEvent(Event):
    """Low value event type."""

    type: _t.ClassVar[str] = "low_event"
    data: ExtremeValue

Building components to create and consume events

So far all of our process models have run step-by-step until completion. When a model contains event-driven components, we need a way to tell them to stop at the end of the simulation, otherwise they will stay running and listening for events forever.

In this example, our Random component will drive the process by generating input random values. When it has completed iters iterations, we call self.io.close() to stop the model, causing other components in the model to shutdown.

class Random(Component):
    """Generates random numbers."""

    io = IOController(outputs=["value"])

    def __init__(self, iters: int = 50, **kwargs: _t.Unpack[ComponentArgsDict]) -> None:
        super().__init__(**kwargs)
        self.max_iters = iters
        self.completed_iters = 0

    async def step(self) -> None:
        self.completed_iters += 1
        self.value = random.random()
        if self.completed_iters >= self.max_iters:
            await self.io.close()

Next, we will define FindHighLowValues to identify high and low values in the stream of random numbers and publish HighEvent and LowEvent respectively.

class FindHighLowValues(Component):
    """Publishes an event on high or low values."""

    io = IOController(inputs=["value"], output_events=[LowEvent, HighEvent])  # (1)!

    def __init__(
        self,
        low_limit: float = 0.2,
        high_limit: float = 0.8,
        **kwargs: _t.Unpack[ComponentArgsDict],
    ) -> None:
        super().__init__(**kwargs)
        self.low_limit = low_limit
        self.high_limit = high_limit

    async def step(self) -> None:
        if self.value >= self.high_limit:
            self.io.queue_event(  # (2)!
                HighEvent(
                    source=self.name, data=ExtremeValue(value=self.value, extreme_type="high")
                )
            )
        if self.value <= self.low_limit:
            self.io.queue_event(
                LowEvent(source=self.name, data=ExtremeValue(value=self.value, extreme_type="low"))
            )
  1. See how we use the IOController to declare that this Component will publish events.
  2. Use self.io.queue_event to send an event from a Component. Here we are senging the HighEvent or LowEvent depending on the input value.

Finally, we need components to subscribe to these events and process them. Use the Event.handler decorator to identify the method on each Component that will do this processing.

class CollectHigh(Component):
    """Collects values from high events."""

    io = IOController(input_events=[HighEvent], outputs=["value"])  # (1)!

    def __init__(self, **kwargs: _t.Unpack[ComponentArgsDict]) -> None:
        super().__init__(**kwargs)
        self.latest_event: _t.Optional[ExtremeValue] = None

    async def step(self) -> None:
        self.value = self.latest_event.value if self.latest_event else None

    @HighEvent.handler  # (2)!
    async def handle_event(self, event: HighEvent) -> None:
        self.latest_event = event.data


class CollectLow(Component):
    """Collects values from low events."""

    io = IOController(input_events=[LowEvent], outputs=["value"])

    def __init__(self, **kwargs: _t.Unpack[ComponentArgsDict]) -> None:
        super().__init__(**kwargs)
        self.latest_event: _t.Optional[ExtremeValue] = None

    async def step(self) -> None:
        self.value = self.latest_event.value if self.latest_event else None

    @LowEvent.handler  # (3)!
    async def handle_event(self, event: LowEvent) -> None:
        self.latest_event = event.data
  1. Specify the events that this Component will subscribe to.
  2. Use this decorator to indicate that we handle HighEvent here...
  3. ...and we handle LowEvent here.

Note

In a real model you could define whatever logic you need inside your event handler, e.g. create a file, publish another event, etc. Here we just store the event on an attribute so that its value can be output via the step() method.

Putting it all together

Now we can create a Process from all these components. The outputs from CollectLow and CollectHigh are connected to separate FileWriter components so that we'll get a CSV file containing the latest high and low values at each step of the simulation.

Info

We need a few extra lines of code to create connectors for the event-based parts of the model. If you define your process in YAML this will be done automatically for you, but if you are defining the process in code then you will need to use the EventConnectorBuilder to do this.

components = [
    Random(name="random-generator"),
    FindHighLowValues(name="find-high-low", low_limit=0.2, high_limit=0.8),
    CollectHigh(name="collect-high"),
    CollectLow(name="collect-low"),
    FileWriter(name="save-high", path="high.csv", field_names=["value"]),
    FileWriter(name="save-low", path="low.csv", field_names=["value"]),
]
connect = lambda in_, out_: AsyncioConnector(spec=ConnectorSpec(source=in_, target=out_))
connectors = [  # (1)!
    connect("random-generator.value", "find-high-low.value"),
    connect("collect-high.value", "save-high.value"),
    connect("collect-low.value", "save-low.value"),
]
connector_builder = ConnectorBuilder(connector_cls=AsyncioConnector)  # (2)!
event_connector_builder = EventConnectorBuilder(connector_builder=connector_builder)
event_connectors = list(event_connector_builder.build(components).values())

process = LocalProcess(
    components=components,
    connectors=connectors + event_connectors,
)

async with process:
    await process.run()
  1. These connectors are for the normal, non-event driven parts of the model and connect Component` inputs and outputs.
  2. These lines will set up connectors for the events in the model.

Take a look at the high.csv and low.csv files: the first few rows will usually be empty, and then as soon as high or low values are identified they will start to appear in the CSVs. As usual, you can run this model from the CLI using plugboard process run model.yaml.