Yohohohohohooho | Sanrei Aya
Sanrei Aya


Server : LiteSpeed
System : Linux barito.iixcp.rumahweb.net 5.14.0-611.49.1.el9_7.x86_64 #1 SMP PREEMPT_DYNAMIC Tue Apr 21 16:39:08 EDT 2026 x86_64
User : elvh3918 ( 1528)
PHP Version : 8.2.31
Disable Function : mail
Directory :  /usr/local/lib/python3.9/site-packages/celery/concurrency/__pycache__/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //usr/local/lib/python3.9/site-packages/celery/concurrency/__pycache__/asynpool.cpython-39.pyc
a

��Shn��@s�dZddlZddlZddlZddlZddlZddlZddlmZm	Z	m
Z
ddlmZddl
mZddlmZddlmZmZmZddlmZdd	lmZmZdd
lmZddlmZmZddlm Z m!Z!m"Z"m#Z#m$Z$dd
l%m&Z&ddl'm(Z(m)Z)ddl*mZ+ddl,m-Z-ddl.m/Z/ddl0m1Z1ddl2m3Z3ddl4m5Z5ddl6m7Z7ddl8m9Z:zddl;m<Z=dZ>Wn2e?�y�ej<fdd�Z=dZ>efdd�ZYn0dZ@e7eA�ZBeBjCeBjDZCZDeEejFejGh�ZHdZId ZJd!ZKd"ZLeLeLeKeKeLd#�ZMd$d%�eM�N�D�ZOe
d&d'�ZPd(d)�ZQd*d+�ZRd,d-�ZSeTed.��rFddddejUejVejWejXfd/d0�ZYn
d<d1d0�ZYddddeYfd2d3�ZZd4d5�Z[Gd6d7�d7ej\�Z\Gd8d9�d9ej]�Z]Gd:d;�d;ej^�Z_dS)=a�Version of multiprocessing.Pool using Async I/O.

.. note::

    This module will be moved soon, so don't use it directly.

This is a non-blocking version of :class:`multiprocessing.Pool`.

This code deals with three major challenges:

#. Starting up child processes and keeping them running.
#. Sending jobs to the processes and receiving results back.
#. Safely shutting down this system.
�N)�Counter�deque�
namedtuple)�BytesIO)�Integral)�HIGHEST_PROTOCOL)�pack�unpack�unpack_from)�sleep)�WeakValueDictionary�ref)�pool)�
isblocking�setblocking)�ACK�NACK�RUN�	TERMINATE�
WorkersJoined)�_SimpleQueue)�ERR�WRITE)�pickle)�
SELECT_BAD_FD)�fxrange)�promise)�worker_before_create_process)�noop)�
get_logger)�state)�readTcCs(|||�}t|�}|dkr$|�|�|S�Nr)�len�write)�fd�buf�sizer!�chunk�n�r*�E/usr/local/lib/python3.9/site-packages/celery/concurrency/asynpool.py�__read__5s


r,FcCs|||���S�N)�getvalue)�fmtZiobufr	r*r*r+r
=sr
)�AsynPool�g@��)N�default�fastZfcfsZfaircCsi|]\}}||�qSr*r*)�.0�k�vr*r*r+�
<dictcomp>W�r9�Ack)�idr%�payloadcCst�|�dkS)z(Return true if generator is not started.�GEN_CREATED)�inspect�getgeneratorstate)�genr*r*r+�gen_not_started\srBcCs(z
|j}WntyYn0|�SdSr-)�_writer�AttributeError)�job�writerr*r*r+�_get_job_writeras

