connector
Connector submodule providing functionality related to component connectors and data exchange.
Connector
Bases: ABC, ExportMixin
Connector provides Channels for communication between a specified source and target.
builder
classmethod
Returns a ConnectorBuilder for this Connector class.
connect_recv
abstractmethod
async
Returns a Channel for receiving messages.
Channel
Bases: ABC
Channel defines an interface for data communication.
Initialises the Channel.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
maxsize
|
int
|
Optional; The message capacity of the |
CHAN_MAXSIZE
|
is_closed
property
Returns True if the Channel is closed, False otherwise.
When a Channel is closed, it can no longer be used to send messages,
though there may still be some messages waiting to be read.
AsyncioConnector
Bases: Connector
AsyncioConnector connects components using AsyncioChannel.
AsyncioChannel
AsyncioChannel(
*args: Any,
maxsize: int = CHAN_MAXSIZE,
queue: Optional[Queue] = None,
subscribers: Optional[set[Queue]] = None,
**kwargs: Any,
)
Bases: Channel
AsyncioChannel enables async data exchange between coroutines on the same host.
Instantiates AsyncioChannel.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
maxsize
|
int
|
Optional; Queue maximum item capacity. |
CHAN_MAXSIZE
|
queue
|
Optional[Queue]
|
Optional; asyncio.Queue to use for data exchange. |
None
|
subscribers
|
Optional[set[Queue]]
|
Optional; Set of output asyncio.Queues in pubsub mode. |
None
|
SerdeChannel
Bases: Channel, ABC
SerdeChannel base class for channels that use serialised messages.
recv
abstractmethod
async
Receives a serialised message from the Channel and returns it.
Note: Receiving data involves an unpickling deserialisation step. There are security implications to consider when unpickling data. It is assumed that data received through a channel is trusted.
RabbitMQConnector
Bases: Connector
RabbitMQConnector connects components via RabbitMQ AMQP broker.
Uses exclusive queues for pub-sub mode to ensure that each subscriber receives its own copy of each message. In direct mode, uses a single queue for all subscribers, allowing them to share the same messages.
RabbitMQChannel
RabbitMQChannel(
*args: Any,
send_exchange: Optional[AbstractExchange] = None,
recv_queue: Optional[AbstractQueue] = None,
topic: str = "",
**kwargs: Any,
)
Bases: SerdeChannel
RabbitMQChannel for sending and receiving messages via RabbitMQ AMQP broker.
Instantiates a RabbitMQChannel.
Uses RabbitMQ AMQP message broker to provide communication between components
on different processes. Requires a RabbitMQ broker to be running with the url
(and credentials if required) set in the RABBITMQ_URL environment variable.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
send_exchange
|
Optional[AbstractExchange]
|
Optional; The RabbitMQ exchange for sending messages. |
None
|
recv_queue
|
Optional[AbstractQueue]
|
Optional; The RabbitMQ queue for receiving messages. |
None
|
topic
|
str
|
Optional; The topic for the |
''
|
RayConnector
Bases: Connector
RayConnector connects components using RayChannel.
RayChannel
Bases: Channel
RayChannel enables async data exchange between coroutines on a Ray cluster.
Instantiates RayChannel.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
actor_options
|
Optional[dict]
|
Optional; Options to pass to the Ray actor. Defaults to {"num_cpus": 0}. |
None
|
**kwargs
|
Any
|
Additional keyword arguments to pass to the the underlying |
{}
|
is_closed
property
Returns True if the RayChannel is closed, False otherwise.
When a RayChannel is closed, it can no longer be used to send messages,
though there may still be some messages waiting to be read.
ZMQConnector
Bases: _ZMQConnector
ZMQConnector connects components using ZMQChannel.
ZMQChannel
ZMQChannel(
*args: Any,
send_socket: Optional[Socket] = None,
recv_socket: Optional[Socket] = None,
topic: str = "",
maxsize: int = 2000,
**kwargs: Any,
)
Bases: SerdeChannel
ZMQChannel enables data exchange between processes using ZeroMQ.
Instantiates ZMQChannel.
Uses ZeroMQ to provide communication between components on different
processes. Note that maxsize is not a hard limit because the operating
system will buffer TCP messages before they reach the channel. ZMQChannel
provides better performance than RayChannel, but is only suitable for use
on a single host. For multi-host communication, use RayChannel.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
send_socket
|
Optional[Socket]
|
Optional; The ZeroMQ socket for sending messages. |
None
|
recv_socket
|
Optional[Socket]
|
Optional; The ZeroMQ socket for receiving messages. |
None
|
topic
|
str
|
Optional; The topic for the |
''
|
maxsize
|
int
|
Optional; Queue maximum item capacity, defaults to 2000. |
2000
|