
    h                         d Z ddlmZ ddlmZ ddlmZ ddlmZ ddl	m
Z
 ddlmZ ddlmZmZ dd	lmZ dd
lmZ ddlmZ ddlmZ dZ ee      Zej6                  ej8                  cZZ G d dej:                        Zy)z)Worker <-> Worker communication Bootstep.    )defaultdict)partial)heappush)
itemgetter)Consumer)	DummyLock)ContentDisallowedDecodeError)	bootsteps)
get_logger)Bunch   )Mingle)Gossipc            	            e Zd ZdZd ZefZ eddddddd      Zd	d
hZ		 	 d fd	Z
d ZddZd Zd Z fdZd Zd Zd Zd Zd Zd Zd Zd Zd Z xZS )r   zfBootstep consuming events from other workers.

    This keeps the logical clock value up to date.
    idclockhostnamepidtopicactioncveramqpredisc                    | xr | j                  |j                        | _        |j                  | _        | |_        |j                  j                  j
                  | _        |j                  | _        dj                  | j                  t        |j                        g      | _
        t        t               t               t                     | _        |j                  | _        | j                  rw|j                  j                  j                  | j                   | j"                  d      | _        |j&                  rt)               |_        | j$                  j,                  | _        || _        || _        d | _        t7        t8              | _        i | _        | j>                  | j@                  d| _!        |j                  jD                  | _"        d| jF                  i| _$        tK        |   |fi | y )N.)	node_join
node_leave	node_lostr   )on_node_joinon_node_leavemax_tasks_in_memory)zworker.electzworker.elect.acktask)'compatible_transportappenabledgossipeventsReceiverr   joinstrr   full_hostnamer   setontimerStater    r!   statehubr   _mutexeventupdate_stateintervalheartbeat_interval_trefr   listconsensus_requestsconsensus_replieson_electon_elect_ackevent_handlersr   	call_taskelection_handlerssuper__init__)selfcwithout_gossipr6   r7   kwargs	__class__s         Y/var/www/Befach/backend/env/lib/python3.12/site-packages/celery/worker/consumer/gossip.pyrB   zGossip.__init__$   s{   ))Nd.G.G.N55--

 XXt}}c!%%j&ABeue
 WW
<<++!.."00$% , DJ
 uu$; $

 0 0D "4
"-d"3!# MM $ 1 1
 UU[[
 DNN"
 	%f%    c                     |j                         5 }|j                  j                  | j                  v cd d d        S # 1 sw Y   y xY wN)connection_for_read	transportdriver_typecompatible_transports)rC   r%   conns      rH   r$   zGossip.compatible_transportM   sA    $$& 	L$>>--1K1KK	L 	L 	Ls	   "=Ac                 b    g | j                   |<   | j                  j                  d|||d       y )Nzworker-electr   )r   r   r   r   )r;   
dispatchersend)rC   r   r   r   s       rH   electionzGossip.electionQ   s5    %'r"vA 	 	
rI   c                     	 | j                   j                  |      j                          y # t        $ r }t        j                  d|       Y d }~y d }~ww xY w)NzCould not call task: %r)r%   	signatureapply_async	Exceptionlogger	exception)rC   r#   excs      rH   r?   zGossip.call_taskX   sF    	=HHt$002 	=6<<	=s   ), 	AAAc                    	 | j                  |      \  }}}}}}}t	        | j
                  |   || d| ||f       | j                  j                  d|       y # t        $ r }	t        j                  d|	      cY d }	~	S d }	~	ww xY w)Nz!election request missing field %sr   zworker-elect-ack)r   )_cons_stamp_fieldsKeyErrorrY   rZ   r   r:   rR   rS   )