rGcCst|t�r|S|��Sr-)�
isinstancer�fileno�r%r*r*r+�_ensure_integral_fdjsrK�pollcCs(|�}|j}	i}
|r8tt|�D]}|
�|d�|B|
|<q|r`tt|�D]}|
�|d�|B|
|<qF|r�tt|�D]}|
�|d�|B|
|<qn|
��D]\}}|	||�q�t�t�}
}|r�|dkr�dn
t|d�}|�|�}|D]@\}}||@r�|
�|�||@�r
|�|�||@r�|
�|�q�|
|dfS)Nrg@�@)	�register�maprK�get�items�set�roundrL�add)�readers�writers�err�timeoutrL�POLLIN�POLLOUT�POLLERRZpollerrMZ
fd_to_maskr%Z
event_mask�R�W�events�eventr*r*r+�_select_impos2



r_cCs8t�||||�\}}}|r.tt|�t|�B�}||dfSr")�select�listrQ)rTrUrVrW�r�w�er*r*r+r_�scCs@|durt�n|}|dur t�n|}|dur2t�n|}z|||||�WSt�y:}z�|j}|tjkr�t�t�dfWYd}~S|tv�r$||B|BD]p}zt�|gggd�Wq�t�y}z:|j}|tvrԂ|�|�|�|�|�|�WYd}~q�d}~00q�t�t�dfWYd}~S�WYd}~n
d}~00dS)a<Simple wrapper to :class:`~select.select`, using :`~select.poll`.

    Arguments:
        readers (Set[Fd]): Set of reader fds to test if readable.
        writers (Set[Fd]): Set of writer fds to test if writable.
        err (Set[Fd]): Set of fds to test for error condition.

    All fd sets passed must be mutable as this function
    will remove non-working fds from them, this also means
    the caller must make sure there are still fds in the sets
    before calling us again.

    Returns:
        Tuple[Set, Set, Set]: of ``(readable, writable, again)``, where
        ``readable`` is a set of fds that have data available for read,
        ``writable`` is a set of fds that's ready to be written to
        and ``again`` is a flag that if set means the caller must
        throw away the result and call us again.
    Nr2r)rQ�OSError�errno�EINTRrr`�discard)rTrUrVrWrL�exc�_errnor%r*r*r+�_select�s,



"rkc	
s���fdd�}g}|D]\�|�|}}z|�g|�Ri|��Wqttfyptjd�dd�|���Yq0q|r�|D]L�z&t|d�r�|���n|��d�Wq|ty�t�d�|�Yq|0q|dS)	a�Apply hub method to fds in iter, remove from list if failure.

    Some file descriptors may become stale through OS reasons
    or possibly other reasons, so safely manage our lists of FDs.
    :param fds_iter: the file descriptors to iterate and apply hub_method
    :param source_data: data source to remove FD if it renders OSError
    :param hub_method: the method to call with each fd and kwargs
    :*args to pass through to the hub_method;
    with a special syntax string '*fd*' represents a substitution
    for the current fd object in the iteration (for some callers).
    :**kwargs to pass through to the hub method (no substitutions needed)
    cs"�}d|vr�fdd��D�}|S)N�*fd*csg|]}|dkr�n|�qS)rlr*)r6�argrJr*r+�
<listcomp>�r:zTiterate_file_descriptors_safely.<locals>._meta_fd_argument_maker.<locals>.<listcomp>r*)Z	call_args��argsr%r*r+�_meta_fd_argument_maker�sz@iterate_file_descriptors_safely.<locals>._meta_fd_argument_makerz)Encountered OSError when accessing fd %s T��exc_info�removeNz*ValueError trying to invalidate %s from %s)	re�FileNotFoundError�logger�warning�append�hasattrrt�pop�
ValueError)	Zfds_iterZsource_dataZ
hub_methodrp�kwargsrqZ	stale_fdsZhub_argsZ
hub_kwargsr*ror+�iterate_file_descriptors_safely�s,�
�r}c@seZdZdZdd�ZdS)�WorkerzPool worker process.cCs|j�t|ff�dSr-)�outq�put�	WORKER_UP)�self�pidr*r*r+�
on_loop_start�szWorker.on_loop_startN)�__name__�
__module__�__qualname__�__doc__r�r*r*r*r+r~�sr~cs^eZdZdZ�fdd�Zeeeee	j
fdd�Zdd�Zdd	�Z
d
d�Zdd
�Zdd�Z�ZS)�
ResultHandlerz)Handles messages from the pool processes.cs:|�d�|_|�d�|_t�j|i|��|j|jt<dS)N�fileno_to_outq�on_process_alive)rzr�r��super�__init__Zstate_handlersr�)r�rpr|��	__class__r*r+r�szResultHandler.__init__c	
cs�d}	}
|rtd�}t|�}n
|�}}|	dkr�z$|||rF||	d�n|d|	�}
Wn6ty�}z|jtvrp�dVWYd}~q(d}~00|
dkr�|	r�td�nt��|	|
7}	q(|d|�\}|r�t|�}t|�}n
|�}}|
|k�rnz$|||r�||
d�n|||
�}
Wn:t�yD}z |jtv�r*�dVWYd}~q�d}~00|
dk�rd|
�r^td�nt��|
|
7}
q�|||j|�|�r�|||��}n|�d�||�}|�r�||�dS)Nrr3zEnd of file during messagez>i)�	bytearray�
memoryviewrerf�UNAVAIL�EOFError�handle_event�seek)r��
add_readerr%�callbackr,�
readcanbufrr
�loadZHrZBrr&Zbufvr)ri�	body_size�messager*r*r+�
_recv_messagesZ

