
    h!                         d Z ddlmZmZmZmZmZmZ ddlm	Z	m
Z
 ddlmZmZ ddlmZ ddlmZmZ ddlmZ ddlmZ dd	lmZmZ d
Z ee      ZdZdZddhZ G d dej@                        Z!y)zNative delayed delivery functionality for Celery workers.

This module provides the DelayedDelivery bootstep which handles setup and configuration
of native delayed delivery functionality when using quorum queues.
    )IteratorListOptionalSetUnion
ValuesView)
ConnectionQueue).bind_queue_to_native_delayed_delivery_exchange4declare_native_delayed_delivery_exchanges_and_queues)retry_over_time)Celery	bootsteps)
get_logger)detect_quorum_queues)ConsumerTasks)DelayedDeliveryg      ?   classicquorumc                       e Zd ZdZefZdedefdZdeddfdZ	dede
ddfdZd	ed
eddfdZdedee   deddfdZd	eddfdZdee
ee
   f   dee
   fdZdee
   ddfdZy)r   a  Bootstep that sets up native delayed delivery functionality.

    This component handles the setup and configuration of native delayed delivery
    for Celery workers. It is automatically included when quorum queues are
    detected in the application configuration.

    Responsibilities:
        - Declaring native delayed delivery exchanges and queues
        - Binding all application queues to the delayed delivery exchanges
        - Handling connection failures gracefully with retries
        - Validating configuration settings
    creturnc                     t        |j                  |j                  j                         j                  j                        d   S )zDetermine if this bootstep should be included.

        Args:
            c: The Celery consumer instance

        Returns:
            bool: True if quorum queues are detected, False otherwise
        r   )r   appconnection_for_write	transportdriver_type)selfr   s     c/var/www/Befach/backend/env/lib/python3.12/site-packages/celery/worker/consumer/delayed_delivery.py
