Event-driven models
So far everything we have built in Plugboard has been a discrete-time model. This means that the whole model advances step-wise, i.e. step
gets called on each Component
, calculating all of their outputs before advancing the simulation on.
In this tutorial we're going to introduce an event-driven model, where data can be passed around between components based on triggers that you can define. Event-based models can be useful in a variety of scenarios, for example when modelling parts moving around a production line, or to trigger expensive computation only when certain conditions are met in the model.
Event-based model
Here's the model that we're going to build. Given a stream of random numbers, we'll trigger HighEvent
whenever the value is above 0.8
and LowEvent
whenever the value is below 0.2
. This allows us to funnel data into different parts of the model: in this case we'll just save the latest high/low values to a file at each step. In the diagram the dotted lines represent the flow of event data: FindHighLowValues
will publish events, while CollectHigh
and CollectLow
will subscribe to receive high and low events respectively.
flowchart LR
collect-high@{ shape: rounded, label: CollectHigh<br>**collect-high** } --> save-high@{ shape: rounded, label: FileWriter<br>**save-high** }
collect-low@{ shape: rounded, label: CollectLow<br>**collect-low** } --> save-low@{ shape: rounded, label: FileWriter<br>**save-low** }
random-generator@{ shape: rounded, label: Random<br>**random-generator** } --> find-high-low@{ shape: rounded, label: FindHighLowValues<br>**find-high-low** }
low_event@{ shape: hex, label: LowEvent } -.-> collect-low@{ shape: rounded, label: CollectLow<br>**collect-low** }
high_event@{ shape: hex, label: HighEvent } -.-> collect-high@{ shape: rounded, label: CollectHigh<br>**collect-high** }
find-high-low@{ shape: rounded, label: FindHighLowValues<br>**find-high-low** } -.-> high_event@{ shape: hex, label: HighEvent }
find-high-low@{ shape: rounded, label: FindHighLowValues<br>**find-high-low** } -.-> low_event@{ shape: hex, label: LowEvent }
Defining events
First we need to define the events that are going to get used in the model. Each event needs a name, in this case "high_event"
and "low_event"
and a data
type associated with it. Use a Pydantic model to define the format of this data
field.
class ExtremeValue(BaseModel):
"""Data for event_A."""
value: float
extreme_type: _t.Literal["high", "low"]
class HighEvent(Event):
"""High value event type."""
type: _t.ClassVar[str] = "high_event"
data: ExtremeValue
class LowEvent(Event):
"""Low value event type."""
type: _t.ClassVar[str] = "low_event"
data: ExtremeValue
Building components to create and consume events
So far all of our process models have run step-by-step until completion. When a model contains event-driven components, we need a way to tell them to stop at the end of the simulation, otherwise they will stay running and listening for events forever.
In this example, our Random
component will drive the process by generating input random values. When it has completed iters
iterations, we call self.io.close()
to stop the model, causing other components in the model to shutdown.
class Random(Component):
"""Generates random numbers."""
io = IOController(outputs=["value"])
def __init__(self, iters: int = 50, **kwargs: _t.Unpack[ComponentArgsDict]) -> None:
super().__init__(**kwargs)
self.max_iters = iters
self.completed_iters = 0
async def step(self) -> None:
self.completed_iters += 1
self.value = random.random()
if self.completed_iters >= self.max_iters:
await self.io.close()
Next, we will define FindHighLowValues
to identify high and low values in the stream of random numbers and publish HighEvent
and LowEvent
respectively.
class FindHighLowValues(Component):
"""Publishes an event on high or low values."""
io = IOController(inputs=["value"], output_events=[LowEvent, HighEvent]) # (1)!
def __init__(
self,
low_limit: float = 0.2,
high_limit: float = 0.8,
**kwargs: _t.Unpack[ComponentArgsDict],
) -> None:
super().__init__(**kwargs)
self.low_limit = low_limit
self.high_limit = high_limit
async def step(self) -> None:
if self.value >= self.high_limit:
self.io.queue_event( # (2)!
HighEvent(
source=self.name, data=ExtremeValue(value=self.value, extreme_type="high")
)
)
if self.value <= self.low_limit:
self.io.queue_event(
LowEvent(source=self.name, data=ExtremeValue(value=self.value, extreme_type="low"))
)
- See how we use the
IOController
to declare that thisComponent
will publish events. - Use
self.io.queue_event
to send an event from aComponent
. Here we are senging theHighEvent
orLowEvent
depending on the input value.
Finally, we need components to subscribe to these events and process them. Use the Event.handler
decorator to identify the method on each Component
that will do this processing.
class CollectHigh(Component):
"""Collects values from high events."""
io = IOController(input_events=[HighEvent], outputs=["value"]) # (1)!
def __init__(self, **kwargs: _t.Unpack[ComponentArgsDict]) -> None:
super().__init__(**kwargs)
self.latest_event: _t.Optional[ExtremeValue] = None
async def step(self) -> None:
self.value = self.latest_event.value if self.latest_event else None
@HighEvent.handler # (2)!
async def handle_event(self, event: HighEvent) -> None:
self.latest_event = event.data
class CollectLow(Component):
"""Collects values from low events."""
io = IOController(input_events=[LowEvent], outputs=["value"])
def __init__(self, **kwargs: _t.Unpack[ComponentArgsDict]) -> None:
super().__init__(**kwargs)
self.latest_event: _t.Optional[ExtremeValue] = None
async def step(self) -> None:
self.value = self.latest_event.value if self.latest_event else None
@LowEvent.handler # (3)!
async def handle_event(self, event: LowEvent) -> None:
self.latest_event = event.data
- Specify the events that this
Component
will subscribe to. - Use this decorator to indicate that we handle
HighEvent
here... - ...and we handle
LowEvent
here.
Note
In a real model you could define whatever logic you need inside your event handler, e.g. create a file, publish another event, etc. Here we just store the event on an attribute so that its value can be output via the step()
method.
Putting it all together
Now we can create a Process
from all these components. The outputs from CollectLow
and CollectHigh
are connected to separate FileWriter
components so that we'll get a CSV file containing the latest high and low values at each step of the simulation.
Info
We need a few extra lines of code to create connectors for the event-based parts of the model. If you define your process in YAML this will be done automatically for you, but if you are defining the process in code then you will need to use the EventConnectorBuilder
to do this.
components = [
Random(name="random-generator"),
FindHighLowValues(name="find-high-low", low_limit=0.2, high_limit=0.8),
CollectHigh(name="collect-high"),
CollectLow(name="collect-low"),
FileWriter(name="save-high", path="high.csv", field_names=["value"]),
FileWriter(name="save-low", path="low.csv", field_names=["value"]),
]
connect = lambda in_, out_: AsyncioConnector(spec=ConnectorSpec(source=in_, target=out_))
connectors = [ # (1)!
connect("random-generator.value", "find-high-low.value"),
connect("collect-high.value", "save-high.value"),
connect("collect-low.value", "save-low.value"),
]
connector_builder = ConnectorBuilder(connector_cls=AsyncioConnector) # (2)!
event_connector_builder = EventConnectorBuilder(connector_builder=connector_builder)
event_connectors = list(event_connector_builder.build(components).values())
process = LocalProcess(
components=components,
connectors=connectors + event_connectors,
)
async with process:
await process.run()
- These connectors are for the normal, non-event driven parts of the model and connect
Component
` inputs and outputs. - These lines will set up connectors for the events in the model.
Take a look at the high.csv
and low.csv
files: the first few rows will usually be empty, and then as soon as high or low values are identified they will start to appear in the CSVs. As usual, you can run this model from the CLI using plugboard process run model.yaml
.