�
�



�
�

zResultHandler._recv_messagecs6|j�|j�|j�|j�|j������fdd�}|S)z3Coroutine reading messages from the pool processes.c	s~z�|Wnty&�|�YS0��|��}zt|�Wn.tyRYn(ttfyn�|�Yn0�||�dSr-)�KeyError�next�
StopIterationrer�)rI�it�r�r��on_state_changeZrecv_message�
remove_readerr*r+�on_result_readableHsz>ResultHandler._make_process_result.<locals>.on_result_readable)r�r�r�r�r�)r��hubr�r*r�r+�_make_process_result@sz"ResultHandler._make_process_resultcCs|�|�|_dSr-)r�r�)r�r�r*r*r+�register_with_event_loopXsz&ResultHandler.register_with_event_loopcGstd��dS)NzNot registered with event loop)�RuntimeError)r�rpr*r*r+r�[szResultHandler.handle_eventc		Cs�|j}|j}|j}|j}|j}t|�}|r�|r�|jtkr�|durF|�t�}|D]L}t|g|j|j	|j
||�z|dd�WqPty�td�YdS0qP|�
|�q&dS)NT)�shutdownz&result handler: all workers terminated)�cache�check_timeoutsr�r��join_exited_workersrQ�_staterr}�_flush_outqueuerSr�debug�difference_update)	r�r�r�r�r�r�Z	outqueuesZpending_remove_fdr%r*r*r+�on_stop_not_started`s*�z!ResultHandler.on_stop_not_startedcCsJz||}Wnty&||�YS0|jj}zt|d�WntyX||�YS0z�z$|�d�rr|��}nd}td�WnNttfy�||�YWzt|d�WSty�||�YS0S0|r�||�Wzt|d�Wnt�y||�YS0n:zt|d�Wn(t�yB||�YYS00dS)Nr2r��?)	r�r�_readerrrerL�recvrr�)r�r%rtZ
process_indexr��proc�reader�taskr*r*r+r�|s>

�
�zResultHandler._flush_outqueue)r�r�r�r�r�r,r�rr
�_pickler�r�r�r�r�r�r��
__classcell__r*r*r�r+r��s�
9r�cs~eZdZdZeZeZdZ�fdd�ZdN�fdd�	Z�fdd	�Z	d
d�Z
dd
�Zdd�Zdd�Z
dd�Zdd�Zdd�Zdd�Zdd�Zeejefdd�Zdd�Zd d!�Zd"d#�Zd$d%�Zd&d'�Zd(d)�Zd*d+�Zd,d-�Zd.d/�Z d0d1�Z!d2d3�Z"e#d4d5��Z$�fd6d7�Z%d8d9�Z&d:d;�Z'd<d=�Z(d>d?�Z)d@dA�Z*dBdC�Z+ejeefdDdE�Z,e-dFdG��Z.dHdI�Z/e-dJdK��Z0e1dLdM��Z2�Z3S)Or0zAsyncIO Pool (no threads).Fcst��|�}d|_|S)NF)r��
WorkerProcess�dead)r�Zworkerr�r*r+r��szAsynPool.WorkerProcessNcst�||��_|dur���n|}|�_�fdd�t|�D��_i�_i�_i�_	|dur^t
n|�_t��_
t��_t��_t��_t��_�jj�_t��_t��_t�j|g|�Ri|���jD]}|�j|j<|�j	|j<q�t�jdt��_ t�jdt��_!dS)Ncsi|]}���d�qSr-��create_process_queues�r6�_�r�r*r+r9�sz%AsynPool.__init__.<locals>.<dictcomp>�on_soft_timeout�on_hard_timeout)"�SCHED_STRATEGIESrO�sched_strategy�	cpu_count�synack�range�_queues�_fileno_to_inq�_fileno_to_outq�_fileno_to_synq�PROC_ALIVE_TIMEOUT�_proc_alive_timeoutrQ�_waiting_to_start�
_all_inqueues�_active_writes�_active_writers�
_busy_workersrh�_mark_worker_as_availabler�outbound_bufferr�write_statsr�r��_pool�outqR_fd�synqW_fd�getattrZ_timeout_handlerrr�r�)r�Z	processesr�r�Zproc_alive_timeoutrpr|r�r�r�r+r��s@�
��

��zAsynPool.__init__cs tj|d�t��t��|�S)N)Zsender)r�send�gcZcollectr��_create_worker_process)r��ir�r*r+r��szAsynPool._create_worker_processcCs|�||�|��dSr-)�_untrack_child_process�
maintain_pool)r�r�r�r*r*r+�_event_process_exit�szAsynPool._event_process_exitcCsNz
|j}Wn&ty0t�|jj�}|_Yn0t|gd|j|j||�dS)z4Helper method determines appropriate fd for process.N)	�_sentinel_pollrD�os�dupZ_popen�sentinelr}r�r��r�r�r�r%r*r*r+�_track_child_process�s

�zAsynPool._track_child_processcCs0|jdur,|jd}|_|�|�t�|�dSr-)r�rtr��closer�r*r*r+r�s

zAsynPool._untrack_child_processcs��j����jj�_��������������fdd��jD�t�j	�j	�j
�jd��j��D]\}}��
||�ql�js��j��j�d�_dS)z4Register the async pool with the current event loop.csg|]}��|���qSr*)r��r6rc�r�r�r*r+rnr:z5AsynPool.register_with_event_loop.<locals>.<listcomp>rlTN)�_result_handlerr�r��handle_result_event�_create_timelimit_handlers�_create_process_handlers�_create_write_handlersr�r}r�r��timersrPZcall_repeatedly�_registered_with_event_loopZon_tickrS�
on_poll_start)r�r��handler�intervalr*r�r+r�s



�z!AsynPool.register_with_event_loopcsR�j�t���_����fdd�}|�_�fdd����_�fdd�}|�_dS)z.Create handlers used to implement time limits.cs@|r"�|�j|j||���|j<n|r<�|�j|j��|j<dSr-)�_on_soft_timeout�_job�_on_hard_timeout)r[�soft�hard)�
call_laterr�r��trefsr*r+�on_timeout_set's�
�z;AsynPool._create_timelimit_handlers.<locals>.on_timeout_setc	s4z��|�}|��~Wnttfy.Yn0dSr-)rz�cancelr�rD)rEZtref)r�r*r+�
_discard_tref2s
z:AsynPool._create_timelimit_handlers.<locals>._discard_trefcs�|j�dSr-)r�)r[)r�r*r+�on_timeout_cancel;sz>AsynPool._create_timelimit_handlers.<locals>.on_timeout_cancelN)r�r�_tref_for_idr�r�r�)r�r�r�r�r*)r�r�r�r�r�r+r�"s	z#AsynPool._create_timelimit_handlersc	Csr|r|�|||j|�|j|<z>z|j|}Wnty@Yn0|�|�W|sn|�|�n|sl|�|�0dSr-)r�r�r��_cacher�r�r�)r�rEr�r�r��resultr*r*r+r�?s�
�zAsynPool._on_soft_timeoutc	CsLz:z|j|}Wnty"Yn0|�|�W|�|�n|�|�0dSr-)rr�r�r�)r�rErr*r*r+r�PszAsynPool._on_hard_timeoutcCs|�|�dSr-)r�)r�rEr��obj�inqW_fdr*r*r+�on_job_ready\szAsynPool.on_job_readycs��	j�	j�	j����
j��
j��
j��
j��
j��
j��
j	��
j
�
�
j���	�fdd�������	�
��fdd�}|�
_d
dd���������	�
���
�fdd	�}|�
_
dS)z/Create handlers called on process up/down, etc.csj|�}|durf|��rf|�vrf|j�vs,J��|j|us>J�|j�jvsNJ�td|�t�|jd�dS)Nz(Timed out waiting for UP message from %r�	)�	_is_aliver�rT�errorr��killr��r�)r�r��waiting_to_startr*r+�verify_process_alivens�
z?AsynPool._create_process_handlers.<locals>.verify_process_alivecs�|j}���D]4}|jr*|jj|kr*||_|jr|jj|kr||_q|�|j<��|��t|jj�rjJ��|j�|j���	|���
�j�t|��dS)z"Called when a process has started.N)
r�values�	_write_to�_scheduled_forr�r�rrr�rSr�r�r
)r��infdrE)r�r�r�r�r�r�rr
r*r+�
on_process_upxs

�z8AsynPool._create_process_handlers.<locals>.on_process_upNcSsnz|��}Wnty YdS0z|||ur<|�|d�WntyPYn0||�|durj||�|Sr-)rIrerzr�)rr��indexZ
remove_funr�r%r*r*r+�_remove_from_index�sz=AsynPool._create_process_handlers.<locals>._remove_from_indexcs�t|dd�rdS�|��|jj|���|jrB�|jj|��	��|jj|��	�jd�}|rh��|��
�|����|��
j�|j	��	|jj��|jj�|j
r��|jj�|jrֈ
j�|j��|jj�dS)z#Called when a worker process exits.r�N�r�)r�rr��synqrC�inqrhr�r�rZsynqR_fdr�)r�r)r�all_inqueues�busy_workers�
fileno_to_inqr��fileno_to_synqr��process_flush_queuesr��
remove_writerr�r
r*r+�on_process_down�s4���

z:AsynPool._create_process_handlers.<locals>.on_process_down)N)r�r�rrr�r�r�r�r�r�rr�rr)r�r�rrr*)rr�rrr�rr�rr�r�rr�rr�rr
r+r�_s"�


"z!AsynPool._create_process_handlersc
s��j��j�	�j��j��j��j��j��j}�j��j	��j
��j�j�
}�j�|j��j�|j
��jj�
�j��jtk�tj�tj�t��td�t��td�i�tjf���fdd�	}|�_������
��fdd�}|�_����fdd�}	|	�_|�_d��������������fd	d
�	}
|
�_��
���fdd�}|�_ ��fd
d�������fdd��������fdd�}|�_!d��	fdd�	�dS)z6Create handlers used to write data to child processes.)rcsR|jdus|j�vr<|js.|�d|���d�|�|j�n|�vrN��|�dSr-)Z_terminatedZcorrelation_id�	_acceptedZ_ackZ_set_terminated�
appendleft)rE�_time)�getpid�outbound�
revoked_tasksr*r+�	_put_back�s
�z2AsynPool._create_write_handlers.<locals>._put_backcsV���}�r"�ot��t��k}n�}|rDt|��dttBdd�nt|��j�dS)NT)Zconsolidate)r#r}rrr)ZinactiveZadd_cond)�
active_writesrr�diffr��hub_add�is_fair_strategyr!r*r+r��s
��z6AsynPool._create_write_handlers.<locals>.on_poll_startcsR��|�z0�||ur8��|d���|���|�WntyLYn0dSr-)rhrzr�)r%r�)r$rrrr*r+�on_inqueue_closes

z9AsynPool._create_write_handlers.<locals>.on_inqueue_closeNc
sv|s
dg}t|�}t|�D�]T}||d|}|dd7<|�vrJq�rX|�vrXq|�vrl��|�qz
��}Wn2ty����D]}��|�q�Y�qrYq0|jsz�|}|_Wnty��
|�YqYn0�|||�}t|�|_�|��
|��	|�zt	|�WnHt
�y0Yqt�yd}	z|	jtj
k�rP�WYd}	~	qd}	~	00�||�qdS)Nrr2)r#r�r�
IndexErrorrrr�r
rCr�r�rerf�EBADF)
Z	ready_fdsZtotal_write_countZ	num_readyr�Zready_fdrEZinqfdr��corri)�
_write_jobr$�
add_writerrrr%rr�r'�mark_worker_as_busy�mark_write_fd_as_active�mark_write_gen_as_active�pop_message�put_messager*r+�schedule_writes!sL



z8AsynPool._create_write_handlers.<locals>.schedule_writescsN�|�d�}t|�}�d|�}�|dd�}t|�t|�|f|_�|�dS)N��protocol�>Ir2r)r#r��_payload)�tup�bodyr��headerrE)�dumps�get_jobrr5r2r*r+�send_jobjs
z1AsynPool._create_write_handlers.<locals>.send_jobcs:t�d||j|�|��r"|����|���|�dS)Nz"Process inqueue damaged: %r %r: %r)rv�	exception�exitcoder�	terminatertr#)r�r%rErir�r*r+�on_not_recoveringvs
�
z:AsynPool._create_write_handlers.<locals>.on_not_recoveringc
3s�|j\}}}d}�zT||_|j}d}}	|dkr�z||||�7}Wn`ty�}
zHt|
dd�tvrd�|d7}|dkr��||||
�t��dVWYd}
~
q(d}
~
00d}q(|	|k�r2z|	|||	�7}	Wndt�y*}
zJt|
dd�tvr�|d7}|dk�r�||||
�t��dVWYd}
~
q�d}
~
00d}q�W��|��|jd7<��	|��|�
��n4��|��|jd7<��	|��|�
��0dS)Nrr3rfr2�d)r7r
Zsend_job_offset�	Exceptionr�r�r�rrrhrC)r�r%rEr:r9r��errorsr��Hw�Bwri)r$r�rA�write_generator_doner�r*r+r,~sL



