QueueLink

Contents

  • Using link()
  • Metrics
  • QueueLink Module API Documentation
    • Factory Function
      • link()
    • QueueLink Class
      • QueueLink
        • QueueLink.Event()
        • QueueLink.Lock()
        • QueueLink.Process()
        • QueueLink.Queue()
        • QueueLink.close()
        • QueueLink.destructive_audit()
        • QueueLink.get_metrics()
        • QueueLink.get_queue()
        • QueueLink.is_alive()
        • QueueLink.is_drained()
        • QueueLink.is_empty()
        • QueueLink.read()
        • QueueLink.register_queue()
        • QueueLink.stop()
        • QueueLink.unregister_queue()
        • QueueLink.write()
    • Handle-to-Queue Adapter (Reader)
      • QueueHandleAdapterReader
        • QueueHandleAdapterReader.add_logging_handler()
        • QueueHandleAdapterReader.close()
        • QueueHandleAdapterReader.get_messages_processed()
        • QueueHandleAdapterReader.is_alive()
        • QueueHandleAdapterReader.is_drained()
        • QueueHandleAdapterReader.queue_handle_adapter()
        • QueueHandleAdapterReader.set_handle()
    • Queue-to-Handle Adapter (Writer)
      • QueueHandleAdapterWriter
        • QueueHandleAdapterWriter.add_logging_handler()
        • QueueHandleAdapterWriter.close()
        • QueueHandleAdapterWriter.get_messages_processed()
        • QueueHandleAdapterWriter.is_alive()
        • QueueHandleAdapterWriter.is_drained()
        • QueueHandleAdapterWriter.queue_handle_adapter()
        • QueueHandleAdapterWriter.set_handle()
  • Publishing QueueLink
QueueLink
  • QueueLink Module API Documentation
  • View page source

QueueLink Module API Documentation

This page covers the QueueLink module interfaces.

Factory Function

queuelink.link(source, destination, *, name: str = None, start_method: str = None, thread_only: bool = False, trusted: bool = False, wrap_when: WRAP_WHEN = WRAP_WHEN.NEVER, wrap_threshold: int = None, link_timeout: float = 0.01)

Wire a source and destination together automatically.

Inspects the types of source and destination and creates the appropriate combination of QueueLink, QueueHandleAdapterReader, and/or QueueHandleAdapterWriter.

Parameters:
  • source – A queue, open file/pipe handle, file path (str/PathLike), or multiprocessing.connection.Connection to read from.

  • destination – A queue, open file/pipe handle, or file path to write to. May also be a list of such objects for fan-out. Tuples and sets are not accepted (use a list).

  • name – Optional name passed to created components for logging.

  • start_method – Multiprocessing start method ('fork', 'forkserver', 'spawn'). Defaults to system preference.

  • thread_only – Force threading instead of separate processes.

  • trusted – For Connection sources — if True, use .recv()/.send(); if False, use .recv_bytes().

  • wrap_when – When to wrap large messages in ContentWrapper for disk buffering. Only applies when a reader adapter is created.

  • wrap_threshold – Byte size limit before wrapping. Only relevant when wrap_when is WRAP_WHEN.AUTO.

  • link_timeout – queue.get() timeout for QueueLink publishers.

Returns:

A _LinkResult instance with stop(), close(), and is_alive() interface. For queue→queue links, result.queue_link exposes the underlying QueueLink.

Raises:
  • TypeError – If either endpoint is an unsupported type, a QueueLink/_LinkResult instance, a Connection as destination, or if destination is a non-list iterable.

  • ValueError – If destination is an empty list or contains duplicate objects.

QueueLink Class

class queuelink.QueueLink(source: Queue | LifoQueue | PriorityQueue | SimpleQueue | Queue | JoinableQueue | SimpleQueue | BaseProxy = None, destination: Queue | LifoQueue | PriorityQueue | SimpleQueue | Queue | JoinableQueue | SimpleQueue | BaseProxy | List[Queue | LifoQueue | PriorityQueue | SimpleQueue | Queue | JoinableQueue | SimpleQueue | BaseProxy] = None, *, name: str = None, log_name: str = None, start_method: str = None, thread_only: bool = False, link_timeout: float = 0.01)

