
    h#                         d Z ddlZddlZddlZddlmZmZ ddlmZ ddl	m
Z
 ddlmZ ddlmZ dd	lmZmZmZ d
Z G d d      Zy)zEvent dispatcher sends events.    N)defaultdictdeque)Producer)app_or_default)anon_nodename)	utcoffset   )Eventget_exchange
group_from)EventDispatcherc                       e Zd ZdZdhZdZdZdZ	 	 	 	 ddZd Z	d Z
d Zd Zd	efd
Zd	defdZd	ed	defdZddZd Zd Zd Zd Z eee      Zy)r   a0  Dispatches event messages.

    Arguments:
        connection (kombu.Connection): Connection to the broker.

        hostname (str): Hostname to identify ourselves as,
            by default uses the hostname returned by
            :func:`~celery.utils.anon_nodename`.

        groups (Sequence[str]): List of groups to send events for.
            :meth:`send` will ignore send requests to groups not in this list.
            If this is :const:`None`, all events will be sent.
            Example groups include ``"task"`` and ``"worker"``.

        enabled (bool): Set to :const:`False` to not actually publish any
            events, making :meth:`send` a no-op.

        channel (kombu.Channel): Can be used instead of `connection` to specify
            an exact channel to use when sending events.

        buffer_while_offline (bool): If enabled events will be buffered
            while the connection is down. :meth:`flush` must be called
            as soon as the connection is re-established.

    Note:
        You need to :meth:`close` this after use.
    sqlNc                 ~   t        |xs | j                        | _        || _        || _        |xs
 t	               | _        || _        |
xs
 t               | _        || _	        || _
        t        t              | _        t        j                         | _        d | _        t%               | _        |xs  | j                  j(                  j*                  | _        t/               | _        t/               | _        t/        |xs g       | _        t6        j8                   t6        j:                   g| _        | j                  j>                  | _        |	| _         |s|r|j                  jB                  | _        || _"        | j                  xs | j                  jG                         }tI        || j                  j(                  jJ                        | _&        |jN                  jP                  | jR                  v rd| _"        | jD                  r| jU                          d| j
                  i| _+        tY        jZ                         | _.        y )N)nameFhostname)/r   app
connectionchannelr   r   buffer_while_offline	frozensetbuffer_groupbuffer_limiton_send_bufferedr   list_group_buffer	threadingLockmutexproducerr   _outbound_bufferconfevent_serializer
serializerset
on_enabledon_disabledgroupstimetimezonealtzonetzoffsetclockdelivery_modeclientenabledconnection_for_writer   event_exchangeexchange	transportdriver_typeDISABLED_TRANSPORTSenableheadersosgetpidpid)selfr   r   r0   r   r   r   r$   r(   r.   r   r   r   conninfos                 T/var/www/Befach/backend/env/lib/python3.12/site-packages/celery/events/dispatcher.py__init__zEventDispatcher.__init__:   s    "#/2$ 3MO$8!(7IK( 0(.^^%
 %$F(F(F%5&,B'--$,,7XX^^
*g%0077DO??Edhh&C&C&E$X*.((--*F*FH))T-E-EE DL<<KKM"DMM299;    c                     | S N r<   s    r>   	__enter__zEventDispatcher.__enter__^   s    r@   c                 $    | j                          y rB   )close)r<   exc_infos     r>   __exit__zEventDispatcher.__exit__a   s    

r@   c                     t        | j                  xs | j                  | j                  | j                  d      | _        d| _        | j                  D ]	  } |         y )NF)r3   r$   auto_declareT)r   r   r   r3   r$   r    r0   r&   r<   callbacks     r>   r7   zEventDispatcher.enabled   sJ     !@*.--,0OO.35 HJ (r@   c                 |    | j                   r0d| _         | j                          | j                  D ]	  } |         y y )NF)r0   rG   r'   rL   s     r>   disablezEventDispatcher.disablem   s3    << DLJJL ,,
 - r@   Fc           	      "   |rdn| j                   j                         } ||f| j                  t               | j                  |d|}| j
                  5   | j                  ||fd|j                  dd      i|cddd       S # 1 sw Y   yxY w)au  Publish event using custom :class:`~kombu.Producer`.

        Arguments:
            type (str): Event type name, with group separated by dash (`-`).
                fields: Dictionary of event fields, must be json serializable.
            producer (kombu.Producer): Producer instance to use:
                only the ``publish`` method will be called.
            retry (bool): Retry in the event of connection failure.
            retry_policy (Mapping): Map of custom retry policy options.
                See :meth:`~kombu.Connection.ensure`.
            blind (bool): Don't set logical clock value (also don't forward
                the internal logical clock).
            Event (Callable): Event type used to create event.
                Defaults to :func:`Event`.
            utcoffset (Callable): Function returning the current
                utc offset in hours.
        Nr   r   r;   r-   routing_key-.)r-   forwardr   r   r;   r   _publishreplace)	r<   typefieldsr    blindr
   kwargsr-   events	            r>   publishzEventDispatcher.publisht   s    & 4::#5#5#7d ;T]]ik((%;39;ZZ 	O 4== O-1\\#s-COGMO	O 	O 	Os   &BBc                 &   | j                   }	 |j                  |||j                  |||g| j                  | j                  | j
                  	       y # t        $ r5}| j                  s | j                  j                  |||f       Y d }~y d }~ww xY w)N)rR   r3   retryretry_policydeclarer$   r8   r.   )