�

z3AsynPool._create_write_handlers.<locals>._write_jobcsLt||�|�}t��}�|||d�}�|��|�|f|_�||�dS)Nr)r;rrp)�responser�rEr%�msgr�r+)�
_write_ackr-r/r0�precalcrGr*r+�send_ack�sz1AsynPool._create_write_handlers.<locals>.send_ackc
3s0|d\}}}�zz�|}Wnty6t��Yn0|j}d}}	|dkr�z||||�7}WqFty�}
z$t|
dd�tvr��dVWYd}
~
qFd}
~
00qF|	|kr�z|	|||	�7}	Wq�ty�}
z$t|
dd�tvr܂dVWYd}
~
q�d}
~
00q�W|�r|���|�n|�r |���|�0dS)N�rr3rf)r�r�Zsend_syn_offsetrCr�r�rh)r%Zackr�r:r9r�r�r�rErFri)r$rr*r+rJ�s8 �z3AsynPool._create_write_handlers.<locals>._write_ack)N)N)"r�r�r��popleftrxr�r�r�r��
differencer-rSrtrhr�__getitem__r�r��SCHED_STRATEGY_FAIR�worker_stateZrevokedr�r r�_create_payloadr�timer#r�r(�
hub_removeZconsolidate_callback�
_quick_putrL)
r�r�rr;r5Zactive_writersrUr#r�r(r3r=rLr*)rJr,r$r-rrr%r;rrr<r r�r&r'r.r/r0rAr!rr1rKr5r2r"r�rGr�r+r��sP
�
(G
3
zAsynPool._create_write_handlersc	Cs�|jtkrdS|jr2|j��D]}|js|��q|jrB|j��|�	��zJ|jt
k�rltddddd�}i}|j��D]}t|�}|durx|||<qx|j
s�|j��n�|j
�rXt|j
�}|D]�}|jdk�rt|��rz||}Wnty�Yn
0|��|j
�|�q�z||}Wnt�y.Yq�0|j}|���rL|�||�|��q�q�|�	�tt|��W|j��|j
��|j��|j��n*|j��|j
��|j��|j��0dS)N�{�G�z�?g�������?T)Z
repeatlastr,)r�rr�rrrZ_cancelr��clearr�rrrGr�rar�rBr�rhr
r�
_flush_writerrr�r�r�)r�rEZ	intervalsZowned_byrFrUrAZjob_procr*r*r+�flush�sb




