Streaming data: processing a websocket feedĀ¶
This model will run on a continuous stream of data provided by BlueSky's firehose websocket. We'll subscribe to posts from some business news feeds and then use an LLM to carry out sentiment analysis on each message.
To run this model you will need to set the OPENAI_API_KEY
environment variable.
import asyncio
import os
import typing as _t
from getpass import getpass
import httpx
from pydantic import BaseModel, Field
from plugboard.component import Component, IOController
from plugboard.connector import AsyncioConnector
from plugboard.schemas import ConnectorSpec
from plugboard.process import LocalProcess
from plugboard.library import FileWriter, LLMChat, WebsocketReader
if "OPENAI_API_KEY" not in os.environ:
os.environ["OPENAI_API_KEY"] = getpass("Enter your OpenAI API key: ")
We'll subscribe to BlueSky posts from the following news outlets.
The BlueSky API filters according to DIDs - a unique identifier for each user that we'll need to lookup.
async def fetch_bluesky_did(client: httpx.AsyncClient, user_name: str) -> str:
response = await client.get(
"https://bsky.social/xrpc/com.atproto.identity.resolveHandle", params={"handle": user_name}
)
return response.json()["did"]
user_handles = [
"reuters.com",
"bloomberg.com",
"cnbc.com",
"financialtimes.com",
"wsj.com",
"yahoofinance.com",
]
async with httpx.AsyncClient() as client:
bluesky_dids = await asyncio.gather(
*[fetch_bluesky_did(client, handle) for handle in user_handles]
)
# Bluesky uses the "wantedDids" parameter to specify the DIDs of the users we want to filter
filter_spec = "&".join([f"wantedDids={did}" for did in bluesky_dids])
Now we have the DIDs for BlueSky, setup a WebsocketReader
to stream posts into a Plugboard process. Using the Jetstream instructions we'll filter on posts from the users we are interested in.
websocket = WebsocketReader(
name="bluesky-feed",
uri=f"wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.feed.post&{filter_spec}",
parse_json=True,
)
Next we need a Component
to extract the post text and timestamp each message received from BlueSky.
class ExtractMessage(Component):
"""Extracts text and timestamp from a BlueSky message dictionary."""
io = IOController(inputs=["message"], outputs=["text", "time_stamp"])
async def step(self) -> None:
try:
# Surround text with quotes so that is is correctly formatted in CSV output
self.text = f'"{websocket.message["commit"]["record"]["text"].replace("\n", " ")}"'
self.time_stamp = websocket.message["commit"]["record"]["createdAt"]
except KeyError:
# Skip messages that aren't correctly formatted
pass
extract = ExtractMessage(name="extract-message")
Next, let's setup an LLM component to analyse the messages as they arrive from BlueSky and carry out sentiment analysis. We'll use the LLM in structured-output mode, so that we have known outputs from the component.
class MessageInformation(BaseModel):
category: _t.Literal["markets", "companies", "economics", "other"]
market_relevance: float = Field(..., ge=0, le=100)
sentiment: _t.Literal["positive", "negative", "neutral"]
system_prompt = """
You are going to be shown headlines from business news services. For each headline, please provide the following:
- The category of the headline (markets, companies, economics, other)
- The market relevance of the headline to financial markets on a scale of 0 (least relevant) to 100 (most relevant)
- The sentiment of the headline (positive, negative, neutral).
"""
llm = LLMChat(
name="llm",
system_prompt=system_prompt,
llm_kwargs={"model": "gpt-4o"},
response_model=MessageInformation,
# Expand the response into separate fields
expand_response=True,
)
Finally, we'll use the FileWriter
component to save the output to CSV.
# Set chunk size to 1 so that data is saved to disk as each message arrives
save = FileWriter(
name="save",
path="bluesky-messages.csv",
chunk_size=1,
field_names=["text", "time_stamp", "category", "market_relevance", "sentiment"],
)
Now build the LocalProcess
and connect all of the components together.
connect = lambda in_, out_: AsyncioConnector(spec=ConnectorSpec(source=in_, target=out_))
process = LocalProcess(
components=[websocket, extract, llm, save],
connectors=[
# Connect websocket to extract
connect("bluesky-feed.message", "extract-message.message"),
# Save the time_stamp and text from the extract component
connect("extract-message.time_stamp", "save.time_stamp"),
connect("extract-message.text", "save.text"),
# Connect the extracted message to the LLM
connect("extract-message.text", "llm.prompt"),
# Connect the LLM outputs to the save component
connect("llm.category", "save.category"),
connect("llm.market_relevance", "save.market_relevance"),
connect("llm.sentiment", "save.sentiment"),
],
)
Now run the model. The websocket input will run forever, continuing to stream new data, so when you are ready to stop the process you will need to manually interrupt it. Open the output CSV file to see the data that has been captured. Keep in mind that some of the news sources publish infrequently outside of their business hours, so depending on when you run the code you might need to leave it for a while to collect some data.
async with process:
await process.run()