r3   r]   r   r$   r8   r.   	Exceptionr   r!   append)	r<   r\   r    rR   r_   r`   r   r3   excs	            r>   rV   zEventDispatcher._publish   s    ==	D'!)!
??"00  
  	D,,!!((%c)BCC	Ds   AA 	B+BBc           	         | j                   r| j                  t        |      }	}|r|	|vry|	| j                  v r| j                  j                         }
 ||f| j                   |       | j                  |
d|}| j                  |	   }|j                  |       t        |      | j                  k\  r| j                          y| j                  r| j                          yy| j                  ||| j                  ||||      S y)a  Send event.

        Arguments:
            type (str): Event type name, with group separated by dash (`-`).
            retry (bool): Retry in the event of connection failure.
            retry_policy (Mapping): Map of custom retry policy options.
                See :meth:`~kombu.Connection.ensure`.
            blind (bool): Don't set logical clock value (also don't forward
                the internal logical clock).
            Event (Callable): Event type used to create event,
                defaults to :func:`Event`.
            utcoffset (Callable): unction returning the current utc offset
                in hours.
            **fields (Any): Event fields -- must be json serializable.
        NrQ   )rZ   r
   r_   r`   )r0   r(   r   r   r-   rU   r   r;   r   rc   lenr   flushr   r]   r    )r<   rX   rZ   r   r_   r`   r
   rY   r(   groupr-   r\   bufs                r>   sendzEventDispatcher.send   s    " << KKD)9EF%v-)))

**,d CT]](1"&((%C;AC ((/

5!s8t000JJL**))+ + ||D&$--u*/u1= $ ? ?! r@   c                    |rnt        | j                        }	 | j                  5  |D ]#  \  }}}| j                  || j                  |       % 	 ddd       | j                  j                          |r^| j                  5  | j                  j                         D ]*  \  }}| j                  || j                  d|z         g |dd , 	 ddd       yy# 1 sw Y   xY w# | j                  j                          w xY w# 1 sw Y   yxY w)zFlush the outbound buffer.Nz%s.multi)r   r!   r   rV   r    clearr   items)	r<   errorsr(   ri   r\   rR   _rh   eventss	            r>   rg   zEventDispatcher.flush   s    t,,-C.ZZ I14-{AeT]]KH 25I %%++- #%)%7%7%=%=%?ME6MM&$--e9KL "F1I &@# # I I %%++-# #s/   C )CC ?AC=CC C:=Dc                 N    | j                   j                  |j                          y)z-Copy the outbound buffer of another instance.N)r!   extend)r<   others     r>   extend_bufferzEventDispatcher.extend_buffer   s    $$U%;%;<r@   c                 ~    | j                   j                         xr | j                   j                          d| _        y)zClose the event dispatcher.N)r   lockedreleaser    rD   s    r>   rG   zEventDispatcher.close   s,    

4

 2 2 4r@   c                     | j                   S rB   r    rD   s    r>   _get_publisherzEventDispatcher._get_publisher   s    }}r@   c                     || _         y rB   ry   )r<   r    s     r>   _set_publisherzEventDispatcher._set_publisher   s	     r@   )NNTNTNNNr	   N   N)TT)__name__
__module____qualname____doc__r6   r   r&   r'   r?   rE   rI   r7   rO   r
   r]   r   rV   rj   rg   rt   rG   rz   r|   property	publisherrC   r@   r>   r   r      s    8 !'
C J K?C>B=>FJ"H 5O4 <A"iD(  %	e#?J# =
!8Ir@   r   )r   r9   r   r)   collectionsr   r   kombur   
celery.appr   celery.utils.nodenamesr   celery.utils.timer   r\   r
   r   r   __all__r   rC   r@   r>   <module>r      s7    $ 	   *  % 0 ' 2 2
R9 R9r@   