U
    hC*                     @   s  d dl Z d dlZd dlmZ d dlmZ d dlmZ d dlmZ zd dlmZ W n  e	k
rp   d dl
mZ Y nX z<d dlZd dlmZ d dlmZ d dlmZ d dlmZ W n  e	k
r   d Z ZZY nX d dlmZ d d	lmZ e d
ZG dd deZG dd deZG dd deZG dd deZe Z!e Z"e Z#e Z$G dd deZ%G dd deZ&G dd deZ'G dd de'Z(dS )    N)localEvent)Lock)Thread)Queue)Greenlet)__deprecated__)SqliteExtDatabasezpeewee.sqliteqc                   @   s   e Zd ZdS )ResultTimeoutN__name__
__module____qualname__ r   r   5/tmp/pip-unpacked-wheel-5j60pwdk/playhouse/sqliteq.pyr      s   r   c                   @   s   e Zd ZdS )WriterPausedNr   r   r   r   r   r      s   r   c                   @   s   e Zd ZdS )ShutdownExceptionNr   r   r   r   r   r   "   s   r   c                   @   s|   e Zd ZdZdd ZdddZdddZd	d
 Zdd ZeZ	e
dd Ze
dd Ze
dd Zdd Zdd Zdd ZdS )AsyncCursor)	sqlparamstimeout_event_cursor_exc_idx_rows_readyc                 C   s:   || _ || _|| _|| _d  | _ | _ | _| _d| _d S NF)	r   r   r   r   r   r   r   r   r   )selfeventr   r   r   r   r   r   __init__*   s    zAsyncCursor.__init__Nc                 C   s6   || _ || _d| _|d kr"| ng | _| j  | S )Nr   )r   r   r   fetchallr   r   set)r   cursorexcr   r   r   
set_result2   s    
zAsyncCursor.set_resultc                 C   sF   |d k	r|n| j }| jj|ds,|r,td| jd k	r<| jd| _d S )N)r   zresults not ready, timed out.T)r   r   waitr   r   r   )r   r   r   r   r   _wait:   s    
zAsyncCursor._waitc                 C   s"   | j s|   | jd k	r| j| S N)r   r(   r   r   r   r   r   __iter__B   s
    
zAsyncCursor.__iter__c                 C   sN   | j s|   z| j| j }W n tk
r6   tY nX |  jd7  _|S d S )N   )r   r(   r   r   
IndexErrorStopIteration)r   objr   r   r   nextI   s    
zAsyncCursor.nextc                 C   s   | j s|   | jjS r)   )r   r(   r   	lastrowidr*   r   r   r   r1   U   s    zAsyncCursor.lastrowidc                 C   s   | j s|   | jjS r)   )r   r(   r   rowcountr*   r   r   r   r2   [   s    zAsyncCursor.rowcountc                 C   s   | j jS r)   )r   descriptionr*   r   r   r   r3   a   s    zAsyncCursor.descriptionc                 C   s   | j   d S r)   )r   closer*   r   r   r   r4   e   s    zAsyncCursor.closec                 C   s   t | S r)   )listr*   r   r   r   r"   h   s    zAsyncCursor.fetchallc                 C   s4   | j s|   z
t| W S  tk
r.   Y d S X d S r)   )r   r(   r0   r.   r*   r   r   r   fetchonek   s    
zAsyncCursor.fetchone)N)N)r   r   r   	__slots__r!   r&   r(   r+   r0   __next__propertyr1   r2   r3   r4   r"   r6   r   r   r   r   r   &   s    





r   c                   @   s8   e Zd ZdZdd Zdd Zdd Zdd	 Zd
d ZdS )Writerdatabasequeuec                 C   s   || _ || _d S r)   r;   )r   r<   r=   r   r   r   r!   |   s    zWriter.__init__c              	   C   s   | j  }zVz*|d kr*|  r4| j  }n
| |}W q tk
rZ   t	d Y W d S X qW 5 |d k	r| j | | j j  X d S )Nz*writer received shutdown request, exiting.)
r<   
connection_close_stateresetwait_unpauseloopr   loggerinfo)r   connr   r   r   run   s    