Manages publishing from source and client queues

Event(*args, **kwargs) → Event | Event | BaseProxy

Create a context-appropriate Event

Lock(*args, **kwargs) → lock | Lock | BaseProxy

Create a context-appropriate Lock

Process(*args, **kwargs) → Process

Create a context-appropriate Process

Queue(*args, **kwargs) → Queue | JoinableQueue | SimpleQueue

Create a context-appropriate Queue

close() → None

Remove managed objects started within QueueLink

destructive_audit(direction: DIRECTION)

Print a line from each client Queue from the provided direction

Parameters:

direction (DIRECTION) – source or destination

This is a destructive operation, as it removes a line from each Queue

:raises Empty

get_metrics() → dict

Retrieve accumulated metrics from all publishers.

Drains both the thread and process metrics queues and merges all snapshots by element_id. Because each snapshot is keyed by element_id, later snapshots for the same element overwrite earlier ones, so the result reflects the most recent data for each metric element. Returns an empty dict if no metrics have been emitted yet.

:returns dict mapping element_id -> metric data dict

get_queue(queue_id: str | int) → Queue | LifoQueue | PriorityQueue | SimpleQueue | Queue | JoinableQueue | SimpleQueue | BaseProxy

Retrieve a client’s queue reference

Parameters:

queue_id – ID of the queue reference

Returns:

Reference to a queue instance

is_alive() → bool

Whether all of the publishers are alive

Returns:

False if any publisher has stopped

Raises:

ProcessNotStarted if we've never started a publisher –

is_drained(queue_id: str = None)

Check alive and empty

Attempts clean semantic response to “is there, or will there be, data to read?”

:returns bool

is_empty(queue_id: str | None = None) → bool

Checks whether the source or destination queues are empty

Parameters:

queue_id – (optional) ID of the client

Returns:

If queue_id is None, returns True ONLY if ALL queues are empty. If queue_id is provided, True ONLY if both main queue and specified client queue are empty.

read(q: Queue | LifoQueue | PriorityQueue | SimpleQueue | Queue | JoinableQueue | SimpleQueue | BaseProxy) → str

Register a source queue

Parameters:

q – Queue or proxy object to a queue

Returns:

The client’s ID for access to this queue

register_queue(q: Queue | LifoQueue | PriorityQueue | SimpleQueue | Queue | JoinableQueue | SimpleQueue | BaseProxy, direction: DIRECTION) → str

Register a queue to this link

For a new FROM (source) queue, a publishing process will be created to send all additions down to destination queues.

For a new TO (destination) queue, all new additions to source queues will be added to this queue.

Returns the numeric ID for the queue reference, which must be used in all future interactions.

Parameters:
  • q – Queue object to register

  • direction – DIRECTION.FROM or DIRECTION.TO

Returns:

The client’s ID for access to this queue

stop() → None

Use to stop somewhat gracefully

unregister_queue(queue_id: str | int, direction: DIRECTION) → str

Detach a queue from this link

Returns the clientId that was removed

Parameters:
  • queue_id – ID of the client

  • direction – source or destination

Returns:

ID of the client queue

write(q: Queue | LifoQueue | PriorityQueue | SimpleQueue | Queue | JoinableQueue | SimpleQueue | BaseProxy) → str

Register a destination queue

Parameters:

q – Queue or proxy object to a queue

Returns:

The client’s ID for access to this queue

Handle-to-Queue Adapter (Reader)

class queuelink.QueueHandleAdapterReader(queue: Queue | LifoQueue | PriorityQueue | SimpleQueue | Queue | JoinableQueue | SimpleQueue | BaseProxy, *, handle: IO | str | PathLike = None, name: str = None, log_name: str = None, start_method: str = None, thread_only: bool = None, trusted: bool = False, wrap_when: WRAP_WHEN = WRAP_WHEN.NEVER, wrap_threshold: int = None)

