
    hn                        d Z ddlZddlZddlZddlZddlZddlZddlmZm	Z	m
Z
 ddlmZ ddlmZ ddlmZ ddlmZmZmZ ddlmZ dd	lmZmZ dd
lmZ ddlmZmZ ddlm Z m!Z!m"Z"m#Z#m$Z$ ddl%m&Z& ddl'm(Z(m)Z) ddl*mZ+ ddl,m-Z- ddl.m/Z/ ddl0m1Z1 ddl2m3Z3 ddl4m5Z5 ddl6m7Z7 ddl8m9Z: 	 ddl;m<Z= dZ>dZ@ e7eA      ZBeBj                  eBj                  cZCZD eEej                  ej                  h      ZHdZIdZJdZKd ZLeLeLeKeKeLd!ZMeMj                         D  ci c]  \  } }|| 
 c}} ZO e
d"d#      ZPd$ ZQd% ZRd& ZS eTed'      r5ddddej                  ej                  ej                  ej                  fd(ZYnd2d)ZYddddeYfd*ZZd+ Z[ G d, d-ej                        Z\ G d. d/ej                        Z] G d0 d1ej                        Z_y# e?$ r ejx                  fdZ=dZ>efdZY 3w xY wc c}} w )3a  Version of multiprocessing.Pool using Async I/O.

.. note::

    This module will be moved soon, so don't use it directly.

This is a non-blocking version of :class:`multiprocessing.Pool`.

This code deals with three major challenges:

#. Starting up child processes and keeping them running.
#. Sending jobs to the processes and receiving results back.
#. Safely shutting down this system.
    N)Counterdeque
namedtuple)BytesIO)Integral)HIGHEST_PROTOCOL)packunpackunpack_from)sleep)WeakValueDictionaryref)pool)
isblockingsetblocking)ACKNACKRUN	TERMINATEWorkersJoined)_SimpleQueue)ERRWRITE)pickle)SELECT_BAD_FD)fxrange)promise)worker_before_create_process)noop)
get_logger)state)readTc                 Z     || |      }t        |      }|dk7  r|j                  |       |S Nr   )lenwrite)fdbufsizer"   chunkns         W/var/www/Befach/backend/env/lib/python3.12/site-packages/celery/concurrency/asynpool.py__read__r-   5   s.    RJ6IIe    Fc                 0     || |j                               S N)getvalue)fmtiobufr
   s      r,   r   r   =   s    c5>>+,,r.   )AsynPool   g      @      )NdefaultfastfcfsfairAck)idr'   payloadc                 2    t        j                  |       dk(  S )z(Return true if generator is not started.GEN_CREATED)inspectgetgeneratorstate)gens    r,   gen_not_startedrD   \   s    $$S)]::r.   c                 H    	 | j                   } |       S # t        $ r Y y w xY wr0   )_writerAttributeError)jobwriters     r,   _get_job_writerrJ   a   s-     x  s    	!!c                 F    t        | t              r| S | j                         S r0   )
isinstancer   fileno)r'   s    r,   _ensure_integral_fdrN   j   s    B)2:ryy{:r.   pollc                     |       }|j                   }	i }
| r-t        t        |       D ]  }|
j                  |d      |z  |
|<    |r-t        t        |      D ]  }|
j                  |d      |z  |
|<    |r-t        t        |      D ]  }|
j                  |d      |z  |
|<    |
j	                         D ]  \  }} |	||        t               t               }}|r|dk  rdnt        |dz        }|j                  |      }|D ]H  \  }}||z  r|j                  |       ||z  r|j                  |       ||z  s8|j                  |       J ||dfS )Nr   g     @@)	registermaprN   getitemssetroundrO   add)readerswriterserrtimeoutrO   POLLINPOLLOUTPOLLERRpollerrQ   
fd_to_maskr'   
event_maskRWeventsevents                    r,   _select_imprf   o   sL    ??
-w7!+A!6!?
2 8-w7!+A!6!@
2 8-s3!+A!6!@
2 4 )..0NB
R$ 1 uce17Q;!E'C-4HW%IBv~b	wb	wb	   !Qwr.   c                     t        j                   | |||      \  }}}|r t        t        |      t        |      z        }||dfS r$   )selectlistrU   )rX   rY   rZ   r[   rwes          r,   rf   rf      s@    --#w?1aSVc!f_%A!Qwr.   c                 R   | 
t               n| } |
t               n|}|
t               n|}	  || |||      S # t        $ r}|j                  }|t        j                  k(  rt               t               dfcY d}~S |t        v r| |z  |z  D ]z  }	 t        j
                  |gg g d       # t        $ rR}|j                  }|t        vr | j                  |       |j                  |       |j                  |       Y d}~td}~ww xY w t               t               dfcY d}~S  d}~ww xY w)a<  Simple wrapper to :class:`~select.select`, using :`~select.poll`.

    Arguments:
        readers (Set[Fd]): Set of reader fds to test if readable.
        writers (Set[Fd]): Set of writer fds to test if writable.
        err (Set[Fd]): Set of fds to test for error condition.

    All fd sets passed must be mutable as this function
    will remove non-working fds from them, this also means
    the caller must make sure there are still fds in the sets
    before calling us again.

    Returns:
        Tuple[Set, Set, Set]: of ``(readable, writable, again)``, where
        ``readable`` is a set of fds that have data available for read,
        ``writable`` is a set of fds that's ready to be written to
        and ``again`` is a flag that if set means the caller must
        throw away the result and call us again.
    Nr6   r   )rU   OSErrorerrnoEINTRr   rh   discard)rX   rY   rZ   r[   rO   exc_errnor'   s           r,   _selectrt      s	   * ceGGceGG;#%CCGWc733 U[[ 5#%?"}$'#-	$MM2$B2 $ YYF]2OOB'OOB'KKOO$ . 5#%?"'sX   
7 	D& 3D!3D&9D!B'&D!'	D0AC=8D!=DD!D& D!!D&c                   	 	fd}g }| D ]  	 |       |}}	  |	g|i |  |r9|D ]3  		 t        |d      r|j                  	       n|j                  	d       5 yy# t         t        f$ r, t        j                  d	d       |j	                  	       Y w xY w# t        $ r t        j                  d	|       Y w xY w)a  Apply hub method to fds in iter, remove from list if failure.

    Some file descriptors may become stale through OS reasons
    or possibly other reasons, so safely manage our lists of FDs.
    :param fds_iter: the file descriptors to iterate and apply hub_method
    :param source_data: data source to remove FD if it renders OSError
    :param hub_method: the method to call with each fd and kwargs
    :*args to pass through to the hub_method;
    with a special syntax string '*fd*' represents a substitution
    for the current fd object in the iteration (for some callers).
    :**kwargs to pass through to the hub method (no substitutions needed)
    c                  J    } d| v rD cg c]  }|dk(  rn| } }| S c c}w )N*fd* )	call_argsargargsr'   s     r,   _meta_fd_argument_makerz@iterate_file_descriptors_safely.<locals>._meta_fd_argument_maker   s:    	YAEF#sf}#5FIF Gs    z)Encountered OSError when accessing fd %s Texc_inforemoveNz*ValueError trying to invalidate %s from %s)	rn   FileNotFoundErrorloggerwarningappendhasattrr   pop
