
    h21                         d dl Z d dlZd dlZd dlZd dlm Z ddlmZ ddlm	Z	m
Z
mZmZmZ  ej                  e      Zd Z G d d      Z G d	 d
      Z G d d      Zy)    N)asyncio   )registry)_close_redis_consistent_hash_wrap_closecreate_pooldecode_hostsc                 b   K   | j                         } t        ||      |i | d {   S 7 wN)
_get_layergetattr)objnameargskwargslayers        Q/var/www/Befach/backend/env/lib/python3.12/site-packages/channels_redis/pubsub.py_async_proxyr      s3      NNE%%t6v6666s   &/-/c                   6    e Zd Zddd	 d	dZd Zd Zd Zd Zy)
RedisPubSubChannelLayerNmsgpack)symmetric_encryption_keysserializer_formatc                f    || _         || _        i | _        t        j                  ||      | _        y )N)r   )_args_kwargs_layersr   get_serializer_serializer)selfr   r   r   r   s        r   __init__z RedisPubSubChannelLayer.__init__   s2     
#22&?
    c                 t    |dv rt        j                  t        | |      S t        | j	                         |      S )N)new_channelsendreceive	group_addgroup_discard
group_sendflush)	functoolspartialr   r   r   )r!   r   s     r   __getattr__z#RedisPubSubChannelLayer.__getattr__,   s:     
 
 $$\4>>4??,d33r#   c                 8    | j                   j                  |      S )z6
        Serializes message to a byte string.
        )r    	serializer!   messages     r   r0   z!RedisPubSubChannelLayer.serialize:   s     ))'22r#   c                 8    | j                   j                  |      S )z2
        Deserializes from a byte string.
        )r    deserializer1   s     r   r4   z#RedisPubSubChannelLayer.deserialize@   s     ++G44r#   c                     t        j                         }	 | j                  |   }|S # t        $ rB t	        | j
                  i | j                  d| i}|| j                  |<   t        | |       Y |S w xY w)Nchannel_layer)r   get_running_loopr   KeyErrorRedisPubSubLoopLayerr   r   r   )r!   loopr   s      r   r   z"RedisPubSubChannelLayer._get_layerF   s    '')		$LL&E   	$(,, #E
 "'DLLd#	$s   ' AA21A2)returnN)__name__
__module____qualname__r"   r.   r0   r4   r    r#   r   r   r      s-     #'#	
 

 435r#   r   c                   h    e Zd ZdZ	 	 	 	 	 ddZd Zd Zd ZddgZd	 Z	dd
Z
d Zd Zd Zd Zd Zy)r9   z@
    Channel Layer that uses Redis's pub/sub functionality.
    Nc                     || _         || _        || _        || _        i | _        i | _        t        |      D cg c]  }t        ||        c}| _        y c c}w r   )	prefixon_disconnecton_reconnectr6   channelsgroupsr
   RedisSingleShardConnection_shards)r!   hostsrB   rC   rD   r6   r   hosts           r   r"   zRedisPubSubLoopLayer.__init__\   sb     *(*   @LE?R
7;&tT2
 
s   Ac                 Z    | j                   t        |t        | j                                  S )zV
        Return the shard that is used exclusively for this channel or group.
        )rH   r   len)r!   channel_or_group_names     r   
_get_shardzRedisPubSubLoopLayer._get_shardx   s%     ||,-BCDUVWWr#   c                 $    | j                    d| S )a\  
        Return the channel name used by a group.
        Includes '__group__' in the returned
        string so that these names are distinguished
        from those returned by `new_channel()`.
        Technically collisions are possible, but it
        takes what I believe is intentional abuse in
        order to have colliding names.
        	__group__)rB   )r!   groups     r   _get_group_channel_namez,RedisPubSubLoopLayer._get_group_channel_name~   s     ++iw//r#   c                    K   t        j                         | j                  |<   | j                  |      }|j	                  |       d {    y 7 wr   )r   QueuerE   rN   	subscribe)r!   channelshards      r   _subscribe_to_channelz*RedisPubSubLoopLayer._subscribe_to_channel   s:     !(g(oog&&&s   AA	A
