Welcome to QueueLink's documentation! ===================================== .. image:: https://badge.fury.io/py/queuelink.svg :target: https://pypi.org/project/queuelink :alt: Pypi Version .. image:: https://readthedocs.org/projects/queuelink/badge/?version=latest :target: http://queuelink.readthedocs.io/en/latest/?badge=latest :alt: Documentation Status This documentation includes an introduction to the purpose of QueueLink, example uses, and API docs. The project source is available on `Github `_. .. toctree:: :maxdepth: 1 :caption: Contents link_guide metrics api publishing Quick Start =========== Use ``link()`` to connect a source and destination without choosing which underlying class to use: .. code-block:: python import queue from queuelink import link src = queue.Queue() dst = queue.Queue() result = link(src, dst) src.put("hello") print(dst.get()) # "hello" result.stop() ``link()`` accepts queues, file handles, file paths, and ``multiprocessing.connection.Connection`` objects, and handles fan-out to multiple destinations. See :doc:`link_guide` for the full guide. Introduction and Background =========================== The QueueLink library simplifies linking queues together with one-to-many or many-to-one relationships. ``link()`` is the recommended entry point for most use cases — it inspects your source and destination types and wires up the correct combination of components automatically. For cases requiring fine-grained control (runtime queue registration, metrics, drain checking), use ``QueueLink``, ``QueueHandleAdapterReader``, and ``QueueHandleAdapterWriter`` directly. A ``QueueLink`` instance is a one-way router that connects queues together. When two or more queues are linked, a separate process (or thread) is started to read from each "source" queue and write into the "destination" queues. (One process per source queue.) Circular references are not allowed. Adapters permit linkage between file handles, pipes, and queues. Examples ======== Some implementations connecting queues together directly with ``QueueLink``. These examples are validated during testing in `this test file `_. Basic use --------- One-to-one, the "hello world". These examples both use the threaded Queue library for both queues (first example) or one of the queues (second example). Because of that, the link itself lives in the main code process and might be more likely to experience Global Interpreter Lock contention than if both queues were from the multiprocessing library. However, inter-process communication is quite slow, so you'll need to determine which works better for your use case. If both queues are created from the multiprocessing library the link will be a separate process. If you don't need to set the start method directly, you can simply ``from multiprocessing import Queue`` and replace ``q = multiprocessing.get_context(start_method).Queue()`` with ``q = Queue()``. .. code-block:: python # Threading only from queue import Queue from queuelink import QueueLink def test_example_threaded(self): # Source and destination queues source_q = Queue() dest_q = Queue() # Create the QueueLink queue_link = QueueLink(name="my link") # Connect queues to the QueueLink source_id = queue_link.read(q=source_q) dest_id = queue_link.write(q=dest_q) # Text to send text_in = "a😂" * 10 # Add text to the source queue source_q.put(text_in) # Retrieve the text from the destination queue! text_out = dest_q.get(timeout=1) print(text_out) .. code-block:: python # Multiprocessing import multiprocessing from queue import Queue from queuelink import QueueLink # Selecting a start method for this example start_method = "spawn" # macOS default def test_cross_thread_multiprocess(self): # Source and destination source_q = Queue() # Thread-based dest_q = multiprocessing.get_context(start_method).Queue() # Process-based # Create the QueueLink queue_link = QueueLink(name="my link", start_method=self.start_method) # Connect queues to the QueueLink source_id = queue_link.read(q=source_q) dest_id = queue_link.write(q=dest_q) # Text to send text_in = "a😂" * 10 # Add text to the source queue source_q.put(text_in) # Retrieve the text from the destination queue! text_out = dest_q.get(timeout=1) print(text_out) Using a process manager for the queues -------------------------------------- A `process manager `_ works across processes or whole machines, and can help mitigate `some kinds of issues `_ related to empty/get_nowait behavior and ordering. .. code-block:: python from multiprocessing import Manager from queuelink import QueueLink # Create the multiprocessing.Manager manager = Manager() # Source and destination queues source_q = manager.JoinableQueue() dest_q = manager.JoinableQueue() # Create the QueueLink queue_link = QueueLink(name="my link") # Connect queues to the QueueLink source_id = queue_link.read(q=source_q) dest_id = queue_link.write(q=dest_q) # Text to send text_in = "a😂" * 10 # Add text to the source queue source_q.put(text_in) # Retrieve the text from the destination queue! text_out = dest_q.get() print(text_out) Reading from an open subprocess PIPE ------------------------------------ This illustrates how to use a QueueLink adapter to read directly from a subprocess pipe into a queue. The pipe reader adapter only accepts one queue to write into, so if you need to read the pipe output from multiple processes/threads, you need to use a QueueLink to copy the pipe output into a set of additional queues. .. code-block:: python # Multiprocessing import multiprocessing from queue import Queue from queuelink import QueueLink, QueueHandleAdapterReader from subprocess import Popen, PIPE # Selecting a start method for this example start_method = "spawn" # macOS default def test_reader(self): # Text to send text_in = "a😂" * 10 # Destination queue dest_q = multiprocessing.get_context(start_method).Queue() # Process-based # Subprocess, simple example sending some text to stdout # from subprocess import Popen, PIPE proc = Popen(['echo', '-n', text_in], # -n prevents echo from adding a newline character stdout=PIPE, universal_newlines=True) # Connect the reader read_adapter = QueueHandleAdapterReader(queue=dest_q, handle=proc.stdout, start_method=self.start_method) # Get the text from the queue text_out = dest_q.get() print(text_out) Start Method ============ The "start method" is how a separate process is started by Python, applicable only to "multiprocessing", not multi-threading. You can read more about this in the `Python documentation `_. It is passed as a string to the QueueLink(start_method=) parameter, where it is sent unmodified to the multiprocessing.get_context() method. This is helpful to set if you need to specify the start method because you or a downstream user may chose one other than the default. Indices and tables ================== * :ref:`genindex`