include_ifzDelayedDelivery.include_if0   s4     $AEE155+E+E+G+Q+Q+]+]^_`aa    Nc           	      ^   |j                   }	 | j                  |       | j                  |j                  j                        }g }|D ]<  }	 t        | j                  ||ft        t        f| j                  t        t               > t'        |      t'        |      k(  rt        j	                  d       yy# t        $ r%}t        j	                  dt        |              d}~ww xY w# t         $ r=}t        j#                  d|t        |             |j%                  ||f       Y d}~d}~ww xY w)a_  Initialize delayed delivery for all broker URLs.

        Attempts to set up delayed delivery for each broker URL in the configuration.
        Failures are logged but don't prevent attempting remaining URLs.

        Args:
            c: The Celery consumer instance

        Raises:
            ValueError: If configuration validation fails
        z#Configuration validation failed: %sN)argscatcherrbackinterval_startmax_retriesz+Failed to setup delayed delivery for %r: %szdFailed to setup delayed delivery for all broker URLs. Native delayed delivery will not be available.)r   _validate_configuration
ValueErrorloggercriticalstr_validate_broker_urlsconf
broker_urlr   _setup_delayed_deliveryConnectionRefusedErrorOSError	_on_retryRETRY_INTERVALMAX_RETRIES	Exceptionwarningappendlen)r    r   r   ebroker_urlssetup_errorsr1   s          r!   startzDelayedDelivery.start;   s    ee	((-
 001D1DE%J500Z17; NN#1 + &" |K 00OOA 11  	OOA3q6J	"  5AA ##ZO445s/   B5 9C&5	C#> CC#&	D,/3D''D,r1   c                    |j                   j                  |      }|j                   j                  j                  }t        j                  d||       	 t        ||       	 | j                  |j                   |       y# t        $ r&}t        j                  d|t        |              d}~ww xY w# t        $ r&}t        j                  d|t        |              d}~ww xY w)az  Set up delayed delivery for a specific broker URL.

        Args:
            c: The Celery consumer instance
            broker_url: The broker URL to configure

        Raises:
            ConnectionRefusedError: If connection to the broker fails
            OSError: If there are network-related issues
            Exception: For other unexpected errors during setup
        )urlz<Setting up delayed delivery for broker %r with queue type %rz1Failed to declare exchanges and queues for %r: %sNz Failed to bind queues for %r: %s)r   r   r0   )broker_native_delayed_delivery_queue_typer,   debugr   r8   r9   r.   _bind_queues)r    r   r1   
connection
queue_typer<   s         r!   r2   z'DelayedDelivery._setup_delayed_deliveryi   s     "#!;!;
!;!K
UUZZII
J
	


	@	aeeZ0  	NNCCF 	  	NN2CF 	s0   A? "B1 ?	B.!B))B.1	C :!CC r   rE   c           	      \   |j                   j                  j                         }|st        j	                  d       y|D ]/  }	 t        j                  d|j                         t        ||       1 y# t        $ r0}t        j                  d|j                  t        |              d}~ww xY w)zBind all application queues to delayed delivery exchanges.

        Args:
            app: The Celery application instance
            connection: The broker connection to use

        Raises:
            Exception: If queue binding fails
        z,No queues found to bind for delayed deliveryNz-Binding queue %r to delayed delivery exchangezFailed to bind queue %r: %s)amqpqueuesvaluesr,   r9   rC   namer   r8   errorr.   )r    r   rE   rI   queuer<   s         r!   rD   zDelayedDelivery._bind_queues   s     %(HHOO$:$:$<NNIJELejjY>z5Q   1JJA s   ,A22	B+;+B&&B+excinterval_rangeintervals_countc                 T    t         j                  d|dz   t        t        |             y)a  Callback for retry attempts.

        Args:
            exc: The exception that triggered the retry
            interval_range: An iterator which returns the time in seconds to sleep next
            intervals_count: Number of retry attempts so far
        z?Retrying delayed delivery setup (attempt %d/%d) after error: %s   N)r,   r9   r7   r.   )r    rN   rO   rP   s       r!   r5   zDelayedDelivery._on_retry   s#     	Mac#h	
r#   c                     | j                  |j                  j                         | j                  |j                  j                         y)zValidate all required configuration settings.

        Args:
            app: The Celery application instance

        Raises:
            ValueError: If any configuration is invalid
        N)r/   r0   r1   _validate_queue_typerB   )r    r   s     r!   r*   z'DelayedDelivery._validate_configuration   s6     	""388#6#67 	!!#(("T"TUr#   r=   c                 "   |st        d      t        |t              r|j                  d      }n>t        |t              r t        d |D              st        d      |}nt        d|      |D ch c]  }| }}|st        d      |S c c}w )aP  Validate and split broker URLs.

        Args:
            broker_urls: Broker URLs, either as a semicolon-separated string
                  or as a list of strings

        Returns:
            Set of valid broker URLs

        Raises:
            ValueError: If no valid broker URLs are found or if invalid URLs are provided
        z!broker_url configuration is empty;c              3   <   K   | ]  }t        |t                y w)N)
isinstancer.   ).0rA   s     r!   	<genexpr>z8DelayedDelivery._validate_broker_urls.<locals>.<genexpr>   s     Cz#s+Cs   zAll broker URLs must be stringsz)broker_url must be a string or list, got z+No valid broker URLs found in configuration)r+   rX   r.   splitlistall)r    r=   brokersrA   
valid_urlss        r!   r/   z%DelayedDelivery._validate_broker_urls   s     @AAk3'!'',GT*C{CC !BCC!GHXYY%,-cc-
-JKK .s   2	BrF   c                     |st        d      |t        vr/t        t              }t        d|ddj                  |             y)zValidate the queue type configuration.

        Args:
            queue_type: The configured queue type

        Raises:
            ValueError: If queue type is invalid
        z;broker_native_delayed_delivery_queue_type is not configuredzInvalid queue type z. Must be one of: z, N)r+   VALID_QUEUE_TYPESsortedjoin)r    rF   sorted_typess      r!   rT   z$DelayedDelivery._validate_queue_type   sU     Z[[..!"34L%j^3EdiiP\F]E^_  /r#   )__name__
__module____qualname____doc__r   requiresr   boolr"   r?   r.   r2   r   r	   rD   r8   r   floatintr5   r*   r   r   r   r/   r   rT    r#   r!   r   r       s     xH	bH 	b 	b,x ,D ,\& &s &t &P J 4 4
Y 
 
Z] 
bf 
V6 Vd VsDI~1F 3s8 @x}  r#   r   N)"rh   typingr   r   r   r   r   r   kombur	   r
   'kombu.transport.native_delayed_deliveryr   r   kombu.utils.functionalr   celeryr   r   celery.utils.logr   celery.utils.quorum_queuesr   celery.worker.consumerr   r   __all__re   r,   r6   r7   ra   StartStopStepr   rm   r#   r!   <module>rx      sg   
 D C #k 2 $ ' ; 2
	H	  ) Wi-- Wr#   