QueueLink Module API Documentation
This page covers the QueueLink module interfaces.
Factory Function
- queuelink.link(source, destination, *, name: str = None, start_method: str = None, thread_only: bool = False, trusted: bool = False, wrap_when: WRAP_WHEN = WRAP_WHEN.NEVER, wrap_threshold: int = None, link_timeout: float = 0.01)
Wire a source and destination together automatically.
Inspects the types of
sourceanddestinationand creates the appropriate combination ofQueueLink,QueueHandleAdapterReader, and/orQueueHandleAdapterWriter.- Parameters:
source – A queue, open file/pipe handle, file path (str/PathLike), or
multiprocessing.connection.Connectionto read from.destination – A queue, open file/pipe handle, or file path to write to. May also be a
listof such objects for fan-out. Tuples and sets are not accepted (use a list).name – Optional name passed to created components for logging.
start_method – Multiprocessing start method (
'fork','forkserver','spawn'). Defaults to system preference.thread_only – Force threading instead of separate processes.
trusted – For
Connectionsources — ifTrue, use.recv()/.send(); ifFalse, use.recv_bytes().wrap_when – When to wrap large messages in
ContentWrapperfor disk buffering. Only applies when a reader adapter is created.wrap_threshold – Byte size limit before wrapping. Only relevant when
wrap_whenisWRAP_WHEN.AUTO.link_timeout –
queue.get()timeout forQueueLinkpublishers.
- Returns:
A
_LinkResultinstance withstop(),close(), andis_alive()interface. For queue→queue links,result.queue_linkexposes the underlyingQueueLink.- Raises:
TypeError – If either endpoint is an unsupported type, a
QueueLink/_LinkResultinstance, aConnectionas destination, or ifdestinationis a non-list iterable.ValueError – If
destinationis an empty list or contains duplicate objects.
QueueLink Class
- class queuelink.QueueLink(source: Queue | LifoQueue | PriorityQueue | SimpleQueue | Queue | JoinableQueue | SimpleQueue | BaseProxy = None, destination: Queue | LifoQueue | PriorityQueue | SimpleQueue | Queue | JoinableQueue | SimpleQueue | BaseProxy | List[Queue | LifoQueue | PriorityQueue | SimpleQueue | Queue | JoinableQueue | SimpleQueue | BaseProxy] = None, *, name: str = None, log_name: str = None, start_method: str = None, thread_only: bool = False, link_timeout: float = 0.01)
Manages publishing from source and client queues
- Event(*args, **kwargs) Event | Event | BaseProxy
Create a context-appropriate Event
- Lock(*args, **kwargs) lock | Lock | BaseProxy
Create a context-appropriate Lock
- Process(*args, **kwargs) Process
Create a context-appropriate Process
- Queue(*args, **kwargs) Queue | JoinableQueue | SimpleQueue
Create a context-appropriate Queue
- close() None
Remove managed objects started within QueueLink
- destructive_audit(direction: DIRECTION)
Print a line from each client Queue from the provided direction
- Parameters:
direction (DIRECTION) – source or destination
This is a destructive operation, as it removes a line from each Queue
:raises Empty
- get_metrics() dict
Retrieve accumulated metrics from all publishers.
Drains both the thread and process metrics queues and merges all snapshots by element_id. Because each snapshot is keyed by element_id, later snapshots for the same element overwrite earlier ones, so the result reflects the most recent data for each metric element. Returns an empty dict if no metrics have been emitted yet.
:returns dict mapping element_id -> metric data dict
- get_queue(queue_id: str | int) Queue | LifoQueue | PriorityQueue | SimpleQueue | Queue | JoinableQueue | SimpleQueue | BaseProxy
Retrieve a client’s queue reference
- Parameters:
queue_id – ID of the queue reference
- Returns:
Reference to a queue instance
- is_alive() bool
Whether all of the publishers are alive
- Returns:
False if any publisher has stopped
- Raises:
ProcessNotStarted if we've never started a publisher –
- is_drained(queue_id: str = None)
Check alive and empty
Attempts clean semantic response to “is there, or will there be, data to read?”
:returns bool
- is_empty(queue_id: str | None = None) bool
Checks whether the source or destination queues are empty
- Parameters:
queue_id – (optional) ID of the client
- Returns:
If queue_id is None, returns True ONLY if ALL queues are empty. If queue_id is provided, True ONLY if both main queue and specified client queue are empty.
- read(q: Queue | LifoQueue | PriorityQueue | SimpleQueue | Queue | JoinableQueue | SimpleQueue | BaseProxy) str
Register a source queue
- Parameters:
q – Queue or proxy object to a queue
- Returns:
The client’s ID for access to this queue
- register_queue(q: Queue | LifoQueue | PriorityQueue | SimpleQueue | Queue | JoinableQueue | SimpleQueue | BaseProxy, direction: DIRECTION) str
Register a queue to this link
For a new
FROM(source) queue, a publishing process will be created to send all additions down to destination queues.For a new
TO(destination) queue, all new additions to source queues will be added to this queue.Returns the numeric ID for the queue reference, which must be used in all future interactions.
- Parameters:
q – Queue object to register
direction – DIRECTION.FROM or DIRECTION.TO
- Returns:
The client’s ID for access to this queue
- stop() None
Use to stop somewhat gracefully
- unregister_queue(queue_id: str | int, direction: DIRECTION) str
Detach a queue from this link
Returns the clientId that was removed
- Parameters:
queue_id – ID of the client
direction – source or destination
- Returns:
ID of the client queue
- write(q: Queue | LifoQueue | PriorityQueue | SimpleQueue | Queue | JoinableQueue | SimpleQueue | BaseProxy) str
Register a destination queue
- Parameters:
q – Queue or proxy object to a queue
- Returns:
The client’s ID for access to this queue
Handle-to-Queue Adapter (Reader)
- class queuelink.QueueHandleAdapterReader(queue: Queue | LifoQueue | PriorityQueue | SimpleQueue | Queue | JoinableQueue | SimpleQueue | BaseProxy, *, handle: IO | str | PathLike = None, name: str = None, log_name: str = None, start_method: str = None, thread_only: bool = None, trusted: bool = False, wrap_when: WRAP_WHEN = WRAP_WHEN.NEVER, wrap_threshold: int = None)
Custom manager to capture the output of processes and store them in one more dedicated thread-safe or process-safe queues.
- add_logging_handler(handler: Handler) None
Add a logging handler to the logger
- Parameters:
handler – A logging handler
- close() None
Stop the adapter and queue link and clean up.
Does not force a drain of the queues.
- get_messages_processed() int
Return the number of messages moved by this adapter
- Returns:
The number of messages processed by this adapter.
- is_alive() bool
Check whether the thread/process managing the movement is still active
- Returns:
True if the adapter thread/process is still running, False otherwise.
- is_drained() bool
Check alive and empty
Attempts clean semantic response to “is there, or will there be, data to read?”
- Returns:
True if fully drained, False if not
- static queue_handle_adapter(*, name: str, handle: IO | str | PathLike, queue: Queue | LifoQueue | PriorityQueue | SimpleQueue | Queue | JoinableQueue | SimpleQueue | BaseProxy, queue_lock: lock | Lock | BaseProxy, stop_event: Event | Event | BaseProxy, messages_processed: MessageCounter, trusted: bool, wrap_when: WRAP_WHEN, wrap_threshold: int)
Copy lines from a given pipe handle into a local threading.Queue
Runs in a separate process, started by __init__. Closes pipe when done reading.
- Parameters:
name – Name to use in logging
handle – Handle/pipe/path to read from
queue – Queue to write to
queue_lock – Lock used to indicate a write in progress
stop_event – Used to determine whether to stop the process
trusted – Whether to trust Connection objects
wrap_when – When to use a ContentWrapper
wrap_threshold – Size limit for a line before it is wrapped in a ContentWrapper
- set_handle(handle: IOBase | str) None
Set the handle to read from or write to.
- Parameters:
handle – An open handle (subclasses of file, IO.IOBase)
- Raises:
HandleAlreadySet –
Queue-to-Handle Adapter (Writer)
- class queuelink.QueueHandleAdapterWriter(queue: Queue | LifoQueue | PriorityQueue | SimpleQueue | Queue | JoinableQueue | SimpleQueue | BaseProxy, *, handle: IO | str | PathLike = None, name: str = None, log_name: str = None, start_method: str = None, thread_only: bool = None, trusted: bool = False)
Custom manager to read messages from a queue and write them to a file or pipe
- add_logging_handler(handler: Handler) None
Add a logging handler to the logger
- Parameters:
handler – A logging handler
- close() None
Stop the adapter and queue link and clean up.
Does not force a drain of the queues.
- get_messages_processed() int
Return the number of messages moved by this adapter
- Returns:
The number of messages processed by this adapter.
- is_alive() bool
Check whether the thread/process managing the movement is still active
- Returns:
True if the adapter thread/process is still running, False otherwise.
- is_drained() bool
Check alive and empty
Attempts clean semantic response to “is there, or will there be, data to read?”
- Returns:
True if fully drained, False if not
- static queue_handle_adapter(*, name: str, handle: IO | str | PathLike, queue: Queue | LifoQueue | PriorityQueue | SimpleQueue | Queue | JoinableQueue | SimpleQueue | BaseProxy, queue_lock: lock | Lock | BaseProxy, stop_event: Event | Event | BaseProxy, messages_processed: MessageCounter, trusted: bool, **kwargs)
Copy lines from a local multiprocessing.JoinableQueue into a pipe
Runs in a separate process, started by __init__. Does not close an open pipe or handle when done writing.
- Parameters:
name – Name to use in logging
handle – Handle/pipe/path to write to
queue – Queue to write to
queue_lock – Lock used to indicate a write is in progress
stop_event – Used to determine whether to stop the process
messages_processed – Number of elements moved from the queue to handle
trusted – Whether to trust Connection objects
- set_handle(handle: IOBase | str) None
Set the handle to read from or write to.
- Parameters:
handle – An open handle (subclasses of file, IO.IOBase)
- Raises:
HandleAlreadySet –