Skip to content

connector

Connector submodule providing functionality related to component connectors and data exchange.

Connector

Connector(spec: ConnectorSpec, *args: Any, **kwargs: Any)

Bases: ABC, ExportMixin

Connector provides Channels for communication between a specified source and target.

id property

id: str

Unique ID for Connector.

builder classmethod

builder(*args: Any, **kwargs: Any) -> ConnectorBuilder

Returns a ConnectorBuilder for this Connector class.

connect_recv abstractmethod async

connect_recv() -> Channel

Returns a Channel for receiving messages.

connect_send abstractmethod async

connect_send() -> Channel

Returns a Channel for sending messages.

Channel

Channel(
    *args: Any, maxsize: int = CHAN_MAXSIZE, **kwargs: Any
)

Bases: ABC

Channel defines an interface for data communication.

Initialises the Channel.

Parameters:

Name Type Description Default
maxsize int

Optional; The message capacity of the Channel.

CHAN_MAXSIZE

is_closed property

is_closed: bool

Returns True if the Channel is closed, False otherwise.

When a Channel is closed, it can no longer be used to send messages, though there may still be some messages waiting to be read.

maxsize property

maxsize: int

Returns the message capacity of the Channel.

close async

close() -> None

Closes the Channel.

recv abstractmethod async

recv() -> _t.Any

Receives an item from the Channel and returns it.

send abstractmethod async

send(msg: Any) -> None

Sends an item through the Channel.

Parameters:

Name Type Description Default
msg Any

The item to be sent through the Channel.

required

AsyncioConnector

AsyncioConnector(
    *args: Any, maxsize: int = CHAN_MAXSIZE, **kwargs: Any
)

Bases: Connector

AsyncioConnector connects components using AsyncioChannel.

connect_recv async

connect_recv() -> AsyncioChannel

Returns an AsyncioChannel for receiving messages.

connect_send async

connect_send() -> AsyncioChannel

Returns an AsyncioChannel for sending messages.

AsyncioChannel

AsyncioChannel(
    *args: Any,
    maxsize: int = CHAN_MAXSIZE,
    queue: Optional[Queue] = None,
    subscribers: Optional[set[Queue]] = None,
    **kwargs: Any,
)

Bases: Channel

AsyncioChannel enables async data exchange between coroutines on the same host.

Instantiates AsyncioChannel.

Parameters:

Name Type Description Default
maxsize int

Optional; Queue maximum item capacity.

CHAN_MAXSIZE
queue Optional[Queue]

Optional; asyncio.Queue to use for data exchange.

None
subscribers Optional[set[Queue]]

Optional; Set of output asyncio.Queues in pubsub mode.

None

recv async

recv() -> _t.Any

Returns an item received from the Channel.

send async

send(item: Any) -> None

Sends an item through the Channel.

SerdeChannel

SerdeChannel(
    *args: Any, maxsize: int = CHAN_MAXSIZE, **kwargs: Any
)

Bases: Channel, ABC

SerdeChannel base class for channels that use serialised messages.

recv abstractmethod async

recv() -> bytes

Receives a serialised message from the Channel and returns it.

Note: Receiving data involves an unpickling deserialisation step. There are security implications to consider when unpickling data. It is assumed that data received through a channel is trusted.

send abstractmethod async

send(msg: bytes) -> None

Sends an serialised message through the Channel.

Parameters:

Name Type Description Default
msg bytes

The message to be sent through the Channel.

required

RabbitMQConnector

RabbitMQConnector(*args: Any, **kwargs: Any)

Bases: Connector

RabbitMQConnector connects components via RabbitMQ AMQP broker.

Uses exclusive queues for pub-sub mode to ensure that each subscriber receives its own copy of each message. In direct mode, uses a single queue for all subscribers, allowing them to share the same messages.

connect_recv async

connect_recv(
    rabbitmq_conn: AbstractRobustConnection = Provide[
        DI.rabbitmq_conn
    ],
) -> RabbitMQChannel

Returns a RabbitMQ channel for receiving messages.

connect_send async

connect_send(
    rabbitmq_conn: AbstractRobustConnection = Provide[
        DI.rabbitmq_conn
    ],
) -> RabbitMQChannel

Returns a RabbitMQ channel for sending messages.

RabbitMQChannel