�



�


zAsynPool.flushc
Cs�|jjh}zj|rf|��sqft||dd�\}}}|s|s:|rzt|�WqtttfybYqfYq0qW|j�	|�n|j�	|�0dS)Nr�)rUrVrW)
rrCrrkr�r�rer�r�rh)r�r�rF�fds�readable�writable�againr*r*r+rY,s
�zAsynPool._flush_writercCstdd�|j��D��S)z�Get queues for a new process.

        Here we'll find an unused slot, as there should always
        be one available when we start a new process.
        css|]\}}|dur|VqdSr-r*�r6�q�ownerr*r*r+�	<genexpr>Cs
�z.AsynPool.get_process_queues.<locals>.<genexpr>)r�r�rPr�r*r*r+�get_process_queues=szAsynPool.get_process_queuescs<t�jt�j�d�}|r8�j��fdd�t|�D��dS)z!Grow the pool by ``n`` processes.rcsi|]}���d�qSr-r�r�r�r*r+r9Jsz$AsynPool.on_grow.<locals>.<dictcomp>N)�maxZ
_processesr#r��updater�)r�r)r%r*r�r+�on_growFs
�zAsynPool.on_growcCsdS)z#Shrink the pool by ``n`` processes.Nr*)r�r)r*r*r+�	on_shrinkNszAsynPool.on_shrinkcCs�tdd�}tdd�}d}t|j�s&J�t|j�r4J�t|j�rBJ�t|j�sPJ�|jr|tdd�}t|j�snJ�t|j�r|J�|||fS)z5Create new in, out, etc. queues, returned as a tuple.T)Z	wnonblock)Z	rnonblockN)rrr�rCr�)r�rrrr*r*r+r�Qs


