
    hp                         d Z ddlZddlZddlZddlZddlmZmZmZm	Z	m
Z
mZ ddlmZ erddlmZ neZddlZddlmZ ddlmZ ddlmZmZ  G d	 d
e      Zy)z
BETA

This is the PubSub logger for GCS PubSub, this sends LiteLLM SpendLogs Payloads to GCS PubSub.

Users can use this instead of sending their SpendLogs to their Postgres database.
    N)TYPE_CHECKINGAnyDictListOptionalUnion)StandardLoggingPayload)SpendLogsPayload)verbose_logger)CustomBatchLogger)get_async_httpx_clienthttpxSpecialProviderc                        e Zd Z	 	 	 ddee   dee   dee   f fdZdeeef   fdZd Zd Z	d	e
eef   deeeef      fd
Z xZS )GcsPubSubLogger
project_idtopic_idcredentials_pathc                    ddl m}  |        t        t        j                        | _        |xs t        j                  d      | _        |xs t        j                  d      | _	        |xs t        j                  d      | _
        | j                  r| j                  st        d      t        j                         | _        t        | @  d
i |d| j                  i t        j"                  | j%                                g | _        y	)a  
        Initialize Google Cloud Pub/Sub publisher

        Args:
            project_id (str): Google Cloud project ID
            topic_id (str): Pub/Sub topic ID
            credentials_path (str, optional): Path to Google Cloud credentials JSON file
        r   _premium_user_check)llm_providerGCS_PUBSUB_PROJECT_IDGCS_PUBSUB_TOPIC_IDGCS_PATH_SERVICE_ACCOUNTz-Both project_id and topic_id must be provided
flush_lockN )litellm.proxy.utilsr   r   r   LoggingCallbackasync_httpx_clientosgetenvr   r   path_service_account_json
ValueErrorasyncioLockr   super__init__create_taskperiodic_flush	log_queue)selfr   r   r   kwargsr   	__class__s         c/var/www/Befach/backend/env/lib/python3.12/site-packages/litellm/integrations/gcs_pubsub/pub_sub.pyr'   zGcsPubSubLogger.__init__    s     	<"8-==#
 %J		2I(J DBII.C$D)9 *
RYY&>
& dmmLMM!,,.>6>doo>D//12PR    returnc                    K   ddl m} |j                  | j                  | j                  d       d{   \  }}|j                  d|| j                  |ddddd	      \  }}d| d	d
}|S 7 8w)z4Construct authorization headers using Vertex AI authr   )vertex_chat_completion	vertex_ai)credentialsr   custom_llm_providerNzpub-sub)	modelauth_headervertex_credentialsvertex_projectvertex_locationgemini_api_keystreamr5   api_basezBearer zapplication/json)AuthorizationzContent-Type)litellmr2   _ensure_access_token_asyncr"   r   _get_token_and_url)r+   r2   _auth_headerr9   r7   _headerss          r.   construct_request_headersz)GcsPubSubLogger.construct_request_headersE   s     2
 )CC66 + D 
 
	
 0BB$#==)  + C 

Q  '{m4.
 -
s   2A/A-9A/c                 @  K   ddl m} ddlm}  |        	 t	        j
                  d|       |j                  dd      }t        j                  du r( |||||      }| j                  j                  |       n| j                  j                  |       t        | j                        | j                  k\  r| j                          d{    yy7 # t        $ r@}	t	        j                  d	t!        |	       d
t#        j$                                 Y d}	~	yd}	~	ww xY ww)a  
        Async Log success events to GCS PubSub Topic

        - Creates a SpendLogsPayload
        - Adds to batch queue
        - Flushes based on CustomBatchLogger settings

        Raises:
            Raises a NON Blocking verbose_logger.exception if an error occurs
        r   )get_logging_payloadr   z6PubSub: Logging - Enters logging function for model %sstandard_logging_objectNT)r,   response_obj
start_timeend_timezPubSub Layer Error - 
)1litellm.proxy.spend_tracking.spend_tracking_utilsrG   r   r   r   debuggetr?   gcs_pub_sub_use_v1r*   appendlen
batch_sizeasync_send_batch	Exception	exceptionstr	traceback
format_exc)
r+   r,   rI   rJ   rK   rG   r   standard_logging_payloadspend_logs_payloades
             r.   async_log_success_eventz'GcsPubSubLogger.async_log_success_eventd   s    	
 	<	  H& (.zz2KT'R$ ))T1%8!!-)%	&" %%&89 %%&>?4>>"doo5++--- 6- 	$$'Axr)2F2F2H1IJ 		sA   DB2C 	C
C DC 	D6DDDDc                   K   	 | j                   s	 | j                   j                          yt        j                  dt	        | j                          d       | j                   D ]  }| j                  |       d{     	 | j                   j                          y7 "# t        $ r@}t        j                  dt        |       dt        j                                 Y d}~ad}~ww xY w# | j                   j                          w xY ww)z8
        Sends the batch of messages to Pub/Sub
        NzPubSub - about to flush z eventszPubSub Error sending batch - rL   )r*   clearr   rN   rR   publish_messagerU   rV   rW   rX   rY   )r+   messager\   s      r.   rT   z GcsPubSubLogger.async_send_batch   s     	#>> NN  "   *3t~~+>*?wG  >>**7333 * NN  " 4 	$$/Axr):N:N:P9QR 	
 NN  "s]   DB  DAB  ;B<B  C, DB   	C))6C$C, $C))C, ,DDra   c                   K   	 | j                          d{   }t        |t              r|}nt        j                  |t              }ddl}|j                  |j                  d            j                  d      }dd|igi}d| j                   d| j                   d	}| j                  j                  |||
       d{   }|j                  dvrAt        j                  dt        |j                                t#        d|j                          t        j$                  d|j                          |j                         S 7 /7 # t"        $ r)}	t        j                  dt        |	             Y d}	~	yd}	~	ww xY ww)z
        Publish message to Google Cloud Pub/Sub using REST API

        Args:
            message: Message to publish (dict or string)

        Returns:
            dict: Published message response
        N)defaultr   zutf-8messagesdataz*https://pubsub.googleapis.com/v1/projects/z/topics/z:publish)urlrD   json)      zPub/Sub publish error: %szFailed to publish message: zPub/Sub response: %s)rE   
isinstancerW   rg   dumpsbase64	b64encodeencodedecoder   r   r   poststatus_coder   errortextrU   rN   )
r+   ra   rD   message_datarl   encoded_messagerequest_bodyrf   responser\   s
             r.   r`   zGcsPubSubLogger.publish_message   s^    "	 ::<<G '3'&#zz'3? $..|/B/B7/KLSSO
 '&/)B(CDL>t>OxX\XeXeWffnoC!4499| :  H ##:5$$%@#hmmBTU"=hmm_ MNN  !7G==?"; =(  	  !<c!fE	sR   F E EB*E E	BE F E 	E 	E=E83F 8E==F )NNN)__name__
__module____qualname__r   rW   r'   r   rE   r]   rT   r   r
   r	   r   r`   __classcell__)r-   s   @r.   r   r      s     %)"&*.	#SSM#S 3-#S #3-	#SJc3h >,\#,.-/EEF.	$sCx.	!.r/   r   )__doc__r$   rg   r    rX   typingr   r   r   r   r   r   litellm.types.utilsr	   litellm.proxy._typesr
   r?   litellm._loggingr   (litellm.integrations.custom_batch_loggerr   &litellm.llms.custom_httpx.http_handlerr   r   r   r   r/   r.   <module>r      sJ      	  B B 65  + Fw' wr/   