Metrics
QueueLink tracks two kinds of data for each active link automatically —
message latency (how long each message took to move from source to destination)
and message count (how many messages have been forwarded). No configuration is
required; collection starts as soon as the first publisher starts and stops when
stop() is called.
Metrics are available through QueueLink.get_metrics(). The link()
factory exposes the underlying QueueLink instance via result.queue_link
for queue-to-queue links:
import queue
from queuelink import link
src = queue.Queue()
dst = queue.Queue()
result = link(src, dst)
src.put("hello")
dst.get()
result.stop()
metrics = result.queue_link.get_metrics()
Calling get_metrics()
get_metrics() returns a snapshot of all data emitted up to that point.
It can be called at any time — before, during, or after a stop() — but
calling it after stop() guarantees all in-flight data has been flushed:
ql = QueueLink(source=src, destination=dst)
# ... move messages ...
ql.stop()
metrics = ql.get_metrics() # complete snapshot
Return value — a dict keyed by element ID (an opaque string). Each
value is a dict describing one tracked element:
{
"<element_id>": {
"name": str or None,
"data_point_count": int, # number of latency samples
"mean": float, # mean latency in seconds
"median": float, # median latency in seconds
"stddev": float, # population std dev in seconds
},
"<element_id>": {
"name": str or None,
"count": int, # total messages forwarded
},
...
}
The element IDs are stable for the lifetime of the QueueLink instance but
are not meaningful outside of it. To tell the two element types apart, check
for the "mean" key (timing) or "count" key (counting):
for eid, data in metrics.items():
if "mean" in data:
print(f"Latency — mean: {data['mean']:.4f}s "
f"median: {data['median']:.4f}s "
f"stddev: {data['stddev']:.4f}s "
f"samples: {data['data_point_count']}")
elif "count" in data:
print(f"Messages forwarded: {data['count']}")
Timing metric details
The timing element records the elapsed time between when each message was placed on the source queue and when it was forwarded to every destination queue. Latency values are in seconds (float).
Rolling window
Only the most recent 100 samples are kept in memory at any time. The mean, median, and standard deviation are recalculated over that window on every new sample, so the statistics always reflect recent behaviour.
data_point_count reflects how many samples are in the current window
(up to 100), not the total number of messages ever forwarded. Use the
counting element for the lifetime total.
Standard deviation
stddev is the population standard deviation (statistics.pstdev),
not the sample standard deviation. It is zero when only one sample has been
collected.
Counting metric details
The counting element increments once for every message forwarded. Unlike the timing window, the count is cumulative for the entire lifetime of the publisher.
When get_metrics() returns multiple elements with a "count" key (which
can happen if the publisher was restarted), sum them to get the overall total.
Periodic vs. final emission
Metrics snapshots are emitted internally at two points:
Periodically — every 100 messages forwarded.
On stop — once when the publisher shuts down, capturing any remaining data.
get_metrics() drains the internal snapshot queue and merges the results,
so it always returns the most recent available snapshot per element, not a
running total of all snapshots. For most use cases, calling get_metrics()
once after stop() is sufficient.
Limitations
Metrics are per-
QueueLinkinstance. If you create multiple links in a pipeline, callget_metrics()on each one separately.Latency measurement begins when the publisher reads from the source queue, not when the caller puts a message onto it. Network or IPC overhead between the caller and the publisher is not included.
link()only exposes metrics when the result includes aQueueLinkcomponent. Adapter-only paths (e.g. file-to-file) have noqueue_linkattribute and no metrics.