
    h
                        d Z ddlmZ ddlmZmZ ddlmZ ddlm	Z	 ddl
mZ ddlmZ d	Z e	e      Zej"                  Z G d
 dej$                        Zy)zWorker Task Consumer Bootstep.    )annotations)QoSignore_errors)	bootsteps)
get_logger)detect_quorum_queues   )Mingle)Tasksc                  H     e Zd ZdZefZ fdZd Zd Zd Z	d Z
ddZ xZS )	r   z,Bootstep starting the task message consumer.c                B    d x|_         |_        t        |   |fi | y )N)task_consumerqossuper__init__)selfckwargs	__class__s      X/var/www/Befach/backend/env/lib/python3.12/site-packages/celery/worker/consumer/tasks.pyr   zTasks.__init__   s#    "&&!%%f%    c                n   j                          | j                        j                  j                  j	                  dj
                         j                  j                  j                  j                  j                        _
        fd}t        |j
                        _        y)zStart task consumer.r   )on_decode_errorc                >    j                   j                  |       S )N)prefetch_countapply_global)r   r   )r   r   
qos_globals    r   set_prefetch_countz'Tasks.start.<locals>.set_prefetch_count,   s%    ??&&-' '  r   N)update_strategiesr   
connectiondefault_channel	basic_qosinitial_prefetch_countappamqpTaskConsumerr   r   r   r   )r   r   r   r   s    ` @r   startzTasks.start   s    	__Q'
 	
$$..q''	
 %%**11LL!*;*; 2 
	
 &(@(@Ar   c                t    |j                   r,t        d       t        ||j                   j                         yy)zStop task consumer.zCanceling task consumer...N)r   debugr   cancelr   r   s     r   stopz
Tasks.stop3   s+    ??./!Q__334 r   c                    |j                   rD| j                  |       t        d       t        ||j                   j                         d|_         yy)zShutdown task consumer.zClosing consumer channel...N)r   r,   r)   r   closer+   s     r   shutdownzTasks.shutdown9   s=    ??IIaL/0!Q__223"AO	 r   c                P    d|j                   r|j                   j                  iS diS )zReturn task consumer info.r   zN/A)r   valuer+   s     r   infoz
Tasks.infoA   s#     !%%++BBEBBr   c                   |j                   j                   }|j                  j                  j                  rPt        |j                  |j                   j                  j                        \  }}|rd}t        j                  d       |S )zDetermine if global QoS should be applied.

        Additional information:
            https://www.rabbitmq.com/docs/consumer-prefetch
            https://www.rabbitmq.com/docs/quorum-queues#global-qos
        Fz5Global QoS is disabled. Prefetch count in now static.)
r    qos_semantics_matches_specr$   confworker_detect_quorum_queuesr   	transportdriver_typeloggerr2   )r   r   r   using_quorum_queuesqnames        r   r   zTasks.qos_globalE   sh     @@@
55::11)=aeeQ\\E[E[EgEg)h&""
STr   )returnbool)__name__
__module____qualname____doc__r
   requiresr   r'   r,   r/   r2   r   __classcell__)r   s   @r   r   r      s-    6yH&B,5#Cr   r   N)rA   
__future__r   kombu.commonr   r   celeryr   celery.utils.logr   celery.utils.quorum_queuesr   mingler
   __all__r>   r9   r)   StartStopStepr    r   r   <module>rM      sH    $ " +  ' ; 
 
H	DI## Dr   