z
Writer.runc                 C   sv   | j  \}}|tkr,td |  dS |tkr<t n6|tkrXt	d |  n|
d t  td| d S )Nz+writer unpaused - reconnecting to database.Tz-writer received pause, but is already paused.zwriter paused, not handling %s)r=   getUNPAUSErD   rE   r#   SHUTDOWNr   PAUSEerrorr&   r   warningr   opr/   r   r   r   rB      s    


zWriter.wait_unpausec                 C   s   | j  \}}|tkr"| | nn|tkrXtd | j| | jj	
  |  d S |tkrttd |  n|tkrt ntd| |S )Nz,writer paused - closing database connection.z0writer received unpause, but is already running.z&writer received unsupported object: %s)r=   rH   QUERYexecuterK   rD   rE   r<   r?   r@   rA   r#   rI   rL   rJ   r   )r   rF   rO   r/   r   r   r   rC      s     


zWriter.loopc              
   C   s`   t d|j z| j|j|j}W n* tk
rN } zd }|}W 5 d }~X Y nX d }|||S )Nzreceived query %s)rD   debugr   r<   _executer   	Exceptionr&   )r   r/   r$   Zexecute_errr%   r   r   r   rQ      s    zWriter.executeN)	r   r   r   r7   r!   rG   rB   rC   rQ   r   r   r   r   r:   y   s   r:   c                       s   e Zd ZdZd fdd	Zdd Zdd	d
Zdd Zdd ZdddZ	dd Z
dd Zdd Zdd Zdd Zdd Ze Z ZZ  ZS ) SqliteQueueDatabasezSQLite must be configured to use the WAL journal mode when using this feature. WAL mode allows one or more readers to continue reading while another connection writes to the database.FTNc           
         s   d|d< t  | _| |dd }tt| }	|	j| _|	j|f|d|i| || _	|| _
d| _| ||| _|   | j	r|   d S )NFZcheck_same_threadpragmasT)r   _qlock_validate_journal_modepopsuperrU   execute_sqlrS   r!   Z
_autostart_results_timeout_is_stoppedget_thread_impl_thread_helper_create_write_queuestart)
r   r<   
use_geventZ	autostartqueue_max_sizeZresults_timeoutargskwargsrV   ZParent	__class__r   r   r!      s    
zSqliteQueueDatabase.__init__c                 C   s   |rt S tS r)   )GreenletHelperThreadHelper)r   rb   r   r   r   r^      s    z#SqliteQueueDatabase.get_thread_implc                 C   sR   |sddiS t |ts(tdd |D }|dd dkrFt| jd|d< |S )NZjournal_modeZwalc                 s   s   | ]\}}|  |fV  qd S r)   )lower).0kvr   r   r   	<genexpr>   s     z=SqliteQueueDatabase._validate_journal_mode.<locals>.<genexpr>)
isinstancedictrH   rj   
ValueErrorWAL_MODE_ERROR_MESSAGE)r   rV   r   r   r   rX      s    

z*SqliteQueueDatabase._validate_journal_modec                 C   s   | j  | _d S r)   )r_   r=   _write_queuer*   r   r   r   r`      s    z'SqliteQueueDatabase._create_write_queuec                 C   s
   | j  S r)   )rs   qsizer*   r   r   r   
queue_size   s    zSqliteQueueDatabase.queue_sizec                 C   sb   |d k	rt d | dr*| ||S t| j |||d krF| jn|d}| j	t
|f |S )Nz,"commit" has been deprecated and is a no-op.select)r    r   r   r   )r	   rj   
startswithrS   r   r_   r    r\   rs   putrP   )r   r   r   commitr   r$   r   r   r   r[      s    zSqliteQueueDatabase.execute_sqlc              	      s^    j N  jsW 5 Q R  dS  fdd} j| _ j  d _W 5 Q R  dS Q R X d S )NFc                     s   t   j} |   d S r)   )r:   rs   rG   )writerr*   r   r   rG     s    z&SqliteQueueDatabase.start.<locals>.runT)rW   r]   r_   thread_writerra   )r   rG   r   r*   r   ra   
  s    
