î j f¦Dã@svdZdZddlZddlZddlmZddlZddlmZddlZddlm Z ddl m Z ddl Z ddl Z e jƒZdad d „Zd ZGd d „d eƒZGdd„deƒZGdd„deƒZdd„Zdd„Zdd„Zdadadd„ZGdd„deƒZGdd„dejƒZ ej!eƒdS)a* Implements ProcessPoolExecutor. The follow diagram and text describe the data-flow through the system: |======================= In-process =====================|== Out-of-process ==| +----------+ +----------+ +--------+ +-----------+ +---------+ | | => | Work Ids | => | | => | Call Q | => | | | | +----------+ | | +-----------+ | | | | | ... | | | | ... | | | | | | 6 | | | | 5, call() | | | | | | 7 | | | | ... | | | | Process | | ... | | Local | +-----------+ | Process | | Pool | +----------+ | Worker | | #1..n | | Executor | | Thread | | | | | +----------- + | | +-----------+ | | | | <=> | Work Items | <=> | | <= | Result Q | <= | | | | +------------+ | | +-----------+ | | | | | 6: call() | | | | ... | | | | | | future | | | | 4, result | | | | | | ... | | | | 3, except | | | +----------+ +------------+ +--------+ +-----------+ +---------+ Executor.submit() called: - creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict - adds the id of the _WorkItem to the "Work Ids" queue Local worker thread: - reads work ids from the "Work Ids" queue and looks up the corresponding WorkItem from the "Work Items" dict: if the work item has been cancelled then it is simply removed from the dict, otherwise it is repackaged as a _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q" until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because calls placed in the "Call Q" can no longer be cancelled with Future.cancel(). - reads _ResultItems from "Result Q", updates the future stored in the "Work Items" dict and deletes the dict entry Process #1..n: - reads _CallItems from "Call Q", executes the calls, and puts the resulting _ResultItems in "Result Q" z"Brian Quinlan (brian@sweetapp.com)éN)Ú_base)ÚFull)Ú SimpleQueue)ÚwaitFcCsadattjƒƒ}x!|D]\}}|jdƒqWx|D]\}}|jƒqCWdS)NT)Ú _shutdownÚlistÚ_threads_queuesÚitemsÚputÚjoin)r ÚtÚq©rú?/opt/alt/python34/lib64/python3.4/concurrent/futures/process.pyÚ _python_exitLs réc@seZdZdd„ZdS)Ú _WorkItemcCs(||_||_||_||_dS)N)ÚfutureÚfnÚargsÚkwargs)ÚselfrrrrrrrÚ__init__\s   z_WorkItem.__init__N)Ú__name__Ú __module__Ú __qualname__rrrrrr[s rc@s"eZdZdddd„ZdS)Ú _ResultItemNcCs||_||_||_dS)N)Úwork_idÚ exceptionÚresult)rrrrrrrrcs  z_ResultItem.__init__)rrrrrrrrrbs rc@seZdZdd„ZdS)Ú _CallItemcCs(||_||_||_||_dS)N)rrrr)rrrrrrrrris   z_CallItem.__init__N)rrrrrrrrr hs r cCs¼xµ|jddƒ}|dkr8|jtjƒƒdSy|j|j|jŽ}WnAtk r—}z!|jt|j d|ƒƒWYdd}~XqX|jt|j d|ƒƒqWdS)aøEvaluates calls from call_queue and places the results in result_queue. This worker is run in a separate process. Args: call_queue: A multiprocessing.Queue of _CallItems that will be read and evaluated by the worker. result_queue: A multiprocessing.Queue of _ResultItems that will written to by the worker. shutdown: A multiprocessing.Event that will be set as a signal to the worker that it should exit when call_queue is empty. ÚblockTNrr) Úgetr ÚosÚgetpidrrrÚ BaseExceptionrr)Ú call_queueÚ result_queueZ call_itemÚrÚerrrÚ_process_workeros  r*cCsžx—|jƒrdSy|jddƒ}Wntjk rDdSYqX||}|jjƒrŒ|jt||j|j |j ƒddƒq||=qqWdS)aMFills call_queue with _WorkItems from pending_work_items. This function never blocks. Args: pending_work_items: A dict mapping work ids to _WorkItems e.g. {5: <_WorkItem...>, 6: <_WorkItem...>, ...} work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids are consumed and the corresponding _WorkItems from pending_work_items are transformed into _CallItems and put in call_queue. call_queue: A multiprocessing.Queue that will be filled with _CallItems derived from _WorkItems. Nr!FT) Úfullr"ÚqueueZEmptyrZset_running_or_notify_cancelr r rrr)Úpending_work_itemsZwork_idsr&rÚ work_itemrrrÚ_add_call_item_to_queue‹s       r/c s8d‰‡fdd†}‡‡fdd†}|j}xût||ˆƒdd„ˆjƒDƒ} | snt‚t|g| ƒ} || krœ|jƒ} nœ|ƒ‰ˆdk rÌdˆ_dˆ_d‰nx3|jƒD]%\} } | j j t d ƒƒ~ qÙW|j ƒxˆjƒD]}|j ƒqW|ƒdSt| tƒrƒ|ƒsVt‚ˆj| ƒ}|jƒˆsë|ƒdSnh| dk rë|j| jdƒ} | dk rë| jrÏ| j j | jƒn| j j| jƒ~ qën|ƒ‰|ƒr*y|s|ƒdSWq*tk r&Yq*Xnd‰q9WdS) a‘Manages the communication between this process and the worker processes. This function is run in a local thread. Args: executor_reference: A weakref.ref to the ProcessPoolExecutor that owns this thread. Used to determine if the ProcessPoolExecutor has been garbage collected and that this function can exit. process: A list of the multiprocessing.Process instances used as workers. pending_work_items: A dict mapping work ids to _WorkItems e.g. {5: <_WorkItem...>, 6: <_WorkItem...>, ...} work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]). call_queue: A multiprocessing.Queue that will be filled with _CallItems derived from _WorkItems for processing by the process workers. result_queue: A multiprocessing.Queue of _ResultItems generated by the process workers. NcstpˆdkpˆjS)N)rÚ_shutdown_threadr)ÚexecutorrrÚ shutting_downÊsz/_queue_management_worker..shutting_downcsutdd„ˆjƒDƒƒ}x$td|ƒD]}ˆjdƒq/WˆjƒxˆjƒD]}|jƒq]WdS)Ncss|]}|jƒVqdS)N)Zis_alive)Ú.0Úprrrú ÏszD_queue_management_worker..shutdown_worker..r)ÚsumÚvaluesÚrangeZ put_nowaitÚcloser )Znb_children_aliveÚir4)r&Ú processesrrÚshutdown_workerÍs  z1_queue_management_worker..shutdown_workercSsg|]}|j‘qSr)Úsentinel)r3r4rrrú às z,_queue_management_worker..Tz^A process in the process pool was terminated abruptly while the future was running or pending.)Z_readerr/r7ÚAssertionErrorrZrecvÚ_brokenr0r rZ set_exceptionÚBrokenProcessPoolÚclearZ terminateÚ isinstanceÚintÚpopr rrZ set_resultrr)Zexecutor_referencer;r-Zwork_ids_queuer&r'r2r<ÚreaderZ sentinelsZreadyZ result_itemrr.r4r)r&r1r;rÚ_queue_management_worker°sf                   rGc Cstrtrttƒ‚qndaytjdƒ}Wnttfk rUdSYnX|dkrfdS|dkrvdSd|attƒ‚dS)NTÚSC_SEM_NSEMS_MAXréz@system provides too few semaphores (%d available, 256 necessary)éÿÿÿÿ)Ú_system_limits_checkedÚ_system_limitedÚNotImplementedErrorr#ÚsysconfÚAttributeErrorÚ ValueError)Z nsems_maxrrrÚ_check_system_limits%s    rQc@seZdZdZdS)rAzy Raised when a process in a ProcessPoolExecutor terminated abruptly while a future was in the running state. N)rrrÚ__doc__rrrrrA<s rAc@sveZdZddd„Zdd„Zdd„Zdd „Zejjj e_ d d d „Z ejj j e _ dS) ÚProcessPoolExecutorNcCs½tƒ|dkr+tjƒp"d|_n ||_tj|jtƒ|_d|j_t ƒ|_ t jƒ|_ d|_ i|_d|_tjƒ|_d|_d|_i|_dS)a/Initializes a new ProcessPoolExecutor instance. Args: max_workers: The maximum number of processes that can be used to execute the given calls. If None or not given then as many worker processes will be created as the machine has processors. NrTFr)rQr#Ú cpu_countÚ _max_workersÚmultiprocessingZQueueÚEXTRA_QUEUED_CALLSÚ _call_queueZ _ignore_epiperÚ _result_queuer,Ú _work_idsÚ_queue_management_threadÚ _processesr0Ú threadingZLockÚ_shutdown_lockr@Ú _queue_countÚ_pending_work_items)rZ max_workersrrrrDs            zProcessPoolExecutor.__init__c Cs£|jdd„}|jdkrŸ|jƒtjdtdtj||ƒ|j|j |j |j |jfƒ|_d|j_ |jj ƒ|jt|j.weakref_cbÚtargetrT)rYr[Ú_adjust_process_countr]ZThreadrGÚweakrefÚrefr\r`rZrXZdaemonÚstartr)rrbrrrÚ_start_queue_management_threadis    z2ProcessPoolExecutor._start_queue_management_threadcCshxatt|jƒ|jƒD]D}tjdtd|j|jfƒ}|j ƒ||j|j ,s6          % s h