Running in parallel
Up until now we have running all our models in a single computational process. This is perfectly sufficient for simple models, or when your components can make use of Python's asyncio to avoid blocking.
As your models get larger and more computationally intensive you may benefit from running parts of the model in parallel. Plugboard integrates with the Ray framework, allowing you to split your computation across multiple CPU cores, or even across nodes in a Ray cluster.
Tip
Keep in mind that parallelising a model has a cost associated with it: the communication between the different components will be slower on Ray than it is locally.
For small models, or when a single component is the computational bottleneck then this overhead may not be worth it. However, when you have multiple computationally-intensive components in different branches of your Process
then moving to Ray can give you a performance boost.
Before running this tutorial be sure to install Ray with pip, or install plugboard with its optional ray
extra.
Parallelising a model
For demonstration purposes we're going to use a model with two branches that containing Sleep
components to simulate computationally intensive activity. In real scenarios these might instead be calls to simulation software or machine-learning model inference.
graph LR;
Iterator(input)-->Slow(slow-sleep);
Iterator(input)-->VerySlow(very-slow-sleep);
Slow(slow-sleep)-->Timestamper(timestamper);
VerySlow(very-slow-sleep)-->Timestamper(timestamper);
Timestamper(timestamper)-->FileWriter(save-results);
Defining the components
Let's define the various components that we need. The Timestamper
component simply emits the current time in ISO format so that our output file will contain a record of how long each step of the model took. We can again use FileWriter
to save the output to CSV.
class Iterator(Component):
"""Creates a sequence of numbers."""
io = IO(outputs=["x"])
def __init__(self, iters: int, **kwargs: _t.Unpack[ComponentArgsDict]) -> None:
super().__init__(**kwargs)
self._iters = iters
async def init(self) -> None:
self._seq = iter(range(self._iters))
async def step(self) -> None:
try:
self.out_1 = next(self._seq)
except StopIteration:
await self.io.close()
class Sleep(Component):
"""Passes through input to output after a delay."""
io = IO(inputs=["x"], outputs=["y"])
def __init__(self, sleep_seconds: float, **kwargs: _t.Unpack[ComponentArgsDict]) -> None:
super().__init__(**kwargs)
self._duration = sleep_seconds
async def step(self) -> None:
time.sleep(self._duration) # (1)!
self.y = self.x
class Timestamper(Component):
"""Emits the current time when all inputs are ready."""
io = IO(inputs=["x", "y"], outputs=["timestamp"])
async def step(self) -> None:
self.timestamp = datetime.datetime.now().isoformat()
- We're using
time.sleep
here and notasyncio.sleep
because we're deliberately blocking execution to simulate a computationally intensive component.
Running normally in a LocalProcess
First we can setup the LocalProcess
and run it as we have in previous tutorials.
process = LocalProcess(
components=[
Iterator(name="input", iters=20),
Sleep(name="slow-sleep", sleep_seconds=0.5),
Sleep(name="very-slow-sleep", sleep_seconds=1),
Timestamper(name="timestamper"),
FileWriter(name="save-results", path="ray.csv", field_names=["timestamp"]),
],
connectors=[
AsyncioConnector(spec=ConnectorSpec(source="input.x", target="slow-sleep.x")),
AsyncioConnector(spec=ConnectorSpec(source="input.x", target="very-slow-sleep.x")),
AsyncioConnector(spec=ConnectorSpec(source="slow-sleep.y", target="timestamper.x")),
AsyncioConnector(
spec=ConnectorSpec(source="very-slow-sleep.y", target="timestamper.y")
),
AsyncioConnector(
spec=ConnectorSpec(source="timestamper.timestamp", target="save-results.timestamp")
),
],
)
async with process:
await process.run()
Running 20 iterations takes around 30 seconds, because each step of the model contains 1.5s of computation.
Running in parallel using RayProcess
With some small changes we can make the same model run in parallel on Ray. First we change the Process
class to RayProcess
. Then when creating the Connector
objects we need to change the channel type to RayChannel
.
Info
Channel
objects are used by Plugboard to handle the communication between components. So far we have used AsyncioChannel
, which is the best option for simple models that don't require parallelisation.
Plugboard provides different channel classes for use in parallel environments: RayChannel
is suitable for single and multi-host Ray environments. ZMQChannel
is faster, but currently only works on a single host.
process = RayProcess(
components=[
Iterator(name="input", iters=20),
Sleep(name="slow-sleep", sleep_seconds=0.5),
Sleep(name="very-slow-sleep", sleep_seconds=1),
Timestamper(name="timestamper"),
FileWriter(name="save-results", path="ray.csv", field_names=["timestamp"]),
],
connectors=[
RayConnector(spec=ConnectorSpec(source="input.x", target="slow-sleep.x")),
RayConnector(spec=ConnectorSpec(source="input.x", target="very-slow-sleep.x")),
RayConnector(spec=ConnectorSpec(source="slow-sleep.y", target="timestamper.x")),
RayConnector(spec=ConnectorSpec(source="very-slow-sleep.y", target="timestamper.y")),
RayConnector(
spec=ConnectorSpec(source="timestamper.timestamp", target="save-results.timestamp")
),
],
)
async with process:
await process.run()
Now the 20 iteration model takes around 23s, because the two different Sleep
components are being executed in parallel (20s compute time plus a little overhead).
Using YAML config
Defining your model as a YAML config file is particularly useful when you want to use more computational resources: the config file is portable and lets you easily move the model to different compute environments.
Specifying the process type and channel builder type in the YAML is the only change needed to get the example above to run on Ray.
plugboard:
process:
type: "plugboard.process.RayProcess" # (1)!
connector_builder:
type: "plugboard.connector.RayConnector" # (2)!
args:
components:
- type: hello_ray.Iterator
args:
name: "input"
iters: 10
- type: hello_ray.Sleep
args:
name: "slow-sleep"
sleep_seconds: 0.5
- type: hello_ray.Sleep
args:
name: "very-slow-sleep"
sleep_seconds: 1
- type: hello_ray.Timestamper
args:
name: "timestamper"
- type: plugboard.library.file_io.FileWriter
args:
name: "save-results"
path: "ray.csv"
field_names:
- timestamp
connectors:
- source: "input.x"
target: "slow-sleep.x"
- source: "input.x"
target: "very-slow-sleep.x"
- source: "slow-sleep.y"
target: "timestamper.x"
- source: "very-slow-sleep.y"
target: "timestamper.y"
- source: "timestamper.timestamp"
target: "save-results.timestamp"
- Tell Plugboard to use a
RayProcess
instead of the defaultLocalProcess
. - Also change the connector builder to
RayConnector
, which will buildRayChannel
objects when creating theProcess
.