RabbitMQChannel(
    *args: Any,
    send_exchange: Optional[AbstractExchange] = None,
    recv_queue: Optional[AbstractQueue] = None,
    topic: str = "",
    **kwargs: Any,
)

Bases: SerdeChannel

RabbitMQChannel for sending and receiving messages via RabbitMQ AMQP broker.

Instantiates a RabbitMQChannel.

Uses RabbitMQ AMQP message broker to provide communication between components on different processes. Requires a RabbitMQ broker to be running with the url (and credentials if required) set in the RABBITMQ_URL environment variable.

Parameters:

Name Type Description Default
send_exchange Optional[AbstractExchange]

Optional; The RabbitMQ exchange for sending messages.

None
recv_queue Optional[AbstractQueue]

Optional; The RabbitMQ queue for receiving messages.

None
topic str

Optional; The topic for the RabbitMQChannel, defaults to an empty string. Only relevant in the case of pub-sub mode channels.

''

close async

close() -> None

Closes the RabbitMQChannel.

recv async

recv() -> bytes

Receive a message from the RabbitMQ channel.

send async

send(msg: bytes) -> None

Send a message to the RabbitMQ channel.

RayConnector

RayConnector(*args: Any, **kwargs: Any)

Bases: Connector

RayConnector connects components using RayChannel.

connect_recv async

connect_recv() -> RayChannel

Returns a RayChannel for receiving messages.

connect_send async

connect_send() -> RayChannel

Returns a RayChannel for sending messages.

RayChannel

RayChannel(
    actor_options: Optional[dict] = None, **kwargs: Any
)

Bases: Channel

RayChannel enables async data exchange between coroutines on a Ray cluster.

Instantiates RayChannel.

Parameters:

Name Type Description Default
actor_options Optional[dict]

Optional; Options to pass to the Ray actor. Defaults to {"num_cpus": 0}.

None
**kwargs Any

Additional keyword arguments to pass to the the underlying Channel.

{}

is_closed property

is_closed: bool

Returns True if the RayChannel is closed, False otherwise.

When a RayChannel is closed, it can no longer be used to send messages, though there may still be some messages waiting to be read.

maxsize property

maxsize: int

Returns the message capacity of the RayChannel.

close async

close() -> None

Closes the RayChannel and terminates the underlying actor.

recv async

recv() -> _t.Any

Returns an item received from the RayChannel.

send async

send(item: Any) -> None

Sends an item through the RayChannel.

ZMQConnector

ZMQConnector(
    *args: Any,
    settings: Settings = Provide[DI.settings],
    **kwargs: Any,
)

Bases: _ZMQConnector

ZMQConnector connects components using ZMQChannel.

zmq_address property

zmq_address: str

The ZMQ address used for communication.

connect_recv async

connect_recv() -> ZMQChannel

Returns a ZMQChannel for receiving messages.

connect_send async

connect_send() -> ZMQChannel

Returns a ZMQChannel for sending messages.

ZMQChannel

ZMQChannel(
    *args: Any,
    send_socket: Optional[Socket] = None,
    recv_socket: Optional[Socket] = None,
    topic: str = "",
    maxsize: int = 2000,
    **kwargs: Any,
)

Bases: SerdeChannel

ZMQChannel enables data exchange between processes using ZeroMQ.

Instantiates ZMQChannel.

Uses ZeroMQ to provide communication between components on different processes. Note that maxsize is not a hard limit because the operating system will buffer TCP messages before they reach the channel. ZMQChannel provides better performance than RayChannel, but is only suitable for use on a single host. For multi-host communication, use RayChannel.

Parameters:

Name Type Description Default
send_socket Optional[Socket]

Optional; The ZeroMQ socket for sending messages.

None
recv_socket Optional[Socket]

Optional; The ZeroMQ socket for receiving messages.

None
topic str

Optional; The topic for the ZMQChannel, defaults to an empty string. Only relevant in the case of pub-sub mode channels.

''
maxsize int

Optional; Queue maximum item capacity, defaults to 2000.

2000

close async

close() -> None

Closes the ZMQChannel.

recv async

recv() -> bytes

Receives a message from the ZMQChannel and returns it.

send async

send(msg: bytes) -> None

Sends a message through the ZMQChannel.

Parameters:

Name Type Description Default
msg bytes

The message to be sent through the ZMQChannel.

required