o
    ȕhN                     @   s   d dl Z d dlZd dlZd dlZd dlZd dl mZ d dlmZ d dlm	Z	 d dl
mZ d dlmZ d dlmZmZ d dlmZ d dlmZ eeZdad	d
 ZG dd dZG dd deZG dd dejZdS )    N)Counter)partial)EventReceiver)State)Gauge	Histogram)PeriodicCallback)optionsc                   C   s   t d u rt a t S N)PROMETHEUS_METRICSPrometheusMetrics r   r   J/var/www/Befach/backend/venv/lib/python3.10/site-packages/flower/events.pyget_prometheus_metrics   s   r   c                   @   s   e Zd Zdd ZdS )r   c                 C   sr   t ddg d| _tddddgtjd| _td	d
ddg| _tddddg| _tdddg| _	tdddg| _
d S )Nflower_events_totalzNumber of events)workertypetaskflower_task_runtime_secondszTask runtimer   r   )buckets!flower_task_prefetch_time_secondszDThe time the task spent waiting at the celery worker to be executed.flower_worker_prefetched_tasksz4Number of tasks of given type prefetched at a workerflower_worker_onlinezWorker online status1flower_worker_number_of_currently_executing_tasksz/Number of tasks currently executing at a worker)PrometheusCountereventsr   r	   task_runtime_metric_bucketsruntimer   prefetch_timenumber_of_prefetched_tasksworker_online*worker_number_of_currently_executing_tasksselfr   r   r   __init__   s.   
zPrometheusMetrics.__init__N)__name__
__module____qualname__r$   r   r   r   r   r      s    r   c                       s(   e Zd Z fddZ fddZ  ZS )EventsStatec                    s*   t  j|i | tt| _t | _d S r
   )superr$   collectionsdefaultdictr   counterr   metrics)r#   argskwargs	__class__r   r   r$   <   s   zEventsState.__init__c                    s  t  | |d }|d }| j| |  d7  < |dr|d }| j|}|dd}|s:|| jv r:|jp9d}| jj	|||
  |dd	}|rX| jj	||| |j}|j}	|d
krq|jsq|	rq| jj	||
  |dkr|js|r|	r| jj	||||	  | jj	||  |dv r|js|r|	r| jj	||d	 |dkr| jj	|d |dkr| jj	|d |d}
|
d ur| jj	||
 |dkr| jj	|d	 d S d S )Nhostnamer      ztask-uuidname r   r   ztask-receivedztask-started)ztask-succeededztask-failedzworker-onlinezworker-heartbeatactivezworker-offline)r)   eventr,   
startswithtasksgetr5   r-   r   labelsincr   observestartedreceivedetar   r   setdecr    r!   )r#   r8   worker_name
event_typetask_idr   	task_namer   task_startedtask_receivednum_executing_tasksr0   r   r   r8   A   sB   


zEventsState.event)r%   r&   r'   r$   r8   __classcell__r   r   r0   r   r(   9   s    r(   c                   @   sN   e Zd ZdZ		dddZdd	 Zd
d Zdd Zdd Zdd Z	dd Z
dS )Eventsi  NFTr   c           	      K   s   t j|  d| _|| _|| _|| _|| _|| _d | _	d | _
| jrBtd| j t| j}|r5|d | _	|  |rBt| j|| _
| j	sMtdi || _	t| j| j| _d S )NTzLoading state from '%s'...r   r   )	threadingThreadr$   daemonio_loopcappdb
persistentenable_eventsstatestate_save_timerloggerdebugshelveopencloser   
save_stater(   on_enable_eventsevents_enable_intervaltimer)	r#   rQ   rP   rR   rS   rT   state_save_intervalr/   rU   r   r   r   r$   u   s0   

zEvents.__init__c                 C   sH   t j|  | jrtd | j  | jr"td | j  d S d S )NzStarting enable events timer...zStarting state save timer...)rM   rN   startrT   rW   rX   r_   rV   r"   r   r   r   ra      s   


zEvents.startc                 C   sJ   | j rtd | j  | jrtd | j  | jr#|   d S d S )NzStopping enable events timer...zStopping state save timer...)rT   rW   rX   r_   stoprV   rS   r\   r"   r   r   r   rb      s   



zEvents.stopc                 C   s  d}	 z5|d9 }| j  "}t|d| ji| j d}d}td |jd d dd W d    n1 s2w   Y  W nG ttfy[   zdd l	}W n t
yT   dd l}Y nw |  Y n% ty } ztd	|| tj|dd
 t| W Y d }~nd }~ww q)Nr3   T   *)handlersappzCapturing events...)limittimeoutwakeupr   z;Failed to capture events: '%s', trying again in %s seconds.)exc_info)rQ   
connectionr   on_eventrW   rX   captureKeyboardInterrupt
SystemExit_threadImportErrorthreadinterrupt_main	Exceptionerrortimesleep)r#   try_intervalconnrecvrr   er   r   r   run   s<   
z
Events.runc                 C   s4   t d| j tj| jdd}| j|d< |  d S )NzSaving state to '%s'...n)flagr   )rW   rX   rR   rY   rZ   rU   r[   )r#   rU   r   r   r   r\      s   
zEvents.save_statec                 C   s   | j d | jjj d S r
   )rP   run_in_executorrQ   controlrT   r"   r   r   r   r]      s   zEvents.on_enable_eventsc                 C   s   | j t| jj| d S r
   )rP   add_callbackr   rU   r8   )r#   r8   r   r   r   rl      s   zEvents.on_event)NFTr   )r%   r&   r'   r^   r$   ra   rb   r|   r\   r]   rl   r   r   r   r   rL   q   s    
 
rL   )r*   loggingrY   rM   rv   r   	functoolsr   celery.eventsr   celery.events.stater   prometheus_clientr   r   r   tornado.ioloopr   tornado.optionsr	   	getLoggerr%   rW   r   r   r   r(   rN   rL   r   r   r   r   <module>   s&    
8