Traffic intersection simulation¶
Event-driven simulations are non-deterministic and not suitable for running in distributed environments, because Plugboard won't guarantee the order in which events get processed.
This demo simulates a dual-intersection traffic corridor using Plugboard's event-driven architecture. Vehicles arrive at Junction A, wait for a green signal, cross the intersection, travel along a connecting road with stochastic travel time, then repeat the process at Junction B before exiting the system.
Components¶
| Component | Role |
|---|---|
| SimulationClock | Emits integer time steps (1 step = 1 second) |
| VehicleSource | Generates vehicles via a Poisson process |
| TrafficLight ×2 | FSM controllers cycling GREEN → YELLOW → RED |
| Junction ×2 | FIFO queues that release vehicles when the signal is green |
| ConnectingRoad | Adds triangular-distributed travel time between junctions |
| MetricsCollector | Subscribes to all event types for post-simulation analysis |
Event types¶
| Event | Trigger |
|---|---|
VehicleArrivedEvent |
A vehicle enters a junction queue |
VehicleDepartedEvent |
A vehicle passes through a junction |
SignalChangedEvent |
A traffic light transitions between states |
CongestionAlertEvent |
A junction's queue exceeds its congestion threshold |
Regular connectors carry continuous data (clock time, signal state, queue length), while events capture discrete occurrences — the two paradigms work together in a single process.
We first run a synchronised baseline (both lights share the same phase), then show how offsetting Junction B's green phase to create a green wave improves throughput.
# Install plugboard and dependencies for Google Colab
!pip install -q plugboard plotly
import random
import pandas as pd
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from plugboard.connector import AsyncioConnector
from plugboard.process import LocalProcess
from plugboard.schemas import ConnectorSpec
connect = lambda src, tgt: AsyncioConnector(spec=ConnectorSpec(source=src, target=tgt))
Component definitions¶
The simulation is built from six Plugboard components. Each one declares its I/O via the class-level
io attribute, which Plugboard uses to wire up both regular field connectors and event routes.
Four lightweight event types carry discrete signals around the system:
| Event | Payload |
|---|---|
VehicleArrivedEvent |
vehicle_id, target_junction, time |
VehicleDepartedEvent |
vehicle_id, from_junction, wait_time, time |
SignalChangedEvent |
junction, new_state, old_state, time |
CongestionAlertEvent |
junction, queue_length, threshold, time |
from collections import deque
from typing import Any, ClassVar, Unpack
from pydantic import BaseModel
from plugboard.component import Component, IOController as IO
from plugboard.events import Event
from plugboard.schemas import ComponentArgsDict
# ── Event data models ────────────────────────────────────────────────────────
class VehicleArrivalData(BaseModel):
"""Payload for a vehicle arrival event."""
vehicle_id: int
target_junction: str
time: float
class VehicleDepartureData(BaseModel):
"""Payload for a vehicle departure event."""
vehicle_id: int
from_junction: str
wait_time: float
time: float
class SignalChangeData(BaseModel):
"""Payload for a traffic signal change event."""
junction: str
new_state: str
old_state: str
time: float
class CongestionData(BaseModel):
"""Payload for a congestion alert event."""
junction: str
queue_length: int
threshold: int
time: float
# ── Events ───────────────────────────────────────────────────────────────────
class VehicleArrivedEvent(Event):
"""Emitted when a vehicle arrives at a junction queue."""
type: ClassVar[str] = "vehicle_arrived"
data: VehicleArrivalData
class VehicleDepartedEvent(Event):
"""Emitted when a vehicle passes through a junction."""
type: ClassVar[str] = "vehicle_departed"
data: VehicleDepartureData
class SignalChangedEvent(Event):
"""Emitted when a traffic light changes state."""
type: ClassVar[str] = "signal_changed"
data: SignalChangeData
class CongestionAlertEvent(Event):
"""Emitted when a junction queue exceeds the congestion threshold."""
type: ClassVar[str] = "congestion_alert"
data: CongestionData
# ── Components ───────────────────────────────────────────────────────────────
class SimulationClock(Component):
"""Drives the simulation by emitting integer time steps (1 step = 1 second)."""
io = IO(outputs=["time"])
def __init__(
self,
duration: int = 600,
**kwargs: Unpack[ComponentArgsDict],
) -> None:
super().__init__(**kwargs)
self._duration: int = duration
self._time: int = 0
async def step(self) -> None:
self.time = self._time
self._time += 1
if self._time > self._duration:
self._logger.info("Simulation complete", duration=self._duration)
await self.io.close()
class VehicleSource(Component):
"""Generates vehicles with Poisson arrivals.
At each step a vehicle arrives with probability ``arrival_rate``.
Each vehicle is assigned a unique ID and a ``VehicleArrivedEvent``
is emitted targeting the specified junction.
"""
io = IO(inputs=["time"], output_events=[VehicleArrivedEvent])
def __init__(
self,
arrival_rate: float = 0.4,
target_junction: str = "A",
**kwargs: Unpack[ComponentArgsDict],
) -> None:
super().__init__(**kwargs)
self._arrival_rate: float = arrival_rate
self._target_junction: str = target_junction
self._next_id: int = 1
async def step(self) -> None:
if random.random() < self._arrival_rate: # noqa: S311
vid = self._next_id
self._next_id += 1
self.io.queue_event(
VehicleArrivedEvent(
source=self.name,
data=VehicleArrivalData(
vehicle_id=vid,
target_junction=self._target_junction,
time=float(self.time),
),
)
)
class TrafficLight(Component):
"""FSM-based traffic signal controller.
Cycles through GREEN → YELLOW → RED with configurable phase durations.
The ``offset`` parameter shifts the starting phase so that two lights
can be synchronised or staggered to create green-wave effects.
A ``SignalChangedEvent`` is emitted on every state transition.
"""
io = IO(
inputs=["time"],
outputs=["signal"],
output_events=[SignalChangedEvent],
)
def __init__(
self,
junction: str = "A",
green_duration: int = 30,
yellow_duration: int = 5,
red_duration: int = 30,
offset: int = 0,
**kwargs: Unpack[ComponentArgsDict],
) -> None:
super().__init__(**kwargs)
self._junction: str = junction
self._green: int = green_duration
self._yellow: int = yellow_duration
self._red: int = red_duration
self._offset: int = offset
self._cycle: int = green_duration + yellow_duration + red_duration
self._signal_state: str | None = None
def _compute_state(self, t: int) -> str:
phase = (t + self._offset) % self._cycle
if phase < self._green:
return "green"
if phase < self._green + self._yellow:
return "yellow"
return "red"
async def step(self) -> None:
t = int(self.time)
new = self._compute_state(t)
if new != self._signal_state:
old = self._signal_state or "init"
self.io.queue_event(
SignalChangedEvent(
source=self.name,
data=SignalChangeData(
junction=self._junction,
new_state=new,
old_state=old,
time=float(t),
),
)
)
self._signal_state = new
self.signal = self._signal_state
class Junction(Component):
"""FIFO vehicle queue at a signalised intersection.
Vehicles arrive via ``VehicleArrivedEvent`` and are filtered by
``junction_name``. When the signal is green, vehicles are released
at ``service_rate`` (probability per step). Each release produces a
``VehicleDepartedEvent``. If the queue exceeds
``congestion_threshold`` a ``CongestionAlertEvent`` is emitted.
"""
io = IO(
inputs=["time", "signal"],
outputs=["queue_length", "total_passed"],
input_events=[VehicleArrivedEvent],
output_events=[VehicleDepartedEvent, CongestionAlertEvent],
)
def __init__(
self,
junction_name: str = "A",
service_rate: float = 0.8,
capacity: int = 100,
congestion_threshold: int = 20,
**kwargs: Unpack[ComponentArgsDict],
) -> None:
super().__init__(**kwargs)
self._junction_name: str = junction_name
self._service_rate: float = service_rate
self._capacity: int = capacity
self._congestion_threshold: int = congestion_threshold
self._queue: deque[tuple[int, float]] = deque()
self._total_passed: int = 0
self._congestion_active: bool = False
@VehicleArrivedEvent.handler
async def on_vehicle_arrived(self, event: VehicleArrivedEvent) -> None:
"""Enqueue vehicles targeted at this junction."""
if event.data.target_junction == self._junction_name:
if len(self._queue) < self._capacity:
self._queue.append((event.data.vehicle_id, event.data.time))
async def step(self) -> None:
t = float(self.time)
if self.signal == "green" and self._queue:
if random.random() < self._service_rate: # noqa: S311
vehicle_id, arrival_time = self._queue.popleft()
self._total_passed += 1
self.io.queue_event(
VehicleDepartedEvent(
source=self.name,
data=VehicleDepartureData(
vehicle_id=vehicle_id,
from_junction=self._junction_name,
wait_time=t - arrival_time,
time=t,
),
)
)
qlen = len(self._queue)
if qlen > self._congestion_threshold and not self._congestion_active:
self._congestion_active = True
self._logger.warning(
"Congestion detected",
junction=self._junction_name,
queue_length=qlen,
)
self.io.queue_event(
CongestionAlertEvent(
source=self.name,
data=CongestionData(
junction=self._junction_name,
queue_length=qlen,
threshold=self._congestion_threshold,
time=t,
),
)
)
elif qlen <= self._congestion_threshold:
self._congestion_active = False
self.queue_length = qlen
self.total_passed = self._total_passed
class ConnectingRoad(Component):
"""Road segment between two junctions with stochastic travel time.
Handles ``VehicleDepartedEvent`` from the upstream junction, buffers
the vehicle for a random travel time drawn from a triangular
distribution, then emits a ``VehicleArrivedEvent`` at the downstream
junction.
"""
io = IO(
inputs=["time"],
input_events=[VehicleDepartedEvent],
output_events=[VehicleArrivedEvent],
)
def __init__(
self,
from_junction: str = "A",
to_junction: str = "B",
min_travel: float = 10.0,
mode_travel: float = 15.0,
max_travel: float = 25.0,
**kwargs: Unpack[ComponentArgsDict],
) -> None:
super().__init__(**kwargs)
self._from: str = from_junction
self._to: str = to_junction
self._min: float = min_travel
self._mode: float = mode_travel
self._max: float = max_travel
self._in_transit: list[tuple[int, float]] = []
@VehicleDepartedEvent.handler
async def on_departed(self, event: VehicleDepartedEvent) -> None:
"""Buffer a departing vehicle with a random travel delay."""
if event.data.from_junction == self._from:
travel = random.triangular(self._min, self._max, self._mode) # noqa: S311
self._in_transit.append((event.data.vehicle_id, event.data.time + travel))
async def step(self) -> None:
t = float(self.time)
remaining: list[tuple[int, float]] = []
for vid, release in self._in_transit:
if t >= release:
self.io.queue_event(
VehicleArrivedEvent(
source=self.name,
data=VehicleArrivalData(
vehicle_id=vid,
target_junction=self._to,
time=t,
),
)
)
else:
remaining.append((vid, release))
self._in_transit = remaining
class MetricsCollector(Component):
"""Subscribes to all simulation events and records them for analysis.
After the simulation finishes, access the ``*_log`` lists and
``queue_history`` to build plots and compute KPIs.
"""
io = IO(
inputs=["time", "queue_a", "queue_b", "passed_a", "passed_b"],
input_events=[
VehicleArrivedEvent,
VehicleDepartedEvent,
CongestionAlertEvent,
SignalChangedEvent,
],
)
def __init__(self, **kwargs: Unpack[ComponentArgsDict]) -> None:
super().__init__(**kwargs)
self.arrival_log: list[dict[str, Any]] = []
self.departure_log: list[dict[str, Any]] = []
self.congestion_log: list[dict[str, Any]] = []
self.signal_log: list[dict[str, Any]] = []
self.queue_history: list[dict[str, Any]] = []
@VehicleArrivedEvent.handler
async def on_arrival(self, event: VehicleArrivedEvent) -> None:
"""Record vehicle arrival."""
self.arrival_log.append(event.data.model_dump())
@VehicleDepartedEvent.handler
async def on_departure(self, event: VehicleDepartedEvent) -> None:
"""Record vehicle departure."""
self.departure_log.append(event.data.model_dump())
@CongestionAlertEvent.handler
async def on_congestion(self, event: CongestionAlertEvent) -> None:
"""Record congestion alert."""
self.congestion_log.append(event.data.model_dump())
@SignalChangedEvent.handler
async def on_signal(self, event: SignalChangedEvent) -> None:
"""Record signal change."""
self.signal_log.append(event.data.model_dump())
async def step(self) -> None:
self.queue_history.append(
{
"time": float(self.time),
"queue_a": int(self.queue_a),
"queue_b": int(self.queue_b),
"passed_a": int(self.passed_a),
"passed_b": int(self.passed_b),
}
)
Assembling the process¶
We create 8 components and wire them together:
- Field connectors carry the clock time to every component, signal states from lights to junctions, and queue metrics into the collector.
- Event connectors are built automatically from the
input_events/output_eventsdeclared on each component'sIOController. Plugboard routes every emitted event to all components that subscribe to its type.
The baseline scenario uses synchronised signals — both junctions share the same GREEN/YELLOW/RED phase.
random.seed(42) # reproducible results
DURATION = 600 # seconds (10 minutes)
ARRIVAL_RATE = 0.4 # vehicles per second (~24 vehicles/min)
GREEN = 30 # seconds
YELLOW = 5
RED = 30
components = [
SimulationClock(name="clock", duration=DURATION),
VehicleSource(name="source", arrival_rate=ARRIVAL_RATE, target_junction="A"),
TrafficLight(
name="light-a", junction="A", green_duration=GREEN, yellow_duration=YELLOW, red_duration=RED
),
TrafficLight(
name="light-b", junction="B", green_duration=GREEN, yellow_duration=YELLOW, red_duration=RED
),
Junction(name="junction-a", junction_name="A", congestion_threshold=15),
Junction(name="junction-b", junction_name="B", congestion_threshold=15),
ConnectingRoad(name="road-ab", from_junction="A", to_junction="B"),
MetricsCollector(name="metrics"),
]
connectors = [
# Clock drives every component
connect("clock.time", "source.time"),
connect("clock.time", "light-a.time"),
connect("clock.time", "light-b.time"),
connect("clock.time", "junction-a.time"),
connect("clock.time", "junction-b.time"),
connect("clock.time", "road-ab.time"),
connect("clock.time", "metrics.time"),
# Signal state into junctions
connect("light-a.signal", "junction-a.signal"),
connect("light-b.signal", "junction-b.signal"),
# Queue metrics into collector
connect("junction-a.queue_length", "metrics.queue_a"),
connect("junction-b.queue_length", "metrics.queue_b"),
connect("junction-a.total_passed", "metrics.passed_a"),
connect("junction-b.total_passed", "metrics.passed_b"),
]
# Event connectors are built automatically from IO declarations
event_connectors = AsyncioConnector.builder().build_event_connectors(components)
process = LocalProcess(components=components, connectors=connectors + event_connectors)
print(f"{len(process.components)} components, {len(process.connectors)} connectors")
from plugboard.diagram import MermaidDiagram
diagram_url = MermaidDiagram.from_process(process).url
print(diagram_url)
In the process diagram dotted lines represent the event-driven portions of the model.
Run the baseline simulation¶
With both signals synchronised, vehicles released from Junction A arrive at Junction B during its RED phase — they queue up and wait.
async with process:
await process.run()
mc = process.components["metrics"]
df_queue = pd.DataFrame(mc.queue_history)
df_arrivals = pd.DataFrame(mc.arrival_log)
df_departures = pd.DataFrame(mc.departure_log)
df_signals = pd.DataFrame(mc.signal_log)
df_congestion = pd.DataFrame(mc.congestion_log)
total_arrived = len(df_arrivals[df_arrivals["target_junction"] == "A"])
passed_a = df_queue["passed_a"].iloc[-1]
passed_b = df_queue["passed_b"].iloc[-1]
print(f"Vehicles generated : {total_arrived}")
print(f"Through Junction A : {passed_a}")
print(f"Through Junction B : {passed_b} (exited system)")
print(f"System throughput : {passed_b / total_arrived:.1%}")
print(f"Congestion alerts : {len(df_congestion)}")
if len(df_departures):
print(
f"Mean wait time (A) : {df_departures[df_departures['from_junction'] == 'A']['wait_time'].mean():.1f}s"
)
print(
f"Mean wait time (B) : {df_departures[df_departures['from_junction'] == 'B']['wait_time'].mean():.1f}s"
)
Queue dynamics¶
The chart below shows how queue lengths at each junction oscillate with the signal cycle. Green and red background bands indicate the signal phase at Junction A.
CYCLE = GREEN + YELLOW + RED
fig = make_subplots(
rows=2,
cols=1,
shared_xaxes=True,
vertical_spacing=0.08,
subplot_titles=("Junction A queue", "Junction B queue"),
)
# Signal-state bands for Junction A
for start in range(0, DURATION, CYCLE):
fig.add_vrect(
x0=start, x1=start + GREEN, fillcolor="green", opacity=0.08, line_width=0, row=1, col=1
)
fig.add_vrect(
x0=start + GREEN + YELLOW,
x1=start + CYCLE,
fillcolor="red",
opacity=0.08,
line_width=0,
row=1,
col=1,
)
# Signal-state bands for Junction B (no offset in baseline)
for start in range(0, DURATION, CYCLE):
fig.add_vrect(
x0=start, x1=start + GREEN, fillcolor="green", opacity=0.08, line_width=0, row=2, col=1
)
fig.add_vrect(
x0=start + GREEN + YELLOW,
x1=start + CYCLE,
fillcolor="red",
opacity=0.08,
line_width=0,
row=2,
col=1,
)
fig.add_trace(
go.Scatter(
x=df_queue["time"], y=df_queue["queue_a"], name="Queue A", line=dict(color="royalblue")
),
row=1,
col=1,
)
fig.add_trace(
go.Scatter(x=df_queue["time"], y=df_queue["queue_b"], name="Queue B", line=dict(color="coral")),
row=2,
col=1,
)
# Mark congestion alerts
if len(df_congestion):
for _, row in df_congestion.iterrows():
r = 1 if row["junction"] == "A" else 2
fig.add_vline(x=row["time"], line_dash="dot", line_color="red", opacity=0.6, row=r, col=1)
fig.update_layout(
height=500,
template="plotly_white",
showlegend=False,
title_text="Baseline: synchronised signals",
)
fig.update_xaxes(title_text="Time (s)", row=2, col=1)
fig.update_yaxes(title_text="Vehicles", row=1, col=1)
fig.update_yaxes(title_text="Vehicles", row=2, col=1)
fig.show()
Wait-time distributions¶
Vehicles arriving during a red phase wait longer. The distribution at Junction B is typically wider because it includes both signal delays and the connecting-road variance.
fig = go.Figure()
for junc, colour in [("A", "royalblue"), ("B", "coral")]:
waits = df_departures[df_departures["from_junction"] == junc]["wait_time"]
if len(waits):
fig.add_trace(
go.Histogram(
x=waits, name=f"Junction {junc}", opacity=0.7, marker_color=colour, nbinsx=30
)
)
fig.update_layout(
barmode="overlay",
template="plotly_white",
title="Vehicle wait-time distribution",
xaxis_title="Wait time (s)",
yaxis_title="Count",
)
fig.show()
Experiment: green-wave offset¶
When both signals share the same phase, vehicles released from Junction A during its green phase arrive at Junction B roughly 10–25 seconds later — often hitting B's red phase.
A green wave staggers Junction B's phase so that its green window aligns with the arrival of vehicles from A. The optimal offset is approximately:
$$\text{offset}_B = \text{cycle length} - \text{avg. travel time} \approx 65 - 15 = 50\text{s}$$
Let's re-run the simulation with this offset and compare.
def build_process(light_b_offset: int = 0, seed: int = 42) -> LocalProcess:
"""Create a fresh traffic-corridor process with a given offset for Junction B."""
random.seed(seed)
comps = [
SimulationClock(name="clock", duration=DURATION),
VehicleSource(name="source", arrival_rate=ARRIVAL_RATE, target_junction="A"),
TrafficLight(
name="light-a",
junction="A",
green_duration=GREEN,
yellow_duration=YELLOW,
red_duration=RED,
),
TrafficLight(
name="light-b",
junction="B",
green_duration=GREEN,
yellow_duration=YELLOW,
red_duration=RED,
offset=light_b_offset,
),
Junction(name="junction-a", junction_name="A", congestion_threshold=15),
Junction(name="junction-b", junction_name="B", congestion_threshold=15),
ConnectingRoad(name="road-ab", from_junction="A", to_junction="B"),
MetricsCollector(name="metrics"),
]
conns = [
connect("clock.time", "source.time"),
connect("clock.time", "light-a.time"),
connect("clock.time", "light-b.time"),
connect("clock.time", "junction-a.time"),
connect("clock.time", "junction-b.time"),
connect("clock.time", "road-ab.time"),
connect("clock.time", "metrics.time"),
connect("light-a.signal", "junction-a.signal"),
connect("light-b.signal", "junction-b.signal"),
connect("junction-a.queue_length", "metrics.queue_a"),
connect("junction-b.queue_length", "metrics.queue_b"),
connect("junction-a.total_passed", "metrics.passed_a"),
connect("junction-b.total_passed", "metrics.passed_b"),
]
evt = AsyncioConnector.builder().build_event_connectors(comps)
return LocalProcess(components=comps, connectors=conns + evt)
OFFSET = CYCLE - 15 # ≈ mode travel time
proc_wave = build_process(light_b_offset=OFFSET)
async with proc_wave:
await proc_wave.run()
mc_wave = proc_wave.components["metrics"]
df_queue_wave = pd.DataFrame(mc_wave.queue_history)
df_dep_wave = pd.DataFrame(mc_wave.departure_log)
df_cong_wave = pd.DataFrame(mc_wave.congestion_log)
print(f"Green-wave offset: {OFFSET}s")
print(f"Through Junction B: {df_queue_wave['passed_b'].iloc[-1]} (was {passed_b})")
print(f"Congestion alerts : {len(df_cong_wave)} (was {len(df_congestion)})")
if len(df_dep_wave):
print(
f"Mean wait (B) : {df_dep_wave[df_dep_wave['from_junction'] == 'B']['wait_time'].mean():.1f}s"
)
Side-by-side comparison¶
The top row shows the synchronised baseline; the bottom row shows the green-wave scenario. Notice how Junction B's queue is dramatically reduced when its green phase is aligned with arriving traffic from Junction A.
fig = make_subplots(
rows=2,
cols=2,
shared_xaxes=True,
vertical_spacing=0.12,
horizontal_spacing=0.08,
subplot_titles=(
"Baseline — Junction A",
"Baseline — Junction B",
"Green wave — Junction A",
"Green wave — Junction B",
),
)
# Baseline
fig.add_trace(
go.Scatter(
x=df_queue["time"], y=df_queue["queue_a"], line=dict(color="royalblue"), showlegend=False
),
row=1,
col=1,
)
fig.add_trace(
go.Scatter(
x=df_queue["time"], y=df_queue["queue_b"], line=dict(color="coral"), showlegend=False
),
row=1,
col=2,
)
# Green wave
fig.add_trace(
go.Scatter(
x=df_queue_wave["time"],
y=df_queue_wave["queue_a"],
line=dict(color="royalblue"),
showlegend=False,
),
row=2,
col=1,
)
fig.add_trace(
go.Scatter(
x=df_queue_wave["time"],
y=df_queue_wave["queue_b"],
line=dict(color="coral"),
showlegend=False,
),
row=2,
col=2,
)
for r in [1, 2]:
for c in [1, 2]:
fig.update_yaxes(title_text="Queue", row=r, col=c)
fig.update_xaxes(title_text="Time (s)", row=2, col=1)
fig.update_xaxes(title_text="Time (s)", row=2, col=2)
fig.update_layout(
height=500,
template="plotly_white",
title_text="Queue length comparison: synchronised vs green wave",
)
fig.show()
fig = go.Figure()
fig.add_trace(
go.Scatter(
x=df_queue["time"],
y=df_queue["passed_b"],
name="Baseline",
line=dict(color="coral", dash="dash"),
)
)
fig.add_trace(
go.Scatter(
x=df_queue_wave["time"],
y=df_queue_wave["passed_b"],
name="Green wave",
line=dict(color="seagreen"),
)
)
fig.update_layout(
template="plotly_white",
title="Cumulative vehicles through Junction B",
xaxis_title="Time (s)",
yaxis_title="Vehicles exited",
)
fig.show()
Summary¶
This demo illustrated several strengths of Plugboard's event-driven architecture applied to transport simulation:
- Hybrid communication — regular connectors carry continuous state (time, signal phase, queue lengths) while discrete events (arrivals, departures, congestion alerts) propagate through the system.
- FSM logic — traffic lights use a finite state machine that emits
SignalChangedEventonly on transitions, avoiding unnecessary work when the state is unchanged. - Stochastic processes — vehicle arrivals follow a Poisson process and road travel times use a triangular distribution, giving the simulation realistic variability.
- Event monitoring — the
MetricsCollectorsubscribes to every event type, enabling rich post-simulation analysis without coupling the analytics to the simulation logic. - What-if analysis — by wrapping process creation in a helper function, we can easily sweep parameters (like signal offset) and compare scenarios.
Try experimenting with different arrival rates, signal timings, or road travel times to explore how the system responds under stress.