ArF   r+   c                    K   | j                  |      }|j                  || j                  j                  |             d{    y7 w)zF
        Send a message onto a (general or specific) channel.
        N)rN   publishr6   r0   )r!   rV   r2   rW   s       r   r&   zRedisPubSubLoopLayer.send   s;      (mmGT%7%7%A%A'%JKKKs   A A
AA
c                    K   | j                    | t        j                         j                   }| j	                  |       d{    |S 7 w)zy
        Returns a new channel name that can be used by a consumer in our
        process as a specific channel.
        N)rB   uuiduuid4hexrX   )r!   rB   rV   s      r   r%   z RedisPubSubLoopLayer.new_channel   sI     
 [[M&$**,*:*:);<((111 	2s   AAAAc                   K   || j                   vr| j                  |       d{    | j                   |   }	 |j                          d{   }| j                  j                  |      S 7 G7 !# t        j                  t        j
                  t        f$ rn || j                   v r^| j                   |= 	 | j                  |      }|j                  |       d{  7    # t        $ r t        j                  d       Y  w xY w w xY ww)z
        Receive the first message that arrives on the channel.
        If more than one coroutine waits on the same channel, a random one
        of the waiting coroutines will get the result.
        Nz/Unexpected exception while cleaning-up channel:)rE   rX   getr   CancelledErrorTimeoutErrorGeneratorExitrN   unsubscribeBaseExceptionlogger	exceptionr6   r4   )r!   rV   qr2   rW   s        r   r'   zRedisPubSubLoopLayer.receive   s      $--',,W555MM'"	EEGmG( !!--g661 6 $&&(<(<mL 	 $--'MM'*X OOG4E++G444  % X$$%VWX #	si   #D
A-D
A1 A/A1 D
/A1 1AD5%C!CC! D!D?DDDD
c                 d  K   || j                   vrt        dt        |       d      | j                  |      }|| j                  vrt               | j                  |<   | j                  |   }||vr|j                  |       | j                  |      }|j                  |       d{    y7 w)z3
        Adds the channel name to a group.
        zYou can only call group_add() on channels that exist in-process.
