Event-driven models
So far everything we have built in Plugboard has been a discrete-time model. This means that the whole model advances step-wise, i.e. step gets called on each Component, calculating all of their outputs before advancing the simulation on.
In this tutorial we're going to introduce an event-driven model, where data can be passed around between components based on triggers that you can define. Event-based models can be useful in a variety of scenarios, for example when modelling parts moving around a production line, or to trigger expensive computation only when certain conditions are met in the model.
Event-based model
Here's the model that we're going to build. Given a stream of random numbers, we'll trigger HighEvent whenever the value is above 0.8 and LowEvent whenever the value is below 0.2. This allows us to funnel data into different parts of the model: in this case we'll just save the latest high/low values to a file at each step. In the diagram the dotted lines represent the flow of event data: FindHighLowValues will publish events, while CollectHigh and CollectLow will subscribe to receive high and low events respectively.
flowchart LR
collect-high@{ shape: rounded, label: CollectHigh<br>**collect-high** } --> save-high@{ shape: rounded, label: FileWriter<br>**save-high** }
collect-low@{ shape: rounded, label: CollectLow<br>**collect-low** } --> save-low@{ shape: rounded, label: FileWriter<br>**save-low** }
random-generator@{ shape: rounded, label: Random<br>**random-generator** } --> find-high-low@{ shape: rounded, label: FindHighLowValues<br>**find-high-low** }
low_event@{ shape: hex, label: LowEvent } -.-> collect-low@{ shape: rounded, label: CollectLow<br>**collect-low** }
high_event@{ shape: hex, label: HighEvent } -.-> collect-high@{ shape: rounded, label: CollectHigh<br>**collect-high** }
find-high-low@{ shape: rounded, label: FindHighLowValues<br>**find-high-low** } -.-> high_event@{ shape: hex, label: HighEvent }
find-high-low@{ shape: rounded, label: FindHighLowValues<br>**find-high-low** } -.-> low_event@{ shape: hex, label: LowEvent }
Defining events
First we need to define the events that are going to get used in the model. Each event needs a name, in this case "high_event" and "low_event" and a data type associated with it. Use a Pydantic model to define the format of this data field.
class ExtremeValue(BaseModel):
"""Data for event_A."""
value: float
extreme_type: _t.Literal["high", "low"]
class HighEvent(Event):
"""High value event type."""
type: _t.ClassVar[str] = "high_event"
data: ExtremeValue
class LowEvent(Event):
"""Low value event type."""
type: _t.ClassVar[str] = "low_event"
data: ExtremeValue
Building components to create and consume events
So far all of our process models have run step-by-step until completion. When a model contains event-driven components, we need a way to tell them to stop at the end of the simulation, otherwise they will stay running and listening for events forever.
In this example, our Random component will drive the process by generating input random values. When it has completed iters iterations, we call self.io.close() to stop the model, causing other components in the model to shutdown.
class Random(Component):
"""Generates random numbers."""
io = IOController(outputs=["value"])
def __init__(self, iters: int = 50, **kwargs: _t.Unpack[ComponentArgsDict]) -> None:
super().__init__(**kwargs)
self.max_iters = iters
self.completed_iters = 0
async def step(self) -> None:
self.completed_iters += 1
self.value = random.random()
if self.completed_iters >= self.max_iters:
await self.io.close()
Next, we will define FindHighLowValues to identify high and low values in the stream of random numbers and publish HighEvent and LowEvent respectively.
class FindHighLowValues(Component):
"""Publishes an event on high or low values."""
io = IOController(inputs=["value"], output_events=[LowEvent, HighEvent]) # (1)!
def __init__(
self,
low_limit: float = 0.2,
high_limit: float = 0.8,
**kwargs: _t.Unpack[ComponentArgsDict],
) -> None:
super().__init__(**kwargs)
self.low_limit = low_limit
self.high_limit = high_limit
async def step(self) -> None:
if self.value >= self.high_limit:
self.io.queue_event( # (2)!
HighEvent(
source=self.name, data=ExtremeValue(value=self.value, extreme_type="high")
)
)
if self.value <= self.low_limit:
self.io.queue_event(
LowEvent(source=self.name, data=ExtremeValue(value=self.value, extreme_type="low"))
)
- See how we use the
IOControllerto declare that thisComponentwill publish events. - Use
self.io.queue_eventto send an event from aComponent. Here we are senging theHighEventorLowEventdepending on the input value.
Finally, we need components to subscribe to these events and process them. Use the Event.handler decorator to identify the method on each Component that will do this processing.
class CollectHigh(Component):
"""Collects values from high events."""
io = IOController(input_events=[HighEvent], outputs=["value"]) # (1)!
def __init__(self, **kwargs: _t.Unpack[ComponentArgsDict]) -> None:
super().__init__(**kwargs)
self.latest_event: _t.Optional[ExtremeValue] = None
async def step(self) -> None:
self.value = self.latest_event.value if self.latest_event else None
@HighEvent.handler # (2)!
async def handle_event(self, event: HighEvent) -> None:
self.latest_event = event.data
class CollectLow(Component):
"""Collects values from low events."""
io = IOController(input_events=[LowEvent], outputs=["value"])
def __init__(self, **kwargs: _t.Unpack[ComponentArgsDict]) -> None:
super().__init__(**kwargs)
self.latest_event: _t.Optional[ExtremeValue] = None
async def step(self) -> None:
self.value = self.latest_event.value if self.latest_event else None
@LowEvent.handler # (3)!
async def handle_event(self, event: LowEvent) -> None:
self.latest_event = event.data
- Specify the events that this
Componentwill subscribe to. - Use this decorator to indicate that we handle
HighEventhere... - ...and we handle
LowEventhere.
Note
In a real model you could define whatever logic you need inside your event handler, e.g. create a file, publish another event, etc. Here we just store the event on an attribute so that its value can be output via the step() method.
Putting it all together
Now we can create a Process from all these components. The outputs from CollectLow and CollectHigh are connected to separate FileWriter components so that we'll get a CSV file containing the latest high and low values at each step of the simulation.
Info
We need a few extra lines of code to create connectors for the event-based parts of the model. If you define your process in YAML this will be done automatically for you, but if you are defining the process in code then you will need to use the EventConnectorBuilder to do this.
components = [
Random(name="random-generator"),
FindHighLowValues(name="find-high-low", low_limit=0.2, high_limit=0.8),
CollectHigh(name="collect-high"),
CollectLow(name="collect-low"),
FileWriter(name="save-high", path="high.csv", field_names=["value"]),
FileWriter(name="save-low", path="low.csv", field_names=["value"]),
]
connect = lambda in_, out_: AsyncioConnector(spec=ConnectorSpec(source=in_, target=out_))
connectors = [ # (1)!
connect("random-generator.value", "find-high-low.value"),
connect("collect-high.value", "save-high.value"),
connect("collect-low.value", "save-low.value"),
]
connector_builder = ConnectorBuilder(connector_cls=AsyncioConnector) # (2)!
event_connector_builder = EventConnectorBuilder(connector_builder=connector_builder)
event_connectors = list(event_connector_builder.build(components).values())
process = LocalProcess(
components=components,
connectors=connectors + event_connectors,
)
async with process:
await process.run()
- These connectors are for the normal, non-event driven parts of the model and connect
Component` inputs and outputs. - These lines will set up connectors for the events in the model.
Take a look at the high.csv and low.csv files: the first few rows will usually be empty, and then as soon as high or low values are identified they will start to appear in the CSVs. As usual, you can run this model from the CLI using plugboard process run model.yaml.