Skip to content

Conversation

@vuonghp92
Copy link

@vuonghp92 vuonghp92 commented Oct 28, 2025

Add a new Prometheus Gauge metric flower_unprocessed_tasks_in_window to track
tasks that have been processing (received but not completed) for longer than a
configurable time window.

Implementation details:

  • Sliding window algorithm with time-based buckets for O(1) add/remove operations
  • Configurable window size via --unprocessed_tasks_window_minutes option (any value > 0)
  • Set to 0 (default) to disable tracking
  • Buckets are timestamp-based integers for efficient lookups
  • Counts bucket 2 steps back to ensure tasks have been unprocessed for > window_minutes
  • Automatic cleanup of counted buckets to minimize memory usage

Data structures:

  • time_buckets: Hash map of buckets containing task IDs
  • task_to_bucket: Hash map for O(1) task-to-bucket lookup
  • Both structures use dict for constant-time operations

Performance:

  • add_task(): O(1)
  • remove_task(): O(1)
  • update_metrics(): O(n) where n = tasks in old bucket, runs periodically (once per window)

Use cases:

  • Detect unprocessed or stalled tasks: Identify tasks that have been started but remain incomplete for longer than the configured time window, indicating potential processing delays or scheduling bottlenecks.

  • Detect message loss in the broker or queue: Highlight scenarios where tasks disappear from the broker or queue system (e.g., RabbitMQ) but never reach the worker, suggesting possible message delivery or routing issues.

  • Detect worker termination during processing: Monitor for tasks that remain “in-progress” because their worker was unexpectedly killed or crashed mid-execution, helping isolate stability or resource exhaustion problems.

Example:
flower --unprocessed_tasks_window_minutes=60

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant