Using link()
link() is the recommended entry point for most QueueLink use cases. It
inspects the types of your source and destination and wires up the correct
combination of QueueLink, QueueHandleAdapterReader, and/or
QueueHandleAdapterWriter automatically.
from queuelink import link
result = link(source, destination)
# ... use your queues ...
result.stop()
Getting Started
The simplest case: route messages from one queue to another.
import queue
from queuelink import link
src = queue.Queue()
dst = queue.Queue()
result = link(src, dst)
src.put("hello")
print(dst.get()) # "hello"
result.stop()
link() detects whether your queues are thread-based or process-based and
creates the appropriate publisher (thread or process) automatically.
Supported Endpoint Types
Sources — anything you want to read from:
Any queue:
queue.Queue,queue.LifoQueue,queue.PriorityQueue,queue.SimpleQueue,multiprocessing.Queue,multiprocessing.JoinableQueue,multiprocessing.SimpleQueue, or amultiprocessing.ManagerqueueOpen file handles with
readline()(subprocess pipes,open(), etc.)File paths (
stroros.PathLike) — opened automaticallymultiprocessing.connection.Connection(to queue destinations only)
Destinations — anything you want to write to:
Any queue (same types as above)
Open file handles with
write()File paths (
stroros.PathLike) — opened automaticallyA
listof any of the above for fan-out
Common Patterns
Queue to queue
import queue
from queuelink import link
src = queue.Queue()
dst = queue.Queue()
result = link(src, dst)
Subprocess pipe to queue
import queue
from subprocess import Popen, PIPE
from queuelink import link
dest_q = queue.Queue()
proc = Popen(['myprogram'], stdout=PIPE, universal_newlines=True)
result = link(proc.stdout, dest_q)
line = dest_q.get()
result.stop()
Queue to file
import queue
from queuelink import link
src_q = queue.Queue()
with open("output.txt", "w") as f:
result = link(src_q, f)
src_q.put("line one\n")
result.stop()
File path to queue
Pass a path string directly — link() opens the file for you:
import queue
from queuelink import link
dest_q = queue.Queue()
result = link("input.txt", dest_q)
Fan-out
Pass a list as the destination to broadcast each message to multiple endpoints:
import queue
from queuelink import link
src = queue.Queue()
dst1 = queue.Queue()
dst2 = queue.Queue()
result = link(src, [dst1, dst2])
src.put("broadcast")
print(dst1.get()) # "broadcast"
print(dst2.get()) # "broadcast"
result.stop()
Mixed fan-out (queues and files) is also supported:
result = link(src, [dst_queue, "log.txt"])
Destinations must be in a list. Tuples and sets are rejected — use a list
to keep behaviour explicit.
Result Interface
link() returns a result object with the following interface:
stop()Shut down all managed components in the correct order: reader first, then wait for internal queues to drain, then publishers, then writers.
close()Alias for
stop(), consistent with adapter usage patterns.is_alive()Returns
Trueif any managed worker thread or process is still running.queue_linkFor queue-to-queue links, direct access to the underlying
QueueLinkinstance. Useful for drain checking, runtime queue registration, or accessing metrics viaget_metrics()(see Metrics).Nonefor adapter-only paths.readerThe
QueueHandleAdapterReaderinstance, if one was created.writersList of
QueueHandleAdapterWriterinstances, if any were created.
Advanced Parameters
start_methodMultiprocessing start method:
'fork','forkserver', or'spawn'. Defaults to the system preference.result = link(src, dst, start_method='spawn')
thread_onlyForce threading instead of spawning separate processes, regardless of queue types.
result = link(src, dst, thread_only=True)
nameOptional name passed to created components for log identification.
result = link(src, dst, name="pipeline-stage-1")
wrap_whenWhen to wrap large messages in a
ContentWrapperfor disk buffering (avoids pipe size limits). Only applies when a reader adapter is created. SeeWRAP_WHENfor values.from queuelink import WRAP_WHEN result = link("large_input.txt", dest_q, wrap_when=WRAP_WHEN.AUTO)
wrap_thresholdByte size above which wrapping is triggered, when
wrap_whenisWRAP_WHEN.AUTO.link_timeoutQueue
get()timeout (seconds) for internal publishers. Default0.01. Increase under heavy load to reduce thrashing; note that higher values slow response tostop().trustedFor
Connectionsources — ifTrue, use.recv()/.send(); ifFalse(default), use.recv_bytes().
Error Handling
link() raises TypeError for unsupported or incompatible inputs:
import multiprocessing
from queuelink import link
conn_recv, conn_send = multiprocessing.Pipe()
# TypeError: Connection as destination is not supported
result = link(src, conn_recv)
# TypeError: tuples/sets are not accepted — use a list
result = link(src, (dst1, dst2))
# TypeError: QueueLink instances cannot be passed as endpoints
from queuelink import QueueLink
ql = QueueLink(source=src, destination=dst)
result = link(ql, dst2)
ValueError is raised for empty or duplicate destination lists:
# ValueError: empty destination list
result = link(src, [])
# ValueError: duplicate destination object
result = link(src, [dst, dst])
When to Use link() vs Direct Classes
link() covers the majority of use cases. Use the lower-level classes
directly when you need capabilities that link() does not expose:
Use QueueLink directly when you need:
Dynamic queue registration or unregistration at runtime (
read()/write()/unregister_queue())Access to per-link metrics (
get_metrics()— see Metrics)Drain checking (
is_drained(),is_empty())Destructive audit (
destructive_audit())
Use QueueHandleAdapterReader or QueueHandleAdapterWriter
directly when you need:
Custom error handling during read or write
Non-standard content wrapping configuration
QueueLink is not deprecated. link() is an additive convenience layer
on top of the same underlying classes.