zAsynPool.create_process_queuescs�zt�fdd�|jD��}Wnty:t�d��YS0|j|jvsLJ�|j|jvs\J�|j�	|�||j|j<||j
|j<|j�|j�dS)zsCalled when receiving the :const:`WORKER_UP` message.

        Marks the process as ready to receive work.
        c3s|]}|j�kr|VqdSr-�r�r�rhr*r+rbir:z,AsynPool.on_process_alive.<locals>.<genexpr>z"process with pid=%s already exitedN)
r�r�r�rvrwrr�r�r�rhr�r�rS)r�r�r�r*rhr+r�cszAsynPool.on_process_alivecCs>|jr |j��s |�||j�n|jr:|j��s:|�|�dS)z:Called for each job when the process assigned to it exits.N)r
r�on_partial_readrr#)r�rEZpid_goner*r*r+�on_job_process_downsszAsynPool.on_job_process_downcCs|�||�dS)z�Called when the process executing job' exits.

        This happens when the process job'
        was assigned to exited by mysterious means (error exitcodes and
        signals).
        N)Zmark_as_worker_lost)r�rEr�r?r*r*r+�on_job_process_lost}szAsynPool.on_job_process_lostcs�|jdurdSt|j���}t|��dd�����rB�t|j�nd��d���fdd�|D��d�tt|��t�	|j
