Skip to content

A more complex process

In the last example our Process consisted of just two components. Usually we use many more components, allowing you to break down your model into separate parts that you can build/test individually. Plugboard allows for branching and looping connections between your components.

In this tutorial we'll also demonstrate how to make components reusable between different processes.

Define some components

First of all, let's define five different components:

  • Random will generate random numbers to feed into our model;
  • Offset adds a fixed offset to its inputs;
  • Scale will multiple its input by a fixed scale factor;
  • Sum will take in two inputs and add them together;
  • Save will write its input to a text file.

We can put the code for each of these in components.py.

import random
import typing as _t

from plugboard.component import Component, IOController as IO
from plugboard.schemas import ComponentArgsDict

class Random(Component):
    io = IO(outputs=["x"])

    def __init__(
            self,
            iters: int,  # (1)!
            low: float = 0,
            high: float = 10,
            **kwargs: _t.Unpack[ComponentArgsDict]
        ) -> None:
        super().__init__(**kwargs)
        self._iters = 0
        self._low = low
        self._high = high
        self._max_iters = iters

    async def step(self) -> None:
        if self._iters < self._max_iters:
            self.x = random.uniform(self._low, self._high)
            self._iters += 1
            return
        await self.io.close()

class Offset(Component):
    """Implements `x = a + offset`."""
    io = IO(inputs=["a"], outputs=["x"]) # (2)!

    def __init__(self, offset: float = 0, **kwargs: _t.Unpack[ComponentArgsDict]) -> None:
        super().__init__(**kwargs)
        self._offset = offset

    async def step(self) -> None:
        self.x = self.a + self._offset

class Scale(Component):
    """Implements `x = a * scale`."""
    io = IO(inputs=["a"], outputs=["x"])

    def __init__(self, scale: float = 1, **kwargs: _t.Unpack[ComponentArgsDict]) -> None:
        super().__init__(**kwargs)
        self._scale = scale

    async def step(self) -> None:
        self.x = self.a * self._scale

class Sum(Component):
    """Implements `x = a + b`."""
    io = IO(inputs=["a", "b"], outputs=["x"]) # (3)!

    async def step(self) -> None:
        self.x = self.a + self.b

class Save(Component):
    io = IO(inputs=["value_to_save"])

    def __init__(self, path: str, **kwargs: _t.Unpack[ComponentArgsDict]) -> None:
        super().__init__(**kwargs)
        self._path = path

    async def init(self) -> None:
        self._f = open(self._path, "w")

    async def step(self) -> None:
        self._f.write(f"{self.value_to_save}\n")

    async def destroy(self) -> None:
        self._f.close()

  1. The Component needs three different parameters: iters to control how many iterations the model runs for, low and high to control the range of the random number generator.
  2. A Component with both inputs and outputs.
  3. Here we have multiple inputs.

Create branching connections in a Process

Next, we'll connect the components together to form this model:

graph LR;
    Random(random)---->SaveIn(save-input);
    Random(random)-->Offset(offset);
    Random(random)-->Scale(scale);
    Offset(offset)-->Sum(sum);
    Scale(scale)-->Sum(sum);
    Sum(sum)-->SaveOut(save-output);

Note

See how we branch the output of the Random component into three different places: a Save component (so that we can record our model input), Offset and Scale.

Now in branching.py we import these components, connect them together and run the model.

import asyncio

from plugboard.connector import AsyncioConnector
from plugboard.process import LocalProcess
from plugboard.schemas import ConnectorSpec

from components import Offset, Random, Save, Scale, Sum


async def main() -> None:
    connect = lambda in_, out_: AsyncioConnector(  # (1)!
        spec=ConnectorSpec(source=in_, target=out_)
    )
    process = LocalProcess(
        components=[  # (2)!
            Random(name="random", iters=5, low=0, high=10),
            Offset(name="offset", offset=10),
            Scale(name="scale", scale=2),
            Sum(name="sum"),
            Save(name="save-input", path="input.txt"),
            Save(name="save-output", path="output.txt"),
        ],
        connectors=[  # (3)!
            connect("random.x", "save-input.value_to_save"),
            connect("random.x", "offset.a"),
            connect("random.x", "scale.a"),
            connect("offset.x", "sum.a"),
            connect("scale.x", "sum.b"),
            connect("sum.x", "save-output.value_to_save"),
        ],
    )
    async with process:  # (3)!
        await process.run()

  1. We'll use this lambda to abbreviate the connectors below.
  2. Instantiate each of the components here with any necessary parameters.
  3. Here is where we define all of the connections in our model: source and target are of the form component_name.io_name. So the first item connects the x output on the random component to the value_to_save input on save-input.
  4. As in the previous tutorial, this is equivalent to calling await process.init(), followed by await process.run() and then await process.destroy().

Now run python branching.py and you will see the input.txt and output.txt files generated showing that the model has run.

Create a loop connection

In some cases you might want to create a loop in your model. This is commonly the case where you need to include feedback: for example you might have a component modelling the temperature of a heated hot-water tank and another one representing an automatic controller that turns the heating element on and off.

To make models like this work in Plugboard you will need to specify initial_values somewhere in the loop: this ensures that each of the Component objects can get all their inputs at the first call to process.step(), allowing the model to start running.

Consider this model in which the Sum component will accumulate a scaled part of its value at every iteration:

graph LR;
    Random(random)-->Sum(sum);
    Sum(sum)-->|"<i>a<sub>t=0</sub>=0</i>"|Scale(scale);
    Scale(scale)-->Sum(sum);
    Sum(sum)---->SaveOut(save-output);

We'll provide an initial input value of a = 0 to the Scale component, allowing the model to run. Implementing this in loop.py we have:

import asyncio

from plugboard.connector import AsyncioConnector
from plugboard.process import LocalProcess
from plugboard.schemas import ConnectorSpec

from components import Random, Save, Scale, Sum


async def main() -> None:
    connect = lambda in_, out_: AsyncioConnector(
        spec=ConnectorSpec(source=in_, target=out_)
    )
    process = LocalProcess(
        components=[
            Random(name="random", iters=5, low=0, high=10),
            Sum(name="sum"),
            Scale(name="scale", initial_values={"a": [0]}, scale=0.5),  # (1)!
            Save(name="save-output", path="cumulative-sum.txt"),
        ],
        connectors=[
            connect("random.x", "sum.a"),
            connect("sum.x", "scale.a"),
            connect("scale.x", "sum.b"),
            connect("sum.x", "save-output.value_to_save"),
        ],
    )
    async with process:
        await process.run()

  1. We specify our initial value for the Scale input here.

Note

Initial values are specified as lists in Plugboard, allowing you to specify a sequence of them. The component will read the first element in the initial value at the first call to step() and so on until there are no more initial values left to consume. Usually you won't need more than one initial value, so the list typically contains a single element as in this example.

Setting initial_values = {"a": [0]} means that we want the a input to be set to 0 on the first step and then revert to reading its input as usual.

Next steps

You've now learned how to build up complex model layouts in Plugboard. In the next tutorial we'll show how powerful a Plugboard model can be as we start to include different types of Component.