Skip to content

Running in parallel

Up until now we have running all our models in a single computational process. This is perfectly sufficient for simple models, or when your components can make use of Python's asyncio to avoid blocking.

As your models get larger and more computationally intensive you may benefit from running parts of the model in parallel. Plugboard integrates with the Ray framework, allowing you to split your computation across multiple CPU cores, or even across nodes in a Ray cluster.

Tip

Keep in mind that parallelising a model has a cost associated with it: the communication between the different components will be slower on Ray than it is locally.

For small models, or when a single component is the computational bottleneck then this overhead may not be worth it. However, when you have multiple computationally-intensive components in different branches of your Process then moving to Ray can give you a performance boost.

Before running this tutorial be sure to install Ray with pip, or install plugboard with its optional ray extra.

Parallelising a model

For demonstration purposes we're going to use a model with two branches that containing Sleep components to simulate computationally intensive activity. In real scenarios these might instead be calls to simulation software or machine-learning model inference.

flowchart LR
    input@{ shape: rounded, label: Iterator<br>**input** } --> slow-sleep@{ shape: rounded, label: Sleep<br>**slow-sleep** }
    input@{ shape: rounded, label: Iterator<br>**input** } --> very-slow-sleep@{ shape: rounded, label: Sleep<br>**very-slow-sleep** }
    slow-sleep@{ shape: rounded, label: Sleep<br>**slow-sleep** } --> timestamper@{ shape: rounded, label: Timestamper<br>**timestamper** }
    timestamper@{ shape: rounded, label: Timestamper<br>**timestamper** } --> save-results@{ shape: rounded, label: FileWriter<br>**save-results** }
    very-slow-sleep@{ shape: rounded, label: Sleep<br>**very-slow-sleep** } --> timestamper@{ shape: rounded, label: Timestamper<br>**timestamper** }

Defining the components

Let's define the various components that we need. The Timestamper component simply emits the current time in ISO format so that our output file will contain a record of how long each step of the model took. We can again use FileWriter to save the output to CSV.

class Iterator(Component):
    """Creates a sequence of numbers."""

    io = IO(outputs=["x"])

    def __init__(self, iters: int, **kwargs: _t.Unpack[ComponentArgsDict]) -> None:
        super().__init__(**kwargs)
        self._iters = iters

    async def init(self) -> None:
        self._seq = iter(range(self._iters))

    async def step(self) -> None:
        try:
            self.out_1 = next(self._seq)
        except StopIteration:
            await self.io.close()


class Sleep(Component):
    """Passes through input to output after a delay."""

    io = IO(inputs=["x"], outputs=["y"])

    def __init__(self, sleep_seconds: float, **kwargs: _t.Unpack[ComponentArgsDict]) -> None:
        super().__init__(**kwargs)
        self._duration = sleep_seconds

    async def step(self) -> None:
        time.sleep(self._duration)  # (1)!
        self.y = self.x


class Timestamper(Component):
    """Emits the current time when all inputs are ready."""

    io = IO(inputs=["x", "y"], outputs=["timestamp"])

    async def step(self) -> None:
        self.timestamp = datetime.datetime.now().isoformat()

  1. We're using time.sleep here and not asyncio.sleep because we're deliberately blocking execution to simulate a computationally intensive component.

Running normally in a LocalProcess

First we can setup the LocalProcess and run it as we have in previous tutorials.

process = LocalProcess(
    components=[
        Iterator(name="input", iters=20),
        Sleep(name="slow-sleep", sleep_seconds=0.5),
        Sleep(name="very-slow-sleep", sleep_seconds=1),
        Timestamper(name="timestamper"),
        FileWriter(name="save-results", path="ray.csv", field_names=["timestamp"]),
    ],
    connectors=[
        AsyncioConnector(spec=ConnectorSpec(source="input.x", target="slow-sleep.x")),
        AsyncioConnector(spec=ConnectorSpec(source="input.x", target="very-slow-sleep.x")),
        AsyncioConnector(spec=ConnectorSpec(source="slow-sleep.y", target="timestamper.x")),
        AsyncioConnector(
            spec=ConnectorSpec(source="very-slow-sleep.y", target="timestamper.y")
        ),
        AsyncioConnector(
            spec=ConnectorSpec(source="timestamper.timestamp", target="save-results.timestamp")
        ),
    ],
)
async with process:
    await process.run()