|j
�t|j�t|j�d�d	�S)
NzN/AcSs|rt|�|ndd�S)Nrz.2f)�float)r8�totalr*r*r+�per�sz'AsynPool.human_write_stats.<locals>.perrz, c3s|]}�|��VqdSr-r*)r6r8�rnrmr*r+rb�r:z-AsynPool.human_write_stats.<locals>.<genexpr>)rm�active)rm�avg�all�rawZstrategyZinqueues)
r�rar�sumr#�joinrN�str�SCHED_STRATEGY_TO_NAMErOr�r�r�)r��valsr*ror+�human_write_stats�s 
���zAsynPool.human_write_statsc	Cs6|js2zd|j|�|�<Wnttfy0Yn0dS)z-Called to clean up queues after process exit.N)r�r��_find_worker_queuesr�r{�r�r�r*r*r+�_process_cleanup_queues�s
z AsynPool._process_cleanup_queuescCsz|jD]n}zt|jjd�Wnty.Yq0z|j�d�Wqtyr}z|jtjkr^�WYd}~qd}~00qdS)z>Called at shutdown to tell processes that we're shutting down.r2N)rrrrCrer�rfr*)Ztask_handlerr�rir*r*r+�_stop_task_handler�s
zAsynPool._stop_task_handlercst�j|j|jd�S)N)r�r�)r��create_result_handlerr�r�r�r�r*r+r~�s�zAsynPool.create_result_handlercCs8||jvsJ�t|j�}||j|<|t|j�ks4J�dS)z;Mark new ownership for ``queues`` to update fileno indices.N)r�r#)r�r��queues�br*r*r+�_process_register_queues�s

