Running in parallel
Up until now we have running all our models in a single computational process. This is perfectly sufficient for simple models, or when your components can make use of Python's asyncio to avoid blocking.
As your models get larger and more computationally intensive you may benefit from running parts of the model in parallel. Plugboard integrates with the Ray framework, allowing you to split your computation across multiple CPU cores, or even across nodes in a Ray cluster.
Tip
Keep in mind that parallelising a model has a cost associated with it: the communication between the different components will be slower on Ray than it is locally.
For small models, or when a single component is the computational bottleneck then this overhead may not be worth it. However, when you have multiple computationally-intensive components in different branches of your Process then moving to Ray can give you a performance boost.
Before running this tutorial be sure to install Ray with pip, or install plugboard with its optional ray extra.
Parallelising a model
For demonstration purposes we're going to use a model with two branches that containing Sleep components to simulate computationally intensive activity. In real scenarios these might instead be calls to simulation software or machine-learning model inference.
flowchart LR
input@{ shape: rounded, label: Iterator<br>**input** } --> slow-sleep@{ shape: rounded, label: Sleep<br>**slow-sleep** }
input@{ shape: rounded, label: Iterator<br>**input** } --> very-slow-sleep@{ shape: rounded, label: Sleep<br>**very-slow-sleep** }
slow-sleep@{ shape: rounded, label: Sleep<br>**slow-sleep** } --> timestamper@{ shape: rounded, label: Timestamper<br>**timestamper** }
timestamper@{ shape: rounded, label: Timestamper<br>**timestamper** } --> save-results@{ shape: rounded, label: FileWriter<br>**save-results** }
very-slow-sleep@{ shape: rounded, label: Sleep<br>**very-slow-sleep** } --> timestamper@{ shape: rounded, label: Timestamper<br>**timestamper** }
Defining the components
Let's define the various components that we need. The Timestamper component simply emits the current time in ISO format so that our output file will contain a record of how long each step of the model took. We can again use FileWriter to save the output to CSV.
class Iterator(Component):
"""Creates a sequence of numbers."""
io = IO(outputs=["x"])
def __init__(self, iters: int, **kwargs: _t.Unpack[ComponentArgsDict]) -> None:
super().__init__(**kwargs)
self._iters = iters
async def init(self) -> None:
self._seq = iter(range(self._iters))
async def step(self) -> None:
try:
self.out_1 = next(self._seq)
except StopIteration:
await self.io.close()
class Sleep(Component):
"""Passes through input to output after a delay."""
io = IO(inputs=["x"], outputs=["y"])
def __init__(self, sleep_seconds: float, **kwargs: _t.Unpack[ComponentArgsDict]) -> None:
super().__init__(**kwargs)
self._duration = sleep_seconds
async def step(self) -> None:
time.sleep(self._duration) # (1)!
self.y = self.x
class Timestamper(Component):
"""Emits the current time when all inputs are ready."""
io = IO(inputs=["x", "y"], outputs=["timestamp"])
async def step(self) -> None:
self.timestamp = datetime.datetime.now().isoformat()
- We're using
time.sleephere and notasyncio.sleepbecause we're deliberately blocking execution to simulate a computationally intensive component.
Running normally in a LocalProcess
First we can setup the LocalProcess and run it as we have in previous tutorials.
process = LocalProcess(
components=[
Iterator(name="input", iters=20),
Sleep(name="slow-sleep", sleep_seconds=0.5),
Sleep(name="very-slow-sleep", sleep_seconds=1),
Timestamper(name="timestamper"),
FileWriter(name="save-results", path="ray.csv", field_names=["timestamp"]),
],
connectors=[
AsyncioConnector(spec=ConnectorSpec(source="input.x", target="slow-sleep.x")),
AsyncioConnector(spec=ConnectorSpec(source="input.x", target="very-slow-sleep.x")),
AsyncioConnector(spec=ConnectorSpec(source="slow-sleep.y", target="timestamper.x")),
AsyncioConnector(
spec=ConnectorSpec(source="very-slow-sleep.y", target="timestamper.y")
),
AsyncioConnector(
spec=ConnectorSpec(source="timestamper.timestamp", target="save-results.timestamp")
),
],
)
async with process:
await process.run()
Running 20 iterations takes around 30 seconds, because each step of the model contains 1.5s of computation.
Running in parallel using RayProcess
With some small changes we can make the same model run in parallel on Ray. First we change the Process class to RayProcess. Then when creating the Connector objects we need to change the channel type to RayChannel.
Info
Channel objects are used by Plugboard to handle the communication between components. So far we have used AsyncioChannel, which is the best option for simple models that don't require parallelisation.
Plugboard provides different channel classes for use in parallel environments: RayChannel is suitable for single and multi-host Ray environments. ZMQChannel is faster, but currently only works on a single host.
process = RayProcess(
components=[
Iterator(name="input", iters=20),
Sleep(name="slow-sleep", sleep_seconds=0.5),
Sleep(name="very-slow-sleep", sleep_seconds=1),
Timestamper(name="timestamper"),
FileWriter(name="save-results", path="ray.csv", field_names=["timestamp"]),
],
connectors=[
RayConnector(spec=ConnectorSpec(source="input.x", target="slow-sleep.x")),
RayConnector(spec=ConnectorSpec(source="input.x", target="very-slow-sleep.x")),
RayConnector(spec=ConnectorSpec(source="slow-sleep.y", target="timestamper.x")),
RayConnector(spec=ConnectorSpec(source="very-slow-sleep.y", target="timestamper.y")),
RayConnector(
spec=ConnectorSpec(source="timestamper.timestamp", target="save-results.timestamp")
),
],
)
async with process:
await process.run()
Now the 20 iteration model takes around 23s, because the two different Sleep components are being executed in parallel (20s compute time plus a little overhead).
Using YAML config
Defining your model as a YAML config file is particularly useful when you want to use more computational resources: the config file is portable and lets you easily move the model to different compute environments.
Specifying the process type and channel builder type in the YAML is the only change needed to get the example above to run on Ray.
plugboard:
process:
type: "plugboard.process.RayProcess" # (1)!
connector_builder:
type: "plugboard.connector.RayConnector" # (2)!
args:
components:
- type: hello_ray.Iterator
args:
name: "input"
iters: 10
- type: hello_ray.Sleep
args:
name: "slow-sleep"
sleep_seconds: 0.5
- type: hello_ray.Sleep
args:
name: "very-slow-sleep"
sleep_seconds: 1
- type: hello_ray.Timestamper
args:
name: "timestamper"
- type: plugboard.library.file_io.FileWriter
args:
name: "save-results"
path: "ray.csv"
field_names:
- timestamp
connectors:
- source: "input.x"
target: "slow-sleep.x"
- source: "input.x"
target: "very-slow-sleep.x"
- source: "slow-sleep.y"
target: "timestamper.x"
- source: "very-slow-sleep.y"
target: "timestamper.y"
- source: "timestamper.timestamp"
target: "save-results.timestamp"
- Tell Plugboard to use a
RayProcessinstead of the defaultLocalProcess. - Also change the connector builder to
RayConnector, which will buildRayChannelobjects when creating theProcess.
Specifying resource requirements
When running components on Ray, you can specify resource requirements for each component to control how Ray allocates computational resources. This is particularly useful when you have components with different resource needs (e.g., CPU-intensive vs GPU-intensive tasks) and you are running on a Ray cluster.
Tip
Normally Ray will start automatically when you are using Plugboard locally. If you want to start a separate Ray instance, for example so that you can control the configuration options, you can launch it from the CLI. For example, this command will start a Ray instance with enough resources to run the example below.
Declaring resources at component definition
The recommended way to specify resource requirements is to declare them as a class attribute when defining your component. This makes the Resource requirements explicit and part of the component's definition:
class CPUIntensiveTask(Component):
"""Component that requires more CPU resources.
Resource requirements are declared as a class attribute.
"""
io = IO(inputs=["x"], outputs=["y"])
resources = Resource(cpu=2.0) # (1)!
async def step(self) -> None:
"""Execute CPU-intensive computation."""
# Simulate CPU-intensive work
result = sum(i**2 for i in range(int(self.x * 10000)))
self.y = result
- Declare resources when defining the class.
Overriding resources at instantiation
You can also override resource requirements when creating component instances. This is useful when you want to use the same component class with different resource requirements:
- Pass a
Resourceobject to override the CPU requirements for this component.
Example
For example, you can specify Resource requirements like this when defining components:
# Resources can be declared at the class level (see CPUIntensiveTask and GPUTask above)
# or overridden when instantiating components
process = RayProcess(
components=[
CPUIntensiveTask(name="cpu-task", resources=Resource(cpu=1.0)), # (1)!
GPUTask(name="gpu-task"), # (2)!
DataProducer(name="producer", iters=5), # (3)!
],
connectors=[
RayConnector(spec=ConnectorSpec(source="producer.output", target="cpu-task.x")),
RayConnector(spec=ConnectorSpec(source="cpu-task.y", target="gpu-task.data")),
],
)
- Override the resource requirement on this instance.
- Use resources specified in class definition.
- Use default resources.
Or override them in YAML configuration:
plugboard:
process:
type: plugboard.process.RayProcess
connector_builder:
type: plugboard.connector.RayConnector
args:
name: resource-example-process
components:
- type: examples.tutorials.004_using_ray.resources_example.DataProducer
args:
name: producer
iters: 10
resources:
cpu: 1.0 # (1)!
- type: examples.tutorials.004_using_ray.resources_example.CPUIntensiveTask
args:
name: cpu-task
# CPUIntensiveTask has class-level resources (cpu: 2.0)
# Override to use more memory
resources:
cpu: 2.0 # (2)!
memory: "512Mi" # (3)!
- type: examples.tutorials.004_using_ray.resources_example.GPUTask
args:
name: gpu-task
# GPUTask has class-level resources (cpu: "500m", gpu: 1)
# Can override or extend with custom resources
resources:
cpu: "500m" # (4)!
gpu: 1 # (5)!
resources:
custom_hardware: 2 # (6)!
connectors:
- source: producer.output
target: cpu-task.x
- source: cpu-task.y
target: gpu-task.data
- Override DataProducer to require 1 CPU (instead of the default 0.001).
- CPUIntensiveTask already declares cpu: 2.0 at the class level, so this matches the class definition.
- Add 512MB memory requirement to CPUIntensiveTask (extending the class-level resources).
- Requires 0.5 CPU, specified in Kubernetes-style format (500 milli CPUs). This matches the class-level declaration.
- Requires 1 GPU, matching the class-level declaration.
- Add a custom resource called
custom_hardware. This needs to be specified in the configuration of your Ray cluster to make it available.
See the Ray documentation for more information about specifying resource requirements.