X7ROOT File Manager
Current Path:
/opt/alt/python33/lib64/python3.3/multiprocessing
opt
/
alt
/
python33
/
lib64
/
python3.3
/
multiprocessing
/
??
..
??
__init__.py
(6.66 KB)
??
__pycache__
??
connection.py
(29.37 KB)
??
dummy
??
forking.py
(14.4 KB)
??
heap.py
(6.85 KB)
??
managers.py
(35.23 KB)
??
pool.py
(22.86 KB)
??
process.py
(8.64 KB)
??
queues.py
(11.28 KB)
??
reduction.py
(9.27 KB)
??
sharedctypes.py
(5.88 KB)
??
synchronize.py
(10.48 KB)
??
util.py
(9.68 KB)
Editing: pool.py
# # Module providing the `Pool` class for managing a process pool # # multiprocessing/pool.py # # Copyright (c) 2006-2008, R Oudkerk # Licensed to PSF under a Contributor Agreement. # __all__ = ['Pool'] # # Imports # import threading import queue import itertools import collections import time from multiprocessing import Process, cpu_count, TimeoutError from multiprocessing.util import Finalize, debug # # Constants representing the state of a pool # RUN = 0 CLOSE = 1 TERMINATE = 2 # # Miscellaneous # job_counter = itertools.count() def mapstar(args): return list(map(*args)) def starmapstar(args): return list(itertools.starmap(args[0], args[1])) # # Code run by worker processes # class MaybeEncodingError(Exception): """Wraps possible unpickleable errors, so they can be safely sent through the socket.""" def __init__(self, exc, value): self.exc = repr(exc) self.value = repr(value) super(MaybeEncodingError, self).__init__(self.exc, self.value) def __str__(self): return "Error sending result: '%s'. Reason: '%s'" % (self.value, self.exc) def __repr__(self): return "<MaybeEncodingError: %s>" % str(self) def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None): assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0) put = outqueue.put get = inqueue.get if hasattr(inqueue, '_writer'): inqueue._writer.close() outqueue._reader.close() if initializer is not None: initializer(*initargs) completed = 0 while maxtasks is None or (maxtasks and completed < maxtasks): try: task = get() except (EOFError, IOError): debug('worker got EOFError or IOError -- exiting') break if task is None: debug('worker got sentinel -- exiting') break job, i, func, args, kwds = task try: result = (True, func(*args, **kwds)) except Exception as e: result = (False, e) try: put((job, i, result)) except Exception as e: wrapped = MaybeEncodingError(e, result[1]) debug("Possible encoding error while sending result: %s" % ( wrapped)) put((job, i, (False, wrapped))) completed += 1 debug('worker exiting after %d tasks' % completed) # # Class representing a process pool # class Pool(object): ''' Class which supports an async version of applying functions to arguments. ''' Process = Process def __init__(self, processes=None, initializer=None, initargs=(), maxtasksperchild=None): self._setup_queues() self._taskqueue = queue.Queue() self._cache = {} self._state = RUN self._maxtasksperchild = maxtasksperchild self._initializer = initializer self._initargs = initargs if processes is None: try: processes = cpu_count() except NotImplementedError: processes = 1 if processes < 1: raise ValueError("Number of processes must be at least 1") if initializer is not None and not callable(initializer): raise TypeError('initializer must be a callable') self._processes = processes self._pool = [] self._repopulate_pool() self._worker_handler = threading.Thread( target=Pool._handle_workers, args=(self, ) ) self._worker_handler.daemon = True self._worker_handler._state = RUN self._worker_handler.start() self._task_handler = threading.Thread( target=Pool._handle_tasks, args=(self._taskqueue, self._quick_put, self._outqueue, self._pool, self._cache) ) self._task_handler.daemon = True self._task_handler._state = RUN self._task_handler.start() self._result_handler = threading.Thread( target=Pool._handle_results, args=(self._outqueue, self._quick_get, self._cache) ) self._result_handler.daemon = True self._result_handler._state = RUN self._result_handler.start() self._terminate = Finalize( self, self._terminate_pool, args=(self._taskqueue, self._inqueue, self._outqueue, self._pool, self._worker_handler, self._task_handler, self._result_handler, self._cache), exitpriority=15 ) def _join_exited_workers(self): """Cleanup after any worker processes which have exited due to reaching their specified lifetime. Returns True if any workers were cleaned up. """ cleaned = False for i in reversed(range(len(self._pool))): worker = self._pool[i] if worker.exitcode is not None: # worker exited debug('cleaning up worker %d' % i) worker.join() cleaned = True del self._pool[i] return cleaned def _repopulate_pool(self): """Bring the number of pool processes up to the specified number, for use after reaping workers which have exited. """ for i in range(self._processes - len(self._pool)): w = self.Process(target=worker, args=(self._inqueue, self._outqueue, self._initializer, self._initargs, self._maxtasksperchild) ) self._pool.append(w) w.name = w.name.replace('Process', 'PoolWorker') w.daemon = True w.start() debug('added worker') def _maintain_pool(self): """Clean up any exited workers and start replacements for them. """ if self._join_exited_workers(): self._repopulate_pool() def _setup_queues(self): from .queues import SimpleQueue self._inqueue = SimpleQueue() self._outqueue = SimpleQueue() self._quick_put = self._inqueue._writer.send self._quick_get = self._outqueue._reader.recv def apply(self, func, args=(), kwds={}): ''' Equivalent of `func(*args, **kwds)`. ''' assert self._state == RUN return self.apply_async(func, args, kwds).get() def map(self, func, iterable, chunksize=None): ''' Apply `func` to each element in `iterable`, collecting the results in a list that is returned. ''' return self._map_async(func, iterable, mapstar, chunksize).get() def starmap(self, func, iterable, chunksize=None): ''' Like `map()` method but the elements of the `iterable` are expected to be iterables as well and will be unpacked as arguments. Hence `func` and (a, b) becomes func(a, b). ''' return self._map_async(func, iterable, starmapstar, chunksize).get() def starmap_async(self, func, iterable, chunksize=None, callback=None, error_callback=None): ''' Asynchronous version of `starmap()` method. ''' return self._map_async(func, iterable, starmapstar, chunksize, callback, error_callback) def imap(self, func, iterable, chunksize=1): ''' Equivalent of `map()` -- can be MUCH slower than `Pool.map()`. ''' if self._state != RUN: raise ValueError("Pool not running") if chunksize == 1: result = IMapIterator(self._cache) self._taskqueue.put((((result._job, i, func, (x,), {}) for i, x in enumerate(iterable)), result._set_length)) return result else: assert chunksize > 1 task_batches = Pool._get_tasks(func, iterable, chunksize) result = IMapIterator(self._cache) self._taskqueue.put((((result._job, i, mapstar, (x,), {}) for i, x in enumerate(task_batches)), result._set_length)) return (item for chunk in result for item in chunk) def imap_unordered(self, func, iterable, chunksize=1): ''' Like `imap()` method but ordering of results is arbitrary. ''' if self._state != RUN: raise ValueError("Pool not running") if chunksize == 1: result = IMapUnorderedIterator(self._cache) self._taskqueue.put((((result._job, i, func, (x,), {}) for i, x in enumerate(iterable)), result._set_length)) return result else: assert chunksize > 1 task_batches = Pool._get_tasks(func, iterable, chunksize) result = IMapUnorderedIterator(self._cache) self._taskqueue.put((((result._job, i, mapstar, (x,), {}) for i, x in enumerate(task_batches)), result._set_length)) return (item for chunk in result for item in chunk) def apply_async(self, func, args=(), kwds={}, callback=None, error_callback=None): ''' Asynchronous version of `apply()` method. ''' if self._state != RUN: raise ValueError("Pool not running") result = ApplyResult(self._cache, callback, error_callback) self._taskqueue.put(([(result._job, None, func, args, kwds)], None)) return result def map_async(self, func, iterable, chunksize=None, callback=None, error_callback=None): ''' Asynchronous version of `map()` method. ''' return self._map_async(func, iterable, mapstar, chunksize, callback, error_callback) def _map_async(self, func, iterable, mapper, chunksize=None, callback=None, error_callback=None): ''' Helper function to implement map, starmap and their async counterparts. ''' if self._state != RUN: raise ValueError("Pool not running") if not hasattr(iterable, '__len__'): iterable = list(iterable) if chunksize is None: chunksize, extra = divmod(len(iterable), len(self._pool) * 4) if extra: chunksize += 1 if len(iterable) == 0: chunksize = 0 task_batches = Pool._get_tasks(func, iterable, chunksize) result = MapResult(self._cache, chunksize, len(iterable), callback, error_callback=error_callback) self._taskqueue.put((((result._job, i, mapper, (x,), {}) for i, x in enumerate(task_batches)), None)) return result @staticmethod def _handle_workers(pool): thread = threading.current_thread() # Keep maintaining workers until the cache gets drained, unless the pool # is terminated. while thread._state == RUN or (pool._cache and thread._state != TERMINATE): pool._maintain_pool() time.sleep(0.1) # send sentinel to stop workers pool._taskqueue.put(None) debug('worker handler exiting') @staticmethod def _handle_tasks(taskqueue, put, outqueue, pool, cache): thread = threading.current_thread() for taskseq, set_length in iter(taskqueue.get, None): i = -1 for i, task in enumerate(taskseq): if thread._state: debug('task handler found thread._state != RUN') break try: put(task) except Exception as e: job, ind = task[:2] try: cache[job]._set(ind, (False, e)) except KeyError: pass else: if set_length: debug('doing set_length()') set_length(i+1) continue break else: debug('task handler got sentinel') try: # tell result handler to finish when cache is empty debug('task handler sending sentinel to result handler') outqueue.put(None) # tell workers there is no more work debug('task handler sending sentinel to workers') for p in pool: put(None) except IOError: debug('task handler got IOError when sending sentinels') debug('task handler exiting') @staticmethod def _handle_results(outqueue, get, cache): thread = threading.current_thread() while 1: try: task = get() except (IOError, EOFError): debug('result handler got EOFError/IOError -- exiting') return if thread._state: assert thread._state == TERMINATE debug('result handler found thread._state=TERMINATE') break if task is None: debug('result handler got sentinel') break job, i, obj = task try: cache[job]._set(i, obj) except KeyError: pass while cache and thread._state != TERMINATE: try: task = get() except (IOError, EOFError): debug('result handler got EOFError/IOError -- exiting') return if task is None: debug('result handler ignoring extra sentinel') continue job, i, obj = task try: cache[job]._set(i, obj) except KeyError: pass if hasattr(outqueue, '_reader'): debug('ensuring that outqueue is not full') # If we don't make room available in outqueue then # attempts to add the sentinel (None) to outqueue may # block. There is guaranteed to be no more than 2 sentinels. try: for i in range(10): if not outqueue._reader.poll(): break get() except (IOError, EOFError): pass debug('result handler exiting: len(cache)=%s, thread._state=%s', len(cache), thread._state) @staticmethod def _get_tasks(func, it, size): it = iter(it) while 1: x = tuple(itertools.islice(it, size)) if not x: return yield (func, x) def __reduce__(self): raise NotImplementedError( 'pool objects cannot be passed between processes or pickled' ) def close(self): debug('closing pool') if self._state == RUN: self._state = CLOSE self._worker_handler._state = CLOSE def terminate(self): debug('terminating pool') self._state = TERMINATE self._worker_handler._state = TERMINATE self._terminate() def join(self): debug('joining pool') assert self._state in (CLOSE, TERMINATE) self._worker_handler.join() self._task_handler.join() self._result_handler.join() for p in self._pool: p.join() @staticmethod def _help_stuff_finish(inqueue, task_handler, size): # task_handler may be blocked trying to put items on inqueue debug('removing tasks from inqueue until task handler finished') inqueue._rlock.acquire() while task_handler.is_alive() and inqueue._reader.poll(): inqueue._reader.recv() time.sleep(0) @classmethod def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, worker_handler, task_handler, result_handler, cache): # this is guaranteed to only be called once debug('finalizing pool') worker_handler._state = TERMINATE task_handler._state = TERMINATE debug('helping task handler/workers to finish') cls._help_stuff_finish(inqueue, task_handler, len(pool)) assert result_handler.is_alive() or len(cache) == 0 result_handler._state = TERMINATE outqueue.put(None) # sentinel # We must wait for the worker handler to exit before terminating # workers because we don't want workers to be restarted behind our back. debug('joining worker handler') if threading.current_thread() is not worker_handler: worker_handler.join() # Terminate workers which haven't already finished. if pool and hasattr(pool[0], 'terminate'): debug('terminating workers') for p in pool: if p.exitcode is None: p.terminate() debug('joining task handler') if threading.current_thread() is not task_handler: task_handler.join() debug('joining result handler') if threading.current_thread() is not result_handler: result_handler.join() if pool and hasattr(pool[0], 'terminate'): debug('joining pool workers') for p in pool: if p.is_alive(): # worker has not yet exited debug('cleaning up worker %d' % p.pid) p.join() def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.terminate() # # Class whose instances are returned by `Pool.apply_async()` # class ApplyResult(object): def __init__(self, cache, callback, error_callback): self._event = threading.Event() self._job = next(job_counter) self._cache = cache self._callback = callback self._error_callback = error_callback cache[self._job] = self def ready(self): return self._event.is_set() def successful(self): assert self.ready() return self._success def wait(self, timeout=None): self._event.wait(timeout) def get(self, timeout=None): self.wait(timeout) if not self.ready(): raise TimeoutError if self._success: return self._value else: raise self._value def _set(self, i, obj): self._success, self._value = obj if self._callback and self._success: self._callback(self._value) if self._error_callback and not self._success: self._error_callback(self._value) self._event.set() del self._cache[self._job] AsyncResult = ApplyResult # create alias -- see #17805 # # Class whose instances are returned by `Pool.map_async()` # class MapResult(ApplyResult): def __init__(self, cache, chunksize, length, callback, error_callback): ApplyResult.__init__(self, cache, callback, error_callback=error_callback) self._success = True self._value = [None] * length self._chunksize = chunksize if chunksize <= 0: self._number_left = 0 self._event.set() del cache[self._job] else: self._number_left = length//chunksize + bool(length % chunksize) def _set(self, i, success_result): success, result = success_result if success: self._value[i*self._chunksize:(i+1)*self._chunksize] = result self._number_left -= 1 if self._number_left == 0: if self._callback: self._callback(self._value) del self._cache[self._job] self._event.set() else: self._success = False self._value = result if self._error_callback: self._error_callback(self._value) del self._cache[self._job] self._event.set() # # Class whose instances are returned by `Pool.imap()` # class IMapIterator(object): def __init__(self, cache): self._cond = threading.Condition(threading.Lock()) self._job = next(job_counter) self._cache = cache self._items = collections.deque() self._index = 0 self._length = None self._unsorted = {} cache[self._job] = self def __iter__(self): return self def next(self, timeout=None): self._cond.acquire() try: try: item = self._items.popleft() except IndexError: if self._index == self._length: raise StopIteration self._cond.wait(timeout) try: item = self._items.popleft() except IndexError: if self._index == self._length: raise StopIteration raise TimeoutError finally: self._cond.release() success, value = item if success: return value raise value __next__ = next # XXX def _set(self, i, obj): self._cond.acquire() try: if self._index == i: self._items.append(obj) self._index += 1 while self._index in self._unsorted: obj = self._unsorted.pop(self._index) self._items.append(obj) self._index += 1 self._cond.notify() else: self._unsorted[i] = obj if self._index == self._length: del self._cache[self._job] finally: self._cond.release() def _set_length(self, length): self._cond.acquire() try: self._length = length if self._index == self._length: self._cond.notify() del self._cache[self._job] finally: self._cond.release() # # Class whose instances are returned by `Pool.imap_unordered()` # class IMapUnorderedIterator(IMapIterator): def _set(self, i, obj): self._cond.acquire() try: self._items.append(obj) self._index += 1 self._cond.notify() if self._index == self._length: del self._cache[self._job] finally: self._cond.release() # # # class ThreadPool(Pool): from .dummy import Process def __init__(self, processes=None, initializer=None, initargs=()): Pool.__init__(self, processes, initializer, initargs) def _setup_queues(self): self._inqueue = queue.Queue() self._outqueue = queue.Queue() self._quick_put = self._inqueue.put self._quick_get = self._outqueue.get @staticmethod def _help_stuff_finish(inqueue, task_handler, size): # put sentinels at head of inqueue to make workers finish inqueue.not_empty.acquire() try: inqueue.queue.clear() inqueue.queue.extend([None] * size) inqueue.not_empty.notify_all() finally: inqueue.not_empty.release()
Upload File
Create Folder