z!AsynPool._process_register_queuescs>zt�fdd�|j��D��WSty8t���Yn0dS)z"Find the queues owned by ``proc``.c3s|]\}}|�kr|VqdSr-r*r_r	r*r+rb�s
�z/AsynPool._find_worker_queues.<locals>.<genexpr>N)r�r�rPr�r{r{r*r	r+rz�szAsynPool._find_worker_queuescCs"d|_d|_|_|_|_dSr-)rVZ_inqueue�	_outqueueZ
_quick_getZ_poll_resultr�r*r*r+�
_setup_queues�s
��zAsynPool._setup_queuesc

Cs|jj}|jj}|h}|r�|js�|jtkr�t|d|dd�\}}}|r�z|��}Wn�t	t
fy�}zlt|dd�}	|	tj
kr�WYd}~qn0|	tjkr�WYd}~q�n|	tvr�td||dd�WYd}~q�WYd}~q�d}~00|dur�td|�q�q�||�qq�qdS)	aFlush all queues.

        Including the outbound buffer, so that
        all tasks that haven't been started will be discarded.

        In Celery this is called whenever the transport connection is lost
        (consumer restart), and when a process is terminated.
        NrW�rWrfz got %r while flushing process %rr2rrz&got sentinel while flushing process %r)rr�r�r��closedr�rrkr�rer�r�rfrg�EAGAINr�r�)
r�r�Zresqr�r[r\r�r�rirjr*r*r+r�s0	

�"

zAsynPool.process_flush_queuescCs�|js|�|�t|�}|r*|j�|�~|js�d|_t|j�}z(|�|�}|�	||�rfd|j|�
�<WntyzYn0t|j�|ks�J�dS)z8Called when a job was partially written to exited child.TN)rr#rGr�rhr�r#r�rz�destroy_queuesr�r{)r�rEr�rF�beforerr*r*r+ri�s 


zAsynPool.on_partial_readc
Cs�|��rJ�|j�|�d}z|j�|�WntyBd}Yn0z|�|dj��|�Wnt	ypYn0|D]J}|rv|j
|jfD]4}|js�|�|�z|�
�Wq�t	y�Yq�0q�qv|S)zqDestroy queues that can no longer be used.

        This way they can be replaced by new usable sockets.
        r2r)rr�rhr�rzr�r(rCrIrer�r�rUr�)r�rr��removed�queue�sockr*r*r+r�s*


zAsynPool.destroy_queuesc	Cs,|||f|d�}t|�}|d|�}|||fS)Nr4r6)r#)	r��type_rpr;rr5r9r'r:r*r*r+rS,s
zAsynPool._create_payloadcCsdSr-r*)�clsr�r�r*r*r+�_set_result_sentinel4szAsynPool._set_result_sentinelcCs|jfSr-)r�r�r*r*r+�_help_stuff_finish_args9sz AsynPool._help_stuff_finish_argsc		Cs�td�i}t�}|D]:}z"|jj��}|�|�|||<WqtyNYq0q|r�t|dd�\}}}|rnqR|stq�|D]}||jj��qxt	d�qRdS)Nz7removing tasks from inqueue until task handler finishedr�r�r)
r�rQrr�rIrSrerkr�r)	r�rZfileno_to_procZinqRrcr%r\r�r^r*r*r+�_help_stuff_finish>s*�
zAsynPool._help_stuff_finishcCs
|jdiS)Ng@)r�r�r*r*r+r�WszAsynPool.timers)NFNN)4r�r�r�r�r�r~r�r�r�r�r�r�r�r�r�r�r�rr�rr�r;rr�rZrYrcrfrgr�r�rjrkryr|�staticmethodr}r~r�rzr�rrir�rS�classmethodr�r�r��propertyr�r�r*r*r�r+r0�sj�>k�
H	
	

$�


r0)NNNr)`r�rfr�r?r�r`rT�collectionsrrr�iorZnumbersrrr�structrr	r
r�weakrefrr
Zbilliardrr�Zbilliard.compatrrZ
billiard.poolrrrrrZbilliard.queuesrZkombu.asynchronousrrZkombu.serializationr�Zkombu.utils.eventiorZkombu.utils.functionalrZvinerZcelery.signalsrZcelery.utils.functionalrZcelery.utils.logrZ
celery.workerr rRZ	_billiardr!r,r��ImportError�__all__r�rvrr��	frozensetr�rgr�r�r�ZSCHED_STRATEGY_FCFSrQr�rPrwr;rBrGrKryrLrXrYrZr_rkr}r~r�ZPoolr0r*r*r*r+�<module>s��
	� 
�
0-
 

Yohohohohohooho | Sanrei Aya