
    h(                         d Z ddlZddlmZ ddlmZmZmZmZm	Z	m
Z
 ddlmZ ddlmZ ddlmZ ddlmZ  G d	 d
e      Zy)zb
Base class across routing strategies to abstract commmon functions like batch incrementing redis
    N)ABC)DictListOptionalSetTupleUnion)verbose_router_logger)	DualCache)RedisPipelineIncrementOperation)DEFAULT_REDIS_SYNC_INTERVALc            	       
   e Zd Zdededeeeef      fdZ	deeeef      fdZ
d Zdeeeef      ded	ee   fd
Zdedeeef   defdZdeeeef      fdZd ZdefdZd	ee   fdZd	ee   fdZd	ee   fdZd Zd Zy)BaseRoutingStrategy
dual_cacheshould_batch_redis_writesdefault_sync_intervalc                 r    || _         g | _        d | _        |r| j                  |       t	               | _        y N)r   redis_increment_operation_queue
_sync_tasksetup_sync_tasksetin_memory_keys_to_update)selfr   r   r   s       i/var/www/Befach/backend/env/lib/python3.12/site-packages/litellm/router_strategy/base_routing_strategy.py__init__zBaseRoutingStrategy.__init__   s;     %VX,8<$  !67 E 	%    c                     	 t        j                         }|j                  | j                  |            | _        y# t        $ r, t        j                         }t        j                  |       Y [w xY w)z;Setup the sync task in a way that's compatible with FastAPI)r   N)asyncioget_running_loopRuntimeErrornew_event_loopset_event_loopcreate_task(periodic_sync_in_memory_spend_with_redisr   )r   r   loops      r   r   z#BaseRoutingStrategy.setup_sync_task    sl    	)++-D
 **99&; : 
	  	)))+D""4(	)s   = 2A21A2c                    K   | j                   0| j                   j                          	 | j                    d{    yy7 # t        j                  $ r Y yw xY ww)z.Cleanup method to be called when shutting downN)r   cancelr   CancelledErrorr   s    r   cleanupzBaseRoutingStrategy.cleanup.   sS     ??&OO""$oo%% ' &)) s7   'AA A A A A AAAAincrement_listttlreturnc                    K   g }|D ]2  \  }}| j                  |||       d{   }|j                  |       4 |S 7 w)zB
        Increment a list of values in the current window
        keyvaluer-   N)"_increment_value_in_current_windowappend)r   r,   r-   resultsr1   r2   results          r   '_increment_value_list_in_current_windowz;BaseRoutingStrategy._increment_value_list_in_current_window7   sW      (JCBBu# C  F NN6"	 )
 	s   "A >A r1   r2   c                    K   | j                   j                  j                  |||       d{   }t        |||      }| j                  j                  |       | j                  |       |S 7 Aw)a  
        Increment spend within existing budget window

        Runs once the budget start time exists in Redis Cache (on the 2nd and subsequent requests to the same provider)

        - Increments the spend in memory cache (so spend instantly updated in memory)
        - Queues the increment operation to Redis Pipeline (using batched pipeline to optimize performance. Using Redis for multi instance environment of LiteLLM)
        r0   N)r1   increment_valuer-   r1   )r   in_memory_cacheasync_incrementr   r   r4   add_to_in_memory_keys_to_update)r   r1   r2   r-   r6   increment_ops         r   r3   z6BaseRoutingStrategy._increment_value_in_current_windowE   s|      66FF G 
 

 7!
 	,,33LA,,,5