ValueError)
fds_itersource_data
hub_methodr{   kwargsr|   	stale_fdshub_args
hub_kwargsr'   s
      `     @r,   iterate_file_descriptors_safelyr      s     I68&*	!r3H3
3	  B0;1&&r*OOB-   *+ 	!NN;T  # R 		!  0K!;00s"   A$0B"$8BB" CCc                       e Zd ZdZd Zy)WorkerzPool worker process.c                 H    | j                   j                  t        |ff       y r0   )outqput	WORKER_UP)selfpids     r,   on_loop_startzWorker.on_loop_start   s     			y3&)*r.   N)__name__
__module____qualname____doc__r   rx   r.   r,   r   r      s
    +r.   r   c                   f     e Zd ZdZ fdZeeeee	j                  fdZd Zd Zd Zd Zd Z xZS )	ResultHandlerz)Handles messages from the pool processes.c                     |j                  d      | _        |j                  d      | _        t        |   |i | | j                  | j
                  t        <   y )Nfileno_to_outqon_process_alive)r   r   r   super__init__state_handlersr   )r   r{   r   	__class__s      r,   r   zResultHandler.__init__   sO    $jj)9: &

+= >$)&))-)>)>I&r.   c	              #     K   dx}	}
|rt        d      }t        |      }n	 |       x}}|	dk  r<	  |||r||	d  n|d|	z
        }|dk(  r|	rt        d      t               |	|z  }	|	dk  r< |d|      \  }|rt        |      }t        |      }n	 |       x}}|
|k  r<	  |||r||
d  n|||
z
        }|dk(  r|
rt        d      t               |
|z  }
|
|k  r< ||| j                  |       |r | ||            }n|j                  d        ||      }|r	 ||       y y # t        $ r!}|j                  t
        vr d  Y d }~d }~ww xY w# t        $ r!}|j                  t
        vr d  Y d }~d }~ww xY ww)Nr   r7   zEnd of file during messagez>i)	bytearray
memoryviewrn   EOFErrorro   UNAVAILhandle_eventseek)r   
add_readerr'   callbackr-   
readcanbufr   r   loadHrBrr(   bufvr+   rr   	body_sizemessages                    r,   _recv_messagezResultHandler._recv_message  s     RA,Cc?D "C$ 1fZRS	T1r6 6DF7#?@ , (
,a 1f !t,
	I&Cc?D "C$9nZRS	T9r> 6DF7#?@ , (
,a 9n 	2t(("-74=)GIIaL4jGW K  99G+,  99G+se   ,E<D" &E<*2E<E 1&E<A
E<"	E+EE<EE<	E9E4/E<4E99E<c                     | j                   | j                  |j                  |j                  | j                  fd}|S )z3Coroutine reading messages from the pool processes.c                     	 |      |       }	 t        |        | |       y # t         $ r  |       cY S w xY w# t        $ r Y y t        t        f$ r  |        Y y w xY wr0   )KeyErrornextStopIterationrn   r   )rM   itr   r   on_state_changerecv_messageremove_readers     r,   on_result_readablez>ResultHandler._make_process_result.<locals>.on_result_readableH  s|    -v& j&/BB'R 62&  -$V,,-
 ! X& &f%&s!   ( ? <<	A#
A#"A#)r   r   r   r   r   )r   hubr   r   r   r   r   r   s      @@@@@r,   _make_process_resultz"ResultHandler._make_process_result@  sJ    ,,..^^
))))	' 	' "!r.   c                 0    | j                  |      | _        y r0   )r   r   )r   r   s     r,   register_with_event_loopz&ResultHandler.register_with_event_loopX  s     55c:r.   c                     t        d      )NzNot registered with event loop)RuntimeError)r   r{   s     r,   r   zResultHandler.handle_event[  s     ;<<r.   c           	         | j                   }| j                  }| j                  }| j                  }| j                  }t        |      }|r|r| j                  t        k7  r| |        t               }|D ];  }t        |g| j                  | j                  |j                  ||       	  |d       = |j                  |       |r|r| j                  t        k7  r|y y y y y y # t        $ r t        d       Y  y w xY w)NT)shutdownz&result handler: all workers terminated)cachecheck_timeoutsr   r   join_exited_workersrU   _stater   r   _flush_outqueuerW   r   debugdifference_update)	r   r   r   r   r   r   	outqueuespending_remove_fdr'   s	            r,   on_stop_not_startedz!ResultHandler.on_stop_not_started`  s    

,,,,.."66 '		dkkY&>)  #/D$--t/C/C%))>?'6   ''(9:! 	dkkY&>	e&>	e % BCs   '	C!!C98C9c                 P   	 ||   }|j                  j                  }	 t        |d       	 |j                  d      r|j                         }nd }t        d       |r	 ||       	 	 t        |d       y # t         $ r  ||      cY S w xY w# t        $ r  ||      cY S w xY w# t        t        f$ r1  ||      cY 	 t        |d       S # t        $ r  ||      cY c S w xY ww xY w# t        $ r  ||      cY S w xY w# 	 t        |d       w # t        $ r  ||      cY c cY S w xY wxY w)Nr6   r         ?)	r   r   _readerr   rn   rO   recvr   r   )r   r'   r   process_indexr   procreadertasks           r,   r   zResultHandler._flush_outqueue|  s4   	 $D ""	"	"{{1~{{}c
 %"FA&1  	 ":		  	":	 " 	":
"FA& "bz!"	  "bz!""FA& "bz!"s   A3 B
 /B! C; &C$ 3BB
BB!C!8C; :CCC C!!C; $C87C8;D%=D
	D%
D"D%!D""D%)r   r   r   r   r   r-   r   r   r   _pickler   r   r   r   r   r   r   __classcell__r   s   @r,   r   r      s=    3?  (J%;"<<7r"0;=
;8"r.   r   c                   x    e Zd ZdZeZeZdZ fdZ	 	 d( fd	Z fdZ	d Z
d Zd Zd	 Zd
 Zd Zd Zd Zd Zeej*                  efdZd Zd Zd Zd Zd Zd Zd Zd Zd Z d Z!d Z"e#d        Z$ fdZ%d Z&d Z'd Z(d  Z)d! Z*d" Z+ej*                  eefd#Z,e-d$        Z.d% Z/e-d&        Z0e1d'        Z2 xZ3S ))r4   zAsyncIO Pool (no threads).Fc                 4    t         |   |      }d|_        |S )NF)r   WorkerProcessdead)r   workerr   s     r,   r   zAsynPool.WorkerProcess  s    &v.r.   c                 R   t         j                  ||      | _        || j                         n|}|| _        t        |      D ci c]  }| j                         d  c}| _        i | _        i | _	        i | _
        |t        n|| _        t               | _        t               | _        t               | _        t               | _        t               | _        | j$                  j&                  | _        t+               | _        t/               | _        t3        	| h  |g|i | | j6                  D ]4  }|| j                  |j8                  <   || j                  |j:                  <   6 t=        | j>                  dt@              | _!        t=        | j>                  dt@              | _"        y c c}w )Non_soft_timeouton_hard_timeout)#SCHED_STRATEGIESrS   sched_strategy	cpu_countsynackrangecreate_process_queues_queues_fileno_to_inq_fileno_to_outq_fileno_to_synqPROC_ALIVE_TIMEOUT_proc_alive_timeoutrU   _waiting_to_start_all_inqueues_active_writes_active_writers_busy_workersrq   _mark_worker_as_availabler   outbound_bufferr   write_statsr   r   _pooloutqR_fdsynqW_fdgetattr_timeout_handlerr   r   r   )