Consumers are encouraged to use the common pattern:
   self.channel_layer.group_add(z, self.channel_name)N)	rE   RuntimeErrorreprrR   rF   setaddrN   rU   r!   rQ   rV   group_channelgroup_channelsrW   s         r   r(   zRedisPubSubLoopLayer.group_add   s      $--'337;-?SU 
 44U;+),DKK&]3.(w'.oom,,,s   B&B0(B.)B0c                 2  K   | j                  |      }| j                  j                  |t                     }||vry|j	                  |       t        |      dk(  r8| j                  |= | j                  |      }|j                  |       d{    yy7 w)zy
        Removes the channel from a group if it is in the group;
        does nothing otherwise (does not error)
        Nr   )rR   rF   r`   rl   removerL   rN   rd   rn   s         r   r)   z"RedisPubSubLoopLayer.group_discard   s     
 44U;>.(g&~!#M*OOM2E##M222 $ 3s   BBBBc                    K   | j                  |      }| j                  |      }|j                  || j                  j	                  |             d{    y7 w)zC
        Send the message to all subscribers of the group.
        N)rR   rN   rZ   r6   r0   )r!   rQ   r2   ro   rW   s        r   r*   zRedisPubSubLoopLayer.group_send   sK      44U;.mmM4+=+=+G+G+PQQQs   AAAAc                 ~   K   i | _         i | _        | j                  D ]  }|j                          d{     y7 w)z
        Flush the layer, making it like new. It can continue to be used as if it
        was just created. This also closes connections, serving as a clean-up
        method; connections will be re-opened if you continue using this layer.
        N)rE   rF   rH   r+   )r!   rW   s     r   r+   zRedisPubSubLoopLayer.flush   s6      \\E++- "s   1=;=)NasgiNNN)z	specific.)r<   r=   r>   __doc__r"   rN   rR   rX   
extensionsr&   r%   r'   r(   r)   r*   r+   r?   r#   r   r9   r9   W   s_     
8X
0'
 G$JL7J-&3 R	 r#   r9   c                   B    e Zd Zd Zd Zd Zd Zd Zd Zd Z	d Z
d	 Zy
)rG   c                     || _         || _        t               | _        t	        j
                         | _        d | _        d | _        d | _	        y r   )
rJ   r6   rl   _subscribed_tor   Lock_lock_redis_pubsub_receive_task)r!   rJ   r6   s      r   r"   z#RedisSingleShardConnection.__init__  s>    	*!e\\^
!r#   c                    K   | j                   4 d {    | j                          | j                  j                  ||       d {    d d d       d {    y 7 I7 7 	# 1 d {  7  sw Y   y xY wwr   )r|   _ensure_redisr}   rZ   )r!   rV   r2   s      r   rZ   z"RedisSingleShardConnection.publish  sf     :: 	8 	8 ++%%gw777	8 	8 	87	8 	8 	8 	8sV   A8AA80A#AA#A8A!A8A#!A8#A5)A,*A51A8c                 d  K   | j                   4 d {    || j                  vr^| j                          | j                          | j                  j                  |       d {    | j                  j                  |       d d d       d {    y 7 7 27 	# 1 d {  7  sw Y   y xY wwr   )r|   rz   r   _ensure_receiverr~   rU   rm   r!   rV   s     r   rU   z$RedisSingleShardConnection.subscribe  s     :: 	1 	1d111""$%%'ll,,W555##''0	1 	1 	1 6		1 	1 	1 	1W   B0BB0AB$B%BB0BB0BB0B-!B$"B-)B0c                 d  K   | j                   4 d {    || j                  v r^| j                          | j                          | j                  j                  |       d {    | j                  j                  |       d d d       d {    y 7 7 27 	# 1 d {  7  sw Y   y xY wwr   )r|   rz   r   r   r~   rd   rr   r   s     r   rd   z&RedisSingleShardConnection.unsubscribe  s     :: 	4 	4$---""$%%'ll..w777##**73	4 	4 	4 8		4 	4 	4 	4r   c                   K   | j                   4 d {    | j                  6| j                  j                          	 | j                   d {    d | _        | j
                  +t        | j
                         d {    d | _        d | _        t               | _	        d d d       d {    y 7 7 d# t        j                  $ r Y ww xY w7 O7 $# 1 d {  7  sw Y   y xY wwr   )
r|   r   cancelr   ra   r}   r   r~   rl   rz   r!   s    r   r+   z RedisSingleShardConnection.flush$  s     :: 	( 	(!!-""))+,,,, &*"{{& #4;;///"#"%%D	( 	( 	( ---  0	( 	( 	( 	(s   C'B1C''CB5B3B5+C>C?!C C'+C,C'3B55CC
CCC'C$CC$ C'c                   K   	 	 | j                   rM| j                   j                  r7| j                   j                  dd       d {   }| j                  |       nt	        j
                  d       d {    x7 47 # t        j                  t        j                  t        f$ r  t        $ r6 t        j                  d       t	        j
                  d       d {  7   Y kw xY ww)NTg?)ignore_subscribe_messagestimeoutz$Unexpected exception in receive taskr   )r~   
subscribedget_message_receive_messager   sleepra   rb   rc   re   rf   rg   r1   s     r   _do_receivingz(RedisSingleShardConnection._do_receiving6  s     '<<DLL$;$;$(LL$<$<26 %= % G ))'2!--,,, 
 -&&$$ 
   '  !GHmmA&&&'sZ   C*AB  A<.B  6A>7B  ;C*<B  >B   AC'C!C'$C*&C''C*c                    ||d   }|d   }t        |t              r|j                         }|| j                  j                  v r)| j                  j                  |   j                  |       y || j                  j                  v r`| j                  j                  |   D ]C  }|| j                  j                  v s| j                  j                  |   j                  |       E y y y )NrV   data)
isinstancebytesdecoder6   rE   
put_nowaitrF   )r!   r2   r   r   channel_names        r   r   z+RedisSingleShardConnection._receive_messageJ  s    9%D6?D$&{{}t))222""++D1<<TB++222$($6$6$=$=d$CL#t'9'9'B'BB**33LALLTR %D 3 r#   c                     | j                   Pt        | j                        }t        j                  |      | _         | j                   j                         | _        y y )N)connection_pool)r}   r	   rJ   aioredisRedispubsubr~   )r!   pools     r   r   z(RedisSingleShardConnection._ensure_redisW  sC    ;;tyy)D"..>DK;;--/DL r#   c                 n    | j                   )t        j                  | j                               | _         y y r   )r   r   ensure_futurer   r   s    r   r   z+RedisSingleShardConnection._ensure_receiver]  s/    %!(!6!6t7I7I7K!LD &r#   N)r<   r=   r>   r"   rZ   rU   rd   r+   r   r   r   r   r?   r#   r   rG   rG     s1    "8
14($'(S0Mr#   rG   )r   r,   loggingr\   redisr   serializersr   utilsr   r   r   r	   r
   	getLoggerr<   rf   r   r   r9   rG   r?   r#   r   <module>r      s`        % !  
		8	$79 9xk  k \ZM ZMr#   