s   ,A2A0AA2c                 @  K   |xs t         }	 	 | j                          d{    t        j                  |       d{    77 "7 # t        $ rI}t        j                  dt        |              t        j                  |       d{  7   Y d}~Rd}~ww xY ww)z
        Handler that triggers sync_in_memory_spend_with_redis every DEFAULT_REDIS_SYNC_INTERVAL seconds

        Required for multi-instance environment usage of provider budgets
        NzError in periodic sync task: )r    _sync_in_memory_spend_with_redisr   sleep	Exceptionr
   errorstr)r   r   es      r   r%   z<BaseRoutingStrategy.periodic_sync_in_memory_spend_with_redis_   s      !6 T9T	;;===mm)   =  %++.KCPQF8,TUmm)  s[   BA	 AA	 A A	 BA	 A	 		B9BBBBBBc                   K   	 | j                   j                  syt        | j                        dkD  ri }g }t	        | j                        D ]<  \  }}|d   |v r||d      dxx   |d   z  cc<   n|||d   <   |j                  |       > t        |j                               }| j                   j                  j                  |       d{   }t	        | j                        D cg c]  \  }}||vr| c}}| _        |$t        ||      D ci c]  \  }}|d   | }}}|S i }|S y7 `c c}}w c c}}w # t        $ r2}	t        j                  dt        |	              g | _        Y d}	~	yd}	~	ww xY ww)av  
        How this works:
        - async_log_success_event collects all provider spend increments in `redis_increment_operation_queue`
        - This function compresses multiple increments for the same key into a single operation
        - Then pushes all increments to Redis in a batched pipeline to optimize performance

        Only runs if Redis is initialized
        Nr   r1   r9   )r,   *Error syncing in-memory cache with Redis: )r   redis_cachelenr   	enumerater4   listvaluesasync_increment_pipelineziprB   r
   rC   rD   )
r   compressed_opsops_to_removeidxopcompressed_queueincrement_resultr1   return_resultrE   s
             r   #_push_in_memory_increments_to_redisz7BaseRoutingStrategy._push_in_memory_increments_to_redist   s    /	6??..47781<MO "()M)MNGC%yN2&r%y12CD-I D 57r%y1!((-  O $((=(=(?#@  //55NN'7 O   ! $-T-Q-Q#R8R-/ 84 $/ (++;=M'N%#C E
B%M % %$ %'M$$K =&
8%  	6!''<SVHE 46D00		6su   E:D< E:B2D< D.D< *D0:D< D6$D< (E:)D< ,E:.D< 0D< <	E7(E2-E:2E77E:c                 :    | j                   j                  |       y r   )r   add)r   r1   s     r   r=   z3BaseRoutingStrategy.add_to_in_memory_keys_to_update   s    %%))#.r   c                      y)z-
        Get the key pattern to sync
        N r*   s    r   get_key_pattern_to_syncz+BaseRoutingStrategy.get_key_pattern_to_sync   s     r   c                     | j                   S r   )r   r*   s    r   get_in_memory_keys_to_updatez0BaseRoutingStrategy.get_in_memory_keys_to_update   s    ,,,r   c                 <    | j                   }t               | _         |S )z-Atomic get and reset in-memory keys to update)r   r   )r   keyss     r   &get_and_reset_in_memory_keys_to_updatez:BaseRoutingStrategy.get_and_reset_in_memory_keys_to_update   s    ,,(+%r   c                 "    t               | _        y r   )r   r   r*   s    r   reset_in_memory_keys_to_updatez2BaseRoutingStrategy.reset_in_memory_keys_to_update   s    (+%r   c                 T  K   	 | j                   j                  y| j                         }t        |      }i }| j                   j                  j                  |       d{   }t        ||      D ]  \  }}t        |xs d      ||<    | j                          d{   }|y|D ]  }t        |j                  |d      xs d      }	t        |j                  |d      xs d      }
t        | j                   j                  j                  |       d{   xs d      }||
z
  }||	k  r|	|z   }n| j                   j                  j                  ||       d{     y7 7 7 T7 # t        $ r+}t        j                  dt        |              Y d}~yd}~ww xY ww)a  
        Ensures in-memory cache is updated with latest Redis values for all provider spends.

        Why Do we need this?
        - Optimization to hit sub 100ms latency. Performance was impacted when redis was used for read/write per request
        - Use provider budgets in multi-instance environment, we use Redis to sync spend across all instances

        What this does:
        1. Push all provider spend increments to Redis
        2. Fetch all current provider spend from Redis to update in-memory cache
        N)r_   r   r:   )r1   r2   rG   )r   rH   r]   rK   r;   async_batch_get_cacherN   floatrV   getasync_get_cacheasync_set_cacherB   r
   	exceptionrD   )r   
cache_keyscache_keys_listin_memory_before_dictin_memory_beforekvredis_valuesr1   	redis_valbeforeafterdeltamergedrE   s                  r   r@   z4BaseRoutingStrategy._sync_in_memory_spend_with_redis   s    5	**2 113  #:.O %'!oo55KK( L   
 O-=>1+0a=%a( ? "&!I!I!KKL# '!,"2"23":"?a@	488a@EAF//99IIcIRRWVW I%&.F oo55EE6 F   % ' L S  	!++<SVHE 	s   F(E1 F(AE1 !E("=E1 E+ E1 &F('A1E1 E-
AE1  E/!E1 'F((E1 +E1 -E1 /E1 1	F%:!F F( F%%F(N)__name__
__module____qualname__r   boolr   r	   intre   r   r   r+   r   r   rD   r7   r3   r%   rV   r=   r[   r   r]   r`   rb   r@   rZ   r   r   r   r      s    $(  (c5j(9:	 
XeCJ>O5P 
"5c?3:=	e$S%Z07:4%-eCJ.?%@*86t/3 /# -c#h -C .Br   r   )__doc__r   abcr   typingr   r   r   r   r   r	   litellm._loggingr
   litellm.caching.cachingr   litellm.caching.redis_cacher   litellm.constantsr   r   rZ   r   r   <module>r      s2      : : 2 - G 9v# vr   