Custom manager to capture the output of processes and store them in one more dedicated thread-safe or process-safe queues.

add_logging_handler(handler: Handler) → None

Add a logging handler to the logger

Parameters:

handler – A logging handler

close() → None

Stop the adapter and queue link and clean up.

Does not force a drain of the queues.

get_messages_processed() → int

Return the number of messages moved by this adapter

Returns:

The number of messages processed by this adapter.

is_alive() → bool

Check whether the thread/process managing the movement is still active

Returns:

True if the adapter thread/process is still running, False otherwise.

is_drained() → bool

Check alive and empty

Attempts clean semantic response to “is there, or will there be, data to read?”

Returns:

True if fully drained, False if not

static queue_handle_adapter(*, name: str, handle: IO | str | PathLike, queue: Queue | LifoQueue | PriorityQueue | SimpleQueue | Queue | JoinableQueue | SimpleQueue | BaseProxy, queue_lock: lock | Lock | BaseProxy, stop_event: Event | Event | BaseProxy, messages_processed: MessageCounter, trusted: bool, wrap_when: WRAP_WHEN, wrap_threshold: int)

Copy lines from a given pipe handle into a local threading.Queue

Runs in a separate process, started by __init__. Closes pipe when done reading.

Parameters:
  • name – Name to use in logging

  • handle – Handle/pipe/path to read from

  • queue – Queue to write to

  • queue_lock – Lock used to indicate a write in progress

  • stop_event – Used to determine whether to stop the process

  • trusted – Whether to trust Connection objects

  • wrap_when – When to use a ContentWrapper

  • wrap_threshold – Size limit for a line before it is wrapped in a ContentWrapper

set_handle(handle: IOBase | str) → None

Set the handle to read from or write to.

Parameters:

handle – An open handle (subclasses of file, IO.IOBase)

Raises:

HandleAlreadySet –

Queue-to-Handle Adapter (Writer)

class queuelink.QueueHandleAdapterWriter(queue: Queue | LifoQueue | PriorityQueue | SimpleQueue | Queue | JoinableQueue | SimpleQueue | BaseProxy, *, handle: IO | str | PathLike = None, name: str = None, log_name: str = None, start_method: str = None, thread_only: bool = None, trusted: bool = False)

Custom manager to read messages from a queue and write them to a file or pipe

add_logging_handler(handler: Handler) → None

Add a logging handler to the logger

Parameters:

handler – A logging handler

close() → None

Stop the adapter and queue link and clean up.

Does not force a drain of the queues.

get_messages_processed() → int

Return the number of messages moved by this adapter

Returns:

The number of messages processed by this adapter.

is_alive() → bool

Check whether the thread/process managing the movement is still active

Returns:

True if the adapter thread/process is still running, False otherwise.

is_drained() → bool

Check alive and empty

Attempts clean semantic response to “is there, or will there be, data to read?”

Returns:

True if fully drained, False if not

static queue_handle_adapter(*, name: str, handle: IO | str | PathLike, queue: Queue | LifoQueue | PriorityQueue | SimpleQueue | Queue | JoinableQueue | SimpleQueue | BaseProxy, queue_lock: lock | Lock | BaseProxy, stop_event: Event | Event | BaseProxy, messages_processed: MessageCounter, trusted: bool, **kwargs)

Copy lines from a local multiprocessing.JoinableQueue into a pipe

Runs in a separate process, started by __init__. Does not close an open pipe or handle when done writing.

Parameters:
  • name – Name to use in logging

  • handle – Handle/pipe/path to write to

  • queue – Queue to write to

  • queue_lock – Lock used to indicate a write is in progress

  • stop_event – Used to determine whether to stop the process

  • messages_processed – Number of elements moved from the queue to handle

  • trusted – Whether to trust Connection objects

set_handle(handle: IOBase | str) → None

Set the handle to read from or write to.

Parameters:

handle – An open handle (subclasses of file, IO.IOBase)

Raises:

HandleAlreadySet –

Previous Next

© Copyright 2025, Andy Robb.

Built with Sphinx using a theme provided by Read the Docs.