
    h6#                         d Z ddlZddlmZ ddlmZ ddlmZ ddlm	Z	 	 ddl
Z
ddlZ
ddlZ
ddlZ
dZ ee      Zd	Zd
ZdZdZdZdZdZdZd Z G d de	      Zy# e$ r dZ
Y 2w xY w)z@Apache Cassandra result store backend using the DataStax driver.    N)states)ImproperlyConfigured)
get_logger   )BaseBackend)CassandraBackendz
You need to install the cassandra-driver library to
use the Cassandra backend.  See https://github.com/datastax/python-driver
z
CASSANDRA_AUTH_PROVIDER you provided is not a valid auth_provider class.
See https://datastax.github.io/python-driver/api/cassandra/auth.html.
z(Cassandra backend improperly configured.z!Cassandra backend not configured.z
INSERT INTO {table} (
    task_id, status, result, date_done, traceback, children) VALUES (
        %s, %s, %s, %s, %s, %s) {expires};
z]
SELECT status, result, date_done, traceback, children
FROM {table}
WHERE task_id=%s
LIMIT 1
z
CREATE TABLE {table} (
    task_id text,
    status text,
    result blob,
    date_done timestamp,
    traceback blob,
    children blob,
    PRIMARY KEY ((task_id), date_done)
) WITH CLUSTERING ORDER BY (date_done DESC);
z
    USING TTL {0}