zSqliteQueueDatabase.startc              	   C   s   t d | j | jr&W 5 Q R  dS | jtd f | j  | j	 s| j
 \}}|tksh|tkrr|  q@|tkr@|d t  q@d| _W 5 Q R  dS Q R X d S )Nzenvironment stop requested.FT)rD   rR   rW   r]   rs   rx   rJ   r|   joinemptyrH   rK   rI   r#   rP   r&   r   rN   r   r   r   stop  s    



zSqliteQueueDatabase.stopc              
   C   s$   | j  | jW  5 Q R  S Q R X d S r)   )rW   r]   r*   r   r   r   
is_stopped+  s    zSqliteQueueDatabase.is_stoppedc              	   C   sL   | j 4 | jrW 5 Q R  dS | j }| jt|f W 5 Q R X |  d S r   )rW   r]   r_   r    rs   rx   rK   r'   r   Zevtr   r   r   pause/  s    
zSqliteQueueDatabase.pausec              	   C   sL   | j 4 | jrW 5 Q R  dS | j }| jt|f W 5 Q R X |  d S r   )rW   r]   r_   r    rs   rx   rI   r'   r   r   r   r   unpause9  s    
zSqliteQueueDatabase.unpausec                 O   s   t dt|  d S )Nz#This method is not supported by %r.)rq   type)r   rd   re   r   r   r   __unsupported__C  s    z#SqliteQueueDatabase.__unsupported__)FTNN)N)NNN)r   r   r   rr   r!   r^   rX   r`   ru   r[   ra   r   r   r   r   r   ZatomicZtransactionZ	savepoint__classcell__r   r   rf   r   rU      s"        



rU   c                   @   s4   e Zd ZdZdddZdd ZdddZd	d
 ZdS )ri   rc   Nc                 C   s
   || _ d S r)   r   )r   rc   r   r   r   r!   K  s    zThreadHelper.__init__c                 C   s   t  S r)   r   r*   r   r   r   r    N      zThreadHelper.eventc                 C   s    |d k	r|n| j }t|pddS Nr   )maxsize)rc   r   r   max_sizer   r   r   r=   P  s    zThreadHelper.queuec                 O   s   t |||d}d|_|S )N)targetrd   re   T)r   daemon)r   fnrd   re   r{   r   r   r   r{   T  s    zThreadHelper.thread)N)N)r   r   r   r7   r!   r    r=   r{   r   r   r   r   ri   H  s
   

ri   c                   @   s*   e Zd ZdZdd Zd	ddZdd ZdS )
rh   r   c                 C   s   t  S r)   )GEventr*   r   r   r   r    ]  r   zGreenletHelper.eventNc                 C   s    |d k	r|n| j }t|pddS r   )rc   GQueuer   r   r   r   r=   _  s    zGreenletHelper.queuec                    s    fdd}t |f||S )Nc                     s   t    | |S r)   )geventsleep)arl   r   r   r   wrapd  s    z#GreenletHelper.thread.<locals>.wrap)GThread)r   r   rd   re   r   r   r   r   r{   c  s    zGreenletHelper.thread)N)r   r   r   r7   r    r=   r{   r   r   r   r   rh   Z  s   
rh   ))loggingweakref	threadingr   Zthread_localr   r   r   r   ImportErrorr=   r   r   r   Zgevent.eventr   Zgevent.localZgreenlet_localZgevent.queuer   Zpeeweer	   Zplayhouse.sqlite_extr
   	getLoggerrD   rT   r   r   r   objectr   r.   rJ   rP   rK   rI   r:   rU   ri   rh   r   r   r   r   <module>   sB   
MG 	