Running 20 iterations takes around 30 seconds, because each step of the model contains 1.5s of computation.

Running in parallel using RayProcess

With some small changes we can make the same model run in parallel on Ray. First we change the Process class to RayProcess. Then when creating the Connector objects we need to change the channel type to RayChannel.

Info

Channel objects are used by Plugboard to handle the communication between components. So far we have used AsyncioChannel, which is the best option for simple models that don't require parallelisation.

Plugboard provides different channel classes for use in parallel environments: RayChannel is suitable for single and multi-host Ray environments. ZMQChannel is faster, but currently only works on a single host.

process = RayProcess(
    components=[
        Iterator(name="input", iters=20),
        Sleep(name="slow-sleep", sleep_seconds=0.5),
        Sleep(name="very-slow-sleep", sleep_seconds=1),
        Timestamper(name="timestamper"),
        FileWriter(name="save-results", path="ray.csv", field_names=["timestamp"]),
    ],
    connectors=[
        RayConnector(spec=ConnectorSpec(source="input.x", target="slow-sleep.x")),
        RayConnector(spec=ConnectorSpec(source="input.x", target="very-slow-sleep.x")),
        RayConnector(spec=ConnectorSpec(source="slow-sleep.y", target="timestamper.x")),
        RayConnector(spec=ConnectorSpec(source="very-slow-sleep.y", target="timestamper.y")),
        RayConnector(
            spec=ConnectorSpec(source="timestamper.timestamp", target="save-results.timestamp")
        ),
    ],
)
async with process:
    await process.run()

Now the 20 iteration model takes around 23s, because the two different Sleep components are being executed in parallel (20s compute time plus a little overhead).

Using YAML config

Defining your model as a YAML config file is particularly useful when you want to use more computational resources: the config file is portable and lets you easily move the model to different compute environments.

Specifying the process type and channel builder type in the YAML is the only change needed to get the example above to run on Ray.

plugboard:
  process:
    type: "plugboard.process.RayProcess"  # (1)!
    connector_builder:
      type: "plugboard.connector.RayConnector"  # (2)!
    args:
      components:
      - type: hello_ray.Iterator
        args:
          name: "input"
          iters: 10
      - type: hello_ray.Sleep
        args:
          name: "slow-sleep"
          sleep_seconds: 0.5
      - type: hello_ray.Sleep
        args:
          name: "very-slow-sleep"
          sleep_seconds: 1
      - type: hello_ray.Timestamper
        args:
          name: "timestamper"
      - type: plugboard.library.file_io.FileWriter
        args:
          name: "save-results"
          path: "ray.csv"
          field_names:
          - timestamp
      connectors:
      - source: "input.x"
        target: "slow-sleep.x"
      - source: "input.x"
        target: "very-slow-sleep.x"
      - source: "slow-sleep.y"
        target: "timestamper.x"
      - source: "very-slow-sleep.y"
        target: "timestamper.y"
      - source: "timestamper.timestamp"
        target: "save-results.timestamp"

  1. Tell Plugboard to use a RayProcess instead of the default LocalProcess.
  2. Also change the connector builder to RayConnector, which will build RayChannel objects when creating the Process.

Specifying resource requirements

When running components on Ray, you can specify resource requirements for each component to control how Ray allocates computational resources. This is particularly useful when you have components with different resource needs (e.g., CPU-intensive vs GPU-intensive tasks) and you are running on a Ray cluster.

