o
    h~                     @  s   d Z ddlmZ ddlZddlmZmZ ddlmZ ddl	m
Z
mZ ddlmZ ddlmZ dd	lmZ dd
lmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZmZ dZde e!eZ"e# Z$G dd dej%Z%G dd dej&Z&dS )aG  SQLAlchemy Transport module for kombu.

Kombu transport using SQL Database as the message store.

Features
========
* Type: Virtual
* Supports Direct: yes
* Supports Topic: yes
* Supports Fanout: no
* Supports Priority: no
* Supports TTL: no

Connection String
=================

.. code-block::

    sqla+SQL_ALCHEMY_CONNECTION_STRING
    sqlalchemy+SQL_ALCHEMY_CONNECTION_STRING

For details about ``SQL_ALCHEMY_CONNECTION_STRING`` see SQLAlchemy Engine Configuration documentation.

Examples
--------
.. code-block::

    # PostgreSQL with default driver
    sqla+postgresql://scott:tiger@localhost/mydatabase

    # PostgreSQL with psycopg2 driver
    sqla+postgresql+psycopg2://scott:tiger@localhost/mydatabase

    # PostgreSQL with pg8000 driver
    sqla+postgresql+pg8000://scott:tiger@localhost/mydatabase

    # MySQL with default driver
    sqla+mysql://scott:tiger@localhost/foo

    # MySQL with mysqlclient driver (a maintained fork of MySQL-Python)
    sqla+mysql+mysqldb://scott:tiger@localhost/foo

    # MySQL with PyMySQL driver
    sqla+mysql+pymysql://scott:tiger@localhost/foo

Transport Options
=================

* ``queue_tablename``: Name of table storing queues.
* ``message_tablename``: Name of table storing messages.

Moreover parameters of :func:`sqlalchemy.create_engine()` function can be passed as transport options.
    )annotationsN)dumpsloads)Empty)create_enginetext)OperationalError)sessionmaker)virtual)cached_property)bytes_to_str   )Message)	ModelBase)Queue)class_registrymetadata)r      r   .c                      s   e Zd ZdZdZi Z fddZdd Zdd Zd	d
 Z	e
dd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zedd Zedd  Z  ZS )!ChannelzThe channel class.Nc                   s&   |  |jj t j|fi | d S N)_configure_entity_tablenamesclienttransport_optionssuper__init__)self
connectionkwargs	__class__ `/var/www/Befach/backend/venv/lib/python3.10/site-packages/kombu/transport/sqlalchemy/__init__.pyr   \   s   zChannel.__init__c                 C  s2   | dd| _| dd| _| jo| j d S  d S )Nqueue_tablenamekombu_queuemessage_tablenamekombu_message)getr#   r%   	queue_clsmessage_cls)r   optsr!   r!   r"   r   `   s   z$Channel._configure_entity_tablenamesc                 C  s   | j j}|j }|dd  |dd  |dd  |dd  |dd  |dd  |dd  |dd  |d	d  t|jfi |S )
Nr#   r%   callbackerrbackmax_retriesinterval_startinterval_stepinterval_maxretry_errors)r   r   r   copypopr   hostname)r   conninfor   r!   r!   r"   _engine_from_configk   s   
zChannel._engine_from_configc                 C  s   | j j}|j| jvrEt1 |j| jv r | j|j W  d    S |  }t|d}t| ||f| j|j< W d    n1 s@w   Y  | j|j S )N)bind)	r   r   r4   _engines_MUTEXr6   r	   r   
create_all)r   r5   engineSessionr!   r!   r"   _openz   s   


zChannel._openc                 C  s$   | j d u r|  \}}| | _ | j S r   )_sessionr=   )r   _r<   r!   r!   r"   session   s   
zChannel.sessionc              	   C  s   | j | j| jj|k }|sitM | j | j| jj|k }|r0|W  d    S | |}| j | z| j   W n t	yP   | j 
  Y n	w W d    |S W d    |S 1 sdw   Y  |S r   )r@   queryr(   filternamefirstr9   addcommitr   rollbackr   queueobjr!   r!   r"   _get_or_create   s4   


zChannel._get_or_createc                 K  s   |  | d S r   )rK   )r   rI   r   r!   r!   r"   
_new_queue      zChannel._new_queuec                 K  sV   |  |}| t||}| j| z| j  W d S  ty*   | j  Y d S w r   )rK   r)   r   r@   rE   rF   r   rG   )r   rI   payloadr   rJ   messager!   r!   r"   _put   s   
zChannel._putc                 C  s   |  |}| jjjdkr| jtd z<| j| j 	| jj
|jk	| jjdk| jj| jjd }|rNd|_tt|jW | j  S t | j  w )NsqlitezBEGIN IMMEDIATE TRANSACTIONFr   )rK   r@   r7   rC   executer   rA   r)   with_for_updaterB   queue_ididvisibleorder_bysent_atlimitrD   r   r   rN   rF   r   )r   rI   rJ   msgr!   r!   r"   _get   s&   


zChannel._getc                 C  s(   |  |}| j| j| jj|jkS r   )rK   r@   rA   r)   rB   rT   rU   rH   r!   r!   r"   
_query_all   s   
zChannel._query_allc                 C  sB   |  |jdd}z| j  W |S  ty    | j  Y |S w )NF)synchronize_session)r\   deleter@   rF   r   rG   )r   rI   countr!   r!   r"   _purge   s   zChannel._purgec                 C  s   |  | S r   )r\   r_   )r   rI   r!   r!   r"   _size   rM   zChannel._sizec                 C  sf   |t vr/t! |t v rt | W  d    S tt||tf|W  d    S 1 s*w   Y  t | S r   )r   r9   typestrr   )r   rC   basensr!   r!   r"   _declarative_cls   s    zChannel._declarative_clsc                 C     |  dtd| jiS )Nr   __tablename__)rf   	QueueBaser#   r   r!   r!   r"   r(      
   zChannel.queue_clsc                 C  rg   )Nr   rh   )rf   MessageBaser%   rj   r!   r!   r"   r)      rk   zChannel.message_cls)__name__
__module____qualname____doc__r>   r8   r   r   r6   r=   propertyr@   rK   rL   rP   r[   r\   r`   ra   rf   r   r(   r)   __classcell__r!   r!   r   r"   r   V   s,    
	
r   c                   @  s2   e Zd ZdZeZdZdZdZdZe	fZ
dd ZdS )		TransportzThe transport class.Tr   sql
sqlalchemyc                 C  s   dd l }|jS )Nr   )ru   __version__)r   ru   r!   r!   r"   driver_version   s   zTransport.driver_versionN)rm   rn   ro   rp   r   can_parse_urldefault_portdriver_typedriver_namer   connection_errorsrw   r!   r!   r!   r"   rs      s    rs   )'rp   
__future__r   	threadingjsonr   r   rI   r   ru   r   r   sqlalchemy.excr   sqlalchemy.ormr	   kombu.transportr
   kombu.utilsr   kombu.utils.encodingr   modelsr   rl   r   r   ri   r   r   VERSIONjoinmaprc   rv   RLockr9   r   rs   r!   r!   r!   r"   <module>   s*    5	 