A more complex process
In the last example our Process
consisted of just two components. Usually we use many more components, allowing you to break down your model into separate parts that you can build/test individually. Plugboard allows for branching and looping connections between your components.
In this tutorial we'll also demonstrate how to make components reusable between different processes.
Define some components
First of all, let's define five different components:
Random
will generate random numbers to feed into our model;Offset
adds a fixed offset to its inputs;Scale
will multiple its input by a fixed scale factor;Sum
will take in two inputs and add them together;Save
will write its input to a text file.
We can put the code for each of these in components.py
.
import random
import typing as _t
from plugboard.component import Component, IOController as IO
from plugboard.schemas import ComponentArgsDict
class Random(Component):
io = IO(outputs=["x"])
def __init__(
self,
iters: int, # (1)!
low: float = 0,
high: float = 10,
**kwargs: _t.Unpack[ComponentArgsDict]
) -> None:
super().__init__(**kwargs)
self._iters = 0
self._low = low
self._high = high
self._max_iters = iters
async def step(self) -> None:
if self._iters < self._max_iters:
self.x = random.uniform(self._low, self._high)
self._iters += 1
return
await self.io.close()
class Offset(Component):
"""Implements `x = a + offset`."""
io = IO(inputs=["a"], outputs=["x"]) # (2)!
def __init__(self, offset: float = 0, **kwargs: _t.Unpack[ComponentArgsDict]) -> None:
super().__init__(**kwargs)
self._offset = offset
async def step(self) -> None:
self.x = self.a + self._offset
class Scale(Component):
"""Implements `x = a * scale`."""
io = IO(inputs=["a"], outputs=["x"])
def __init__(self, scale: float = 1, **kwargs: _t.Unpack[ComponentArgsDict]) -> None:
super().__init__(**kwargs)
self._scale = scale
async def step(self) -> None:
self.x = self.a * self._scale
class Sum(Component):
"""Implements `x = a + b`."""
io = IO(inputs=["a", "b"], outputs=["x"]) # (3)!
async def step(self) -> None:
self.x = self.a + self.b
class Save(Component):
io = IO(inputs=["value_to_save"])
def __init__(self, path: str, **kwargs: _t.Unpack[ComponentArgsDict]) -> None:
super().__init__(**kwargs)
self._path = path
async def init(self) -> None:
self._f = open(self._path, "w")
async def step(self) -> None:
self._f.write(f"{self.value_to_save}\n")
async def destroy(self) -> None:
self._f.close()
- The
Component
needs three different parameters:iters
to control how many iterations the model runs for,low
andhigh
to control the range of the random number generator. - A
Component
with both inputs and outputs. - Here we have multiple inputs.
Create branching connections in a Process
Next, we'll connect the components together to form this model:
graph LR;
Random(random)---->SaveIn(save-input);
Random(random)-->Offset(offset);
Random(random)-->Scale(scale);
Offset(offset)-->Sum(sum);
Scale(scale)-->Sum(sum);
Sum(sum)-->SaveOut(save-output);
Note
See how we branch the output of the Random
component into three different places: a Save
component (so that we can record our model input), Offset
and Scale
.
Now in branching.py
we import these components, connect them together and run the model.
import asyncio
from plugboard.connector import AsyncioConnector
from plugboard.process import LocalProcess
from plugboard.schemas import ConnectorSpec
from components import Offset, Random, Save, Scale, Sum
async def main() -> None:
connect = lambda in_, out_: AsyncioConnector( # (1)!
spec=ConnectorSpec(source=in_, target=out_)
)
process = LocalProcess(
components=[ # (2)!
Random(name="random", iters=5, low=0, high=10),
Offset(name="offset", offset=10),
Scale(name="scale", scale=2),
Sum(name="sum"),
Save(name="save-input", path="input.txt"),
Save(name="save-output", path="output.txt"),
],
connectors=[ # (3)!
connect("random.x", "save-input.value_to_save"),
connect("random.x", "offset.a"),
connect("random.x", "scale.a"),
connect("offset.x", "sum.a"),
connect("scale.x", "sum.b"),
connect("sum.x", "save-output.value_to_save"),
],
)
async with process: # (3)!
await process.run()
- We'll use this lambda to abbreviate the connectors below.
- Instantiate each of the components here with any necessary parameters.
- Here is where we define all of the connections in our model:
source
andtarget
are of the formcomponent_name.io_name
. So the first item connects thex
output on therandom
component to thevalue_to_save
input onsave-input
. - As in the previous tutorial, this is equivalent to calling
await process.init()
, followed byawait process.run()
and thenawait process.destroy()
.
Now run python branching.py
and you will see the input.txt
and output.txt
files generated showing that the model has run.
Create a loop connection
In some cases you might want to create a loop in your model. This is commonly the case where you need to include feedback: for example you might have a component modelling the temperature of a heated hot-water tank and another one representing an automatic controller that turns the heating element on and off.
To make models like this work in Plugboard you will need to specify initial_values
somewhere in the loop: this ensures that each of the Component
objects can get all their inputs at the first call to process.step()
, allowing the model to start running.
Consider this model in which the Sum
component will accumulate a scaled part of its value at every iteration:
graph LR;
Random(random)-->Sum(sum);
Sum(sum)-->|"<i>a<sub>t=0</sub>=0</i>"|Scale(scale);
Scale(scale)-->Sum(sum);
Sum(sum)---->SaveOut(save-output);
We'll provide an initial input value of a = 0
to the Scale
component, allowing the model to run. Implementing this in loop.py
we have:
import asyncio
from plugboard.connector import AsyncioConnector
from plugboard.process import LocalProcess
from plugboard.schemas import ConnectorSpec
from components import Random, Save, Scale, Sum
async def main() -> None:
connect = lambda in_, out_: AsyncioConnector(
spec=ConnectorSpec(source=in_, target=out_)
)
process = LocalProcess(
components=[
Random(name="random", iters=5, low=0, high=10),
Sum(name="sum"),
Scale(name="scale", initial_values={"a": [0]}, scale=0.5), # (1)!
Save(name="save-output", path="cumulative-sum.txt"),
],
connectors=[
connect("random.x", "sum.a"),
connect("sum.x", "scale.a"),
connect("scale.x", "sum.b"),
connect("sum.x", "save-output.value_to_save"),
],
)
async with process:
await process.run()
- We specify our initial value for the
Scale
input here.
Note
Initial values are specified as lists in Plugboard, allowing you to specify a sequence of them. The component will read the first element in the initial value at the first call to step()
and so on until there are no more initial values left to consume. Usually you won't need more than one initial value, so the list typically contains a single element as in this example.
Setting initial_values = {"a": [0]}
means that we want the a
input to be set to 0
on the first step and then revert to reading its input as usual.
Next steps
You've now learned how to build up complex model layouts in Plugboard. In the next tutorial we'll show how powerful a Plugboard model can be as we start to include different types of Component
.