Skip to content

feat: add extra observability metrics for PrometheusMiddleware and Receiver#604

Open
Mohammed0tarek wants to merge 3 commits intotaskiq-python:masterfrom
Mohammed0tarek:observability/add_full_observability
Open

feat: add extra observability metrics for PrometheusMiddleware and Receiver#604
Mohammed0tarek wants to merge 3 commits intotaskiq-python:masterfrom
Mohammed0tarek:observability/add_full_observability

Conversation

@Mohammed0tarek
Copy link

Summary

  • PrometheusMiddleware: add queue_wait_seconds histogram (end-to-end queue latency via _taskiq_enqueue_timestamp label injected in pre_send), task_errors_by_type counter
    (error count broken down by exception class name), and idempotent timestamp injection so multiple middlewares can coexist
  • ReceiverObserver protocol: a new typing.Protocol (taskiq/receiver/observer.py) that lets external implementations observe receiver internals — prefetch queue depth, semaphore
    availability, active task count, unknown task lookups, and deserialization errors — with zero overhead when no observer is set (if self.observer is not None guards)
  • PrometheusReceiverObserver: concrete implementation using Prometheus Gauges/Counters, wired automatically via PrometheusMiddleware.set_broker()broker._receiver_observer
    Receiver(observer=...)

New metrics

Metric Type Source
queue_wait_seconds Histogram PrometheusMiddleware
task_errors_by_type Counter (task_name, error_type) PrometheusMiddleware
prefetch_queue_size Gauge PrometheusReceiverObserver
semaphore_available Gauge PrometheusReceiverObserver
worker_active_tasks_count Gauge PrometheusReceiverObserver
task_not_found_total Counter (task_name) PrometheusReceiverObserver
deserialize_error_count Counter PrometheusReceiverObserver

Some decisions decisions

  • ReceiverObserver is a Protocol (not ABC) so there's no coupling — any class with matching methods satisfies it
  • queue_wait_seconds gracefully degrades: if the sender doesn't use PrometheusMiddleware, the timestamp label is absent and the metric is simply not observed
  • Negative queue wait values (from clock skew) are clamped to 0. If needed we can change this behavior.

Testing

Honestly still no idea how to test this via py test. But I'll be working on it.

Add production observability for the Receiver via an observer protocol
that tracks prefetch queue depth, semaphore availability, active task
count, unknown task lookups, and deserialization errors.

- Add ReceiverObserver protocol (taskiq/receiver/observer.py)
- Instrument Receiver with guarded observer callbacks at 5 sites
- Add PrometheusReceiverObserver implementation with Gauges/Counters
- Wire observer from middleware to receiver via broker attribute
- Remove redundant in_flight_tasks gauge (replaced by active_tasks_count)
- Fix typos in observer docstring and metric descriptions
- Add missing docstrings to observer protocol and implementation methods
- Remove unused Gauge import from PrometheusMiddleware.__init__
- Remove unused ReceiverObserver import from run.py
- Fix import ordering (ruff I001)
- Add noqa for expected complexity in runner method
- Run black formatting
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant