Bus Bunching Simulation¶
Event-driven simulations are non-deterministic and not suitable for running in distributed environments, because Plugboard won't guarantee the order in which events get processed.
This notebook demonstrates how Plugboard's event-driven simulation naturally models the feedback dynamics that cause bus bunching — one of the most pervasive problems in public transit.
Bus bunching occurs when a delayed bus picks up more waiting passengers, increasing its dwell time and delaying it further, while the following bus finds fewer passengers and speeds ahead. This positive feedback loop causes buses to cluster together, ruining service regularity.
We model a transit corridor with staggered bus departures:
- Bus stops that accumulate passengers via a Poisson process
- Buses that travel between stops, dwell to board/alight passengers, and experience traffic signal delays
- A central dispatch controller that implements a headway-based holding strategy to mitigate bunching
We run the simulation twice — once without control to observe bunching, then with holding control — and compare the results.
Install and import dependencies¶
# Install plugboard for Google Colab
%pip install -q plugboard plotly
import math
import typing as _t
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from pydantic import BaseModel
from plugboard.component import Component, IOController as IO
from plugboard.connector import AsyncioConnector
from plugboard.diagram import MermaidDiagram
from plugboard.events import Event
from plugboard.process import LocalProcess
from plugboard.schemas import ComponentArgsDict, ConnectorSpec
connect = lambda src, tgt: AsyncioConnector(spec=ConnectorSpec(source=src, target=tgt))
Route configuration¶
We model a single bus corridor with 15 stops. Buses depart from stop 0 at regular headways and travel one-way to the final stop.
Key parameters:
- 15 stops with 400–600m spacing (~7.5 km total corridor)
- Bus speed: 8 m/s (~29 km/h, typical urban bus)
- 6 buses dispatched at even headways from the first stop
- Capacity: 80 passengers per bus
- Passenger arrival rates vary by stop — higher in the central corridor
- Simulation timestep: 10 seconds of real time per tick
- Initial perturbation: bus-0 suffers an extra 120s delay at its 3rd stop (e.g. wheelchair boarding), which seeds the bunching cascade
# --- Route configuration ---
NUM_STOPS = 15
NUM_BUSES = 6
BUS_CAPACITY = 80
BUS_SPEED = 8.0 # m/s (~29 km/h)
TICK_SECONDS = 10 # each simulation tick = 10 real seconds
# Distance between consecutive stops (meters)
STOP_DISTANCES = [500, 450, 400, 550, 500, 600, 450, 500, 550, 400, 500, 450, 500, 550]
# Cumulative stop positions along the route
STOP_POSITIONS = [0.0]
for d in STOP_DISTANCES:
STOP_POSITIONS.append(STOP_POSITIONS[-1] + d)
ROUTE_LENGTH = STOP_POSITIONS[-1]
# Headway between bus departures from stop 0 (in ticks)
DEPARTURE_HEADWAY_TICKS = 18 # 180 seconds = 3 minutes
# Total simulation ticks: enough for all buses to complete the route
TRAVEL_TIME_TICKS = math.ceil(ROUTE_LENGTH / (BUS_SPEED * TICK_SECONDS))
TOTAL_TICKS = (
(NUM_BUSES - 1) * DEPARTURE_HEADWAY_TICKS + TRAVEL_TIME_TICKS + 250
) # extra margin for dwell + holding
# Base passenger arrival rate per stop per tick (Poisson lambda)
# Higher at central stops, creating more dwell time variation
BASE_ARRIVAL_RATES = [0.4, 0.6, 1.0, 1.4, 1.8, 2.2, 2.5, 2.5, 2.2, 1.8, 1.4, 1.0, 0.6, 0.4, 0.2]
# Alighting probability at each stop
ALIGHT_PROBS = [
0.02,
0.03,
0.05,
0.06,
0.08,
0.10,
0.12,
0.12,
0.10,
0.08,
0.08,
0.10,
0.12,
0.15,
0.30,
]
# Dwell time parameters
DWELL_FIXED = 8.0 # fixed dead time (door open/close), seconds
DWELL_PER_PAX = 2.5 # seconds per boarding/alighting passenger
# Traffic signal parameters
NUM_SIGNALS = 7
SIGNAL_GREEN_FRAC = 0.50
# Perturbation: extra delay on bus-0 at stop 2 (simulates an incident)
PERTURBATION_BUS = "bus-0"
PERTURBATION_STOP = 2
PERTURBATION_TICKS = 12 # 120 seconds extra delay
# Headway control threshold (seconds)
HEADWAY_THRESHOLD = 30.0
print(f"Route: {ROUTE_LENGTH:.0f}m, {NUM_STOPS} stops")
print(f"Fleet: {NUM_BUSES} buses at {DEPARTURE_HEADWAY_TICKS * TICK_SECONDS}s headway")
print(f"Simulation: {TOTAL_TICKS} ticks = {TOTAL_TICKS * TICK_SECONDS / 60:.0f} min")
Define events¶
Events model the discrete interactions between buses, stops, and the dispatch controller:
| Event | Direction | Purpose |
|---|---|---|
BusArrivedAtStop |
Bus → Stop | Triggers passenger boarding/alighting |
PassengersBoarded |
Stop → Bus | Reports boarding results and dwell time |
BusPositionUpdate |
Bus → Dispatch | Periodic position report for headway tracking |
HoldBus |
Dispatch → Bus | Instruction to hold at a stop |
# --- Event data models ---
class BusArrivedData(BaseModel):
bus_id: str
stop_index: int
current_load: int
tick: int
class PassengersBoardedData(BaseModel):
bus_id: str
stop_index: int
boarded: int
alighted: int
new_load: int
dwell_ticks: int
class BusPositionData(BaseModel):
bus_id: str
position: float
stop_index: int
load: int
tick: int
phase: str
class HoldBusData(BaseModel):
bus_id: str
hold_ticks: int
# --- Event classes ---
class BusArrivedAtStop(Event):
type: _t.ClassVar[str] = "bus_arrived_at_stop"
data: BusArrivedData
class PassengersBoarded(Event):
type: _t.ClassVar[str] = "passengers_boarded"
data: PassengersBoardedData
class BusPositionUpdate(Event):
type: _t.ClassVar[str] = "bus_position_update"
data: BusPositionData
class HoldBus(Event):
type: _t.ClassVar[str] = "hold_bus"
data: HoldBusData
Create the Bus Stop component¶
Each BusStop maintains a passenger queue. On every tick, new passengers arrive via a Poisson process. When a bus arrives (via BusArrivedAtStop), the stop:
- Computes alighting passengers (fraction of onboard load)
- Computes boarding passengers (limited by bus capacity)
- Calculates dwell time: $T_d = t_0 + t_{pax} \cdot (n_{board} + n_{alight})$
- Records the arrival for headway analysis
- Returns a
PassengersBoardedevent
This is where the bunching feedback loop lives: a delayed bus finds more passengers waiting → longer dwell → more delay. Meanwhile, the following bus finds fewer passengers → shorter dwell → catches up.
class BusStop(Component):
"""A bus stop that accumulates passengers and handles boarding/alighting."""
io = IO(
inputs=["tick"],
input_events=[BusArrivedAtStop],
output_events=[PassengersBoarded],
)
def __init__(
self,
stop_index: int,
arrival_rate: float,
alight_prob: float,
rng_seed: int = 42,
**kwargs: _t.Unpack[ComponentArgsDict],
) -> None:
super().__init__(**kwargs)
self._stop_index = stop_index
self._arrival_rate = arrival_rate
self._alight_prob = alight_prob
self._queue = 0
self._rng = np.random.default_rng(rng_seed + stop_index * 7)
# Record arrivals for headway analysis: list of (tick, bus_id)
self.arrival_log: list[tuple[int, str]] = []
async def step(self) -> None:
new_arrivals = int(self._rng.poisson(self._arrival_rate))
self._queue += new_arrivals
@BusArrivedAtStop.handler
async def handle_bus_arrival(self, event: BusArrivedAtStop) -> PassengersBoarded:
"""Board/alight passengers when a bus arrives."""
if event.data.stop_index != self._stop_index:
return PassengersBoarded(
source=self.name,
data=PassengersBoardedData(
bus_id=event.data.bus_id,
stop_index=self._stop_index,
boarded=0,
alighted=0,
new_load=event.data.current_load,
dwell_ticks=0,
),
)
# Record arrival
self.arrival_log.append((event.data.tick, event.data.bus_id))
load = event.data.current_load
# Alighting
alighted = int(self._rng.binomial(load, self._alight_prob)) if load > 0 else 0
load -= alighted
# Boarding (limited by remaining capacity)
space = BUS_CAPACITY - load
boarded = min(self._queue, space)
self._queue -= boarded
load += boarded
# Dwell time
dwell_seconds = DWELL_FIXED + DWELL_PER_PAX * (boarded + alighted)
dwell_ticks = max(1, math.ceil(dwell_seconds / TICK_SECONDS))
return PassengersBoarded(
source=self.name,
data=PassengersBoardedData(
bus_id=event.data.bus_id,
stop_index=self._stop_index,
boarded=boarded,
alighted=alighted,
new_load=load,
dwell_ticks=dwell_ticks,
),
)
Create the Bus Vehicle component¶
Each Bus travels one-way along the corridor. Key features:
- Staggered departure: each bus waits for its scheduled departure tick before moving
- Perturbation support: an optional extra delay at a specific stop seeds the bunching cascade
- Traffic signal delays: random red-light encounters add realistic variance to travel times
- State machine: traveling → dwelling → (optionally holding) → traveling
- Trajectory recording: each tick's state is logged for post-simulation visualization
class Bus(Component):
"""A bus that travels one-way along the corridor."""
io = IO(
inputs=["tick"],
input_events=[PassengersBoarded, HoldBus],
output_events=[BusArrivedAtStop, BusPositionUpdate],
)
def __init__(
self,
bus_id: str,
departure_tick: int,
signal_positions: list[float],
perturbation_stop: int | None = None,
perturbation_ticks: int = 0,
rng_seed: int = 42,
**kwargs: _t.Unpack[ComponentArgsDict],
) -> None:
super().__init__(**kwargs)
self._bus_id = bus_id
self._departure_tick = departure_tick
self._position = 0.0
self._next_stop_index = 0
self._load = 0
self._phase = "waiting" # waiting | traveling | dwelling | holding | finished
self._dwell_remaining = 0
self._hold_remaining = 0
self._signal_positions = signal_positions
self._perturbation_stop = perturbation_stop
self._perturbation_ticks = perturbation_ticks
self._perturbation_applied = False
self._rng = np.random.default_rng(rng_seed)
self._finished = False
self._awaiting_boarding = False # True while waiting for PassengersBoarded response
self.trajectory: list[dict[str, _t.Any]] = []
async def step(self) -> None:
current_tick = int(self.tick)
# Wait until departure time
if self._phase == "waiting":
if current_tick >= self._departure_tick:
self._phase = "traveling"
self._record(current_tick)
return
if self._phase == "finished":
self._record(current_tick)
return
if self._phase == "dwelling":
# Don't leave the stop until PassengersBoarded response has arrived
if self._awaiting_boarding:
self._record(current_tick)
return
self._dwell_remaining -= 1
if self._dwell_remaining <= 0:
# Apply perturbation delay if configured
if (
not self._perturbation_applied
and self._perturbation_stop is not None
and self._next_stop_index == self._perturbation_stop
):
self._perturbation_applied = True
self._phase = "holding"
self._hold_remaining = self._perturbation_ticks
self._record(current_tick)
return
self._next_stop_index += 1
if self._next_stop_index >= NUM_STOPS:
self._phase = "finished"
self._record(current_tick)
return
self._phase = "traveling"
elif self._phase == "holding":
self._hold_remaining -= 1
if self._hold_remaining <= 0:
self._next_stop_index += 1
if self._next_stop_index >= NUM_STOPS:
self._phase = "finished"
self._record(current_tick)
return
self._phase = "traveling"
if self._phase == "traveling":
distance = BUS_SPEED * TICK_SECONDS
# Traffic signal delay
for sig_pos in self._signal_positions:
if self._position < sig_pos <= self._position + distance:
phase_in_cycle = self._rng.uniform(0, 90.0)
if phase_in_cycle > SIGNAL_GREEN_FRAC * 90.0:
red_wait = 90.0 - phase_in_cycle
distance = max(0.0, distance - BUS_SPEED * red_wait)
self._position = min(self._position + distance, ROUTE_LENGTH)
# Check if we reached the next stop
if self._next_stop_index < NUM_STOPS:
target = STOP_POSITIONS[self._next_stop_index]
if self._position >= target:
self._position = target
self._phase = "dwelling"
self._dwell_remaining = 0 # set by PassengersBoarded handler
self._awaiting_boarding = True
self.io.queue_event(
BusArrivedAtStop(
source=self.name,
data=BusArrivedData(
bus_id=self._bus_id,
stop_index=self._next_stop_index,
current_load=self._load,
tick=current_tick,
),
)
)
self._record(current_tick)
# Position update for dispatch
self.io.queue_event(
BusPositionUpdate(
source=self.name,
data=BusPositionData(
bus_id=self._bus_id,
position=self._position,
stop_index=self._next_stop_index,
load=self._load,
tick=current_tick,
phase=self._phase,
),
)
)
def _record(self, tick: int) -> None:
self.trajectory.append(
{
"tick": tick,
"position": self._position,
"stop_index": self._next_stop_index,
"load": self._load,
"phase": self._phase,
}
)
@PassengersBoarded.handler
async def handle_boarding(self, event: PassengersBoarded) -> None:
if event.data.bus_id != self._bus_id:
return
self._load = event.data.new_load
self._awaiting_boarding = False
if event.data.dwell_ticks > 0:
self._dwell_remaining = max(self._dwell_remaining, event.data.dwell_ticks)
@HoldBus.handler
async def handle_hold(self, event: HoldBus) -> None:
if event.data.bus_id != self._bus_id:
return
if self._phase in ("dwelling", "holding"):
self._phase = "holding"
self._hold_remaining = max(self._hold_remaining, event.data.hold_ticks)
Create the Simulation Clock and Dispatch Controller¶
The SimClock drives the simulation forward.
The Dispatch controller listens to BusPositionUpdate events and computes headways between consecutive active buses. When two buses are too close together (headway below threshold), it holds the trailing bus at its current stop to restore spacing.
class SimClock(Component):
"""Simulation clock that drives the tick counter."""
io = IO(outputs=["tick"])
def __init__(
self, total_ticks: int = TOTAL_TICKS, **kwargs: _t.Unpack[ComponentArgsDict]
) -> None:
super().__init__(**kwargs)
self._total = total_ticks
self._tick = 0
async def step(self) -> None:
self._tick += 1
self.tick = self._tick
if self._tick >= self._total:
await self.io.close()
class Dispatch(Component):
"""Monitors bus positions and issues holding instructions to restore headways."""
io = IO(
inputs=["tick"],
input_events=[BusPositionUpdate],
output_events=[HoldBus],
)
def __init__(
self,
bus_ids: list[str],
enabled: bool = True,
target_headway_s: float = DEPARTURE_HEADWAY_TICKS * TICK_SECONDS,
threshold_s: float = HEADWAY_THRESHOLD,
**kwargs: _t.Unpack[ComponentArgsDict],
) -> None:
super().__init__(**kwargs)
self._bus_ids = bus_ids
self._enabled = enabled
self._target = target_headway_s
self._threshold = threshold_s
self._positions: dict[str, float] = {}
self._phases: dict[str, str] = {}
self._hold_cooldown: dict[str, int] = {} # bus_id → tick when hold expires
async def step(self) -> None:
if not self._enabled:
return
# Only consider active buses (not waiting/finished)
active = {
bid: pos
for bid, pos in self._positions.items()
if self._phases.get(bid) not in ("waiting", "finished", None)
}
if len(active) < 2:
return
# Use departure order (not position) to identify leader/follower pairs.
# In a one-way route, bus-0 always leads bus-1, etc.
ordered = [bid for bid in self._bus_ids if bid in active]
for i in range(len(ordered) - 1):
leader_id = ordered[i]
follower_id = ordered[i + 1]
gap = max(0.0, active[leader_id] - active[follower_id])
time_gap = gap / BUS_SPEED if BUS_SPEED > 0 else float("inf")
# If follower is too close to leader, hold the follower
if time_gap < self._target - self._threshold:
hold_s = min((self._target - time_gap) * 0.4, 60.0)
hold_ticks = max(1, math.ceil(hold_s / TICK_SECONDS))
current_tick = int(self.tick)
# Only hold if not on cooldown from a recent hold
if self._phases.get(follower_id) in (
"dwelling",
"holding",
) and current_tick >= self._hold_cooldown.get(follower_id, 0):
self._hold_cooldown[follower_id] = current_tick + hold_ticks + 2
self.io.queue_event(
HoldBus(
source=self.name,
data=HoldBusData(bus_id=follower_id, hold_ticks=hold_ticks),
)
)
@BusPositionUpdate.handler
async def handle_position(self, event: BusPositionUpdate) -> None:
self._positions[event.data.bus_id] = event.data.position
self._phases[event.data.bus_id] = event.data.phase
Wire up the process¶
We assemble the Plugboard process with:
- Field connectors: Clock tick → all components (synchronization)
- Event connectors: automatically wired based on declared input/output events
Plugboard's declarative wiring makes the complex interaction topology explicit — buses talk to stops via events, stops respond via events, and the dispatch controller monitors everything through the same event bus.
signal_positions = [ROUTE_LENGTH * (i + 1) / (NUM_SIGNALS + 1) for i in range(NUM_SIGNALS)]
def build_process(
dispatch_enabled: bool = False,
apply_perturbation: bool = True,
rng_seed: int = 42,
) -> tuple[LocalProcess, list[Bus], list[BusStop], Dispatch]:
"""Build a simulation process with all components wired together."""
clock = SimClock(name="clock")
stops = [
BusStop(
name=f"stop-{i}",
stop_index=i,
arrival_rate=BASE_ARRIVAL_RATES[i],
alight_prob=ALIGHT_PROBS[i],
rng_seed=rng_seed,
)
for i in range(NUM_STOPS)
]
bus_ids = [f"bus-{i}" for i in range(NUM_BUSES)]
buses = [
Bus(
name=bus_ids[i],
bus_id=bus_ids[i],
departure_tick=i * DEPARTURE_HEADWAY_TICKS,
signal_positions=signal_positions,
perturbation_stop=PERTURBATION_STOP
if (apply_perturbation and bus_ids[i] == PERTURBATION_BUS)
else None,
perturbation_ticks=PERTURBATION_TICKS
if (apply_perturbation and bus_ids[i] == PERTURBATION_BUS)
else 0,
rng_seed=rng_seed + i * 100,
)
for i in range(NUM_BUSES)
]
dispatch = Dispatch(name="dispatch", bus_ids=bus_ids, enabled=dispatch_enabled)
components: list[Component] = [clock] + stops + buses + [dispatch] # type: ignore[list-item]
field_connectors = []
for comp in stops + buses + [dispatch]:
field_connectors.append(connect("clock.tick", f"{comp.name}.tick"))
event_connectors = AsyncioConnector.builder().build_event_connectors(components)
process = LocalProcess(
components=components,
connectors=field_connectors + event_connectors,
)
return process, buses, stops, dispatch
# Visualize the process wiring
process_viz, _, _, _ = build_process(dispatch_enabled=True)
diagram = MermaidDiagram.from_process(process_viz)
diagram.url
Run Scenario 1: No control (bus bunching)¶
First we run without the dispatch controller. The initial perturbation on bus-0 (a 120s delay at stop 2) seeds the bunching cascade. As bus-0 falls behind, it encounters more waiting passengers, increasing dwell times and delaying it further. Bus-1 catches up, finding fewer passengers and shorter dwells.
process_nc, buses_nc, stops_nc, dispatch_nc = build_process(dispatch_enabled=False, rng_seed=42)
async with process_nc:
await process_nc.run()
print("Scenario 1 (no control) complete")
Run Scenario 2: With headway control¶
Now we run with the dispatch controller enabled. It monitors headways and holds buses at stops when they get too close to the bus ahead.
process_ctrl, buses_ctrl, stops_ctrl, dispatch_ctrl = build_process(
dispatch_enabled=True, rng_seed=42
)
async with process_ctrl:
await process_ctrl.run()
print("Scenario 2 (with control) complete")
Visualize bus trajectories (time-space diagram)¶
The time-space diagram is the standard transit operations visualization. Each line traces a bus along the corridor. Flat segments = dwelling/holding at a stop. Converging lines = bunching.
_COLORS = px.colors.qualitative.Plotly
_x_max = TOTAL_TICKS * TICK_SECONDS / 60
fig = make_subplots(
rows=1,
cols=2,
shared_yaxes=True,
subplot_titles=["No Control", "With Headway Control"],
horizontal_spacing=0.05,
)
for col, (buses, _title) in enumerate(
[(buses_nc, "No Control"), (buses_ctrl, "With Headway Control")], 1
):
for i, bus in enumerate(buses):
ticks = [r["tick"] for r in bus.trajectory]
times_m = [t * TICK_SECONDS / 60 for t in ticks]
positions_km = [r["position"] / 1000 for r in bus.trajectory]
fig.add_trace(
go.Scatter(
x=times_m,
y=positions_km,
mode="lines",
name=bus.name,
line=dict(color=_COLORS[i % len(_COLORS)], width=1.5),
opacity=0.85,
legendgroup=bus.name,
showlegend=(col == 1),
),
row=1,
col=col,
)
for sp in STOP_POSITIONS:
fig.add_shape(
type="line",
x0=0,
x1=_x_max,
y0=sp / 1000,
y1=sp / 1000,
line=dict(color="lightgray", width=0.5),
row=1,
col=col,
)
fig.update_xaxes(title_text="Time (minutes)")
fig.update_yaxes(title_text="Distance along route (km)", col=1)
fig.update_layout(
title=dict(text="Time-Space Diagram: Bus Bunching", font=dict(size=16)),
height=550,
legend=dict(orientation="v"),
)
fig.show()
Headway analysis at stops¶
Headway is the time between consecutive bus arrivals at the same stop. Even headways = regular service. Headways that shrink then grow = buses bunching up.
We compute headways from the arrival_log recorded by each BusStop component and plot the coefficient of variation (CV) across stops.
def compute_stop_headways(
stops: list[BusStop],
) -> dict[int, list[float]]:
"""Compute headways (in seconds) at each stop from arrival logs."""
headways: dict[int, list[float]] = {}
for i, stop in enumerate(stops):
log = sorted(stop.arrival_log, key=lambda x: x[0]) # sort by tick
if len(log) < 2:
headways[i] = []
continue
hw = []
for j in range(1, len(log)):
dt = (log[j][0] - log[j - 1][0]) * TICK_SECONDS
hw.append(dt)
headways[i] = hw
return headways
hw_nc = compute_stop_headways(stops_nc)
hw_ctrl = compute_stop_headways(stops_ctrl)
def headway_cv(headways: dict[int, list[float]]) -> list[float]:
"""Compute CV (std/mean) at each stop."""
cvs = []
for i in range(NUM_STOPS):
hw = headways.get(i, [])
if len(hw) >= 2:
arr = np.array(hw)
cvs.append(float(arr.std() / arr.mean()) if arr.mean() > 0 else 0.0)
else:
cvs.append(0.0)
return cvs
cv_nc = headway_cv(hw_nc)
cv_ctrl = headway_cv(hw_ctrl)
# Headway distribution (all stops combined)
all_hw_nc = [h for hws in hw_nc.values() for h in hws]
all_hw_ctrl = [h for hws in hw_ctrl.values() for h in hws]
ideal_headway = DEPARTURE_HEADWAY_TICKS * TICK_SECONDS
stop_indices = list(range(NUM_STOPS))
fig = make_subplots(
rows=1,
cols=2,
subplot_titles=["Headway Regularity by Stop", "Headway Distribution (All Stops)"],
horizontal_spacing=0.1,
)
fig.add_trace(
go.Bar(name="No Control", x=stop_indices, y=cv_nc, marker_color="salmon", opacity=0.8),
row=1,
col=1,
)
fig.add_trace(
go.Bar(name="With Control", x=stop_indices, y=cv_ctrl, marker_color="steelblue", opacity=0.8),
row=1,
col=1,
)
fig.add_trace(
go.Histogram(
x=all_hw_nc,
name="No Control",
marker_color="salmon",
opacity=0.6,
histnorm="probability density",
nbinsx=25,
showlegend=False,
),
row=1,
col=2,
)
fig.add_trace(
go.Histogram(
x=all_hw_ctrl,
name="With Control",
marker_color="steelblue",
opacity=0.6,
histnorm="probability density",
nbinsx=25,
showlegend=False,
),
row=1,
col=2,
)
fig.add_vline(
x=ideal_headway,
line=dict(color="black", dash="dash", width=1.5),
annotation_text=f"Ideal ({ideal_headway}s)",
annotation_position="top right",
row=1,
col=2,
)
fig.update_xaxes(title_text="Stop Index", col=1)
fig.update_yaxes(title_text="Headway CV (std / mean)", col=1)
fig.update_xaxes(title_text="Headway (seconds)", col=2)
fig.update_yaxes(title_text="Density", col=2)
fig.update_layout(
title=dict(text="Headway Analysis", font=dict(size=16)),
barmode="group",
height=500,
)
fig.show()
Bus load factors¶
How full are the buses? In the bunching scenario, the delayed bus picks up more passengers (it's full) while the bus behind runs nearly empty — a hallmark of the bunching problem.
fig = make_subplots(
rows=1,
cols=2,
shared_yaxes=True,
subplot_titles=["No Control", "With Headway Control"],
horizontal_spacing=0.05,
)
for col, (buses, _title) in enumerate(
[(buses_nc, "No Control"), (buses_ctrl, "With Headway Control")], 1
):
for i, bus in enumerate(buses):
active = [r for r in bus.trajectory if r["phase"] not in ("waiting",)]
if not active:
continue
times_m = [r["tick"] * TICK_SECONDS / 60 for r in active]
loads_pct = [r["load"] / BUS_CAPACITY * 100 for r in active]
fig.add_trace(
go.Scatter(
x=times_m,
y=loads_pct,
mode="lines",
name=bus.name,
line=dict(color=_COLORS[i % len(_COLORS)], width=1.2),
opacity=0.8,
legendgroup=bus.name,
showlegend=(col == 1),
),
row=1,
col=col,
)
fig.add_hline(
y=100, line=dict(color="red", width=0.8, dash="dash"), opacity=0.5, row=1, col=col
)
fig.update_xaxes(title_text="Time (minutes)")
fig.update_yaxes(title_text="Load Factor (%)", col=1, range=[0, 110])
fig.update_yaxes(range=[0, 110])
fig.update_layout(
title=dict(text="Bus Load Factors Over Time", font=dict(size=16)),
height=500,
)
fig.show()
Summary statistics¶
We quantify the improvement from headway control using key transit performance metrics:
- Mean headway — average time between consecutive buses at each stop
- Headway CV — coefficient of variation (lower = more regular)
- Excess wait time — how much longer passengers wait compared to perfectly regular service
- P95 headway — the 95th percentile headway (tail events passengers experience)
def compute_metrics(headways: dict[int, list[float]]) -> dict[str, float]:
"""Compute aggregate headway metrics across all stops."""
all_hw = [h for hws in headways.values() for h in hws if h > 0]
if not all_hw:
return {"mean": 0, "std": 0, "cv": 0, "p95": 0, "excess_wait": 0}
arr = np.array(all_hw)
mean = float(arr.mean())
std = float(arr.std())
cv = std / mean if mean > 0 else 0
p95 = float(np.percentile(arr, 95))
# Excess wait = E[W] - H/2, where E[W] = H/2 * (1 + CV^2)
ideal_wait = mean / 2
actual_wait = mean / 2 * (1 + cv**2)
excess_wait = actual_wait - ideal_wait
return {"mean": mean, "std": std, "cv": cv, "p95": p95, "excess_wait": excess_wait}
m_nc = compute_metrics(hw_nc)
m_ctrl = compute_metrics(hw_ctrl)
ideal_headway = DEPARTURE_HEADWAY_TICKS * TICK_SECONDS
print("=" * 60)
print(f"{'Metric':<25} {'No Control':>12} {'With Control':>14} {'Change':>10}")
print("=" * 60)
print(f"{'Ideal headway (s)':<25} {ideal_headway:>12.0f} {ideal_headway:>14.0f} {'':>10}")
print(
f"{'Mean headway (s)':<25} {m_nc['mean']:>12.1f} {m_ctrl['mean']:>14.1f} {(m_ctrl['mean'] - m_nc['mean']):>+10.1f}"
)
print(
f"{'Headway std dev (s)':<25} {m_nc['std']:>12.1f} {m_ctrl['std']:>14.1f} {(m_ctrl['std'] - m_nc['std']):>+10.1f}"
)
print(
f"{'Headway CV':<25} {m_nc['cv']:>12.3f} {m_ctrl['cv']:>14.3f} {(m_ctrl['cv'] - m_nc['cv']):>+10.3f}"
)
print(
f"{'P95 headway (s)':<25} {m_nc['p95']:>12.1f} {m_ctrl['p95']:>14.1f} {(m_ctrl['p95'] - m_nc['p95']):>+10.1f}"
)
print(
f"{'Excess wait (s)':<25} {m_nc['excess_wait']:>12.1f} {m_ctrl['excess_wait']:>14.1f} {(m_ctrl['excess_wait'] - m_nc['excess_wait']):>+10.1f}"
)
print("=" * 60)
if m_nc["cv"] > 0:
improvement = (1 - m_ctrl["cv"] / m_nc["cv"]) * 100
print(f"\nHeadway regularity improvement: {improvement:.1f}%")
if m_nc["excess_wait"] > 0:
wait_improvement = (1 - m_ctrl["excess_wait"] / m_nc["excess_wait"]) * 100
print(f"Excess wait time reduction: {wait_improvement:.1f}%")
Key takeaways¶
What this example demonstrates¶
Event-driven simulation is a natural fit for transit modeling. Bus arrivals, passenger boarding, and dispatch decisions are inherently event-driven. Plugboard's
Eventand handler system maps cleanly to these interactions — no polling or centralized scheduler needed.The bus bunching feedback loop. A small initial perturbation (120s delay) cascades through the system: the delayed bus encounters more waiting passengers → longer dwell times → falls further behind → the following bus catches up, finding fewer passengers → eventually buses pair up.
Headway control via holding strategies. By monitoring bus positions through
BusPositionUpdateevents and issuingHoldBusevents, theDispatchcomponent can regulate headways in real time — the same approach used by real transit agencies.Modular, composable architecture. Each component (
Bus,BusStop,SimClock,Dispatch) is independent and testable. The event-based wiring lets us add or swap control strategies without modifying existing components.Quantifiable improvement. The headway-based holding strategy reduced headway variability (CV) by over 50% and excess passenger wait time by over 75% — a dramatic improvement from a relatively simple control policy, all implemented as an additional event-producing component.
Extensions¶
- Multiple routes with transfers — add interchange stops that generate cross-route events
- Demand-responsive scheduling — use events to trigger schedule adjustments
- Real-time passenger information — add components that estimate arrival times from bus positions
- Distributed simulation — use
RayProcessto run different route segments on separate workers