ž ­ÿfíCc@sldZdZddlZddlZddlmZddlZddlZddlm Z m Z ddl m Z ddl Z ddlZejƒZdadd„Zd ZGd d „d eƒZGd d „d eƒZGdd„deƒZdd„Zdd„Zdd„Zdadadd„ZGdd„de ƒZ!Gdd„dej"ƒZ#ej$eƒdS(u+ Implements ProcessPoolExecutor. The follow diagram and text describe the data-flow through the system: |======================= In-process =====================|== Out-of-process ==| +----------+ +----------+ +--------+ +-----------+ +---------+ | | => | Work Ids | => | | => | Call Q | => | | | | +----------+ | | +-----------+ | | | | | ... | | | | ... | | | | | | 6 | | | | 5, call() | | | | | | 7 | | | | ... | | | | Process | | ... | | Local | +-----------+ | Process | | Pool | +----------+ | Worker | | #1..n | | Executor | | Thread | | | | | +----------- + | | +-----------+ | | | | <=> | Work Items | <=> | | <= | Result Q | <= | | | | +------------+ | | +-----------+ | | | | | 6: call() | | | | ... | | | | | | future | | | | 4, result | | | | | | ... | | | | 3, except | | | +----------+ +------------+ +--------+ +-----------+ +---------+ Executor.submit() called: - creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict - adds the id of the _WorkItem to the "Work Ids" queue Local worker thread: - reads work ids from the "Work Ids" queue and looks up the corresponding WorkItem from the "Work Items" dict: if the work item has been cancelled then it is simply removed from the dict, otherwise it is repackaged as a _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q" until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because calls placed in the "Call Q" can no longer be cancelled with Future.cancel(). - reads _ResultItems from "Result Q", updates the future stored in the "Work Items" dict and deletes the dict entry Process #1..n: - reads _CallItems from "Call Q", executes the calls, and puts the resulting _ResultItems in "Request Q" u"Brian Quinlan (brian@sweetapp.com)iN(u_base(u SimpleQueueuFull(uwaitcCsadattjƒƒ}x!|D]\}}|jdƒqWx|D]\}}|jƒqCWdS(NT(uTrueu _shutdownulistu_threads_queuesuitemsuputuNoneujoin(uitemsutuq((u?/opt/alt/python33/lib64/python3.3/concurrent/futures/process.pyu _python_exitKs u _python_exiticBs |EeZdZdd„ZdS(u _WorkItemcCs(||_||_||_||_dS(N(ufutureufnuargsukwargs(uselfufutureufnuargsukwargs((u?/opt/alt/python33/lib64/python3.3/concurrent/futures/process.pyu__init__[s   u_WorkItem.__init__N(u__name__u __module__u __qualname__u__init__(u __locals__((u?/opt/alt/python33/lib64/python3.3/concurrent/futures/process.pyu _WorkItemZsu _WorkItemcBs&|EeZdZdddd„ZdS(u _ResultItemcCs||_||_||_dS(N(uwork_idu exceptionuresult(uselfuwork_idu exceptionuresult((u?/opt/alt/python33/lib64/python3.3/concurrent/futures/process.pyu__init__bs  u_ResultItem.__init__N(u__name__u __module__u __qualname__uNoneu__init__(u __locals__((u?/opt/alt/python33/lib64/python3.3/concurrent/futures/process.pyu _ResultItemasu _ResultItemcBs |EeZdZdd„ZdS(u _CallItemcCs(||_||_||_||_dS(N(uwork_idufnuargsukwargs(uselfuwork_idufnuargsukwargs((u?/opt/alt/python33/lib64/python3.3/concurrent/futures/process.pyu__init__hs   u_CallItem.__init__N(u__name__u __module__u __qualname__u__init__(u __locals__((u?/opt/alt/python33/lib64/python3.3/concurrent/futures/process.pyu _CallItemgsu _CallItemcCs»x´|jddƒ}|dkr8|jtjƒƒdSy|j|j|jŽ}WnAt k r—}z!|jt |j d|ƒƒWYdd}~XqX|jt |j d|ƒƒqdS(uøEvaluates calls from call_queue and places the results in result_queue. This worker is run in a separate process. Args: call_queue: A multiprocessing.Queue of _CallItems that will be read and evaluated by the worker. result_queue: A multiprocessing.Queue of _ResultItems that will written to by the worker. shutdown: A multiprocessing.Event that will be set as a signal to the worker that it should exit when call_queue is empty. ublockNu exceptionuresultT( ugetuTrueuNoneuputuosugetpidufnuargsukwargsu BaseExceptionu _ResultItemuwork_id(u call_queueu result_queueu call_itemurue((u?/opt/alt/python33/lib64/python3.3/concurrent/futures/process.pyu_process_workerns  u_process_workercCsx–|jƒrdSy|jddƒ}Wntjk rDdSYqX||}|jjƒrŒ|jt||j |j |j ƒddƒq||=qqdS(uMFills call_queue with _WorkItems from pending_work_items. This function never blocks. Args: pending_work_items: A dict mapping work ids to _WorkItems e.g. {5: <_WorkItem...>, 6: <_WorkItem...>, ...} work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids are consumed and the corresponding _WorkItems from pending_work_items are transformed into _CallItems and put in call_queue. call_queue: A multiprocessing.Queue that will be filled with _CallItems derived from _WorkItems. NublockFT( ufullugetuFalseuqueueuEmptyufutureuset_running_or_notify_canceluputu _CallItemufnuargsukwargsuTrue(upending_work_itemsuwork_idsu call_queueuwork_idu work_item((u?/opt/alt/python33/lib64/python3.3/concurrent/futures/process.pyu_add_call_item_to_queueŠs       u_add_call_item_to_queuec s1d‰‡fdd†}‡‡fdd†}|j}xôt||ˆƒdd„ˆjƒDƒ} | snt‚t|g| ƒ} || krœ|jƒ} n™|ƒ‰ˆdk rÌd ˆ_d ˆ_ d‰nx0|j ƒD]"\} } | j j t dƒƒqÙW|jƒxˆjƒD]}|jƒqW|ƒdSt| tƒr€|ƒsSt‚ˆj| ƒ}|jƒˆså|ƒdSne| dk rå|j| jdƒ} | dk rå| jrÌ| j j | jƒqâ| j j| jƒqån|ƒ‰|ƒr$y|s |ƒdSWq$tk r Yq$Xnd‰q9dS( u‘Manages the communication between this process and the worker processes. This function is run in a local thread. Args: executor_reference: A weakref.ref to the ProcessPoolExecutor that owns this thread. Used to determine if the ProcessPoolExecutor has been garbage collected and that this function can exit. process: A list of the multiprocessing.Process instances used as workers. pending_work_items: A dict mapping work ids to _WorkItems e.g. {5: <_WorkItem...>, 6: <_WorkItem...>, ...} work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]). call_queue: A multiprocessing.Queue that will be filled with _CallItems derived from _WorkItems for processing by the process workers. result_queue: A multiprocessing.Queue of _ResultItems generated by the process workers. cstpˆdkpˆjS(N(u _shutdownuNoneu_shutdown_thread((uexecutor(u?/opt/alt/python33/lib64/python3.3/concurrent/futures/process.pyu shutting_downÉsu/_queue_management_worker..shutting_downcsutdd„ˆjƒDƒƒ}x$td|ƒD]}ˆjdƒq/WˆjƒxˆjƒD]}|jƒq]WdS(Ncss|]}|jƒVqdS(N(uis_alive(u.0up((u?/opt/alt/python33/lib64/python3.3/concurrent/futures/process.pyu ÎsuD_queue_management_worker..shutdown_worker..i(usumuvaluesurangeu put_nowaituNoneucloseujoin(unb_children_aliveuiup(u call_queueu processes(u?/opt/alt/python33/lib64/python3.3/concurrent/futures/process.pyushutdown_workerÌs  u1_queue_management_worker..shutdown_workercSsg|]}|j‘qS((usentinel(u.0up((u?/opt/alt/python33/lib64/python3.3/concurrent/futures/process.pyu ßs u,_queue_management_worker..u^A process in the process pool was terminated abruptly while the future was running or pending.NT(uNoneu_readeru_add_call_item_to_queueuvaluesuAssertionErroruwaiturecvuTrueu_brokenu_shutdown_threaduitemsufutureu set_exceptionuBrokenProcessPooluclearu terminateu isinstanceuintupopujoinuwork_idu exceptionu set_resulturesultuFull(uexecutor_referenceu processesupending_work_itemsuwork_ids_queueu call_queueu result_queueu shutting_downushutdown_workerureaderu sentinelsureadyu result_itemuwork_idu work_itemup((u call_queueuexecutoru processesu?/opt/alt/python33/lib64/python3.3/concurrent/futures/process.pyu_queue_management_worker¯sb                 u_queue_management_workerc Cstrtrttƒ‚qndaytjdƒ}Wnttfk rUdSYnX|dkrfdS|dkrvdSd|attƒ‚dS(NuSC_SEM_NSEMS_MAXiiu@system provides too few semaphores (%d available, 256 necessary)Tiÿÿÿÿ(u_system_limits_checkedu_system_limiteduNotImplementedErroruTrueuosusysconfuAttributeErroru ValueError(u nsems_max((u?/opt/alt/python33/lib64/python3.3/concurrent/futures/process.pyu_check_system_limits s    u_check_system_limitscBs|EeZdZdZdS(uBrokenProcessPooluy Raised when a process in a ProcessPoolExecutor terminated abruptly while a future was in the running state. N(u__name__u __module__u __qualname__u__doc__(u __locals__((u?/opt/alt/python33/lib64/python3.3/concurrent/futures/process.pyuBrokenProcessPool7suBrokenProcessPoolcBsz|EeZdZd dd„Zdd„Zdd„Zdd„Zej jj e_ d d d „Z ej j j e _ d S( uProcessPoolExecutorcCs·tƒ|dkr%tjƒ|_n ||_tj|jtƒ|_d|j_ t ƒ|_ t jƒ|_ d|_i|_d|_tjƒ|_d|_d|_i|_dS(u/Initializes a new ProcessPoolExecutor instance. Args: max_workers: The maximum number of processes that can be used to execute the given calls. If None or not given then as many worker processes will be created as the machine has processors. iNTF(u_check_system_limitsuNoneumultiprocessingu cpu_countu _max_workersuQueueuEXTRA_QUEUED_CALLSu _call_queueuTrueu _ignore_epipeu SimpleQueueu _result_queueuqueueu _work_idsu_queue_management_threadu _processesuFalseu_shutdown_threadu threadinguLocku_shutdown_locku_brokenu _queue_countu_pending_work_items(uselfu max_workers((u?/opt/alt/python33/lib64/python3.3/concurrent/futures/process.pyu__init__?s            uProcessPoolExecutor.__init__c Cs£|jdd„}|jdkrŸ|jƒtjdtdtj||ƒ|j |j |j |j |jfƒ|_d|j_|jjƒ|jt|j.weakref_cbutargetuargsT(u _result_queueu_queue_management_threaduNoneu_adjust_process_countu threadinguThreadu_queue_management_workeruweakrefurefu _processesu_pending_work_itemsu _work_idsu _call_queueuTrueudaemonustartu_threads_queues(uselfu weakref_cb((u?/opt/alt/python33/lib64/python3.3/concurrent/futures/process.pyu_start_queue_management_threadds    u2ProcessPoolExecutor._start_queue_management_threadcCshxatt|jƒ|jƒD]D}tjdtd|j|jfƒ}|j ƒ||j|j s%  uProcessPoolExecutorF(%u__doc__u __author__uatexituosuconcurrent.futuresu_baseuqueueumultiprocessingumultiprocessing.queuesu SimpleQueueuFullumultiprocessing.connectionuwaitu threadinguweakrefuWeakKeyDictionaryu_threads_queuesuFalseu _shutdownu _python_exituEXTRA_QUEUED_CALLSuobjectu _WorkItemu _ResultItemu _CallItemu_process_workeru_add_call_item_to_queueu_queue_management_workeru_system_limits_checkeduNoneu_system_limitedu_check_system_limitsu RuntimeErroruBrokenProcessPooluExecutoruProcessPoolExecutoruregister(((u?/opt/alt/python33/lib64/python3.3/concurrent/futures/process.pyu,s4          % o h