Welcome to QueueLink’s documentation!
This documentation includes an introduction to the purpose of QueueLink, example uses, and API docs.
The project source is available on Github.
Quick Start
Use link() to connect a source and destination without choosing which
underlying class to use:
import queue
from queuelink import link
src = queue.Queue()
dst = queue.Queue()
result = link(src, dst)
src.put("hello")
print(dst.get()) # "hello"
result.stop()
link() accepts queues, file handles, file paths, and
multiprocessing.connection.Connection objects, and handles fan-out to
multiple destinations. See Using link() for the full guide.
Introduction and Background
The QueueLink library simplifies linking queues together with one-to-many or many-to-one relationships. link() is the recommended entry point for most use cases — it inspects your source and destination types and wires up the correct combination of components automatically.
For cases requiring fine-grained control (runtime queue registration, metrics, drain checking), use QueueLink, QueueHandleAdapterReader, and QueueHandleAdapterWriter directly.
A QueueLink instance is a one-way router that connects queues together. When two or more queues are linked, a separate process (or thread) is started to read from each “source” queue and write into the “destination” queues. (One process per source queue.)
Circular references are not allowed.
Adapters permit linkage between file handles, pipes, and queues.
Examples
Some implementations connecting queues together directly with QueueLink.
These examples are validated during testing in this test file.
Basic use
One-to-one, the “hello world”.
These examples both use the threaded Queue library for both queues (first example) or one of the queues (second example). Because of that, the link itself lives in the main code process and might be more likely to experience Global Interpreter Lock contention than if both queues were from the multiprocessing library. However, inter-process communication is quite slow, so you’ll need to determine which works better for your use case.
If both queues are created from the multiprocessing library the link will be a separate process. If you don’t need to set the start method directly, you can simply from multiprocessing import Queue and replace q = multiprocessing.get_context(start_method).Queue() with q = Queue().
# Threading only
from queue import Queue
from queuelink import QueueLink
def test_example_threaded(self):
# Source and destination queues
source_q = Queue()
dest_q = Queue()
# Create the QueueLink
queue_link = QueueLink(name="my link")
# Connect queues to the QueueLink
source_id = queue_link.read(q=source_q)
dest_id = queue_link.write(q=dest_q)
# Text to send
text_in = "a😂" * 10
# Add text to the source queue
source_q.put(text_in)
# Retrieve the text from the destination queue!
text_out = dest_q.get(timeout=1)
print(text_out)
# Multiprocessing
import multiprocessing
from queue import Queue
from queuelink import QueueLink
# Selecting a start method for this example
start_method = "spawn" # macOS default
def test_cross_thread_multiprocess(self):
# Source and destination
source_q = Queue() # Thread-based
dest_q = multiprocessing.get_context(start_method).Queue() # Process-based
# Create the QueueLink
queue_link = QueueLink(name="my link", start_method=self.start_method)
# Connect queues to the QueueLink
source_id = queue_link.read(q=source_q)
dest_id = queue_link.write(q=dest_q)
# Text to send
text_in = "a😂" * 10
# Add text to the source queue
source_q.put(text_in)
# Retrieve the text from the destination queue!
text_out = dest_q.get(timeout=1)
print(text_out)
Using a process manager for the queues
A process manager works across processes or whole machines, and can help mitigate some kinds of issues related to empty/get_nowait behavior and ordering.
from multiprocessing import Manager
from queuelink import QueueLink
# Create the multiprocessing.Manager
manager = Manager()
# Source and destination queues
source_q = manager.JoinableQueue()
dest_q = manager.JoinableQueue()
# Create the QueueLink
queue_link = QueueLink(name="my link")
# Connect queues to the QueueLink
source_id = queue_link.read(q=source_q)
dest_id = queue_link.write(q=dest_q)
# Text to send
text_in = "a😂" * 10
# Add text to the source queue
source_q.put(text_in)
# Retrieve the text from the destination queue!
text_out = dest_q.get()
print(text_out)
Reading from an open subprocess PIPE
This illustrates how to use a QueueLink adapter to read directly from a subprocess pipe into a queue. The pipe reader adapter only accepts one queue to write into, so if you need to read the pipe output from multiple processes/threads, you need to use a QueueLink to copy the pipe output into a set of additional queues.
# Multiprocessing
import multiprocessing
from queue import Queue
from queuelink import QueueLink, QueueHandleAdapterReader
from subprocess import Popen, PIPE
# Selecting a start method for this example
start_method = "spawn" # macOS default
def test_reader(self):
# Text to send
text_in = "a😂" * 10
# Destination queue
dest_q = multiprocessing.get_context(start_method).Queue() # Process-based
# Subprocess, simple example sending some text to stdout
# from subprocess import Popen, PIPE
proc = Popen(['echo', '-n', text_in], # -n prevents echo from adding a newline character
stdout=PIPE,
universal_newlines=True)
# Connect the reader
read_adapter = QueueHandleAdapterReader(queue=dest_q,
handle=proc.stdout,
start_method=self.start_method)
# Get the text from the queue
text_out = dest_q.get()
print(text_out)
Start Method
The “start method” is how a separate process is started by Python, applicable only to “multiprocessing”, not multi-threading. You can read more about this in the Python documentation. It is passed as a string to the QueueLink(start_method=) parameter, where it is sent unmodified to the multiprocessing.get_context() method.
This is helpful to set if you need to specify the start method because you or a downstream user may chose one other than the default.