r   	processesr   r   proc_alive_timeoutr{   r   _r   r   s
            r,   r   zAsynPool.__init__  s    /22>3AC(1(9DNN$y	 9>i8H
34D&&($.

 !!! #5"<# 	  "% !U "e  #u !U)-););)C)C&  %w"94T4V4JJD 37D  /26D  /	   '!!#4d 
  '!!#4d 
e
s   F$c                 v    t        j                  |        t        j                          t        |   |      S )N)sender)r   sendgccollectr   _create_worker_process)r   ir   s     r,   r  zAsynPool._create_worker_process  s*    $))6


w-a00r.   c                 H    | j                  ||       | j                          y r0   )_untrack_child_processmaintain_pool)r   r   r   s      r,   _event_process_exitzAsynPool._event_process_exit  s    ##D#.r.   c                     	 |j                   }t        |gd|j                  | j                  ||       y# t        $ r3 t        j                  |j                  j
                        x}|_         Y aw xY w)z4Helper method determines appropriate fd for process.N)	_sentinel_pollrG   osdup_popensentinelr   r   r  r   r   r   r'   s       r,   _track_child_processzAsynPool._track_child_process  sl    	D$$B 	(D$$$c4	1  	D
 (*vvdkk.B.B'CCB$	Ds   4 9A0/A0c                     |j                   ;|j                   d c}|_         |j                  |       t        j                  |       y y r0   )r  r   r  closer  s       r,   r	  zAsynPool._untrack_child_process  s>    *&*&9&94#B#JJrNHHRL +r.   c                 |   | j                   j                  |       | j                   j                  | _        | j	                  |       | j                  |       | j                  |       | j                  D cg c]  }| j                  ||       c} t        | j                  | j                  |j                  | j                  d       | j                  j                         D ]  \  }}|j                  ||        | j                  s-|j                   j#                  | j$                         d| _        yyc c}w )z4Register the async pool with the current event loop.rw   TN)_result_handlerr   r   handle_result_event_create_timelimit_handlers_create_process_handlers_create_write_handlersr   r  r   r   r   timersrT   call_repeatedly_registered_with_event_loopon_tickrW   on_poll_start)r   r   rk   handlerintervals        r,   r   z!AsynPool.register_with_event_loop  s   55c:#'#7#7#D#D '',%%c*##C( 59JJ?q	"	"1c	*? 	(  $"6"6$$f	. "&!2!2!4GX'2 "5
 //KKOOD..//3D, 0 	@s   8D9c                      j                   t               x _         fd}| _        fd _        fd}| _        y)z.Create handlers used to implement time limits.c                     |r/ |j                   | j                  ||      | j                  <   y |r, |j                  | j                        | j                  <   y y r0   )_on_soft_timeout_job_on_hard_timeout)rb   softhard
call_laterr   r   trefss      r,   on_timeout_setz;AsynPool._create_timelimit_handlers.<locals>.on_timeout_set'  s\     *$//tS!aff  *$//!aff r.   c                 v    	 j                  |       }|j                          ~y # t        t        f$ r Y y w xY wr0   )r   cancelr   rG   )rH   trefr+  s     r,   _discard_trefz:AsynPool._create_timelimit_handlers.<locals>._discard_tref2  s8    yy~n- s   "& 88c                 *     | j                          y r0   )r&  )rb   r0  s    r,   on_timeout_cancelz>AsynPool._create_timelimit_handlers.<locals>.on_timeout_cancel;  s    !&&!r.   N)r*  r   _tref_for_idr,  r0  r2  )r   r   r,  r2  r0  r*  r+  s   ``  @@@r,   r  z#AsynPool._create_timelimit_handlers"  sG    ^^
$7$99!	 -	 +	"!2r.   c                    |r-|j                  ||z
  | j                  |      | j                  |<   	 | j                  |   }| j	                  |       |s| j                  |       y y # t
        $ r Y  w xY w# |s| j                  |       w w xY wr0   )r*  r'  r3  _cacher   r   r0  )r   rH   r(  r)  r   results         r,   r%  zAsynPool._on_soft_timeout?  s    %(^^tT22C&Dc"		([[%F   (""3'   		
 ""3' s)   A&  A5 &	A2/A5 1A22A5 5Bc                     	 | j                   |   }| j                  |       | j                  |       y # t        $ r Y w xY w# | j                  |       w xY wr0   )r5  r   r   r0  )r   rH   r6  s      r,   r'  zAsynPool._on_hard_timeoutP  sZ    	$[[%F   ( s#  		 s#s$   4 A 	A A A  A Ac                 &    | j                  |       y r0   )r   )r   rH   r  objinqW_fds        r,   on_job_readyzAsynPool.on_job_ready\  s    &&w/r.   c                    	
 j                   j                  j                  c j                   j                   j
                  	 j                  
 j                   j                   j                   j                   j                  
fd
 fd}| _        dd	
 fd}| _        y)z/Create handlers called on process up/down, etc.c                     |        } | | j                         ro| v rj| j                  v sJ | j                     | u sJ | j                  j                  v sJ t        d|        t	        j
                  | j                  d       y y y y )Nz(Timed out waiting for UP message from %r	   )	_is_aliver   rX   errorr  killr   )r   r   r   waiting_to_starts    r,   verify_process_alivez?AsynPool._create_process_handlers.<locals>.verify_process_aliven  s    6D T^^%5,,}}666%dmm4<<<}}333@$G!$ - &6 r.   c                 *   | j                   }j                         D ]\  }|j                  r |j                  j                   |k(  r| |_        |j                  s<|j                  j                   |k(  sV| |_        ^ | | j                  <   j                  |        t        | j                  j                        rJ  | j                  | j                         
j                  |        j                  j                  	t        |              y)z"Called when a process has started.N)r:  values	_write_to_scheduled_forr   r  r   r   r   rW   r*  r   r   )r   infdrH   r   r   r   r  r   r   rC  rB  s      r,   on_process_upz8AsynPool._create_process_handlers.<locals>.on_process_upx  s     <<D||~==S]]%:%:d%B$(CM%%#*<*<*D*D*L)-C&	 &
 -1N4==) %%dC0!$))"3"3444 t}}&94==I  &NN((*>D	r.   Nc                     	 | j                         }	 ||   |u r|j                  |d         ||       | ||       |S # t        $ r Y y w xY w# t        $ r Y |S w xY wr0   )rM   rn   r   r   )r9  r   index
remove_funr   r'   s         r,   _remove_from_indexz=AsynPool._create_process_handlers.<locals>._remove_from_index  sz    ZZ\	!9$IIb$' 2'RLI    
 Is"   A  A  	AA	AAc                 .   t        | dd      ry 	|         | j                  j                  | 
       | j                  r | j                  j                  |         | j
                  j                  | j                        }|rj                  |       j                  |        j                  |        j                  j                  | j                          | j
                  j                          
