Yfd@sddgZddlZddlZddlZddlZddlZddlZddlZddlm Z ddlm Z m Z dZ dZ dZejZdd Zd d ZGd d d eZGdddZddZGdddeZdfddddZddZGdddeZGdddeZeZGdddeZGdddeZGd d!d!eZ Gd"ddeZ!dS)#Pool ThreadPoolN)util) get_context TimeoutErrorcCstt|S)N)listmap)argsr 9/opt/alt/python35/lib64/python3.5/multiprocessing/pool.pymapstar+srcCsttj|d|dS)Nrr)r itertoolsstarmap)r r r r starmapstar.src@s(eZdZddZddZdS)RemoteTracebackcCs ||_dS)N)tb)selfrr r r __init__6szRemoteTraceback.__init__cCs|jS)N)r)rr r r __str__8szRemoteTraceback.__str__N)__name__ __module__ __qualname__rrr r r r r5s  rc@s(eZdZddZddZdS)ExceptionWithTracebackcCsDtjt|||}dj|}||_d||_dS)Nz """ %s""") tracebackformat_exceptiontypejoinexcr)rr rr r r r<s zExceptionWithTraceback.__init__cCst|j|jffS)N) rebuild_excr r)rr r r __reduce__Asz!ExceptionWithTraceback.__reduce__N)rrrrr"r r r r r;s  rcCst||_|S)N)r __cause__)r rr r r r!Dsr!cs@eZdZdZfddZddZddZS)MaybeEncodingErrorzVWraps possible unpickleable errors, so they can be safely sent through the socket.csAt||_t||_tt|j|j|jdS)N)reprr valuesuperr$r)rr r&) __class__r r rPszMaybeEncodingError.__init__cCsd|j|jfS)Nz(Error sending result: '%s'. Reason: '%s')r&r )rr r r rUs zMaybeEncodingError.__str__cCsd|jj|fS)Nz<%s: %s>)r(r)rr r r __repr__YszMaybeEncodingError.__repr__)rrr__doc__rrr)r r )r(r r$Ls  r$Fc'Cs7|dks0t|tkr*|dks0t|j}|j}t|drk|jj|jj|dk r||d}x|dks|r!||kr!y |} Wn&t t fk rt j dPYnX| dkrt j dP| \} } } } }yd| | |f}WnUt k r}}z5|r_| tk r_t||j}d|f}WYdd}~XnXy|| | |fWnbt k r}zBt||d}t j d||| | d|ffWYdd}~XnXd} } }} } }|d7}qWt j d |dS) Nr_writerz)worker got EOFError or OSError -- exitingzworker got sentinel -- exitingTFrz0Possible encoding error while sending result: %szworker exiting after %d tasks)rintAssertionErrorputgethasattrr+close_readerEOFErrorOSErrorrdebug Exception_helper_reraises_exceptionr __traceback__r$)inqueueoutqueue initializerinitargsZmaxtasksZwrap_exceptionr.r/Z completedtaskjobifuncr kwdsresultewrappedr r r worker]sF0      !     ,rEcCs |dS)z@Pickle-able helper function for use by _guarded_task_generation.Nr )Zexr r r r7sr7c@seZdZdZdZddZddfddddZdd Zd d Zd d Z ddZ fiddZ dddZ dddZ dddddZddZdddZdddZfidddd Zdddd!d"Zdddd#d$Zed%d&Zed'd(Zed)d*Zed+d,Zd-d.Zd/d0Zd1d2Zd3d4Zed5d6Zed7d8Z d9d:Z!d;d<Z"dS)=rzS Class which supports an async version of applying functions to arguments. TcOs|jj||S)N)_ctxProcess)rr rAr r r rGsz Pool.ProcessNc Cs#|p t|_|jtj|_i|_t|_||_ ||_ ||_ |dkrvt j psd}|dkrtd|dk rt| rtd||_g|_|jtjdtjd|f|_d|j_t|j_|jjtjdtjd|j|j|j|j|jf|_d|j_t|j_|jjtjdtjd|j|j |jf|_!d|j!_t|j!_|j!jt"j#||j$d|j|j%|j|j|j|j|j!|jfdd|_&dS) Nrz&Number of processes must be at least 1zinitializer must be a callabletargetr TZ exitpriority)'rrF _setup_queuesqueueQueue _taskqueue_cacheRUN_state_maxtasksperchild _initializer _initargsos cpu_count ValueErrorcallable TypeError _processes_pool_repopulate_pool threadingZThreadr_handle_workers_worker_handlerdaemonstart _handle_tasks _quick_put _outqueue _task_handler_handle_results _quick_get_result_handlerrZFinalize_terminate_pool_inqueue _terminate)r processesr;r<Zmaxtasksperchildcontextr r r rsT                              z Pool.__init__cCswd}xjttt|jD]M}|j|}|jdk r"tjd||jd}|j|=q"W|S)zCleanup after any worker processes which have exited due to reaching their specified lifetime. Returns True if any workers were cleaned up. FNzcleaning up worker %dT)reversedrangelenrZexitcoderr5r)rZcleanedr?rEr r r _join_exited_workerss"  zPool._join_exited_workersc Csxt|jt|jD]}|jdtd|j|j|j|j |j |j f}|jj ||j jdd|_ d|_|jtjdqWdS)zBring the number of pool processes up to the specified number, for use after reaping workers which have exited. rHr rGZ PoolWorkerTz added workerN)rnrYrorZrGrErircrRrSrQ_wrap_exceptionappendnamereplacer_r`rr5)rr?wr r r r[s#    zPool._repopulate_poolcCs|jr|jdS)zEClean up any exited workers and start replacements for them. N)rqr[)rr r r _maintain_pools zPool._maintain_poolcCsL|jj|_|jj|_|jjj|_|jjj|_ dS)N) rFZ SimpleQueuerircr+sendrbr2recvrf)rr r r rJszPool._setup_queuescCs.|jtkst|j|||jS)z6 Equivalent of `func(*args, **kwds)`. )rPrOr- apply_asyncr/)rr@r rAr r r applysz Pool.applycCs|j||t|jS)zx Apply `func` to each element in `iterable`, collecting the results in a list that is returned. ) _map_asyncrr/)rr@iterable chunksizer r r r szPool.mapcCs|j||t|jS)z Like `map()` method but the elements of the `iterable` are expected to be iterables as well and will be unpacked as arguments. Hence `func` and (a, b) becomes func(a, b). )r|rr/)rr@r}r~r r r r sz Pool.starmapcCs|j||t|||S)z= Asynchronous version of `starmap()` method. )r|r)rr@r}r~callbackerror_callbackr r r starmap_asyncszPool.starmap_asyncccsy>d}x1t|D]#\}}||||fifVqWWn@tk r}z ||dt|fifVWYdd}~XnXdS)zProvides a generator of tasks for imap and imap_unordered with appropriate handling for iterables which throw exceptions during iteration.rN) enumerater6r7)rZ result_jobr@r}r?xrCr r r _guarded_task_generations zPool._guarded_task_generationrcCs|jtkrtd|dkret|j}|jj|j|j|||j f|S|dkswt t j |||}t|j}|jj|j|jt ||j fdd|DSdS)zP Equivalent of `map()` -- can be MUCH slower than `Pool.map()`. zPool not runningrcss"|]}|D] }|Vq qdS)Nr ).0chunkitemr r r @szPool.imap..N)rPrOrV IMapIteratorrNrMr.r_job _set_lengthr-r _get_tasksr)rr@r}r~rB task_batchesr r r imap's"       z Pool.imapcCs|jtkrtd|dkret|j}|jj|j|j|||j f|S|dkswt t j |||}t|j}|jj|j|jt ||j fdd|DSdS)zL Like `imap()` method but ordering of results is arbitrary. zPool not runningrcss"|]}|D] }|Vq qdS)Nr )rrrr r r r[sz&Pool.imap_unordered..N)rPrOrVIMapUnorderedIteratorrNrMr.rrrr-rrr)rr@r}r~rBrr r r imap_unorderedBs"       zPool.imap_unorderedcCs_|jtkrtdt|j||}|jj|jd|||fgdf|S)z; Asynchronous version of `apply()` method. zPool not runningrN)rPrOrV ApplyResultrNrMr.r)rr@r rArrrBr r r rz]s  +zPool.apply_asynccCs|j||t|||S)z9 Asynchronous version of `map()` method. )r|r)rr@r}r~rrr r r map_asynchszPool.map_asyncc Cs|jtkrtdt|ds6t|}|dkrztt|t|jd\}}|rz|d7}t|dkrd}tj |||}t |j |t||d|} |j j |j| j||df| S)zY Helper function to implement map, starmap and their async counterparts. zPool not running__len__Nrrr)rPrOrVr0r divmodrorZrr MapResultrNrMr.rr) rr@r}Zmapperr~rrZextrarrBr r r r|ps&   (     zPool._map_asynccCsrtj}xB|jtks6|jrP|jtkrP|jtjdqW|j j dt j ddS)Ng?zworker handler exiting) r\current_threadrPrOrN TERMINATErwtimesleeprMr.rr5)poolthreadr r r r]s  * zPool._handle_workersc Cstj}x+t|jdD] \}}d}zx|D]}|jrXtjdPy||Wq;tk r} zN|dd\} } y|| j| d| fWnt k rYnXWYdd} ~ Xq;Xq;W|rtjd|r|dnd } || dwPWdd}}} XqWtjdyFtjd|j dtjdx|D]} |dqkWWnt k rtjd YnXtjd dS) Nz'task handler found thread._state != RUNrFzdoing set_length()rztask handler got sentinelz/task handler sending sentinel to result handlerz(task handler sending sentinel to workersz/task handler got OSError when sending sentinelsztask handler exitingr) r\riterr/rPrr5r6_setKeyErrorr.r4) taskqueuer.r:rcacherZtaskseqZ set_lengthr=rCr>idxpr r r rasB            zPool._handle_taskscCs tj}xy |}Wn)ttfk rGtjddSYnX|jrt|jtksfttjdP|dkrtjdP|\}}}y||j ||Wnt k rYnXd}}}qWx|r|jtkry |}Wn)ttfk r,tjddSYnX|dkrItjdq|\}}}y||j ||Wnt k rYnXd}}}qWt |drtjdy2x+t dD]}|j jsP|qWWnttfk rYnXtjdt||jdS) Nz.result handler got EOFError/OSError -- exitingz,result handler found thread._state=TERMINATEzresult handler got sentinelz&result handler ignoring extra sentinelr2z"ensuring that outqueue is not full z7result handler exiting: len(cache)=%s, thread._state=%s)r\rr4r3rr5rPrr-rrr0rnr2pollro)r:r/rrr=r>r?objr r r res\                 zPool._handle_resultsccsDt|}x1ttj||}|s1dS||fVqWdS)N)rtuplerislice)r@itsizerr r r rs  zPool._get_taskscCstddS)Nz:pool objects cannot be passed between processes or pickled)NotImplementedError)rr r r r"szPool.__reduce__cCs5tjd|jtkr1t|_t|j_dS)Nz closing pool)rr5rPrOCLOSEr^)rr r r r1s  z Pool.closecCs0tjdt|_t|j_|jdS)Nzterminating pool)rr5rrPr^rj)rr r r terminates   zPool.terminatecCsqtjd|jttfks(t|jj|jj|j jx|j D]}|jqYWdS)Nz joining pool) rr5rPrrr-r^rrdrgrZ)rrr r r rs    z Pool.joincCsZtjd|jjx9|jrU|jjrU|jjtj dqWdS)Nz7removing tasks from inqueue until task handler finishedr) rr5Z_rlockacquireis_aliver2rryrr)r9 task_handlerrr r r _help_stuff_finish(s    zPool._help_stuff_finishc Cstjdt|_t|_tjd|j||t||jsit|dksitt|_|jdtjdt j |k r|j |rt |ddrtjdx'|D]} | j dkr| jqWtjdt j |k r!|j tjdt j |k rJ|j |rt |ddrtjd x8|D]0} | jrwtjd | j| j qwWdS) Nzfinalizing poolz&helping task handler/workers to finishrzjoining worker handlerrzterminating workerszjoining task handlerzjoining result handlerzjoining pool workerszcleaning up worker %d)rr5rrPrrorr-r.r\rrr0rprpid) clsrr9r:rZworker_handlerrZresult_handlerrrr r r rh1s8    $             zPool._terminate_poolcCs|S)Nr )rr r r __enter___szPool.__enter__cCs|jdS)N)r)rexc_typeZexc_valZexc_tbr r r __exit__bsz Pool.__exit__)#rrrr*rrrGrrqr[rwrJr{r rrrrrrzrr| staticmethodr]rarerr"r1rrr classmethodrhrrr r r r rsF   8        .<     . c@s^eZdZddZddZddZddd Zdd d Zd d ZdS)rcCsJtj|_tt|_||_||_||_|||j s<             * &%@