X7ROOT File Manager
Current Path:
/opt/alt/python33/lib64/python3.3/multiprocessing
opt
/
alt
/
python33
/
lib64
/
python3.3
/
multiprocessing
/
??
..
??
__init__.py
(6.66 KB)
??
__pycache__
??
connection.py
(29.37 KB)
??
dummy
??
forking.py
(14.4 KB)
??
heap.py
(6.85 KB)
??
managers.py
(35.23 KB)
??
pool.py
(22.86 KB)
??
process.py
(8.64 KB)
??
queues.py
(11.28 KB)
??
reduction.py
(9.27 KB)
??
sharedctypes.py
(5.88 KB)
??
synchronize.py
(10.48 KB)
??
util.py
(9.68 KB)
Editing: reduction.py
# # Module to allow connection and socket objects to be transferred # between processes # # multiprocessing/reduction.py # # Copyright (c) 2006-2008, R Oudkerk # Licensed to PSF under a Contributor Agreement. # __all__ = ['reduce_socket', 'reduce_connection', 'send_handle', 'recv_handle'] import os import sys import socket import threading import struct import signal from multiprocessing import current_process from multiprocessing.util import register_after_fork, debug, sub_debug from multiprocessing.util import is_exiting, sub_warning # # # if not(sys.platform == 'win32' or (hasattr(socket, 'CMSG_LEN') and hasattr(socket, 'SCM_RIGHTS'))): raise ImportError('pickling of connections not supported') # # Platform specific definitions # if sys.platform == 'win32': # Windows __all__ += ['reduce_pipe_connection'] import _winapi def send_handle(conn, handle, destination_pid): dh = DupHandle(handle, _winapi.DUPLICATE_SAME_ACCESS, destination_pid) conn.send(dh) def recv_handle(conn): return conn.recv().detach() class DupHandle(object): def __init__(self, handle, access, pid=None): # duplicate handle for process with given pid if pid is None: pid = os.getpid() proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid) try: self._handle = _winapi.DuplicateHandle( _winapi.GetCurrentProcess(), handle, proc, access, False, 0) finally: _winapi.CloseHandle(proc) self._access = access self._pid = pid def detach(self): # retrieve handle from process which currently owns it if self._pid == os.getpid(): return self._handle proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, self._pid) try: return _winapi.DuplicateHandle( proc, self._handle, _winapi.GetCurrentProcess(), self._access, False, _winapi.DUPLICATE_CLOSE_SOURCE) finally: _winapi.CloseHandle(proc) class DupSocket(object): def __init__(self, sock): new_sock = sock.dup() def send(conn, pid): share = new_sock.share(pid) conn.send_bytes(share) self._id = resource_sharer.register(send, new_sock.close) def detach(self): conn = resource_sharer.get_connection(self._id) try: share = conn.recv_bytes() return socket.fromshare(share) finally: conn.close() def reduce_socket(s): return rebuild_socket, (DupSocket(s),) def rebuild_socket(ds): return ds.detach() def reduce_connection(conn): handle = conn.fileno() with socket.fromfd(handle, socket.AF_INET, socket.SOCK_STREAM) as s: ds = DupSocket(s) return rebuild_connection, (ds, conn.readable, conn.writable) def rebuild_connection(ds, readable, writable): from .connection import Connection sock = ds.detach() return Connection(sock.detach(), readable, writable) def reduce_pipe_connection(conn): access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) | (_winapi.FILE_GENERIC_WRITE if conn.writable else 0)) dh = DupHandle(conn.fileno(), access) return rebuild_pipe_connection, (dh, conn.readable, conn.writable) def rebuild_pipe_connection(dh, readable, writable): from .connection import PipeConnection handle = dh.detach() return PipeConnection(handle, readable, writable) else: # Unix # On MacOSX we should acknowledge receipt of fds -- see Issue14669 ACKNOWLEDGE = sys.platform == 'darwin' def send_handle(conn, handle, destination_pid): with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s: s.sendmsg([b'x'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, struct.pack("@i", handle))]) if ACKNOWLEDGE and conn.recv_bytes() != b'ACK': raise RuntimeError('did not receive acknowledgement of fd') def recv_handle(conn): size = struct.calcsize("@i") with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s: msg, ancdata, flags, addr = s.recvmsg(1, socket.CMSG_LEN(size)) try: if ACKNOWLEDGE: conn.send_bytes(b'ACK') cmsg_level, cmsg_type, cmsg_data = ancdata[0] if (cmsg_level == socket.SOL_SOCKET and cmsg_type == socket.SCM_RIGHTS): return struct.unpack("@i", cmsg_data[:size])[0] except (ValueError, IndexError, struct.error): pass raise RuntimeError('Invalid data received') class DupFd(object): def __init__(self, fd): new_fd = os.dup(fd) def send(conn, pid): send_handle(conn, new_fd, pid) def close(): os.close(new_fd) self._id = resource_sharer.register(send, close) def detach(self): conn = resource_sharer.get_connection(self._id) try: return recv_handle(conn) finally: conn.close() def reduce_socket(s): df = DupFd(s.fileno()) return rebuild_socket, (df, s.family, s.type, s.proto) def rebuild_socket(df, family, type, proto): fd = df.detach() s = socket.fromfd(fd, family, type, proto) os.close(fd) return s def reduce_connection(conn): df = DupFd(conn.fileno()) return rebuild_connection, (df, conn.readable, conn.writable) def rebuild_connection(df, readable, writable): from .connection import Connection fd = df.detach() return Connection(fd, readable, writable) # # Server which shares registered resources with clients # class ResourceSharer(object): def __init__(self): self._key = 0 self._cache = {} self._old_locks = [] self._lock = threading.Lock() self._listener = None self._address = None self._thread = None register_after_fork(self, ResourceSharer._afterfork) def register(self, send, close): with self._lock: if self._address is None: self._start() self._key += 1 self._cache[self._key] = (send, close) return (self._address, self._key) @staticmethod def get_connection(ident): from .connection import Client address, key = ident c = Client(address, authkey=current_process().authkey) c.send((key, os.getpid())) return c def stop(self, timeout=None): from .connection import Client with self._lock: if self._address is not None: c = Client(self._address, authkey=current_process().authkey) c.send(None) c.close() self._thread.join(timeout) if self._thread.is_alive(): sub_warn('ResourceSharer thread did not stop when asked') self._listener.close() self._thread = None self._address = None self._listener = None for key, (send, close) in self._cache.items(): close() self._cache.clear() def _afterfork(self): for key, (send, close) in self._cache.items(): close() self._cache.clear() # If self._lock was locked at the time of the fork, it may be broken # -- see issue 6721. Replace it without letting it be gc'ed. self._old_locks.append(self._lock) self._lock = threading.Lock() if self._listener is not None: self._listener.close() self._listener = None self._address = None self._thread = None def _start(self): from .connection import Listener assert self._listener is None debug('starting listener and thread for sending handles') self._listener = Listener(authkey=current_process().authkey) self._address = self._listener.address t = threading.Thread(target=self._serve) t.daemon = True t.start() self._thread = t def _serve(self): if hasattr(signal, 'pthread_sigmask'): signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG)) while 1: try: conn = self._listener.accept() msg = conn.recv() if msg is None: break key, destination_pid = msg send, close = self._cache.pop(key) send(conn, destination_pid) close() conn.close() except: if not is_exiting(): import traceback sub_warning( 'thread for sharing handles raised exception :\n' + '-'*79 + '\n' + traceback.format_exc() + '-'*79 ) resource_sharer = ResourceSharer()
Upload File
Create Folder