Tip

Normally Ray will start automatically when you are using Plugboard locally. If you want to start a separate Ray instance, for example so that you can control the configuration options, you can launch it from the CLI. For example, this command will start a Ray instance with enough resources to run the example below.

uv run ray start --head --num-cpus=4 --num-gpus=1 --resources='{"custom_hardware": 5}'

Declaring resources at component definition

The recommended way to specify resource requirements is to declare them as a class attribute when defining your component. This makes the Resource requirements explicit and part of the component's definition:

class CPUIntensiveTask(Component):
    """Component that requires more CPU resources.

    Resource requirements are declared as a class attribute.
    """

    io = IO(inputs=["x"], outputs=["y"])
    resources = Resource(cpu=2.0)   # (1)!

    async def step(self) -> None:
        """Execute CPU-intensive computation."""
        # Simulate CPU-intensive work
        result = sum(i**2 for i in range(int(self.x * 10000)))
        self.y = result
  1. Declare resources when defining the class.

Overriding resources at instantiation

You can also override resource requirements when creating component instances. This is useful when you want to use the same component class with different resource requirements:

CPUIntensiveTask(name="cpu-task", resources=Resource(cpu=1.0)),  # (1)!
  1. Pass a Resource object to override the CPU requirements for this component.

Example

For example, you can specify Resource requirements like this when defining components:

# Resources can be declared at the class level (see CPUIntensiveTask and GPUTask above)
# or overridden when instantiating components
process = RayProcess(
    components=[
        CPUIntensiveTask(name="cpu-task", resources=Resource(cpu=1.0)),  # (1)!
        GPUTask(name="gpu-task"),  # (2)!
        DataProducer(name="producer", iters=5),  # (3)!
    ],
    connectors=[
        RayConnector(spec=ConnectorSpec(source="producer.output", target="cpu-task.x")),
        RayConnector(spec=ConnectorSpec(source="cpu-task.y", target="gpu-task.data")),
    ],
)
  1. Override the resource requirement on this instance.
  2. Use resources specified in class definition.
  3. Use default resources.

Or override them in YAML configuration:

plugboard:
  process:
    type: plugboard.process.RayProcess
    connector_builder:
      type: plugboard.connector.RayConnector
    args:
      name: resource-example-process
      components:
        - type: examples.tutorials.004_using_ray.resources_example.DataProducer
          args:
            name: producer
            iters: 10
            resources:
              cpu: 1.0   # (1)!
        - type: examples.tutorials.004_using_ray.resources_example.CPUIntensiveTask
          args:
            name: cpu-task
            # CPUIntensiveTask has class-level resources (cpu: 2.0)
            # Override to use more memory
            resources:
              cpu: 2.0  # (2)!
              memory: "512Mi"  # (3)!
        - type: examples.tutorials.004_using_ray.resources_example.GPUTask
          args:
            name: gpu-task
            # GPUTask has class-level resources (cpu: "500m", gpu: 1)
            # Can override or extend with custom resources
            resources:
              cpu: "500m"  # (4)!
              gpu: 1  # (5)!
              resources:
                custom_hardware: 2  # (6)!
      connectors:
        - source: producer.output
          target: cpu-task.x
        - source: cpu-task.y
          target: gpu-task.data
  1. Override DataProducer to require 1 CPU (instead of the default 0.001).
  2. CPUIntensiveTask already declares cpu: 2.0 at the class level, so this matches the class definition.
  3. Add 512MB memory requirement to CPUIntensiveTask (extending the class-level resources).
  4. Requires 0.5 CPU, specified in Kubernetes-style format (500 milli CPUs). This matches the class-level declaration.
  5. Requires 1 GPU, matching the class-level declaration.
  6. Add a custom resource called custom_hardware. This needs to be specified in the configuration of your Ray cluster to make it available.

See the Ray documentation for more information about specifying resource requirements.