| j                  j                         | j                  r 
| j                  j                         | j                  rBj                  j                  | j                          
| j                  j                         yy)z#Called when a worker process exits.r   Nr   )r   r   r   synqrF   inqrq   r	  r   r:  synqR_fdr   )r   rQ  rM  all_inqueuesbusy_workersfileno_to_inqr   fileno_to_synqr   process_flush_queuesr   remove_writerr   rB  s     r,   on_process_downz:AsynPool._create_process_handlers.<locals>.on_process_down  s=   tVT* &		!!4 yy"II%%t^] %  $}%--C $$S)''c2$$T*''5$((**+$))++,}}dii//0}}##++DMM:dii//0 r.   r0   )r   r   rX  r5  r   r   r   r   r   r  rW  r   rI  rY  )r   r   rI  rY  rM  r   rS  rT  r   rU  r   rV  r  rW  r   rX  rC  rB  s   ``  @@@@@@@@@@@@@@r,   r  z!AsynPool._create_process_handlers_  s     NNC--s/@/@ 	1
M= ))++----))"66#8811	%	 	8 +	*	1 	18  /r.   c                     !"#$  j                    j                   j                  j                  j                  ! j
                   j                   j                  } j                  j                  j                  j                  j                  c}j                  |j                  j                  |j                  # j                  j                   j                   $ j"                  t$        k(  t&        j(                  "t*        j,                  t.         j1                  t.        d      t2         j1                  t2        d      i t4        j4                  f"fd	}| _        fd}| _        fd}	|	 _        | _        d!fd	}
