e fa@sddgZddlZddlZddlZddlZddlZddlZddlZddlm Z ddlm Z m Z dZ dZ dZejZdd Zd d ZGd d d eZGdddZddZGdddeZdfddddZGdddeZGdddeZeZGdddeZGdddeZGdddeZGd ddeZ dS)!Pool ThreadPoolN)util) get_context TimeoutErrorcCstt|S)N)listmap)argsr 9/opt/alt/python34/lib64/python3.4/multiprocessing/pool.pymapstar+srcCsttj|d|dS)Nrr)r itertoolsstarmap)r r r r starmapstar.src@s(eZdZddZddZdS)RemoteTracebackcCs ||_dS)N)tb)selfrr r r __init__6szRemoteTraceback.__init__cCs|jS)N)r)rr r r __str__8szRemoteTraceback.__str__N)__name__ __module__ __qualname__rrr r r r r5s  rc@s(eZdZddZddZdS)ExceptionWithTracebackcCsDtjt|||}dj|}||_d||_dS)Nz """ %s""") tracebackformat_exceptiontypejoinexcr)rr rr r r r<s zExceptionWithTraceback.__init__cCst|j|jffS)N) rebuild_excr r)rr r r __reduce__Asz!ExceptionWithTraceback.__reduce__N)rrrrr"r r r r r;s  rcCst||_|S)N)r __cause__)r rr r r r!Dsr!cs@eZdZdZfddZddZddZS)MaybeEncodingErrorzVWraps possible unpickleable errors, so they can be safely sent through the socket.csAt||_t||_tt|j|j|jdS)N)reprr valuesuperr$r)rr r&) __class__r r rPszMaybeEncodingError.__init__cCsd|j|jfS)Nz(Error sending result: '%s'. Reason: '%s')r&r )rr r r rUs zMaybeEncodingError.__str__cCsdt|S)Nz)str)rr r r __repr__YszMaybeEncodingError.__repr__)rrr__doc__rrr*r r )r(r r$Ls  r$Fc'Cs|j}|j}t|dr>|jj|jjn|dk rW||nd}xx|dks~|r||kry |} Wn&ttfk rtj dPYnX| dkrtj dPn| \} } } } }yd| | |f}WnLt k rM}z,|r/t ||j }nd|f}WYdd}~XnXy|| | |fWnbt k r}zBt ||d}tj d||| | d|ffWYdd}~XnX|d7}q`Wtj d |dS) N_writerrz)worker got EOFError or OSError -- exitingzworker got sentinel -- exitingTFrz0Possible encoding error while sending result: %szworker exiting after %d tasks)putgethasattrr,close_readerEOFErrorOSErrorrdebug Exceptionr __traceback__r$)inqueueoutqueue initializerinitargsZmaxtasksZwrap_exceptionr-r.Z completedtaskjobifuncr kwdsresultewrappedr r r worker]sB     !     ,rCc@seZdZdZdZddZddfddddZdd Zd d Zd d Z ddZ fiddZ dddZ dddZ dddddZdddZdddZfiddddZddddd Zdddd!d"Zed#d$Zed%d&Zed'd(Zed)d*Zd+d,Zd-d.Zd/d0Zd1d2Zed3d4Zed5d6Zd7d8Z d9d:Z!dS);rzS Class which supports an async version of applying functions to arguments. TcOs|jj||S)N)_ctxProcess)rr r?r r r rEsz Pool.ProcessNc Cs,|p t|_|jtj|_i|_t|_||_ ||_ ||_ |dkryt j psd}n|dkrtdn|dk rt| rtdn||_g|_|jtjdtjd|f|_d|j_t|j_|jjtjdtjd|j|j|j|j|jf|_d|j_t|j_|jjtjdtjd|j|j |jf|_!d|j!_t|j!_|j!jt"j#||j$d|j|j%|j|j|j|j|j!|jfdd|_&dS) Nrz&Number of processes must be at least 1zinitializer must be a callabletargetr TZ exitpriority)'rrD _setup_queuesqueueQueue _taskqueue_cacheRUN_state_maxtasksperchild _initializer _initargsos cpu_count ValueErrorcallable TypeError _processes_pool_repopulate_pool threadingZThreadr_handle_workers_worker_handlerdaemonstart _handle_tasks _quick_put _outqueue _task_handler_handle_results _quick_get_result_handlerrZFinalize_terminate_pool_inqueue _terminate)r processesr9r:Zmaxtasksperchildcontextr r r rsT                            z Pool.__init__cCszd}xmttt|jD]P}|j|}|jdk r"tjd||jd}|j|=q"q"W|S)zCleanup after any worker processes which have exited due to reaching their specified lifetime. Returns True if any workers were cleaned up. FNzcleaning up worker %dT)reversedrangelenrXexitcoderr4r)rZcleanedr=rCr r r _join_exited_workerss"  zPool._join_exited_workersc Csxt|jt|jD]}|jdtd|j|j|j|j |j |j f}|jj ||j jdd|_ d|_|jtjdqWdS)zBring the number of pool processes up to the specified number, for use after reaping workers which have exited. rFr rEZ PoolWorkerTz added workerN)rlrWrmrXrErCrgrarPrQrO_wrap_exceptionappendnamereplacer]r^rr4)rr=wr r r rYs#    zPool._repopulate_poolcCs|jr|jndS)zEClean up any exited workers and start replacements for them. N)rorY)rr r r _maintain_pools zPool._maintain_poolcCsL|jj|_|jj|_|jjj|_|jjj|_ dS)N) rDZ SimpleQueuergrar,sendr`r1recvrd)rr r r rHszPool._setup_queuescCs|j|||jS)z6 Equivalent of `func(*args, **kwds)`. ) apply_asyncr.)rr>r r?r r r applysz Pool.applycCs|j||t|jS)zx Apply `func` to each element in `iterable`, collecting the results in a list that is returned. ) _map_asyncrr.)rr>iterable chunksizer r r r szPool.mapcCs|j||t|jS)z Like `map()` method but the elements of the `iterable` are expected to be iterables as well and will be unpacked as arguments. Hence `func` and (a, b) becomes func(a, b). )rzrr.)rr>r{r|r r r rsz Pool.starmapcCs|j||t|||S)z= Asynchronous version of `starmap()` method. )rzr)rr>r{r|callbackerror_callbackr r r starmap_asyncszPool.starmap_asyncrcs|jtkrtdn|dkrrt|j|jjfddt|DjfSt j ||}t|j|jjfddt|DjfddDSdS)zP Equivalent of `map()` -- can be MUCH slower than `Pool.map()`. zPool not runningrc3s0|]&\}}j||fifVqdS)N)_job).0r=x)r>r@r r szPool.imap..c3s0|]&\}}j|t|fifVqdS)N)rr)rr=r)r@r r r%scss"|]}|D] }|Vq qdS)Nr )rchunkitemr r r r'sN) rNrMrT IMapIteratorrLrKr- enumerate _set_lengthr _get_tasks)rr>r{r| task_batchesr )r>r@r imaps z Pool.imapcs|jtkrtdn|dkrrt|j|jjfddt|DjfSt j ||}t|j|jjfddt|DjfddDSdS)zL Like `imap()` method but ordering of results is arbitrary. zPool not runningrc3s0|]&\}}j||fifVqdS)N)r)rr=r)r>r@r r r1sz&Pool.imap_unordered..c3s0|]&\}}j|t|fifVqdS)N)rr)rr=r)r@r r r8scss"|]}|D] }|Vq qdS)Nr )rrrr r r r:sN) rNrMrTIMapUnorderedIteratorrLrKr-rrrr)rr>r{r|rr )r>r@r imap_unordered)s zPool.imap_unorderedcCsb|jtkrtdnt|j||}|jj|jd|||fgdf|S)z; Asynchronous version of `apply()` method. zPool not runningN)rNrMrT ApplyResultrLrKr-r)rr>r r?r}r~r@r r r rx<s +zPool.apply_asynccCs|j||t|||S)z9 Asynchronous version of `map()` method. )rzr)rr>r{r|r}r~r r r map_asyncGszPool.map_asyncc s|jtkrtdnt|ds<t|}n|dkrtt|t|jd\}}|r|d7}qnt|dkrd}ntj |||}t |j |t||d||j j fdd t|DdfS) zY Helper function to implement map, starmap and their async counterparts. zPool not running__len__Nrrr~c3s0|]&\}}j||fifVqdS)N)r)rr=r)mapperr@r r rcsz"Pool._map_async..)rNrMrTr/r divmodrmrXrr MapResultrLrKr-r) rr>r{rr|r}r~Zextrarr )rr@r rzOs  (  zPool._map_asynccCsrtj}xB|jtks6|jrP|jtkrP|jtjdqW|j j dt j ddS)Ng?zworker handler exiting) rZcurrent_threadrNrMrL TERMINATErutimesleeprKr-rr4)poolthreadr r r r[gs  * zPool._handle_workerscCs>tj}xt|jdD]\}}d}d } yxt|D]\} }|jrmtjdPny||WqGtk r} zN|dd\} } y|| j | d| fWnt k rYnXWYdd} ~ XqGXqGW|rtjd|| dnwPWqtk r} zx|rD|ddnd \} } | |kr}|| j | dd| fn|rtjd|| dnWYdd} ~ XqXqWtjdyFtjd|j dtjd x|D]}|dqWWnt k r,tjd YnXtjd dS)Nrz'task handler found thread._state != RUNrFzdoing set_length()rztask handler got sentinelz/task handler sending sentinel to result handlerz(task handler sending sentinel to workersz/task handler got OSError when sending sentinelsztask handler exiting)rr) rZriterr.rrNrr4r5_setKeyErrorr-r3) taskqueuer-r8rcacherZtaskseqZ set_lengthr;r=rAr<ZindZexpr r r r_tsN     " ! (      zPool._handle_taskscCstj}xy |}Wn)ttfk rGtjddSYnX|jrbtjdPn|dkrtjdPn|\}}}y||j||Wqtk rYqXqWx|rn|jt krny |}Wn)ttfk rtjddSYnX|dkr/tjdqn|\}}}y||j||Wqtk rjYqXqWt |drtjdy5x.t dD] }|j j sPn|qWWqttfk rYqXntjdt||jdS) Nz.result handler got EOFError/OSError -- exitingz,result handler found thread._state=TERMINATEzresult handler got sentinelz&result handler ignoring extra sentinelr1z"ensuring that outqueue is not full z7result handler exiting: len(cache)=%s, thread._state=%s)rZrr3r2rr4rNrrrr/rlr1pollrm)r8r.rrr;r<r=objr r r rcsV                   zPool._handle_resultsccsDt|}x1ttj||}|s1dS||fVqWdS)N)rtuplerislice)r>itsizerr r r rs  zPool._get_taskscCstddS)Nz:pool objects cannot be passed between processes or pickled)NotImplementedError)rr r r r"szPool.__reduce__cCs8tjd|jtkr4t|_t|j_ndS)Nz closing pool)rr4rNrMCLOSEr\)rr r r r0s  z Pool.closecCs0tjdt|_t|j_|jdS)Nzterminating pool)rr4rrNr\rh)rr r r terminates   zPool.terminatecCsVtjd|jj|jj|jjx|jD]}|jq>WdS)Nz joining pool)rr4r\rrbrerX)rrr r r rs     z Pool.joincCsZtjd|jjx9|jrU|jjrU|jjtj dqWdS)Nz7removing tasks from inqueue until task handler finishedr) rr4Z_rlockacquireis_aliver1rrwrr)r7 task_handlerrr r r _help_stuff_finishs    zPool._help_stuff_finishc Cstjdt|_t|_tjd|j||t|t|_|jdtjdtj|k r|j n|rt |ddrtjdx-|D]"} | j dkr| j qqWntjdtj|k r |j ntjdtj|k r5|j n|rt |ddrtjd x>|D]3} | j rbtjd | j| j qbqbWndS) Nzfinalizing poolz&helping task handler/workers to finishzjoining worker handlerrrzterminating workerszjoining task handlerzjoining result handlerzjoining pool workerszcleaning up worker %d)rr4rrNrrmr-rZrrr/rnrrpid) clsrr7r8rZworker_handlerrZresult_handlerrrr r r rf s6                 zPool._terminate_poolcCs|S)Nr )rr r r __enter__;szPool.__enter__cCs|jdS)N)r)rexc_typeZexc_valZexc_tbr r r __exit__>sz Pool.__exit__)"rrrr+rprErrorYrurHryr rrrrrxrrz staticmethodr[r_rcrr"r0rrr classmethodrfrrr r r r rsD   8       3:     . c@s^eZdZddZddZddZddd Zdd d Zd d ZdS)rcCsJtj|_tt|_||_||_||_|||j s:             ,&%I