Simple 3-node modelĀ¶
This model demonstrates how to create different types of component and link them together. We use a built-in plugboard component to load timeseries data from a CSV file. A second node computes a rolling sum of these values. Finally another built-in component saves the output to a different CSV.
import typing as _t
from plugboard.connector import AsyncioConnector
from plugboard.component import Component
from plugboard.component import IOController as IO
from plugboard.schemas import ComponentArgsDict, ConnectorSpec
from plugboard.process import LocalProcess
from plugboard.library import FileReader, FileWriter
The FileReader
component is provided by Plugboard. It takes the content of a CSV and emits the values row-by-row. Our CSV contains a single value
column, so we configure the field_names
argument to expect that.
input_data = FileReader(name="input_data", path="input.csv", field_names=["value"])
Next, we implement a component to compute a running total using its internal state.
class RunningTotal(Component):
# Define the inputs and outputs of the component
io = IO(inputs=["value"], outputs=["total_value"])
def __init__(self, **kwargs: _t.Unpack[ComponentArgsDict]) -> None:
super().__init__(**kwargs)
# Set the initial running total to 0
self._running_total = 0
async def step(self) -> None:
# The main logic of the component
self._running_total += self.value
self.total_value = self._running_total
await super().step()
total = RunningTotal(name="total")
At this point, it is possible to test the component in the notebook by initialising it, setting its input value, then calling step()
.
await total.init()
total.value = 10 # Set the value input to 10
await total.step() # Run the component
print(total.total_value) # Print the total value output
total.value = 20 # Set the value input to 20
await total.step() # Run the component
print(total.total_value) # Print the total value output
Now re-instantiate total
to reset its state.
total = RunningTotal(name="total")
For the output we can use the built-in FileWriter
component, configured to expect an input called value_to_save
.
output_data = FileWriter(name="output_data", path="output.csv", field_names=["value_to_save"])
Now connect the components together in a LocalProcess
.
process = LocalProcess(
components=[input_data, total, output_data],
connectors=[
# Connect input_data to total
AsyncioConnector(
spec=ConnectorSpec(source="input_data.value", target="total.value"),
),
# Connect total to output_data
AsyncioConnector(
spec=ConnectorSpec(source="total.total_value", target="output_data.value_to_save"),
),
],
)
Now we can initialise and run the simulation.
async with process:
await process.run()
Finally check we have the output data saved in output.csv
.
!cat output.csv