c                     t        | d      S )Nutf8)bytes)xs    U/var/www/Befach/backend/env/lib/python3.12/site-packages/celery/backends/cassandra.pybuf_tr   C   s    F    c                   `     e Zd ZdZdZdZdZ	 	 d
 fd	ZddZ	 ddZ	ddZ
d Zd fd		Z xZS )r   aG  Cassandra/AstraDB backend utilizing DataStax driver.

    Raises:
        celery.exceptions.ImproperlyConfigured:
            if module :pypi:`cassandra-driver` is not available,
            or not-exactly-one of the :setting:`cassandra_servers` and
            the :setting:`cassandra_secure_bundle_path` settings is set.
    NTc                 h   t        |   di | t        st        t              | j
                  j                  }|xs |j                  dd       | _        |xs |j                  dd       | _	        |xs |j                  dd       xs d| _
        |xs |j                  dd       | _        |xs |j                  dd       | _        |j                  di       | _        | j                  xs | j                  }	|	r| j                  r| j                  st        t              | j                  r| j                  rt        t              |xs |j                  dd       }
|
t         j#                  |
      nd	| _        |j                  d
      xs d}|j                  d      xs d}t'        t        j(                  |t        j(                  j*                        | _        t'        t        j(                  |t        j(                  j*                        | _        d | _        |j                  dd       }|j                  dd       }|r;|r9t'        t        j2                  |d       }|st        t4               |di || _        d | _        d | _        d | _        d | _        t?        j@                         | _!        y )Ncassandra_serverscassandra_secure_bundle_pathcassandra_portiR#  cassandra_keyspacecassandra_tablecassandra_optionscassandra_entry_ttl cassandra_read_consistencyLOCAL_QUORUMcassandra_write_consistencycassandra_auth_providercassandra_auth_kwargs )"super__init__	cassandrar   E_NO_CASSANDRAappconfgetserversbundle_pathportkeyspacetabler   E_CASSANDRA_NOT_CONFIGUREDE_CASSANDRA_MISCONFIGURED	Q_EXPIRESformat
cqlexpiresgetattrConsistencyLevelr   read_consistencywrite_consistencyauth_providerauth!E_NO_SUCH_CASSANDRA_AUTH_PROVIDER_cluster_session_write_stmt
_read_stmt	threadingRLock_lock)selfr'   r*   r+   	entry_ttlr)   r(   kwargsr%   db_directionsexpires	read_cons
write_consr5   auth_kwargsauth_provider_class	__class__s                   r   r!   zCassandraBackend.__init__X   sL   "6"&~66xx}}E$((+>"E& 2$((*D+2DDHH%5t<D	 HDHH-A4$H?dhh'8$?
!%*=r!B 8(8(8DMM&'ABB<<D,,&'@AADtxx(=tD *1)<IW%" 	 HH9:Ln	XX;<N
 '&&	&&33!5 ")&&
&&33"5 "!:DAhh6=["))..-"N&*+LMM!4!C{!CD__&
r   c                    | j                   y| j                  j                          	 | j                   	 | j                  j                          y| j                  rQt        j                  j                  | j                  f| j                  | j                  d| j                  | _        nGt        j                  j                  dd| j                  i| j                  d| j                  | _        | j                  j                  | j                        | _         t
        j                  j!                  t"        j%                  | j&                  | j(                              | _        | j,                  | j*                  _        t
        j                  j!                  t0        j%                  | j&                              | _        | j4                  | j2                  _        |rjt
        j                  j!                  t6        j%                  | j&                              }| j,                  |_        	 | j                   j9                  |       | j                  j                          y# t
        j:                  $ r Y 0w xY w# t
        j<                  $ r6 | j                  | j                  j?                          d| _        d| _          w xY w# | j                  j                          w xY w)zjPrepare the connection for action.

        Arguments:
            write (bool): are we a writer?
        N)r)   r5   secure_connect_bundle)cloudr5   )r+   rC   )r+   r   ) r9   r>   acquirereleaser'   r"   clusterClusterr)   r5   r   r8   r(   connectr*   querySimpleStatementQ_INSERT_RESULTr/   r+   r0   r:   r4   consistency_levelQ_SELECT_RESULTr;   r3   Q_CREATE_RESULT_TABLEexecuteAlreadyExistsOperationTimedOutshutdown)r?   write	make_stmts      r   _get_connectionz CassandraBackend._get_connection   s_    ==$

=	!}}(v JJ s || ) 1 1 9 9LL!.'+yy"&"4"4!. ,,!. !* 1 1 9 9 !./1A1A #'"4"4	!.
 ,,!. !MM11$--@DM  )>>&&**doo ' ? D 261G1GD.'oo==&&TZZ&8DO 150E0EDOO- &OO;;)00tzz0B	 /3.D.D	+MM)))4 JJ  !..  ** 	 }}(&&( DM DM	 JJ s=   J G#J 5I+ +J>J  JJ A	KK K,c                 ^   | j                  d       | j                  j                  | j                  ||t	        | j                  |            | j                  j                         t	        | j                  |            t	        | j                  | j                  |                  f       y)z1Store return value and state of an executed task.T)r[   N)	r]   r9   rW   r:   r   encoder$   nowcurrent_task_children)r?   task_idresultstate	tracebackrequestrA   s          r   _store_resultzCassandraBackend._store_result   s     	4(d..$++f%&HHLLN$++i()$++d88ABC1
 	r   c                      y)Nzcassandra://r   )r?   include_passwords     r   as_urizCassandraBackend.as_uri   s    r   c           
      P   | j                          | j                  j                  | j                  |f      j	                         }|st
        j                  ddS |\  }}}}}| j                  ||| j                  |      || j                  |      | j                  |      d      S )z$Get task meta-data for a task by id.N)statusrc   )rb   rl   rc   	date_donere   children)	r]   r9   rW   r;   oner   PENDINGmeta_from_decodeddecode)r?   rb   resrl   rc   rm   re   rn   s           r   _get_task_meta_forz#CassandraBackend._get_task_meta_for   s    mm##DOOg[AEEG$nn==9<6	9h%%kk&)"Y/H-'
  	r   c                     |si n|}|j                  | j                  | j                  | j                  d       t        |   ||      S )N)r'   r*   r+   )updater'   r*   r+   r    
__reduce__)r?   argsrA   rH   s      r   rw   zCassandraBackend.__reduce__   sF    !vjj"	# w!$//r   )NNNNNN)F)NN)T)r   N)__name__
__module____qualname____doc__r'   r(   supports_autoexpirer!   r]   rg   rj   rt   rw   __classcell__)rH   s   @r   r   r   G   sK     GKJN(,4'lF!R /3&0 0r   r   )r|   r<   celeryr   celery.exceptionsr   celery.utils.logr   baser   r"   cassandra.authcassandra.clustercassandra.queryImportError__all__ry   loggerr#   r7   r-   r,   rS   rU   rV   r.   r   r   r   r   r   <module>r      s    F   2 ' 
  	H	
% !
 G @ 
 	
y0{ y0q  Is   A A#"A#