|
_        !fd}| _          fd#$fd	 #fd
}| _!        dfd	y)z6Create handlers used to write data to child processes.)r   c                     | j                   | j                  v rF| j                  s| j                  d  |               d        | j	                  | j                          y | vrj                  |        y y r0   )_terminatedcorrelation_id	_accepted_ack_set_terminated
appendleft)rH   _timegetpidoutboundrevoked_taskss     r,   	_put_backz2AsynPool._create_write_handlers.<locals>._put_back  sg    *&&-7}}HHT57FHd;##COO4 h&'', 'r.   c                             } r	xr t              t              k  }n	}|rt        | d t        t        z  d       y t        | j                         y )NT)consolidate)r%   r   r   r   rX  )
inactiveadd_condactive_writesrS  rT  diffr   hub_addis_fair_strategyrd  s
     r,   r   z6AsynPool._create_write_handlers.<locals>.on_poll_start  sd    M*H  #ML(9C<M(M#/lG%#+49 0lC,=,=?r.   c                     j                  |        	 |    |u r5j                  | d        j                  |        j                  |        y y # t        $ r Y y w xY wr0   )rq   r   r   )r'   r   rk  rS  rT  rU  s     r,   on_inqueue_closez9AsynPool._create_write_handlers.<locals>.on_inqueue_close  sj       $ $,!%%b$/!))"- ((, -  s   ;A 	AANc                    |sdg}t        |       }t        |      D ]  }| |d   |z     }|dxx   dz  cc<   |v r r|v r'|vrj                  |       =	         }|j                  rR	 |   x}|_         
|||      }t        |      |_         |        |        |       	 t        |        ||        y # t
        $ r  |       Y w xY w# t        $ r Y t        $ r(}|j                  t        j                  k7  r Y d }~d }~ww xY w# t        $ r"        D ]  }	j                  |	        Y  y w xY w)Nr   r6   )r%   r   rX  r^  rG  r   r   rF   r   r   rn   ro   EBADF
IndexError)	ready_fdstotal_write_count	num_readyr   ready_fdrH   r   corrr   inqfd
_write_jobrk  
add_writerrS  rT  rl  rU  r   rn  mark_worker_as_busymark_write_fd_as_activemark_write_gen_as_activepop_messageput_messages             r,   schedule_writesz8AsynPool._create_write_handlers.<locals>.schedule_writes!  s~   $%&C! II9%$%6q%9I%EF!!$)$},#L(@</%%h/'6%-C ==	% 9Fh8OOD3#5 )x=&)#h05/9+H56 I 'x5g &<  ( % (,$%  - ! & &"yyEKK7 %  8&C "  "&m!4))%0 "5sB   D0C/CCC	D(D0DD'EEc                      |       }t        |      } d|      } | d   d         }t        |      t        |      |f|_         	|       y )Nprotocol>Ir6   r   )r%   r   _payload)
tupbodyr   headerrH   dumpsget_jobr	   r  r  s
        r,   send_jobz1AsynPool._create_write_handlers.<locals>.send_jobj  sX     x0DD	I$	*F#a&)$C%f-z$/?JCLr.   c                     t         j                  d| | j                  |       | j                         r| j	                          j                  |       j                  |       y )Nz"Process inqueue damaged: %r %r: %r)r   	exceptionexitcoder?  	terminater   rf  )r   r'   rH   rr   r   r   s       r,   on_not_recoveringz:AsynPool._create_write_handlers.<locals>.on_not_recoveringv  sJ    4dDMM3P~~ JJrNNN3r.   c              3   D  K   |j                   \  }}}d}	 | |_        | j                  }dx}}	|dk  r	 | |||      z  }d}|dk  r|	|k  r	 |	 |||	      z  }	d}|	|k  rj                  |       | j                  xx   dz  cc<   j                  |        |j                                y # t        $ rA}
t	        |
dd       t
        vr |dz  }|dkD  r | |||
       t               d  Y d }
~
d }
~
ww xY w# t        $ rA}
t	        |
dd       t
        vr |dz  }|dkD  r | |||
       t               d  Y d }
~
d }
~
ww xY w# j                  |       | j                  xx   dz  cc<   j                  |        |j                                w xY ww)Nr   r7   ro   r6   d   )r  rF  send_job_offset	Exceptionr   r   r   rX  rK  rq   rF   )r   r'   rH   r  r  r   errorsr  HwBwrr   rk  r   r  write_generator_doner   s              r,   rz  z3AsynPool._create_write_handlers.<locals>._write_job~  s    
 '*ll#FD)F*4 $++R1f#d62.. "# 1f  9n#d4n, "# 9n !!"%DJJ'1,'%%b)$S[[]3A % "36gE!!!C<-dBSA"//1  % "36gE!!!C<-dBSA"//1 !!"%DJJ'1,'%%b)$S[[]3sw   F E B2  E E C? E "AF 2	C<;7C72E 7C<<E ?	E	7E?E E		E AFF c                     t        |||          }t              } |||      } 
