Local and Remote Image Processing¶
This example demonstrates a hybrid AI pipeline that combines a lightweight local model with a powerful remote LLM.
The Goal:
We want to process a stream of images, identify which ones contain clocks, and then read the time from those clocks.
The Problem:
Sending every image to a large multimodal model (like OpenAI) is expensive and slow.
The Solution:
- Use a fast, local computer vision model (MobileNetV3) to classify images.
- Filter the stream:
- If it's a clock -> Send to GPT-5-nano to read the time.
- If it's not a clock -> Log it and skip the expensive call.
This architecture demonstrates Event-driven routing in Plugboard, where data flows to different downstream components based on analysis results.
The overall model will look like this:
!pip install torchvision pillow requests
import os
from getpass import getpass
from io import BytesIO
import typing as _t
from datetime import time
import pandas as pd
from pydantic import BaseModel, ConfigDict, AnyUrl
from PIL import Image
import httpx
from torchvision.models import mobilenet_v3_small, MobileNet_V3_Small_Weights
from torchvision import transforms
from plugboard.component import Component
from plugboard.connector import AsyncioConnector, ConnectorBuilder
from plugboard.component import IOController as IO
from plugboard.schemas import ComponentArgsDict, ConnectorSpec
from plugboard.process import LocalProcess
from plugboard.library import FileReader, FileWriter, LLMImageProcessor
from plugboard.events import Event, EventConnectorBuilder
if "OPENAI_API_KEY" not in os.environ:
os.environ["OPENAI_API_KEY"] = getpass("Enter your OpenAI API key: ")
1. Define Local Components¶
First, we define the components that run locally.
LoadImage: Downloads an image from a URL and converts it to a format suitable for processing.LocalModel: Usestorchvision's pre-trained MobileNetV3 to classify the image. This runs entirely on your machine (or CPU/GPU) and is very fast.
class LoadImage(Component):
"""Loads an image from a URL"""
io = IO(inputs=["url"], outputs=["image"])
async def step(self) -> None:
headers = {
"User-Agent": "PlugboardExample/1.0 (https://docs.plugboard.dev, hello@plugboard.dev)"
}
async with httpx.AsyncClient() as client:
r = await client.get(self.url, headers=headers, follow_redirects=True)
r.raise_for_status()
self.image = Image.open(BytesIO(r.content)).convert("RGB")
class LocalModel(Component):
"""Passes an image into a MobileNetV3 model"""
io = IO(inputs=["image"], outputs=["classification"])
async def init(self) -> None:
self._model = mobilenet_v3_small(weights=MobileNet_V3_Small_Weights.IMAGENET1K_V1)
self._categories = MobileNet_V3_Small_Weights.IMAGENET1K_V1.meta["categories"]
self._model.eval()
self._transform = transforms.Compose(
[
transforms.Resize((224, 224)), # MobileNetV3 expects 224x224 input
transforms.ToTensor(), # Convert PIL Image to torch.Tensor (CHW format, [0.0,1.0] range)
transforms.Normalize(
mean=[0.485, 0.456, 0.406], # Normalize to ImageNet means
std=[0.229, 0.224, 0.225], # Normalize to ImageNet stds
),
]
)
async def step(self) -> None:
tensor_image = self._transform(self.image)
batch_tensor = tensor_image.unsqueeze(0) # Add batch dimension
outputs = self._model(batch_tensor).squeeze(0).softmax(0)
self.classification = dict(zip(self._categories, outputs.detach().numpy()))
2. Define Events¶
We define two types of events to handle the routing logic:
MatchEvent: Fired when the image matches our criteria (it is a clock). It carries the image data.NonMatchEvent: Fired when the image does not match. It carries the classification results for logging.
We use Pydantic models (MatchData, NonMatchData) to strictly type the event payloads.
class MatchData(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)
url: AnyUrl
class MatchEvent(Event):
type: _t.ClassVar[str] = "match"
data: MatchData
class NonMatchData(BaseModel):
classification: dict[str, float]
url: AnyUrl
class NonMatchEvent(Event):
type: _t.ClassVar[str] = "non_match"
data: NonMatchData
3. Logic and Adapters¶
Now we define the components that use these events.
CheckClassification: Takes the classification result fromLocalModel. If the probability of any target class (e.g., "analog clock") is above a threshold, it fires aMatchEvent. Otherwise, it fires aNonMatchEvent.MatchAdapter&NonMatchAdapter: These components subscribe to the events and convert the event payload back into a standard output stream. This allows us to connect them to downstream components likeLLMImageProcessororFileWriter.
Note: MatchAdapter also passes on the URL of time image to the LLMImageProcessor.
class CheckClassification(Component):
"""Checks for a classification match."""
io = IO(inputs=["url", "classification"], output_events=[MatchEvent, NonMatchEvent])
def __init__(
self, classes: list[str], threshold: float = 0.5, **kwargs: _t.Unpack[ComponentArgsDict]
) -> None:
super().__init__(**kwargs)
self._class_names = classes
self._threshold = threshold
async def step(self) -> None:
match = False
for c in self._class_names:
if self.classification.get(c, 0) >= self._threshold:
match = True
break
if match:
self.io.queue_event(MatchEvent(source=self.name, data=MatchData(url=self.url)))
else:
self.io.queue_event(
NonMatchEvent(
source=self.name,
data=NonMatchData(classification=self.classification, url=self.url),
)
)
class MatchAdapter(Component):
"""Listens for match events and passes them on for further processing."""
io = IO(outputs=["url"], input_events=[MatchEvent])
def __init__(self, **kwargs: _t.Unpack[ComponentArgsDict]) -> None:
super().__init__(**kwargs)
self._buffer = []
@MatchEvent.handler
async def handle_match(self, event: MatchEvent):
self._buffer.append(event.data)
async def step(self) -> None:
if self._buffer:
data = self._buffer.pop(0)
self.url = str(data.url)
class NonMatchAdapter(Component):
"""Listens for non-match events and passes them on for further processing."""
io = IO(outputs=["classification", "url"], input_events=[NonMatchEvent])
def __init__(self, **kwargs: _t.Unpack[ComponentArgsDict]) -> None:
super().__init__(**kwargs)
self._buffer = []
@NonMatchEvent.handler
async def handle_non_match(self, event: NonMatchEvent):
self._buffer.append(event.data)
async def step(self) -> None:
if self._buffer:
data = self._buffer.pop(0)
self.classification = data.classification
self.url = str(data.url)
4. LLM Configuration¶
We define a Pydantic model for the LLM's response. This ensures we get structured data (JSON) back from the model, rather than just free text.
class TimeReading(BaseModel):
time: str
5. Define and Run Process¶
Finally, we wire everything together in a LocalProcess.
- Input: We create a CSV with URLs of images (some clocks, some not).
- Pipeline:
FileReader->LoadImage->LocalModel->CheckClassificationCheckClassificationfires events.MatchEvent->MatchAdapter->LLMImageProcessor->FileWriter(matches.csv)NonMatchEvent->NonMatchAdapter->FileWriter(non_matches.csv)
connect = lambda src, dst: AsyncioConnector(spec=ConnectorSpec(source=src, target=dst))
# Create CSV with some image URLs
images_df = pd.DataFrame(
{
"url": [
# This is a clock
"https://images.unsplash.com/photo-1541480601022-2308c0f02487?ixlib=rb-4.1.0&q=85&fm=jpg&crop=entropy&cs=srgb",
# Not a clock (Train)
"https://images.unsplash.com/photo-1535535112387-56ffe8db21ff?ixlib=rb-4.1.0&q=85&fm=jpg&crop=entropy&cs=srgb",
# This is a clock
"https://images.unsplash.com/photo-1563861826100-9cb868fdbe1c?ixlib=rb-4.1.0&q=85&fm=jpg&crop=entropy&cs=srgb",
# Not a clock (Dog)
"https://images.unsplash.com/photo-1530281700549-e82e7bf110d6?ixlib=rb-4.1.0&q=85&fm=jpg&crop=entropy&cs=srgb",
]
}
)
images_df.to_csv("images.csv", index=False)
# Define Components
components = [
FileReader(name="reader", path="images.csv", field_names=["url"]),
LoadImage(name="loader"),
LocalModel(name="classifier"),
CheckClassification(name="checker", classes=["analog clock", "wall clock", "digital clock"]),
MatchAdapter(name="match_adapter"),
LLMImageProcessor(
name="llm_processor",
prompt="Read the time on the clock",
response_model=TimeReading,
expand_response=True,
llm_kwargs={"model": "gpt-4o-mini"},
),
FileWriter(name="match_writer", path="matches.csv", field_names=["time", "url"]),
NonMatchAdapter(name="non_match_adapter"),
FileWriter(
name="non_match_writer", path="non_matches.csv", field_names=["classification", "url"]
),
]
# Define Connectors
connectors = [
connect("reader.url", "loader.url"),
connect("loader.image", "classifier.image"),
connect("reader.url", "checker.url"),
connect("classifier.classification", "checker.classification"),
# Adapter to Processor/Writer
connect("match_adapter.url", "llm_processor.image"),
connect("llm_processor.time", "match_writer.time"),
connect("match_adapter.url", "match_writer.url"),
connect("non_match_adapter.classification", "non_match_writer.classification"),
connect("non_match_adapter.url", "non_match_writer.url"),
]
# Event Connectors
builder = ConnectorBuilder(connector_cls=AsyncioConnector)
event_builder = EventConnectorBuilder(connector_builder=builder)
event_connectors = list(event_builder.build(components).values())
# Define Process
process = LocalProcess(components=components, connectors=connectors + event_connectors)
async with process:
await process.run()
Now open the output CSVs to see the time read from the clock images.
!cat matches.csv