QueueLink

Contents

  • Using link()
    • Getting Started
    • Supported Endpoint Types
    • Common Patterns
      • Queue to queue
      • Subprocess pipe to queue
      • Queue to file
      • File path to queue
    • Fan-out
    • Result Interface
    • Advanced Parameters
    • Error Handling
    • When to Use link() vs Direct Classes
  • Metrics
  • QueueLink Module API Documentation
  • Publishing QueueLink
QueueLink
  • Using link()
  • View page source

Using link()

link() is the recommended entry point for most QueueLink use cases. It inspects the types of your source and destination and wires up the correct combination of QueueLink, QueueHandleAdapterReader, and/or QueueHandleAdapterWriter automatically.

from queuelink import link

result = link(source, destination)
# ... use your queues ...
result.stop()

Getting Started

The simplest case: route messages from one queue to another.

import queue
from queuelink import link

src = queue.Queue()
dst = queue.Queue()

result = link(src, dst)

src.put("hello")
print(dst.get())   # "hello"

result.stop()

link() detects whether your queues are thread-based or process-based and creates the appropriate publisher (thread or process) automatically.

Supported Endpoint Types

Sources — anything you want to read from:

  • Any queue: queue.Queue, queue.LifoQueue, queue.PriorityQueue, queue.SimpleQueue, multiprocessing.Queue, multiprocessing.JoinableQueue, multiprocessing.SimpleQueue, or a multiprocessing.Manager queue

  • Open file handles with readline() (subprocess pipes, open(), etc.)

  • File paths (str or os.PathLike) — opened automatically

  • multiprocessing.connection.Connection (to queue destinations only)

Destinations — anything you want to write to:

  • Any queue (same types as above)

  • Open file handles with write()

  • File paths (str or os.PathLike) — opened automatically

  • A list of any of the above for fan-out

Common Patterns

Queue to queue

import queue
from queuelink import link

src = queue.Queue()
dst = queue.Queue()
result = link(src, dst)

Subprocess pipe to queue

import queue
from subprocess import Popen, PIPE
from queuelink import link

dest_q = queue.Queue()
proc = Popen(['myprogram'], stdout=PIPE, universal_newlines=True)

result = link(proc.stdout, dest_q)

line = dest_q.get()
result.stop()

Queue to file

import queue
from queuelink import link

src_q = queue.Queue()

with open("output.txt", "w") as f:
    result = link(src_q, f)
    src_q.put("line one\n")
    result.stop()

File path to queue

Pass a path string directly — link() opens the file for you:

import queue
from queuelink import link

dest_q = queue.Queue()
result = link("input.txt", dest_q)

Fan-out

Pass a list as the destination to broadcast each message to multiple endpoints:

import queue
from queuelink import link

src = queue.Queue()
dst1 = queue.Queue()
dst2 = queue.Queue()

result = link(src, [dst1, dst2])

src.put("broadcast")
print(dst1.get())   # "broadcast"
print(dst2.get())   # "broadcast"

result.stop()

Mixed fan-out (queues and files) is also supported:

result = link(src, [dst_queue, "log.txt"])

Destinations must be in a list. Tuples and sets are rejected — use a list to keep behaviour explicit.

Result Interface

link() returns a result object with the following interface:

stop()

Shut down all managed components in the correct order: reader first, then wait for internal queues to drain, then publishers, then writers.

close()

Alias for stop(), consistent with adapter usage patterns.

is_alive()

Returns True if any managed worker thread or process is still running.

queue_link

For queue-to-queue links, direct access to the underlying QueueLink instance. Useful for drain checking, runtime queue registration, or accessing metrics via get_metrics() (see Metrics). None for adapter-only paths.

reader

The QueueHandleAdapterReader instance, if one was created.

writers

List of QueueHandleAdapterWriter instances, if any were created.

Advanced Parameters

start_method

Multiprocessing start method: 'fork', 'forkserver', or 'spawn'. Defaults to the system preference.

result = link(src, dst, start_method='spawn')
thread_only

Force threading instead of spawning separate processes, regardless of queue types.

result = link(src, dst, thread_only=True)
name

Optional name passed to created components for log identification.

result = link(src, dst, name="pipeline-stage-1")
wrap_when

When to wrap large messages in a ContentWrapper for disk buffering (avoids pipe size limits). Only applies when a reader adapter is created. See WRAP_WHEN for values.

from queuelink import WRAP_WHEN
result = link("large_input.txt", dest_q, wrap_when=WRAP_WHEN.AUTO)
wrap_threshold

Byte size above which wrapping is triggered, when wrap_when is WRAP_WHEN.AUTO.

link_timeout

Queue get() timeout (seconds) for internal publishers. Default 0.01. Increase under heavy load to reduce thrashing; note that higher values slow response to stop().

trusted

For Connection sources — if True, use .recv()/.send(); if False (default), use .recv_bytes().

Error Handling

link() raises TypeError for unsupported or incompatible inputs:

import multiprocessing
from queuelink import link

conn_recv, conn_send = multiprocessing.Pipe()

# TypeError: Connection as destination is not supported
result = link(src, conn_recv)

# TypeError: tuples/sets are not accepted — use a list
result = link(src, (dst1, dst2))

# TypeError: QueueLink instances cannot be passed as endpoints
from queuelink import QueueLink
ql = QueueLink(source=src, destination=dst)
result = link(ql, dst2)

ValueError is raised for empty or duplicate destination lists:

# ValueError: empty destination list
result = link(src, [])

# ValueError: duplicate destination object
result = link(src, [dst, dst])

When to Use link() vs Direct Classes

link() covers the majority of use cases. Use the lower-level classes directly when you need capabilities that link() does not expose:

Use QueueLink directly when you need:

  • Dynamic queue registration or unregistration at runtime (read() / write() / unregister_queue())

  • Access to per-link metrics (get_metrics() — see Metrics)

  • Drain checking (is_drained(), is_empty())

  • Destructive audit (destructive_audit())

Use QueueHandleAdapterReader or QueueHandleAdapterWriter directly when you need:

  • Custom error handling during read or write

  • Non-standard content wrapping configuration

QueueLink is not deprecated. link() is an additive convenience layer on top of the same underlying classes.

Previous Next

© Copyright 2025, Andy Robb.

Built with Sphinx using a theme provided by Read the Docs.