|        	|       |f|_         ||       y )NrO  )r<   r   r{   )responser   rH   r'   msgr   rx  
_write_ackr{  r}  r~  precalcr  s          r,   send_ackz1AsynPool._create_write_handlers.<locals>.send_ack  sT     c2wx01C34HRx8C$S)#B' FHMr3r.   c              3     K   |d   \  }}}	 	 |    }|j                  }dx}}	|dk  r	 | |||      z  }|dk  r|	|k  r	 |	 |||	      z  }	|	|k  r|r |        j                  |        y # t         $ r t               w xY w# t        $ r"}
t	        |
dd       t
        vr d  Y d }
~
vd }
~
ww xY w# t        $ r"}
t	        |
dd       t
        vr d  Y d }
~
d }
~
ww xY w# |r |        j                  |        w xY ww)N   r   r7   ro   )r   r   send_syn_offsetr  r   r   rq   )r'   ackr   r  r  r   r   r  r  r  rr   rk  rV  s              r,   r  z3AsynPool._create_write_handlers.<locals>._write_ack  s/     '*!f#FD) **)"-D
 ++R1fd62.. 1f 9nd4n, 9n J%%b);   * (/)* % "36gE! % "36gE!	 J%%b)s   
DA/ C" B C" C" B4 C" D/BC" 	B1B,'C" ,B11C" 4	C=CC" CC" "C>>Dr0   )"r   r   r   popleftr   r   r   r   r   
differencer{  rW   r   rq   r5  __getitem__r   r   SCHED_STRATEGY_FAIRworker_staterevokedr  rc  r   _create_payloadr   timerf  r   rp  
hub_removeconsolidate_callback
_quick_putr  )%r   r   r	   r  r  active_writersr  rf  r   rp  r  r  r  r  rz  rk  r{  rS  rT  rl  rU  rV  r  rc  rm  rn  r|  r}  r~  r  rd  r  r  r  re  r  r   s%   `````        @@@@@@@@@@@@@@@@@@@@@@@@r,   r  zAsynPool._create_write_handlers  s    ++--''&&oo))++--))&&^^
!ggszz"/"3"3#1#5#5 *..-55++))&&..2EE$,,,,S$7--dD9; "& 	- #	? 	?$ +
	 !1$F	6 F	6 F	6N $3 		 		 #	 1	4 1	4f		  		  !%	*r.   c                    | j                   t        k(  ry | j                  r<| j                  j	                         D ]  }|j
                  r|j                          ! | j                  r| j                  j                          | j                          	 | j                   t        k(  rSt        dddd      }i }| j                  j	                         D ]  }t        |      }||||<    | j                  s| j                  j                          n| j                  rt        | j                        }|D ]  }|j                  dk(  r=t!        |      r2	 ||   }|j#                          | j                  j#                  |       O	 ||   }|j&                  }|j)                         r| j+                  ||       |j#                           | j                  r| j                          t-        t/        |             | j                  j                          | j                  j                          | j0                  j                          | j2                  j                          y # t$        $ r Y w xY w# t$        $ r Y Kw xY w# | j                  j                          | j                  j                          | j0                  j                          | j2                  j                          w xY w)N{Gz?g?T)
repeatlastrz  )r   r   r   r5  rE  r^  _cancelr   clearr
  r   r   rJ   r   ri   r   rD   rq   r   rF  r?  _flush_writerr   r   r   r   )r   rH   	intervalsowned_byrI   rY   rC   job_procs           r,   flushzAsynPool.flush  sa   ;;)# ;;{{))+}}KKM ,   &&(5	' {{c!#D#tE	 ;;--/C,S1F)+.( 0
 ++KK%%'.."&t';';"<#*C # <$3C$8!2*23-C
 %(KKM $ 4 4 < <S A	!2*23-C 03}}H'/'9'9';(,(:(:8S(I$'KKM1 $+ ..8 &&($y/*  &&(  &&(%%'$$&1 (0 !)$(!) (0 !)$(!)   &&(  &&(%%'$$&sd   AJ $A.J I$,J I4
AJ $J $	I1-J 0I11J 4	J=J  JJ A*K.c                 R   |j                   j                  h}	 |r8|j                         sn't        ||d      \  }}}|s|s|r	 t	        |       |r8| j                  j                  |       y # t
        t        t        f$ r Y 2w xY w# | j                  j                  |       w xY w)Nr   )rY   rZ   r[   )
rQ  rF   r?  rt   r   r   rn   r   r   rq   )r   r   rI   fdsreadablewritableagains          r,   r  zAsynPool._flush_writer,  s    xx 	1~~',3S#-)(E (hV    ((0 *7H=    ((0s/   +B	 A/ B	 /BB	 BB	 	B&c                 V    t        d | j                  j                         D              S )zGet queues for a new process.

        Here we'll find an unused slot, as there should always
        be one available when we start a new process.
        c              3   *   K   | ]  \  }}||  y wr0   rx   ).0qowners      r,   	<genexpr>z.AsynPool.get_process_queues.<locals>.<genexpr>C  s       &(!U}  &s   )r   r   rT   r   s    r,   get_process_queueszAsynPool.get_process_queues=  s*      &dll&8&8&: & & 	&r.   c                     t        | j                  t        | j                        z
  d      }|rB| j                  j	                  t        |      D ci c]  }| j                         d c}       yyc c}w )z!Grow the pool by ``n`` processes.r   N)max
_processesr%   r   updater   r   )r   r+   rl  r   s       r,   on_growzAsynPool.on_growF  sd    4??S%66:LL<A$K!78**,d2!  !s   A2c                      y)z#Shrink the pool by ``n`` processes.Nrx   )r   r+   s     r,   	on_shrinkzAsynPool.on_shrinkN  s    r.   c                    t        d      }t        d      }d}t        |j                        sJ t        |j                        rJ t        |j                        rJ t        |j                        sJ | j                  r:t        d      }t        |j                        sJ t        |j                        rJ |||fS )z5Create new in, out, etc. queues, returned as a tuple.T)	wnonblock)	rnonblockN)r   r   r   rF   r   )r   rQ  r   rP  s       r,   r   zAsynPool.create_process_queuesQ  s    
 T*d+#++&&&ckk***dll+++$,,''';;$/Ddll+++!$,,///D$r.   c                    	 t        fd| j                  D              }|j
                  | j                  vsJ |j
                  | j                  vsJ | j                  j                  |       || j                  |j
                  <   || j                  |j                  <   | j                  j                  |j
                         y# t        $ r t        j	                  d      cY S w xY w)zsCalled when receiving the :const:`WORKER_UP` message.

        Marks the process as ready to receive work.
        c              3   B   K   | ]  }|j                   k(  s|  y wr0   )r   )r  rk   r   s     r,   r  z,AsynPool.on_process_alive.<locals>.<genexpr>i  s     >a#>s   z"process with pid=%s already exitedN)r   r   r   r   r   r:  r   r   r   rq   r   r   rW   )r   r   r   s    ` r,   r   zAsynPool.on_process_alivec  s    
	M>4::>>D ||4#6#6666||4#5#5555&&t,,0DLL).2T]]+t||,  	M>>"FLL	Ms   C C*)C*c                     |j                   r7|j                   j                         s| j                  ||j                          y|j                  r-|j                  j                         s| j	                  |       yyy)z:Called for each job when the process assigned to it exits.N)rF  r?  on_partial_readrG  rf  )r   rH   pid_gones      r,   on_job_process_downzAsynPool.on_job_process_downs  s]    ==!8!8!:  cmm4(:(:(D(D(F NN3 )Gr.   c                 (    | j                  ||       y)zCalled when the process executing job' exits.

        This happens when the process job'
        was assigned to exited by mysterious means (error exitcodes and
        signals).
        N)mark_as_worker_lost)r   rH   r   r  s       r,   on_job_process_lostzAsynPool.on_job_process_lost}  s     	  h/r.   c           	         | j                   yt        | j                   j                               }t        |      d  rt	        | j                         z  nd      dj                  fd|D              dj                  t        t        |            t        j                  | j                  | j                        t	        | j                        t	        | j                        ddS )NzN/Ac                 .    | rt        |       |z  dS ddS )Nr   z.2f)float)vtotals     r,   perz'AsynPool.human_write_stats.<locals>.per  s"    ,-uQx%'S9:1S9:r.   r   z, c              3   0   K   | ]  } |        y wr0   rx   )r  r  r  r  s     r,   r  z-AsynPool.human_write_stats.<locals>.<genexpr>  s     9qSE]9s   )r  active)r  avgallrawstrategyinqueues)r   ri   rE  sumr%   joinrR   strSCHED_STRATEGY_TO_NAMErS   r   r   r   )r   valsr  r  s     @@r,   human_write_statszAsynPool.human_write_stats  s    #D$$++-.D		; us4#3#3441eL999D9999Sd^,.22##T%8%8 T//0d112
 	
r.   c                     |j                   s 	 d| j                  | j                  |      <   yy# t        t        f$ r Y yw xY w)z-Called to clean up queues after process exit.N)r   r   _find_worker_queuesr   r   r   r   s     r,   _process_cleanup_queuesz AsynPool._process_cleanup_queues  sE    yy?CT55d;<  j) s   . A A c                 &   | j                   D ]?  }	 t        |j                  j                  d       	 |j                  j	                  d       A y# t
        $ r(}|j                  t        j                  k7  r Y d}~od}~ww xY w# t
        $ r Y w xY w)z>Called at shutdown to tell processes that we're shutting down.r6   N)r   r   rQ  rF   r   rn   ro   rr  )task_handlerr   rr   s      r,   _stop_task_handlerzAsynPool._stop_task_handler  s     !%%D	DHH,,a0HHLL& &  yyEKK/ 0  s(    BA	BA<<B	BBc                 N    t         |   | j                  | j                        S )N)r   r   )r   create_result_handlerr   r   )r   r   s    r,   r  zAsynPool.create_result_handler  s,    w,//!22 - 
 	
r.   c                     || j                   v sJ t        | j                         }|| j                   |<   |t        | j                         k(  sJ y)z;Mark new ownership for ``queues`` to update fileno indices.N)r   r%   )r   r   queuesbs       r,   _process_register_queuesz!AsynPool._process_register_queues  sG    %%%#VC%%%%r.   c                     	 t        fd| j                  j                         D              S # t        $ r t	              w xY w)z"Find the queues owned by ``proc``.c              3   2   K   | ]  \  }}|k(  r|  y wr0   rx   )r  r  r  r   s      r,   r  z/AsynPool._find_worker_queues.<locals>.<genexpr>  s#      *ha D=  *s   )r   r   rT   r   r   r  s    `r,   r  zAsynPool._find_worker_queues  sH    	# *$,,*<*<*> * * * 	#T""	#s	   +/ Ac                 J    d | _         d x| _        x| _        x| _        | _        y r0   )r  _inqueue	_outqueue
_quick_get_poll_resultr  s    r,   _setup_queueszAsynPool._setup_queues  s1     
 37	7 	7 	7Od/r.   c                 P   |j                   j                  }| j                  j                  }|h}|r|j                  sx| j
                  t        k7  rdt        |d|d      \  }}}|r)	 |j                         }|t        d|       y ||       ny|r"|j                  s| j
                  t        k7  rayyyyyy# t        t        f$ r^}t        |dd      }	|	t        j                  k(  rY d}~|	t        j                  k(  rY d}~y|	t         vrt        d||d       Y d}~yd}~ww xY w)	a  Flush all queues.

        Including the outbound buffer, so that
        all tasks that haven't been started will be discarded.

        In Celery this is called whenever the transport connection is lost
        (consumer restart), and when a process is terminated.
        Nr  r[   z&got sentinel while flushing process %rro   z got %r while flushing process %rr6   r}   )r   r   r  r   closedr   r   rt   r   r   rn   r   r   ro   rp   EAGAINr   )
r   r   resqr   r  r  r   r   rr   rs   s
             r,   rW  zAsynPool.process_flush_queues  s	    yy  ..>>f$++$++*B$S$TBNHa.99;D |FM'-- $++$++*B+c*B+c
  * 	$S'48F, 5<</w.@!4!5	s$   'B8 8D% D ,D D  D%c                    |j                   s| j                  |       t        |      }|r| j                  j	                  |       ~|j
                  sxd|_        t        | j                        }	 | j                  |      }| j                  ||      rd| j                  | j                         <   t        | j                        |k(  sJ yy# t        $ r Y 'w xY w)z8Called when a job was partially written to exited child.TN)r^  rf  rJ   r   rq   r   r%   r   r  destroy_queuesr   r   )r   rH   r   rI   beforer  s         r,   r  zAsynPool.on_partial_read  s     }}NN3 %  ((0yyDI&F11$7&&vt4AEDLL!;!;!=> t||$...   s   0A C 	CCc                    |j                         rJ | j                  j                  |       d}	 | j                  j	                  |       	 | j                  |d   j                  j                         |       |D ]Q  }|s|j                  |j                  fD ]1  }|j                  r| j                  |       	 |j                          3 S |S # t
        $ r d}Y w xY w# t        $ r Y tw xY w# t        $ r Y cw xY w)zqDestroy queues that can no longer be used.

        This way they can be replaced by new usable sockets.
        r6   r   )r?  r   rq   r   r   r   rp  rF   rM   rn   r   r  r  r  )r   r  r   removedqueuesocks         r,   r  zAsynPool.destroy_queues  s    
 >>###&&t,	LLV$	!!&)"3"3":":"<dC E"]]EMM:D;;-! JJL	 ;    	G	  		  ' ! !s5   C -C# <C2C C #	C/.C/2	C>=C>c                 L     |||f|      }t        |      } |d|      }|||fS )Nr  r  )r%   )	r   type_r{   r  r	   r  r  r)   r  s	            r,   r  zAsynPool._create_payload,  s6     eT]X64ydD!tT!!r.   c                      y r0   rx   )clsr  r   s      r,   _set_result_sentinelzAsynPool._set_result_sentinel4  s     	r.   c                     | j                   fS r0   )r   r  s    r,   _help_stuff_finish_argsz AsynPool._help_stuff_finish_args9  s     

}r.   c                    t        d       i }t               }|D ]=  }	 |j                  j                  j	                         }|j                  |       |||<   ? |rTt        |d      \  }}}|r|sy |D ])  }||   j                  j                  j                          + t        d       |rSy y # t        $ r Y w xY w)Nz7removing tasks from inqueue until task handler finishedr   r  r   )
r   rU   rQ  r   rM   rW   rn   rt   r   r   )	r  r   fileno_to_procinqRrk   r'   r  r   r  s	            r,   _help_stuff_finishzAsynPool._help_stuff_finish>  s     	E	
 uAUU]]))+%&r"	  !(s!;Har"&&..335 !H   s   :B11	B=<B=c                     | j                   diS )Ng      @)r
  r  s    r,   r  zAsynPool.timersW  s    ""C((r.   )NFNN)4r   r   r   r   r   r   r  r   r   r  r  r  r	  r   r  r%  r'  r;  r  r	   r   r  r   r  r  r  r  r  r  r   r   r  r  r  r  staticmethodr  r  r  r  r  rW  r  r  r  classmethodr  r  r  propertyr  r   r   s   @r,   r4   r4     s9   $!MF #(
 /49=<
|1

1463:("
$0h/V %)(8Y*vF'P1"&2$-  0
.  
&#7"H/48 &mm$!1"  
  0 ) )r.   r4   )NNNr   )`r   ro   r  rA   r  rh   r  collectionsr   r   r   ior   numbersr   r   r   structr	   r
   r   r   weakrefr   r   billiardr   r   billiard.compatr   r   billiard.poolr   r   r   r   r   billiard.queuesr   kombu.asynchronousr   r   kombu.serializationr   kombu.utils.eventior   kombu.utils.functionalr   viner   celery.signalsr   celery.utils.functionalr   celery.utils.logr    celery.workerr!   r  	_billiardr"   r-   r   ImportError__all__r   r   r@  r   	frozensetr  rp   r   r   r   SCHED_STRATEGY_FCFSr  r   rT   r  r<   rD   rJ   rN   r   rO   r\   r]   r^   rf   rt   r   r   r   Poolr4   )kr  s   00r,   <module>r6     s    	  	   2 2   # , ,  , " 3 B B ( ) 1 - *  7 ( ' /
-*J 	H	||V\\u
U\\5;;/
0 	     "  ,<+A+A+CD41a!Q$D /0;
; 66 $D!V]]"NNFNN@ $D!-`*0Z+U\\ +\"E'' \"~})uzz })S  -%'WW  J'- --H Es   *G G)G&%G&