rC   r4   id_r   r   r   r   r   _r[   s
             rH   r<   zGossip.on_elect^   s    	N!%!8!8!?S%3FA 	##C(xj#'7	
 	/C8  	N##$GMM	Ns   A 	B$A?9B?Bc                 F    t         |   |       |j                  | _        y rK   )rA   startevent_dispatcherrR   )rC   rD   rG   s     rH   rb   zGossip.startj   s    a,,rI   c                    |d   }	 | j                   |   }t        | j                  j	                               }|j                  |d          t        |      t        |      k\  r| j                  j                  | j                  |         \  }}}}|| j                  k(  r%t        d|       	 | j                  |   }	 |	|       nt        d||       | j                  j                  |d        | j                   j                  |d        y y # t        $ r Y y w xY w# t        $ r t        j                  d|       Y jw xY w)Nr   r   zI won the election %rzUnknown election topic %rznode %s elected for %r)r;   r^   r-   r1   alive_workersappendlenr   	sort_heapr:   r,   infor@   rY   rZ   pop)
rC   r4   r   repliesre   r`   leaderr   r   handlers
             rH   r=   zGossip.on_elect_ackn   s4   4[	,,R0G DJJ4467uZ()w<3}--'+zz';';''+($Avuf +++,b1$"44U;G FO-vr:##''D1""&&r40 .  		   I$$%@%HIs#   D -D 	DDD=<D=c                 |    t        d|j                         | j                  | j                  j                  |       y )Nz%s joined the party)debugr   _call_handlersr.   r   rC   workers     rH   r    zGossip.on_node_join   s+    #V__5DGG--v6rI   c                 |    t        d|j                         | j                  | j                  j                  |       y )Nz%s left)ro   r   rp   r.   r   rq   s     rH   r!   zGossip.on_node_leave   s*    i)DGG..7rI   c                 |    t        d|j                         | j                  | j                  j                  |       y )Nzmissed heartbeat from %s)ri   r   rp   r.   r   rq   s     rH   on_node_lostzGossip.on_node_lost   s+    '9DGG--v6rI   c                 ~    |D ]  }	  ||i |  y # t         $ r!}t        j                  d||       Y d }~4d }~ww xY w)Nz!Ignored error from handler %r: %r)rX   rY   rZ   )rC   handlersargsrF   rm   r[   s         rH   rp   zGossip._call_handlers   sO    GG((    G  7#G GGs   	<7<c                     | j                   | j                   j                          | j                  j                  | j                  | j
                        | _         y rK   )r8   cancelr/   call_repeatedlyr6   periodic)rC   s    rH   register_timerzGossip.register_timer   s<    ::!JJZZ//t}}M
rI   c                    | j                   j                  }t               }|j                         D ]1  }|j                  r|j                  |       | j                  |       3 |D ]  }|j                  |j                  d          y rK   )	r1   workersr-   valuesaliveaddru   rj   r   )rC   r   dirtyrr   s       rH   r|   zGossip.periodic   sh    **$$nn&F<<		&!!!&) ' FKK. rI   c                     | j                          | j                  |d| j                        }t        ||j                  gt        | j                  |j                        |j                  d      gS )Nzworker.#)routing_key	queue_ttlT)queues
on_messageacceptno_ack)	r}   r)   r7   r   queuer   r   event_from_messager   )rC   channelevs      rH   get_consumerszGossip.get_consumers   sj    ]]7
%)%<%<  >HH:t0E0EF99
  	rI   c                    |j                   d   }|j                  dd      d   dk(  ry 	 | j                  |   } ||j                        S # t        $ r Y nw xY w|j
                  j                  d      xs |j                  d   }|| j                  k7  r^	  ||j                        \  }}| j                  |       y # t        t        t        f$ r}t        j                  |       Y d }~y d }~ww xY w| j                  j                          y )Nr   r   r   r   r#   r   )delivery_infosplitr>   payloadr^   headersgetr   r5   r
   r	   	TypeErrorrY   errorr   forward)	rC   preparemessage_typerm   r   r`   r4   r[   s	            rH   r   zGossip.on_message   s    %%m4 ;;sAq!V+	,))%0G 7??++  		 OO''
3 0OOJ/ 	t}}$""7??35!!%(!2I> "S!!" JJ s)   A 	AA&B= =C0C++C0)Fg      @g       @rK   )__name__
__module____qualname____doc__labelr   requiresr   r]   rO   rB   r$   rT   r?   r<   rb   r=   r    r!   ru   rp   r}   r|   r   r   __classcell__)rG   s   @rH   r   r      s    
 EyH#gz5'8V $W-).25'&RL
=
9-14787GN
/
!rI   r   N)r   collectionsr   	functoolsr   heapqr   operatorr   kombur   kombu.asynchronous.semaphorer   kombu.exceptionsr	   r
   celeryr   celery.utils.logr   celery.utils.objectsr   mingler   __all__r   rY   ro   ri   ConsumerStepr    rI   rH   <module>r      s_    / #     2 ;  ' & 
	H	llFKKtw!Y## w!rI   