|
Server : LiteSpeed System : Linux barito.iixcp.rumahweb.net 5.14.0-611.49.1.el9_7.x86_64 #1 SMP PREEMPT_DYNAMIC Tue Apr 21 16:39:08 EDT 2026 x86_64 User : elvh3918 ( 1528) PHP Version : 8.2.31 Disable Function : mail Directory : /proc/thread-self/root/usr/local/lib/python3.9/site-packages/celery/utils/ |
from __future__ import annotations
def detect_quorum_queues(app, driver_type: str) -> tuple[bool, str]:
"""Detect if any of the queues are quorum queues.
Returns:
tuple[bool, str]: A tuple containing a boolean indicating if any of the queues are quorum queues
and the name of the first quorum queue found or an empty string if no quorum queues were found.
"""
is_rabbitmq_broker = driver_type == 'amqp'
if is_rabbitmq_broker:
queues = app.amqp.queues
for qname in queues:
qarguments = queues[qname].queue_arguments or {}
if qarguments.get("x-queue-type") == "quorum":
return True, qname
return False, ""