Skip to content

library

Provides implementations of Plugboard objects for use in user models.

DataReader

DataReader(
    field_names: list[str],
    chunk_size: Optional[int] = None,
    **kwargs: Unpack[ComponentArgsDict],
)

Bases: Component, ABC

Abstract base class for reading data.

Instantiates the DataReader.

Parameters:

Name Type Description Default
field_names list[str]

The names of the fields to read from the data source.

required
chunk_size Optional[int]

The size of the data chunk to read from the data source.

None
**kwargs Unpack[ComponentArgsDict]

Additional keyword arguments for Component.

{}

init async

init() -> None

Initialises the DataReader.

step async

step() -> None

Reads data from the source and updates outputs.

DataWriter

DataWriter(
    field_names: list[str],
    chunk_size: Optional[int] = None,
    **kwargs: Unpack[ComponentArgsDict],
)

Bases: Component, ABC

Abstract base class for writing data.

Instantiates the DataWriter.

Parameters:

Name Type Description Default
field_names list[str]

The names of the fields to write to the data source.

required
chunk_size Optional[int]

The size of the data chunk to read from the DataFrame.

None
**kwargs Unpack[ComponentArgsDict]

Additional keyword arguments for Component.

{}

run async

run() -> None

Runs the DataWriter.

step async

step() -> None

Trigger save when buffer is at target size.

FileReader

FileReader(
    path: str | Path,
    storage_options: Optional[dict[str, Any]] = None,
    **kwargs: Unpack[DataReaderArgsSpec],
)

Bases: DataReader

Reads data from a file.

Supported formats: CSV, GZIP-compressed CSV, Parquet. The file can be stored locally or on an fsspec-compatible cloud storage service.

Instantiates the FileReader.

Parameters:

Name Type Description Default
path str | Path

The path to the file to read.

required
storage_options Optional[dict[str, Any]]

Optional; Additional options for the fsspec-compatible filesystem.

None
**kwargs Unpack[DataReaderArgsSpec]

Additional keyword arguments for DataReader.

{}

FileWriter

FileWriter(
    path: str | Path,
    storage_options: Optional[dict[str, Any]] = None,
    **kwargs: Unpack[DataWriterArgsSpec],
)

Bases: DataWriter

Writes data to a file. If the file already exists, it will be overwritten.

Supported formats: CSV, GZIP-compressed CSV, Parquet. The file can be stored locally or on an fsspec-compatible cloud storage service.

Instantiates the FileWriter.

Parameters:

Name Type Description Default
path str | Path

The path to the file to write.

required
storage_options Optional[dict[str, Any]]

Optional; Additional options for the fsspec-compatible filesystem.

None
**kwargs Unpack[DataWriterArgsSpec]

Additional keyword arguments for DataWriter.

{}

LLMChat

LLMChat(
    llm: str = "llama_index.llms.openai.OpenAI",
    system_prompt: Optional[str] = None,
    context_window: int = 0,
    response_model: Optional[Type[BaseModel] | str] = None,
    expand_response: bool = False,
    llm_kwargs: Optional[dict[str, Any]] = None,
    **kwargs: Unpack[ComponentArgsDict],
)

Bases: Component

LLMChat is a component for interacting with large language models (LLMs).

Requires the optional plugboard[llm] installation. The default LLM is OpenAI, and requires the OPENAI_API_KEY environment variable to be set. Other LLMs supported by llama-index can be used: see here for available models. Additional llama-index dependencies may be required for specific models.

Structured output is supported by providing a Pydantic model as the response_model argument. This can optionally be unpacked into individual output fields by setting expand_response=True, otherwise the LLM response will be stored in the response output field.

Instantiates LLMChat.

Parameters:

Name Type Description Default
llm str

The LLM class to use from llama-index.

'llama_index.llms.openai.OpenAI'
system_prompt Optional[str]

Optional; System prompt to prepend to the context window.

None
context_window int

The number of previous messages to include in the context window.

0
response_model Optional[Type[BaseModel] | str]

Optional; A Pydantic model to structure the response. Can be specified as a string identifying the namespaced class to use.

None
expand_response bool

Setting this to True when using a structured response model will cause the individual attributes of the response model to be added as output fields.

False
llm_kwargs Optional[dict[str, Any]]

Additional keyword arguments for the LLM.

None
**kwargs Unpack[ComponentArgsDict]

Additional keyword arguments for Component.

{}

SQLReader

SQLReader(
    connection_string: str,
    query: str,
    params: Optional[dict[str, Any]] = None,
    connect_args: Optional[dict[str, Any]] = None,
    **kwargs: Unpack[DataReaderArgsSpec],
)

Bases: DataReader

Reads data from an SQL database using a supplied query and optional parameters.

The underlying database connection is managed by SQLAlchemy: both synchronous and asynchronous drivers are supported.

Instantiates the SQLReader.

Parameters:

Name Type Description Default
connection_string str

The connection string for the database.

required
query str

The SQL query to run on the database.

required
params Optional[dict[str, Any]]

Optional; Parameters to pass to the query.

None
connect_args Optional[dict[str, Any]]

Optional; Additional options for the database connection.

None
**kwargs Unpack[DataReaderArgsSpec]

Additional keyword arguments for DataReader.

{}

SQLWriter

SQLWriter(
    connection_string: str,
    table: str,
    connect_args: Optional[dict[str, Any]] = None,
    **kwargs: Unpack[DataWriterArgsSpec],
)

Bases: DataWriter

Writes data to an SQL database. The specified table must already exist.

The underlying database connection is managed by SQLAlchemy: both synchronous and asynchronous drivers are supported.

Instantiates the SQLWriter.

Parameters:

Name Type Description Default
connection_string str

The connection string for the database.

required
table str

The name of the table to write to, which must already exist.

required
connect_args Optional[dict[str, Any]]

Optional; Additional options for the database connection.

None
**kwargs Unpack[DataWriterArgsSpec]

Additional keyword arguments for DataWriter.

{}

WebsocketBase

WebsocketBase(
    uri: str,
    connect_args: dict[str, Any] | None = None,
    **kwargs: Unpack[ComponentArgsDict],
)

Bases: Component, ABC

Base Component for websocket connections.

See websockets for more info on the underlying websocket library.

Instantiates the Component.

Parameters:

Name Type Description Default
uri str

The URI of the WebSocket server.

required
connect_args dict[str, Any] | None

Optional; Additional arguments to pass to the WebSocket connection.

None
**kwargs Unpack[ComponentArgsDict]

Additional keyword arguments for Component.

{}

destroy async

destroy() -> None

Closes the websocket connection.

init async

init() -> None

Initializes the websocket connection.

WebsocketReader

WebsocketReader(
    initial_message: Any | None = None,
    skip_messages: int = 0,
    parse_json: bool = False,
    **kwargs: Unpack[WebsocketArgsDict],
)

Bases: WebsocketBase

Reads data from a websocket connection.

Instantiates the WebsocketReader.

See here for possible connection arguments that can be passed using connect_args. This WebsocketReader will run until interrupted, and automatically reconnect if the server connection is lost.

Parameters:

Name Type Description Default
initial_message Any | None

Optional; The initial message to send to the WebSocket server on connection. Can be used to subscribe to a specific topic.

None
skip_messages int

The number of messages to ignore before starting to read messages.

0
parse_json bool

Whether to parse the received data as JSON.

False
**kwargs Unpack[WebsocketArgsDict]

Additional keyword arguments for WebsocketBase.

{}

step async

step() -> None

Reads a message from the websocket connection.

WebsocketWriter

WebsocketWriter(
    parse_json: bool = False,
    **kwargs: Unpack[WebsocketArgsDict],
)

Bases: WebsocketBase

Writes data to a websocket connection.

Instantiates the WebsocketWriter.

See here for possible connection arguments that can be passed using connect_args.

Parameters:

Name Type Description Default
parse_json bool

Whether to convert the data to JSON before sending.

False
**kwargs Unpack[WebsocketArgsDict]

Additional keyword arguments for WebsocketBase.

{}

step async

step() -> None

Writes a message to the websocket connection.