� j f�2�@s�dZddlZddlZddlZddlZddlZddlZddlZddlZddl Z ddl Z ddl Z ddl Z ddl Z ddl mZddlmZddlmZmZyddlZWnek rdZYnXddlmZddlmZdd lmZdd lmZdd lmZdd lmZdd lmZddlm Z e j!dkr�ddl"m#Z#nddlm#Z#dd�Z$dd�Z%ddd�Z&dd�Z'Gdd�de�Z(Gdd�de�Z)Gdd�d�Z*Gd d!�d!e*e)�Z+d"d#d$d%�Z,e-ed&�rGd'd(�d(ej.e�Z/Gd)d*�d*e/e�Z0Gd+d,�d,e0�Z1Gd-d.�d.e*e1�Z2d/d0�Z3ej4d1d2��Z5ej4d"d#d3d4��Z6nej4d5d6d7dd"d#d8d9��Z7d:d;�Z8Gd<d=�d=ej9�Z:Gd>d?�d?ej;�Z<d@dA�Z=GdBdC�dCe>�Z?dDdE�Z@GdFdG�dGe jA�ZAej4dHdI��ZBejCejDejEdJdK�ZFdLdM�ZGdS)NzUtilities shared by tests.�N)�mock)� HTTPServer)�WSGIRequestHandler� WSGIServer�)� base_events)�compat)�events)�futures)� selectors)�tasks)� coroutine)�logger�win32)� socketpaircCs$tdkrdStjtj�SdS)N)�sslZ SSLContextZPROTOCOL_SSLv23�rr�7/opt/alt/python34/lib64/python3.4/asyncio/test_utils.py�dummy_ssl_context*s rc CsVtdd��}|�}|j|�}d|_z|j|�Wd|j�XdS)NcSsdS)Nrrrrr�once2szrun_briefly.<locals>.onceF)r Z create_taskZ_log_destroy_pending�run_until_complete�close)�loopr�gen�trrr� run_briefly1s  r�cCsztj�|}xc|�su|dk rV|tj�}|dkrVtj��qVn|jtjdd|��qWdS)Nrg����MbP?r)�timer � TimeoutErrorrr Zsleep)rZpred�timeoutZdeadlinerrr� run_until@s   r cCs|j|j�|j�dS)z�Legacy API to run once through the event loop. This is the recommended pattern for test code. It will poll the selector once and run all callbacks scheduled in response to I/O events. N)Z call_soon�stopZ run_forever)rrrr�run_onceJsr"c@s(eZdZdd�Zdd�ZdS)�SilentWSGIRequestHandlercCs tj�S)N)�io�StringIO)�selfrrr� get_stderrWsz#SilentWSGIRequestHandler.get_stderrcGsdS)Nr)r&�format�argsrrr� log_messageZsz$SilentWSGIRequestHandler.log_messageN)�__name__� __module__� __qualname__r'r*rrrrr#Us  r#cs4eZdZdZ�fdd�Zdd�Z�S)�SilentWSGIServer�cs/t�j�\}}|j|j�||fS)N)�super� get_request� settimeout�request_timeout)r&�request� client_addr)� __class__rrr1bszSilentWSGIServer.get_requestcCsdS)Nr)r&r4�client_addressrrr� handle_errorgszSilentWSGIServer.handle_error)r+r,r-r3r1r8rr)r6rr.^s r.c@seZdZdd�ZdS)�SSLWSGIServerMixinc Cs�tjjtjjt�dd�}tjj|�s`tjjtjjtj�dd�}ntjj|d�}tjj|d�}tj|d|d|d d �}y!|j|||�|j �Wnt k r�YnXdS) Nz..ZtestsZtestZ test_asyncioz ssl_key.pemz ssl_cert.pem�keyfile�certfileZ server_sideT) �os�path�join�dirname�__file__�isdirrZ wrap_socketZRequestHandlerClassr�OSError)r&r4r7�herer:r;Zssockrrr�finish_requestms$   z!SSLWSGIServerMixin.finish_requestN)r+r,r-rDrrrrr9ks r9c@seZdZdS)� SSLWSGIServerN)r+r,r-rrrrrE�s rE�use_sslFc #s�dd�}|r|n|}||t���j|��j�_tjd�fdd��}|j�z �VWd�j��j�|j �XdS)NcSs#d}dg}|||�dgS)Nz200 OK� Content-type� text/plains Test message)rGrHr)�environZstart_responseZstatusZheadersrrr�app�s  z_run_test_server.<locals>.app�targetcs�jdd�S)NZ poll_intervalg�������?)Z serve_foreverr)�httpdrr�<lambda>�sz"_run_test_server.<locals>.<lambda>) r#Zset_appZserver_address�address� threadingZThread�startZshutdownZ server_closer>)rNrF� server_cls�server_ssl_clsrJZ server_classZ server_threadr)rLr�_run_test_server�s        rSZAF_UNIXc@seZdZdd�ZdS)�UnixHTTPServercCs&tjj|�d|_d|_dS)Nz 127.0.0.1�P)� socketserver�UnixStreamServer� server_bindZ server_nameZ server_port)r&rrrrX�s zUnixHTTPServer.server_bindN)r+r,r-rXrrrrrT�s rTcs4eZdZdZdd�Z�fdd�Z�S)�UnixWSGIServerr/cCstj|�|j�dS)N)rTrXZ setup_environ)r&rrrrX�s zUnixWSGIServer.server_bindcs/t�j�\}}|j|j�|dfS)N� 127.0.0.1�)rZr[)r0r1r2r3)r&r4r5)r6rrr1�szUnixWSGIServer.get_request)r+r,r-r3rXr1rr)r6rrY�s  rYc@seZdZdd�ZdS)�SilentUnixWSGIServercCsdS)Nr)r&r4r7rrrr8�sz!SilentUnixWSGIServer.handle_errorN)r+r,r-r8rrrrr\�s r\c@seZdZdS)�UnixSSLWSGIServerN)r+r,r-rrrrr]�s r]cCs tj��}|jSWdQXdS)N)�tempfileZNamedTemporaryFile�name)�filerrr�gen_unix_socket_path�sraccs@t�}z |VWdytj|�Wntk r:YnXXdS)N)rar<�unlinkrB)r=rrr�unix_socket_path�s   rcccs:t��+}td|d|dtdt�DdHWdQXdS)NrNrFrQrR)rcrSr\r])rFr=rrr�run_test_unix_server�s rd�hostz 127.0.0.1�portc cs.td||fd|dtdt�DdHdS)NrNrFrQrR)rSr.rE)rerfrFrrr�run_test_server�srgcCsqi}xKt|�D]=}|jd�r=|jd�r=qntdd�||<qWtd|f|j|��S)N�__� return_valueZ TestProtocol)�dir� startswith�endswith� MockCallback�type� __bases__)�baseZdctr_rrr�make_test_protocol�s rqc@sOeZdZdd�Zddd�Zdd�Zdd �Zd d �ZdS) � TestSelectorcCs i|_dS)N)�keys)r&rrr�__init__�szTestSelector.__init__NcCs)tj|d||�}||j|<|S)Nr)r Z SelectorKeyrs)r&�fileobjr �data�keyrrr�register�s zTestSelector.registercCs|jj|�S)N)rs�pop)r&rurrr� unregister�szTestSelector.unregistercCsgS)Nr)r&rrrr�selectszTestSelector.selectcCs|jS)N)rs)r&rrr�get_mapszTestSelector.get_map)r+r,r-rtrxrzr{r|rrrrrr�s    rrcs�eZdZdZd�fdd�Zdd�Zdd�Z�fd d �Zd d �Zd d�Z dd�Z dd�Z dd�Z dd�Z dd�Z�fdd�Z�fdd�Zdd�Zdd �Z�S)!�TestLoopa�Loop for unittests. It manages self time directly. If something scheduled to be executed later then on next loop iteration after all ready handlers done generator passed to __init__ is calling. Generator should be like this: def gen(): ... when = yield ... ... = yield time_advance Value returned by yield is absolute time of next scheduled handler. Value passed to yield is time advance to move loop's time forward. Ncs�t�j�|dkr1dd�}d|_n d|_|�|_t|j�d|_d|_g|_t�|_ i|_ i|_ |j �dS)Ncss dVdS)NrrrrrrszTestLoop.__init__.<locals>.genFTrg��&� .>) r0rt�_check_on_close�_gen�next�_timeZ_clock_resolution�_timersrrZ _selector�readers�writers�reset_counters)r&r)r6rrrts             zTestLoop.__init__cCs|jS)N)r�)r&rrrr/sz TestLoop.timecCs|r|j|7_ndS)zMove test time forward.N)r�)r&�advancerrr� advance_time2szTestLoop.advance_timec sRt�j�|jrNy|jjd�Wntk r>YqNXtd��ndS)NrzTime generator is not finished)r0rr~r�send� StopIteration�AssertionError)r&)r6rrr7s   zTestLoop.closecGs tj|||�|j|<dS)N)r �Handler�)r&�fd�callbackr)rrr� add_readerAszTestLoop.add_readercCs6|j|d7<||jkr.|j|=dSdSdS)NrTF)�remove_reader_countr�)r&r�rrr� remove_readerDs  zTestLoop.remove_readercGs�||jks$tdj|���|j|}|j|ks[tdj|j|���|j|ks�tdj|j|���dS)Nzfd {} is not registeredz {!r} != {!r})r�r�r(� _callback�_args)r&r�r�r)�handlerrr� assert_readerLs $ zTestLoop.assert_readercGs tj|||�|j|<dS)N)r r�r�)r&r�r�r)rrr� add_writerTszTestLoop.add_writercCs6|j|d7<||jkr.|j|=dSdSdS)NrTF)�remove_writer_countr�)r&r�rrr� remove_writerWs  zTestLoop.remove_writercGs�||jks$tdj|���|j|}|j|ks[tdj|j|���|j|ks�tdj|j|���dS)Nzfd {} is not registeredz {!r} != {!r})r�r�r(r�r�)r&r�r�r)r�rrr� assert_writer_s $ zTestLoop.assert_writercCs(tjt�|_tjt�|_dS)N)� collections� defaultdict�intr�r�)r&rrrr�gszTestLoop.reset_counterscsMt�j�x0|jD]%}|jj|�}|j|�qWg|_dS)N)r0� _run_oncer�rr�r�)r&�whenr�)r6rrr�ks  zTestLoop._run_oncecs&|jj|�t�j|||�S)N)r��appendr0�call_at)r&r�r�r))r6rrr�rszTestLoop.call_atcCsdS)Nr)r&Z event_listrrr�_process_eventsvszTestLoop._process_eventscCsdS)Nr)r&rrr�_write_to_selfyszTestLoop._write_to_self)r+r,r-�__doc__rtrr�rr�r�r�r�r�r�r�r�r�r�r�rr)r6rr}s           r}cKstjddg|�S)N�spec�__call__)rZMock)�kwargsrrrrm}srmc@s"eZdZdZdd�ZdS)� MockPatternz�A regex based str with a fuzzy __eq__. Use this helper with 'mock.assert_called_with', or anywhere where a regex comparison between strings is needed. For instance: mock_call.assert_called_with(MockPattern('spam.*ham')) cCs"ttjt|�|tj��S)N)�bool�re�search�str�S)r&�otherrrr�__eq__�szMockPattern.__eq__N)r+r,r-r�r�rrrrr��s r�cCs5tj|�}|dkr1td|f��n|S)Nzunable to get the source of %r)r Z_get_function_source� ValueError)�func�sourcerrr�get_function_source�s r�c@sUeZdZdddd�Zddd�Zdd �ZejsQd d �ZndS) �TestCase�cleanupTcCs<|dk st�tjd�|r8|j|j�ndS)N)r�r �set_event_loopZ addCleanupr)r&rr�rrrr��s zTestCase.set_event_loopNcCst|�}|j|�|S)N)r}r�)r&rrrrr� new_test_loop�s  zTestCase.new_test_loopcCs'tjd�|jtj�d�dS)N)NNN)r r�Z assertEqual�sys�exc_info)r&rrr�tearDown�s zTestCase.tearDowncOsGdd�d�}|�S)Nc@s(eZdZdd�Zdd�ZdS)z!TestCase.subTest.<locals>.EmptyCMcSsdS)Nr)r&rrr� __enter__�sz+TestCase.subTest.<locals>.EmptyCM.__enter__cWsdS)Nr)r&�excrrr�__exit__�sz*TestCase.subTest.<locals>.EmptyCM.__exit__N)r+r,r-r�r�rrrr�EmptyCM�s  r�r)r&r)r�r�rrr�subTest�szTestCase.subTest) r+r,r-r�r�r�rZPY34r�rrrrr��s   r�c cs;tj}ztjtjd�dVWdtj|�XdS)zrContext manager to disable asyncio logger. For example, it can be used to ignore warnings in debug mode. rN)r�levelZsetLevel�loggingZCRITICAL)Z old_levelrrr�disable_logger�s   r�cCs=tjtj�}||_||_||_d|j_|S)z'Create a mock of a non-blocking socket.g)rZ MagicMock�socket�protorn�familyZ gettimeoutri)r�rnr�Zsockrrr�mock_nonblocking_socket�s     r�cCstjddd�S)Nz'asyncio.sslproto._is_sslproto_availableriF)rZpatchrrrr�force_legacy_ssl_support�s r�)Hr�r�� contextlibr$r�r<r�r�rVr�r^rOrZunittestrZ http.serverrZwsgiref.simple_serverrrr� ImportErrorr[rrr r r r Z coroutinesr �logr�platformZ windows_utilsrrrr r"r#r.r9rErS�hasattrrWrTrYr\r]ra�contextmanagerrcrdrgrqZ BaseSelectorrrZ BaseEventLoopr}rmr�r�r�r�r�Z IPPROTO_TCPZ SOCK_STREAMZAF_INETr�r�rrrr�<module>s|                        v