首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

multiprocessing

2.6版本中的新功能。

1.介绍

multiprocessing是一个使用类似于threading模块的API支持产卵过程的软件包。该multiprocessing包提供本地和远程并发性,通过使用子进程而不是线程有效地侧移全局解释器锁。由于这个原因,该multiprocessing模块允许程序员充分利用给定机器上的多个处理器。它可以在Unix和Windows上运行。

multiprocessing模块还引入了threading模块中没有模拟量的API 。这方面的一个主要例子是Pool提供了一种方便的手段,即跨越多个输入值并行执行一个函数,在输入数据之间分配输入数据(数据并行性)。以下示例演示了在模块中定义这些函数的通用做法,以便子进程可以成功导入该模块。这个数据并行性的基本例子Pool

代码语言:javascript
复制
from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    p = Pool(5)
    print(p.map(f, [1, 2, 3]))

将打印到标准输出

代码语言:javascript
复制
[1, 4, 9]

1.1. Process类别

multiprocessing,进程是通过创建一个Process对象,然后调用它的start()方法产生的。Process遵循API的threading.Thread。一个多进程程序的简单例子是

代码语言:javascript
复制
from multiprocessing import Process

def f(name):
    print 'hello', name

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

为了显示所涉及的各个进程ID,下面是一个扩展示例:

代码语言:javascript
复制
from multiprocessing import Process
import os

def info(title):
    print title
    print 'module name:', __name__
    if hasattr(os, 'getppid'):  # only available on Unix
        print 'parent process:', os.getppid()
    print 'process id:', os.getpid()

def f(name):
    info('function f')
    print 'hello', name

if __name__ == '__main__':
    info('main line')
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

有关解释为什么(在Windows上)if __name__ == '__main__'零件是必需的,请参阅编程准则。

1.2. 在进程之间交换对象

multiprocessing 支持两种进程之间的通信通道:

Queues

这个Queue类是一个近似的克隆Queue.Queue。例如:

代码语言:javascript
复制
from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print q.get()    # prints "[42, None, 'hello']"
    p.join()

队列是线程和进程安全的。

Pipes

Pipe()函数返回一对由管道连接的连接对象,默认情况下为双工(双向)。例如:

代码语言:javascript
复制
from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print parent_conn.recv()   # prints "[42, None, 'hello']"
    p.join()

通过返回的两个连接对象Pipe()表示管道的两端。每个连接对象都有send()recv()方法(等等)。请注意,如果两个进程(或线程)试图同时读取或写入管道的同一端,则管道中的数据可能会损坏。当然,不会同时使用管道的不同端点进行腐蚀的风险。

1.3. 进程之间的同步

multiprocessing包含来自所有同步基元的等价物threading。例如,可以使用锁来确保一次只有一个进程打印到标准输出:

代码语言:javascript
复制
from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    print 'hello world', i
    l.release()

if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()

如果不使用来自不同流程的锁定输出,则容易混淆。

1.4. 在进程之间共享状态

如上所述,在进行并发编程时,通常最好尽可能避免使用共享状态。使用多个进程时尤其如此。

但是,如果您确实需要使用某些共享数据,则multiprocessing可以提供一些方法。

共享内存

数据可以使用Value或存储在共享内存映射中Array。例如,下面的代码

代码语言:javascript
复制
from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print num.value
    print arr[:]

将打印

代码语言:javascript
复制
3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

'd''i'创建时使用的参数numarr被使用的那种的TypeCodes array模块:'d'表示一个双精度浮点数和'i'指示符号整数。这些共享对象将是进程和线程安全的。

为了更灵活地使用共享内存,可以使用multiprocessing.sharedctypes支持创建从共享内存分配的任意ctypes对象的模块。

服务器进程

通过Manager()控制服务器进程返回的管理器对象,该进程包含Python对象并允许其他进程使用代理来操作它们。

通过返回的Manager()将支持类型listdictNamespaceLockRLockSemaphoreBoundedSemaphoreConditionEventQueueValueArray。例如,

代码语言:javascript
复制
from multiprocessing import Process, Manager

def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()

if __name__ == '__main__':
    manager = Manager()

    d = manager.dict()
    l = manager.list(range(10))

    p = Process(target=f, args=(d, l))
    p.start()
    p.join()

    print d
    print l

将打印

代码语言:javascript
复制
{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

服务器进程管理器比使用共享内存对象更灵活,因为它们可以支持任意对象类型。此外,单个管理员可以通过网络在不同计算机上的进程共享。但是,它们比使用共享内存要慢。

1.5. 使用一群工人

Pool类代表工作进程池。它具有允许以几种不同方式将任务卸载到工作进程的方法。

例如:

代码语言:javascript
复制
from multiprocessing import Pool, TimeoutError
import time
import os

def f(x):
    return x*x

if __name__ == '__main__':
    pool = Pool(processes=4)              # start 4 worker processes

    # print "[0, 1, 4,..., 81]"
    print pool.map(f, range(10))

    # print same numbers in arbitrary order
    for i in pool.imap_unordered(f, range(10)):
        print i

    # evaluate "f(20)" asynchronously
    res = pool.apply_async(f, (20,))      # runs in *only* one process
    print res.get(timeout=1)              # prints "400"

    # evaluate "os.getpid()" asynchronously
    res = pool.apply_async(os.getpid, ()) # runs in *only* one process
    print res.get(timeout=1)              # prints the PID of that process

    # launching multiple evaluations asynchronously *may* use more processes
    multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
    print [res.get(timeout=1) for res in multiple_results]

    # make a single worker sleep for 10 secs
    res = pool.apply_async(time.sleep, (10,))
    try:
        print res.get(timeout=1)
    except TimeoutError:
        print "We lacked patience and got a multiprocessing.TimeoutError"

请注意,池的方法只能由创建它的进程使用。

注意

这个包中的功能要求__main__模块可以由孩子导入。编程指南中介绍了这一点,但值得在此指出。这意味着一些示例(如Pool示例)在交互式解释器中不起作用。例如:

代码语言:javascript
复制
>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
...     return x*x
...
>>> p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'

(如果你尝试这样做,它实际上会输出三个以半随机方式交错的完整回溯,然后你可能不得不停止主处理。)

2.参考

multiprocessing软件包主要复制threading模块的API 。

2.1. Process和例外

class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={})

过程对象表示在单独的过程中运行的活动。本Process类具有的所有方法等同threading.Thread

应始终使用关键字参数调用构造函数。团队应该永远是None; 它仅仅是为了兼容而存在threading.Threadtarget是要由run()方法调用的可调用对象。它默认为None,意味着什么都不叫。名称是进程名称。缺省情况下,一个唯一的名称由'Process-N1:N2:...:Nk'形式构成,其中N1,N2,...,Nk是一个整数序列,其长度由进程的生成决定。args是目标调用的参数元组。kwargs是目标调用的关键字参数字典。默认情况下,没有参数传递给目标.

如果一个子类重写构造函数,它必须确保它在进行任何其他操作之前调用基类构造函数(Process.__init__())。

run()

表示进程活动的方法。

您可以在子类中重写此方法。标准run()方法调用传递给对象构造函数的可调用对象作为目标参数,如果有的话,分别从argskwargs参数中获取顺序和关键字参数。

start()

开始流程的活动。

每个过程对象最多只能调用一次。它安排run()在独立的进程中调用对象的方法。

join([timeout])

阻塞调用线程,直到调用该join()方法的进程终止或发生可选超时。

如果超时None没有超时。

一个过程可以连接多次。

进程无法自行加入,因为这会导致死锁。尝试在启动之前加入进程是错误的。

name

该进程的名称。

该名称是仅用于识别目的的字符串。它没有语义。多个进程可以被赋予相同的名称。初始名称由构造函数设置。

is_alive()

返回过程是否存在。

粗略地说,从该start()方法返回的那一刻起,一个进程对象处于活动状态,直到子进程终止。

daemon

进程的守护进程标志,一个布尔值。这必须在start()调用之前设置。

初始值是从创建过程继承的。

当进程退出时,它将尝试终止其所有守护进程的子进程。

请注意,守护进程不允许创建子进程。否则,守护进程会在其父进程退出时终止其子进程。另外,这些不是 Unix守护进程或服务,它们是正常的进程,如果非守护进程退出,它们将被终止(而不是加入)。

除了threading.ThreadAPI之外,Process对象还支持以下属性和方法:

pid

返回进程ID。在这个过程产生之前,这将是None

exitcode

孩子的退出代码。这将是None如果该过程还没有结束。负值-N表示孩子被信号N终止。

authkey

进程的身份验证密钥(一个字节字符串)。

multiprocessing初始化主进程正在使用分配一个随机串os.urandom()

Process创建一个对象时,它将继承其父进程的身份验证密钥,但可以通过设置authkey为另一个字节字符串来更改它。

请参阅认证密钥。

terminate()

终止该过程。在Unix上,这是使用SIGTERM信号完成的; 在Windows TerminateProcess()上使用。请注意,退出处理程序和最后的子句等将不会执行。

请注意,流程的后代流程不会终止 - 它们将简单地变成孤儿。

警告

如果在关联进程正在使用管道或队列时使用此方法,那么管道或队列可能会损坏并可能会被其他进程无法使用。同样,如果进程获得了锁或信号量等,那么终止它可能会导致其他进程死锁。

需要注意的是start()join()is_alive()terminate()exitcode方法只能由创建进程对象的过程调用。

一些方法的示例用法Process

代码语言:javascript
复制
>>> import multiprocessing, time, signal
>>> p = multiprocessing.Process(target=time.sleep, args=(1000,))
>>> print p, p.is_alive()
<Process(Process-1, initial)> False
>>> p.start()
>>> print p, p.is_alive()
<Process(Process-1, started)> True
>>> p.terminate()
>>> time.sleep(0.1)
>>> print p, p.is_alive()
<Process(Process-1, stopped[SIGTERM])> False
>>> p.exitcode == -signal.SIGTERM
True

exception multiprocessing.BufferTooShort

Connection.recv_bytes_into()当提供的缓冲区对象太小而不能读取消息时引发异常。

如果e是的一个实例BufferTooShort,然后e.args[0]会给出消息作为字节串。

2.2. 管道和队列

当使用多个进程时,通常使用消息传递进行进程之间的通信,并避免使用任何同步原语(如锁)。

对于传递消息,可以使用Pipe()(用于两个进程之间的连接)或队列(允许多个生产者和消费者)。

Queuemultiprocessing.queues.SimpleQueueJoinableQueue类型是仿照多生产,多消费FIFO队列Queue.Queue的标准库类。他们的区别在于Queue缺乏task_done()join()引入的Python 2.5的方法Queue.Queue类。

如果使用,JoinableQueue必须调用JoinableQueue.task_done()从队列中移除的每个任务,否则用于计算未完成任务数的信号量最终可能会溢出,引发异常。

请注意,您也可以使用管理员对象创建共享队列 - 请参阅经理。

注意

multiprocessing使用常规Queue.EmptyQueue.Full例外来表示超时。它们在multiprocessing命名空间中不可用,因此您需要从中导入Queue

注意

当一个对象被放在一个队列中时,该对象被腌制,后面的线程将清理后的数据刷新到底层管道。这有一些令人惊讶的后果,但不应该导致任何实际困难 - 如果他们真的打扰你,那么你可以使用由经理创建的队列。

  1. 在一个空队列上放置一个对象之后,在队列的empty()方法返回之前可能会有一个无限小的延迟,False并且get_nowait()可以在不提升的情况下返回Queue.Empty

2. 如果多个进程正在排列对象,则可能会在另一端无序地接收对象。然而,由相同过程入队的对象将始终按预期顺序相对于彼此。

警告

如果一个进程正在使用打死Process.terminate()os.kill()当它试图使用Queue,则在队列中的数据可能会损坏。这可能会导致任何其他进程在尝试稍后使用队列时发生异常。

警告

如上所述,如果子进程已将项目放入队列(并且未使用JoinableQueue.cancel_join_thread),则该进程将不会终止,直到所有缓冲项目已被刷新到管道。

这意味着如果你尝试加入这个过程,你可能会遇到死锁,除非你确定放入队列的所有物品都已被消耗掉。同样,如果子进程是非守护进程,那么父进程在尝试加入所有非守护进程子进程时可能会在退出时挂起。

请注意,使用管理器创建的队列不存在此问题。请参阅编程准则。

有关使用队列进行进程间通信的示例,请参阅示例。

multiprocessing.Pipe([duplex])

返回一对(conn1, conn2)Connection代表的配管的端部的对象。

如果双工True(默认),那么管道是双向的。如果双工False管道,那么管道是单向的:conn1只能用于接收消息,conn2并且只能用于发送消息。

class multiprocessing.Queue([maxsize])

返回使用管道和几个锁/信号量实现的进程共享队列。当一个进程首先将一个项目放入队列中时,启动一个进给线程,该进程线程将对象从缓冲区传送到管道中。

标准库模块的常见Queue.EmptyQueue.Full例外情况Queue引发超时。

Queue实现Queue.Queue除了task_done()和之外的所有方法join()

qsize()

返回队列的近似大小。由于多线程/多处理语义,这个数字是不可靠的。

请注意,NotImplementedError在Mac OS X等Unix平台上可能会出现这种情况,sem_getvalue()但未实现。

empty()

如果队列为空则返回True,否则返回False。由于多线程/多处理语义,这是不可靠的。

full()

如果队列已满则返回True,否则返回False。由于多线程/多处理语义,这是不可靠的。

put(obj[, block[, timeout]])

将obj放入队列中。如果可选参数True(默认)并且超时None(默认),则在需要时禁止,直到有空闲插槽可用。如果超时是一个正数,它会阻止至多超时秒数,并Queue.Full在该时间内没有空闲插槽时引发异常。否则(False),如果空闲插槽立即可用,则在队列中放置一个项目,否则引发Queue.Full异常(在这种情况下超时被忽略)。

put_nowait(obj)

相当于put(obj, False)

get([block[, timeout]])

从队列中移除并返回一个项目。如果可选的参数True默认值,并且超时None默认值,则在必要时阻塞,直到项目可用。如果超时时间为正数,则最多会阻止超时秒数,Queue.Empty如果在该时间内没有可用项目,则会引发异常。否则(块是False),如果一个项立即可用,则返回一个项目,否则引发Queue.Empty异常(在这种情况下超时被忽略)。

get_nowait()

相当于get(False)

Queue有一些其他方法未找到Queue.Queue。大多数代码通常不需要这些方法:

close()

指示当前进程不会有更多数据放入此队列中。一旦将所有缓冲数据刷新到管道,后台线程将退出。这在队列被垃圾收集时自动调用。

join_thread()

加入后台线程。这只能在close()被调用后才能使用。它会阻塞,直到后台线程退出,确保缓冲区中的所有数据已被刷新到管道。

默认情况下,如果进程不是队列的创建者,那么在退出时它将尝试加入队列的后台线程。该过程可以调用cancel_join_thread()做出join_thread()什么也不做。

cancel_join_thread()

防止join_thread()阻塞。特别是,这可以防止后台线程在进程退出时自动加入 - 请参阅join_thread()

这个方法的名字可能更好allow_exit_without_flush()。这很可能会导致入队数据丢失,并且几乎可以肯定不需要使用它。如果您需要当前进程立即退出而不等待将排入队列的数据刷新到底层管道,并且您不关心丢失的数据,那才真正在那里。

注意

该类的功能需要在主机操作系统上运行共享信号量。没有一个,这个类中的功能将被禁用,并尝试实例化一个Queue将导致一个ImportError。有关其他信息,请参阅问题3770。对于下面列出的任何专用队列类型也是如此。

class multiprocessing.queues.SimpleQueue

这是一个简化的Queue类型,非常接近锁定Pipe

empty()

如果队列为空则返回True,否则返回False

get()

从队列中移除并返回一个项目。

put(item)

项目放入队列中。

class multiprocessing.JoinableQueue([maxsize])

JoinableQueue,一个Queue子类,是另外有一个队列task_done()join()方法。

task_done()

表明以前排队的任务已完成。由队列消费者线程使用。对于每个get()用于获取任务的对象,后续调用会task_done()告知队列该任务的处理已完成。

如果join()当前处于阻塞状态,则在所有项目都处理完毕后(即task_done()接收到已put()进入队列的每个项目的呼叫),它将恢复。

提出了一个ValueError好象叫更多的时间比中放入队列中的项目。

join()

阻塞,直到队列中的所有项目都被获取并处理。

每当将项目添加到队列中时,未完成任务的数量就会增加。无论何时消费者线程调用task_done()以指示该项目已被检索并且所有工作都已完成,计数就会减少。当未完成任务的计数降至零时,join()取消阻止。

2.3. 杂

multiprocessing.active_children()

返回当前进程的所有活着的孩子的列表。

调用它会产生“加入”已经完成的任何过程的副作用。

multiprocessing.cpu_count()

返回系统中的CPU数量。可能会提高NotImplementedError

multiprocessing.current_process()

返回Process当前进程对应的对象。

一种类似物threading.current_thread()

multiprocessing.freeze_support()

添加对何时使用的程序multiprocessing已被冻结以生成Windows可执行文件的支持。(已经用py2exePyInstallercx_Freeze进行过测试。)

需要if __name__ == '__main__'在主模块的行后面直接调用该函数。例如:

代码语言:javascript
复制
from multiprocessing import Process, freeze_support

def f():
    print 'hello world!'

if __name__ == '__main__':
    freeze_support()
    Process(target=f).start()

如果freeze_support()行被忽略,则尝试运行冻结的可执行文件将会引发RuntimeError

freeze_support()在Windows以外的任何操作系统上调用时,调用都不起作用。另外,如果模块正在通过Windows上的Python解释器正常运行(该程序尚未被冻结),则freeze_support()不起作用。

multiprocessing.set_executable()

设置启动子进程时使用的Python解释器的路径。(默认sys.executable使用)。嵌入者可能需要做一些类似的事情

代码语言:javascript
复制
set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))

然后才能创建子进程。(仅限Windows)

注意

multiprocessing不包含的类似物threading.active_count()threading.enumerate()threading.settrace()threading.setprofile()threading.Timer,或threading.local

2.4. 连接对象

连接对象允许发送和接收可选对象或字符串。他们可以被认为是面向消息的连接套接字。

连接对象通常使用Pipe()- 另请参阅监听器和客户端。

class multiprocessing.Connectionsend(obj)

将对象发送到应该使用的连接的另一端recv()

该对象必须是可挑选的。非常大的泡菜(大约32 MB +,但取决于操作系统)可能会引发ValueError异常。

recv()

返回使用连接的另一端发送的对象send()。阻止,直到有东西可以接收。EOFError如果没有什么可以接收而另一端关闭,就会引发。

fileno()

返回连接使用的文件描述符或句柄。

close()

关闭连接。

当连接被垃圾收集时这被自动调用。

poll([timeout])

返回是否有可供读取的数据。

如果没有指定超时,它将立即返回。如果超时是一个数字,那么这指定了要阻止的最大时间(以秒为单位)。如果超时None则使用无限超时。

send_bytes(buffer[, offset[, size]])

将支持缓冲区接口的对象的字节数据作为完整消息发送。

如果给出偏移量,则数据从缓冲区中的该位置读取。如果给出大小,那么将从缓冲区读取那么多字节。非常大的缓冲区(大约32 MB +,但取决于操作系统)可能会引发ValueError异常

recv_bytes([maxlength])

将连接另一端发送的字节数据的完整消息作为字符串返回。阻止,直到有东西可以接收。提出EOFError,如果没有什么留下来接收和另一端已经关闭。

如果最大长度被指定并且所述消息是长于最大长度然后IOError升至并连接将不再是可读的。

recv_bytes_into(buffer[, offset])

将从连接另一端发送的完整字节数据消息读入缓冲区,并返回消息中的字节数。阻止,直到有东西可以接收。EOFError如果没有什么可以接收而另一端关闭,就会引发。

缓冲区必须是满足可写缓冲区接口的对象。如果给出偏移量,那么消息将从该位置写入缓冲区。偏移量必须是小于缓冲区长度的非负整数(以字节为单位)。

如果缓冲区太短,则会BufferTooShort引发异常,并且完整的消息可用,e.args[0]因为e异常实例在哪里。

例如:

代码语言:javascript
复制
>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes('thank you')
>>> a.recv_bytes()
'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])

警告

Connection.recv()方法会自动取消接收的数据,这可能会带来安全风险,除非您可以信任发送该消息的进程。

因此,除非连接对象是使用Pipe()您制作的,否则只应在执行某种验证之后使用recv()send()方法。请参阅认证密钥。

警告

如果某个进程在尝试读取或写入管道时被终止,那么管道中的数据可能会被破坏,因为可能无法确定消息边界位于何处。

2.5. 同步原语

通常,同步原语在多进程程序中并不像在多线程程序中那样必要。请参阅threading模块的文档。

请注意,也可以使用管理器对象创建同步基元 - 请参阅管理器。

class multiprocessing.BoundedSemaphore([value])

一个有界的信号量对象:一个紧密的类比threading.BoundedSemaphore

与其近似模拟存在单一差异:其acquire方法的第一个参数是名为block的,并且它支持可选的第二个参数超时,与之一致Lock.acquire()

注意

在Mac OS X上,这是无法区分的,Semaphore因为sem_getvalue()没有在该平台上实现。

class multiprocessing.Condition([lock])

一个条件变量:一个克隆threading.Condition

如果指定了,那么它应该是一个Lock或一个RLock对象multiprocessing

class multiprocessing.Event

克隆的threading.Event。这个方法在退出时返回内部信号量的状态,所以它会一直返回True,除非超时并且操作超时。

在版本2.7中更改:以前,该方法始终返回None

class multiprocessing.Lock

非递归锁定对象:一个非常类似的threading.Lock。一旦进程或线程获得锁定,随后尝试从任何进程或线程获取它将被阻塞,直到它被释放; 任何进程或线程都可以释放它。threading.Lock适用于线程的概念和行为在multiprocessing.Lock适用于进程或线程时在此处被复制,除非另有说明。

请注意,Lock它实际上是一个工厂函数,它返回multiprocessing.synchronize.Lock使用默认上下文初始化的实例。

Lock支持上下文管理器协议,因此可以在with语句中使用。

acquire(block=True, timeout=None)

获取锁定,阻止或不阻止。

block参数设置为True(默认值)时,方法调用将被阻塞,直到锁处于解锁状态,然后将其设置为锁定并返回True。请注意,这第一个参数的名称不同于threading.Lock.acquire()

参数设置为False,方法调用不会阻止。如果锁当前处于锁定状态,则返回False;否则将锁设置为锁定状态并返回True

当用正值,浮点值超时调用,只要无法获取锁,就会阻塞超时指定的秒数。具有负值的调用超时值相当于超时值为零。超时值为None(缺省值)的调用将超时时间设置为无限。该超时参数有,如果没有实际意义参数被设置为False,并因此忽略。返回True如果锁已经被收购或False如果超时周期已经过去。请注意超时这种方法的模拟中不存在论据threading.Lock.acquire()

release()

释放一个锁。这可以从任何进程或线程调用,不仅是最初获取锁的进程或线程。

除了在解锁的锁上调用threading.Lock.release()时,行为与其中的行为相同ValueError

class multiprocessing.RLock

递归锁定对象:一个紧密的模拟对象threading.RLock。递归锁必须由获取它的进程或线程释放。一旦进程或线程获得了递归锁定,相同的进程或线程可能会再次获取而不会阻塞; 该进程或线程必须每次释放一次它被获取。

请注意,RLock它实际上是一个工厂函数,它返回multiprocessing.synchronize.RLock使用默认上下文初始化的实例。

RLock支持上下文管理器协议,因此可以在with语句中使用。

acquire(block=True, timeout=None)

获取锁定,阻止或不阻止。

block参数设置为的情况下调用时True,阻塞直到锁处于解锁状态(不属于任何进程或线程),除非该锁已被当前进程或线程所拥有。当前进程或线程接着获取锁的所有权(如果它还没有所有权),并且锁内的递归级别增加1,导致返回值为True。请注意,与实现相比,第一个参数的行为存在一些差异threading.RLock.acquire(),从参数本身的名称开始。

当调用block参数设置为时False,不要阻塞。如果锁已被另一个进程或线程获取(并因此被拥有),则当前进程或线程不会获得所有权,并且锁内的递归级别不会更改,导致返回值为False。如果锁处于未锁定状态,当前进程或线程将获得所有权,并递增递归级别,从而返回值为True

timeout参数的使用和行为与in中的相同Lock.acquire()。请注意,此方法的模拟中不存在超时参数threading.RLock.acquire()

release()

释放一个锁,递减递归级别。如果在递减之后递归级别为零,则将锁重置为解锁(不属于任何进程或线程),并且如果有任何其他进程或线程被阻塞,等待锁解锁,则准许其中一个进程继续。如果在递减之后递归级别仍然不为零,则锁保持锁定并由调用进程或线程拥有。

只有在调用进程或线程拥有锁定时才调用此方法。一个AssertionError如果该方法是通过一个过程调用或线程以外的雇主或升高如果锁处于解锁(无主)的状态。请注意,在这种情况下引发的异常类型与实现的行为不同threading.RLock.release()

class multiprocessing.Semaphore([value])

信号量对象:一个近似的类比threading.Semaphore

与其近似模拟存在单一差异:其acquire方法的第一个参数是名为block的,并且它支持可选的第二个参数超时,与之一致Lock.acquire()

注意

所述acquire()的方法BoundedSemaphoreLockRLockSemaphore具有未由当量支撑的超时参数threading。签名的acquire(block=True, timeout=None)关键字参数是可以接受的。如果blockTruetimeout不是,None那么它指定一个以秒为单位的超时。如果False那么超时将被忽略。

在Mac OS X上,sem_timedwait不受支持,因此acquire()使用超时调用将使用休眠循环模拟该函数的行为。

注意

如果所产生的SIGINT信号Ctrl-C,而主线程是由呼叫阻塞到达BoundedSemaphore.acquire()Lock.acquire()RLock.acquire()Semaphore.acquire()Condition.acquire()Condition.wait()则该呼叫将被立即中断和KeyboardInterrupt将提高。

这与threading在等效阻塞调用进行时忽略SIGINT 的行为不同。

注意

某些此软件包的功能需要在主机操作系统上运行共享信号量。没有一个,multiprocessing.synchronize模块将被禁用,并尝试导入它将导致一个ImportError。有关其他信息,请参阅问题3770

2.6. 共享ctypes对象

可以使用可以由子进程继承的共享内存创建共享对象。

multiprocessing.Value(typecode_or_type, *args[, lock])

返回ctypes从共享内存分配的对象。默认情况下,返回值实际上是对象的同步包装器。

typecode_or_type确定返回对象的类型:它是一个ctypes类型或array模块使用的类型的一个字符类型。*参数传递给类型的构造函数。

如果True(默认值),那么一个新的递归锁对象被创建同步访问值。如果是一个Lock或一个RLock对象,那么将用于同步对该值的访问。如果是False那么访问返回的对象不会被锁自动保护,因此它不一定是“过程安全的”。

操作就像+=它涉及的读取和写入不是原子。因此,例如,如果你想以原子方式递增共享值,那么仅仅是做不到

代码语言:javascript
复制
counter.value += 1

假设关联的锁是递归的(它是默认的),你可以改为这样做

代码语言:javascript
复制
with counter.get_lock():
    counter.value += 1

请注意,锁定是仅有关键字的参数。

multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)

返回从共享内存分配的ctypes数组。默认情况下,返回值实际上是数组的同步包装器。

typecode_or_type确定返回数组元素的类型:它是模块使用的ctypes类型或单字符typecode array。如果size_or_initializer是一个整数,那么它决定了数组的长度,并且该数组最初将归零。否则,size_or_initializer是用于初始化数组的序列,其长度决定数组的长度。

如果True(默认值),那么一个新的锁定对象被创建同步访问值。如果是一个Lock或一个RLock对象,那么将用于同步对该值的访问。如果是False那么访问返回的对象不会被锁自动保护,因此它不一定是“过程安全的”。

请注意,锁定是仅有关键字的参数。

请注意,一个ctypes.c_char具有原始属性的数组允许用它来存储和检索字符串。

2.6.1. multiprocessing.sharedctypes模块

multiprocessing.sharedctypes模块提供了ctypes从共享内存中分配可由子进程继承的对象的功能。

注意

虽然可以将指针存储在共享内存中,但请记住这将指向特定进程的地址空间中的某个位置。但是,指针在第二个进程的上下文中很可能无效,并且试图从第二个进程取消引用该指针可能会导致崩溃。

multiprocessing.sharedctypes.RawArray(typecode_or_type, size_or_initializer)

返回从共享内存分配的ctypes数组。

typecode_or_type确定返回数组元素的类型:它是模块使用的ctypes类型或单字符typecode array。如果size_or_initializer是一个整数,那么它确定数组的长度,并且该数组将被初始化为零。否则,size_or_initializer是一个用于初始化数组的序列,其长度决定数组的长度。

请注意,设置和获取元素可能是非原子的 - 请使用它Array()来确保使用锁自动同步访问。

multiprocessing.sharedctypes.RawValue(typecode_or_type, *args)

返回从共享内存分配的ctypes对象。

typecode_or_type确定返回对象的类型:它是一个ctypes类型或array模块使用的类型的一个字符类型。*参数传递给类型的构造函数。

请注意,设置和获取该值可能是非原子的 - 使用它Value()来确保访问使用锁自动同步。

请注意,ctypes.c_charhas valueraw属性允许用它来存储和检索字符串 - 请参阅文档ctypes

multiprocessing.sharedctypes.Array(typecode_or_type, size_or_initializer, *args[, lock])

相同RawArray(),除了取决于的值的过程安全的同步封装器可以返回来代替原始ctypes的阵列。

如果True(默认值),那么一个新的锁定对象被创建同步访问值。如果是一个Lock或一个RLock对象,那么将用于同步对该值的访问。如果是False那么访问返回的对象不会被锁自动保护,因此它不一定是“过程安全的”。

请注意,锁定是仅有关键字的参数。

multiprocessing.sharedctypes.Value(typecode_or_type, *args[, lock])

相同RawValue(),除了取决于的值的过程安全的同步封装器可以返回来代替原始ctypes的对象。

如果True(默认值),那么一个新的锁定对象被创建同步访问值。如果是一个Lock或一个RLock对象,那么将用于同步对该值的访问。如果是False那么访问返回的对象不会被锁自动保护,因此它不一定是“过程安全的”。

请注意,锁定是仅有关键字的参数。

multiprocessing.sharedctypes.copy(obj)

返回从ctypes对象obj的副本共享内存中分配的ctypes对象。

multiprocessing.sharedctypes.synchronized(obj[, lock])

为使用来同步访问的ctypes对象返回一个流程安全的包装器对象。如果None(缺省值),然后一个multiprocessing.RLock对象被自动创建。

一个同步包装除了包装它的对象之外还有两个方法:get_obj()返回包装对象并get_lock()返回用于同步的锁对象。

请注意,通过包装器访问ctypes对象可能比访问原始ctypes对象要慢很多。

下表比较了用于从共享内存创建共享ctypes对象的语法与正常ctypes语法。(在表中MyStruct是。的一些子类ctypes.Structure。)

ctypes的

使用类型共享类型

使用typecode共享类型

c_double(2.4)

RawValue(c_double,2.4)

RawValue('d',2.4)

MyStruct(4,6)

RawValue(MyStruct,4,6)

?

(c_short * 7)()

RawArray(c_short,7)

RawArray('h',7)

(c_int * 3)(9,2,8)

RawArray(c_int,(9,2,8))

RawArray('i',(9,2,8))

下面是一个例子,其中一些ctypes对象被一个子进程修改:

代码语言:javascript
复制
from multiprocessing import Process, Lock
from multiprocessing.sharedctypes import Value, Array
from ctypes import Structure, c_double

class Point(Structure):
    _fields_ = [('x', c_double), ('y', c_double)]

def modify(n, x, s, A):
    n.value **= 2
    x.value **= 2
    s.value = s.value.upper()
    for a in A:
        a.x **= 2
        a.y **= 2

if __name__ == '__main__':
    lock = Lock()

    n = Value('i', 7)
    x = Value(c_double, 1.0/3.0, lock=False)
    s = Array('c', 'hello world', lock=lock)
    A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock)

    p = Process(target=modify, args=(n, x, s, A))
    p.start()
    p.join()

    print n.value
    print x.value
    print s.value
    print [(a.x, a.y) for a in A]

打印的结果是

代码语言:javascript
复制
49
0.1111111111111111
HELLO WORLD
[(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)]

2.7. 经理

管理人员提供了一种方法来创建可以在不同进程之间共享的数据。管理员对象控制管理共享对象的服务器进程。其他进程可以通过使用代理来访问共享对象。

multiprocessing.Manager()

返回SyncManager可用于在进程之间共享对象的已启动对象。返回的管理器对象对应于衍生的子进程,并具有创建共享对象并返回相应代理的方法。

当垃圾收集或其父进程退出时,Manager进程将立即关闭。管理员类在multiprocessing.managers模块中定义:

class multiprocessing.managers.BaseManager([address[, authkey]])

创建一个BaseManager对象。

一旦创建,应该调用start()get_server().serve_forever()确保管理器对象引用已启动的管理器进程。

地址是管理进程侦听新连接的地址。如果地址None那么任意一个被选择。

authkey是将用于检查到服务器进程的传入连接的有效性的身份验证密钥。如果AUTHKEYNone那么current_process().authkey。否则使用authkey,它必须是一个字符串。

start([initializer[, initargs]])

启动一个子流程来启动管理器。如果初始化程序不是,None那么子进程将initializer(*initargs)在启动时调用。

get_server()

返回一个Server代表管理器控制下的实际服务器的对象。Server对象支持serve_forever()方法:

代码语言:javascript
复制
>>> from multiprocessing.managers import BaseManager
>>> manager = BaseManager(address=('', 50000), authkey='abc')
>>> server = manager.get_server()
>>> server.serve_forever()

Server另外还有一个address属性。

connect()

将本地管理器对象连接到远程管理器进程:

代码语言:javascript
复制
>>> from multiprocessing.managers import BaseManager
>>> m = BaseManager(address=('127.0.0.1', 5000), authkey='abc')
>>> m.connect()

shutdown()

停止经理使用的过程。这仅start()在用于启动服务器进程时才可用。

这可以被多次调用。

register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])

可用于注册类型或可与经理类一起调用的类方法。

typeid是一个“类型标识符”,用于标识特定类型的共享对象。这必须是一个字符串。

callable是可调用的,用于为此类型标识创建对象。如果将使用from_address()classmethod 创建管理器实例,或者如果create_method参数是False可以保留为None

proxytype是其中的一个子类,BaseProxy用于使用此typeid为共享对象创建代理。如果None那么代理类是自动创建的。

exposed用于指定一个方法名称序列,该类型的代理应该被允许使用访问BaseProxy._callmethod()。(如果暴露None随后proxytype._exposed_被用于代替如果它存在)。在其中没有指定暴露列表中的情况下,共享对象的所有“公共方法”将是可访问的。(这里的“公共方法”是指任何具有__call__()方法且名称不以其开头的属性'_'。)

method_to_typeid是一个映射,用于指定那些应该返回代理的公开方法的返回类型。它将方法名称映射到typeid字符串。(如果method_to_typeidNone然后proxytype._method_to_typeid_,如果它存在来代替。)如果一个方法的名字不在此映射的关键或如果映射是None则该方法返回该对象将在值被复制。

create_method确定是否应该使用名称typeid创建一个方法,该方法可用于通知服务器进程创建新的共享对象并为其返回代理。默认情况下是True

BaseManager 实例也有一个只读属性:

address

经理使用的地址。

class multiprocessing.managers.SyncManager

其中的一个子类BaseManager可用于进程的同步。这种类型的对象被返回multiprocessing.Manager()

它还支持创建共享列表和字典。

BoundedSemaphore([value])

创建一个共享threading.BoundedSemaphore对象并为其返回一个代理。

Condition([lock])

创建一个共享threading.Condition对象并为其返回一个代理。

如果提供了,那么它应该是一个threading.Lock或一个threading.RLock对象的代理。

Event()

创建一个共享threading.Event对象并为其返回一个代理。

Lock()

创建一个共享threading.Lock对象并为其返回一个代理。

Namespace()

创建一个共享Namespace对象并为其返回一个代理。

Queue([maxsize])

创建一个共享Queue.Queue对象并为其返回一个代理。

RLock()

创建一个共享threading.RLock对象并为其返回一个代理。

Semaphore([value])

创建一个共享threading.Semaphore对象并为其返回一个代理。

Array(typecode, sequence)

创建一个数组并为其返回一个代理。

Value(typecode, value)

创建一个具有可写value属性的对象并返回它的代理。

dict()dict(mapping)dict(sequence)

创建一个共享dict对象并为其返回一个代理。

list()list(sequence)

创建一个共享list对象并为其返回一个代理。

注意

字典和列表代理中的可变值或项目的修改不会通过管理器传播,因为代理无法知道其值或项目何时被修改。要修改这样的项目,您可以将修改的对象重新分配给容器代理:

代码语言:javascript
复制
# create a list proxy and append a mutable object (a dictionary)
lproxy = manager.list()
lproxy.append({})
# now mutate the dictionary
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# at this point, the changes to d are not yet synced, but by
# reassigning the dictionary, the proxy is notified of the change
lproxy[0] = d

class multiprocessing.managers.Namespace

可以注册的类型SyncManager

名称空间对象没有公共方法,但具有可写的属性。它的表示显示了它的属性值。

但是,当为名称空间对象使用代理时,以下列属性开头的属性'_'将成为代理的属性,而不是所指对象的属性:

代码语言:javascript
复制
>>> manager = multiprocessing.Manager()
>>> Global = manager.Namespace()
>>> Global.x = 10
>>> Global.y = 'hello'
>>> Global._z = 12.3    # this is an attribute of the proxy
>>> print Global
Namespace(x=10, y='hello')

2.7.1. 定制经理

要创建自己的经理,需要创建一个子类BaseManager并使用register()该类方法向经理类注册新类型或可调用对象。例如:

代码语言:javascript
复制
from multiprocessing.managers import BaseManager

class MathsClass(object):
    def add(self, x, y):
        return x + y
    def mul(self, x, y):
        return x * y

class MyManager(BaseManager):
    pass

MyManager.register('Maths', MathsClass)

if __name__ == '__main__':
    manager = MyManager()
    manager.start()
    maths = manager.Maths()
    print maths.add(4, 3)         # prints 7
    print maths.mul(7, 8)         # prints 56

2.7.2. 使用远程管理器

可以在一台机器上运行管理服务器,并让客户机从其他机器上使用它(假设所涉及的防火墙允许)。

运行以下命令将为远程客户端可以访问的单个共享队列创建一个服务器:

代码语言:javascript
复制
>>> from multiprocessing.managers import BaseManager
>>> import Queue
>>> queue = Queue.Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey='abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()

一个客户端可以如下访问服务器:

代码语言:javascript
复制
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey='abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')

另一个客户端也可以使用:

代码语言:javascript
复制
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey='abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'

本地进程也可以访问该队列,使用客户端上面的代码远程访问它:

代码语言:javascript
复制
>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
...     def __init__(self, q):
...         self.q = q
...         super(Worker, self).__init__()
...     def run(self):
...         self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey='abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()

2.8. 代理对象

代理是一个对象,它指的是一个共享对象,它可能存在于不同的进程中。共享对象被认为是代理的对象。多个代理对象可能具有相同的对象。

代理对象具有调用其指示对象的相应方法的方法(尽管代理中并不需要指示对象的每种方法)。代理通常可以以与其所指对象相同的方式使用:

代码语言:javascript
复制
>>> from multiprocessing import Manager
>>> manager = Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print l
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print repr(l)
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]

注意,应用于str()代理将返回指示对象的表示,而应用repr()将返回代理的表示。

代理对象的一个??重要特性是它们是可选择的,因此它们可以在进程之间传递。但是,请注意,如果将代理发送给相应的经理进程,则取消其代理将自己生成指示对象。这意味着,例如,一个共享对象可以包含第二个:

代码语言:javascript
复制
>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b)         # referent of a now contains referent of b
>>> print a, b
[[]] []
>>> b.append('hello')
>>> print a, b
[['hello']] ['hello']

注意

代理类型multiprocessing无法支持按值进行比较。所以,例如,我们有:

代码语言:javascript
复制
>>> manager.list([1,2,3]) == [1,2,3]
False

在进行比较时,应该只使用指示对象的副本。

class multiprocessing.managers.BaseProxy

代理对象是的子类的实例BaseProxy

_callmethod(methodname[, args[, kwds]])

调用并返回代理所指对象方法的结果。

如果proxy是代理人的对象是obj表达式

代码语言:javascript
复制
proxy._callmethod(methodname, args, kwds)

将评估表达

代码语言:javascript
复制
getattr(obj, methodname)(*args, **kwds)

在经理的过程中。

返回的值将是调用结果的副本或新共享对象的代理 - 请参阅method_to_typeid参数的文档BaseManager.register()

如果通话引发异常,则通过重新提出_callmethod()。如果经理进程中出现其他异常,则将其转换为RemoteError异常并由其提出_callmethod()

特别要注意的是,如果methodname尚未公开,将会引发异常。

使用的一个例子_callmethod()

代码语言:javascript
复制
>>> l = manager.list(range(10))
>>> l._callmethod('__len__')
10
>>> l._callmethod('__getslice__', (2, 7))   # equiv to `l[2:7]`
[2, 3, 4, 5, 6]
>>> l._callmethod('__getitem__', (20,))     # equiv to `l[20]`
Traceback (most recent call last):
...
IndexError: list index out of range

_getvalue()

返回指称对象的副本。

如果指示物是不可抽出的,那么这将引发异常。

__repr__()

返回代理对象的表示。

__str__()

返回指示对象的表示。

2.8.1. 清理

代理对象使用weakref回调函数,以便当它被垃圾回收后,它将从拥有其指示对象的管理器中注销自身。

当不再有任何代理引用共享对象时,会从管理进程中删除共享对象。

2.9. 进程池

可以创建一个过程池,执行与Pool课程一起提交给它的任务。

class multiprocessing.Pool([processes[, initializer[, initargs[, maxtasksperchild]]]])

一个进程池对象,用于控制可提交作业的工作进程池。它支持超时和回调的异步结果,并具有并行映射实现。

进程是要使用的工作进程的数量。如果过程None然后通过返回的数字cpu_count()被使用。如果初始化程序不是,None则每个工作进程initializer(*initargs)在启动时都会调用。

请注意,池对象的方法只应由创建池的进程调用。

版本2.7中的新增功能:maxtasksperchild是工作进程在退出并被新工作进程替换之前可以完成的任务数量,以便释放未使用的资源。默认的maxtasksperchildNone,这意味着工作进程的生存时间与池一样长。

注意

一般情况下,工作进程在Pool池的工作队列的整个持续时间内都处于活动状态。在其他系统(如Apache,mod_wsgi等)中发现的释放工作人员资源的频繁模式是允许池中的工作人员在退出,清理和产生新的进程之前完成一定量的工作取代旧的。该maxtasksperchild参数来Pool自曝这种能力给最终用户。

apply(func[, args[, kwds]])

apply()内置功能的等价物。它阻塞直到结果准备就绪,所以apply_async()更适合并行执行工作。此外,func只能在游泳池的其中一名工作人员身上执行。

apply_async(func[, args[, kwds[, callback]]])

apply()返回结果对象的方法的变体。

如果指定了回调,那么它应该是一个可接受的参数。当结果变为就绪时,将对其应用回调(除非该呼叫失败)。回调应该立即完成,否则处理结果的线程将被阻塞。

map(func, iterable[, chunksize])

map()内置函数的并行等价物(尽管它只支持一个可迭代的参数)。它阻塞,直到结果准备就绪。

这种方法将迭代器切成许多块,并将它作为单独的任务提交给进程池。这些块的(近似)大小可以通过将chunksize设置为正整数来指定。

map_async(func, iterable[, chunksize[, callback]])

map()返回结果对象的方法的变体。

如果指定了回调,那么它应该是一个可接受的参数。当结果变为就绪时,将对其应用回调(除非该呼叫失败)。回调应该立即完成,否则处理结果的线程将被阻塞。

imap(func, iterable[, chunksize])

相当于itertools.imap()

所述CHUNKSIZE参数是与由所使用的一个map()方法。对于使用了一个较大的值很长iterables CHUNKSIZE可以使作业完成的太多比使用的默认值加快1

此外,如果CHUNKSIZE1next()通过返回的迭代器的方法imap()方法有一个可选的超时参数:next(timeout)将提高multiprocessing.TimeoutError如果结果不能内退回超时秒。

imap_unordered(func, iterable[, chunksize])

同样imap(),除了从返回的迭代结果的排序应该考虑随心所欲。(只有当只有一个工作进程时,才能保证“正确”的顺序。)

close()

防止将更多任务提交到池中。所有任务完成后,工作进程将退出。

terminate()

立即停止工作进程而不完成杰出的工作。当池对象被垃圾收集时terminate()会立即调用。

join()

等待工作进程退出。必须调用close()terminate()在使用之前join()

class multiprocessing.pool.AsyncResult

Pool.apply_async()和返回的结果的类Pool.map_async()

get([timeout])

到达时返回结果。如果超时不是None,并且结果未在超时秒内到达,multiprocessing.TimeoutError则会引发。如果远程调用引发异常,那么该异常将被重新调整get()

wait([timeout])

等到结果可用或超时秒数通过。

ready()

返回通话是否完成。

successful()

返回是否完成呼叫而不引发异常。AssertionError如果结果尚未准备好,将会升高。

以下示例演示了如何使用池:

代码语言:javascript
复制
from multiprocessing import Pool
import time

def f(x):
    return x*x

if __name__ == '__main__':
    pool = Pool(processes=4)              # start 4 worker processes

    result = pool.apply_async(f, (10,))   # evaluate "f(10)" asynchronously in a single process
    print result.get(timeout=1)           # prints "100" unless your computer is *very* slow

    print pool.map(f, range(10))          # prints "[0, 1, 4,..., 81]"

    it = pool.imap(f, range(10))
    print it.next()                       # prints "0"
    print it.next()                       # prints "1"
    print it.next(timeout=1)              # prints "4" unless your computer is *very* slow

    result = pool.apply_async(time.sleep, (10,))
    print result.get(timeout=1)           # raises multiprocessing.TimeoutError

2.10. 听众和客户

通常在进程之间传递消息是使用队列或使用Connection返回的对象完成的Pipe()

但是,multiprocessing.connection模块允许一些额外的灵活性。它基本上为处理套接字或Windows命名管道提供了高层面的面向消息的API,并且还支持使用该模块的摘要式身份验证hmac

multiprocessing.connection.deliver_challenge(connection, authkey)

将随机生成的消息发送到连接的另一端并等待回复。

如果回复与使用authkey作为密钥的消息摘要匹配,则欢迎消息将发送到连接的另一端。否则AuthenticationError会提出。

multiprocessing.connection.answer_challenge(connection, authkey)

接收消息,使用authkey作为密钥来计算消息的摘要,然后将摘要发回。

如果没有收到欢迎消息,则会AuthenticationError提出。

multiprocessing.connection.Client(address[, family[, authenticate[, authkey]]])

尝试建立到正在使用地址地址的侦听器的连接,返回一个Connection

连接的类型由参数决定,但通常可以省略,因为它通常可以从地址格式中推断出来。(请参阅地址格式)

如果身份验证Trueauthkey是字符串,则使用摘要式身份验证。用于身份验证的关键将是要么AUTHKEY或者current_process().authkey)如果AUTHKEYNone。如果认证失败,则AuthenticationError引发。请参阅认证密钥。

class multiprocessing.connection.Listener([address[, family[, backlog[, authenticate[, authkey]]]]])

绑定套接字或Windows命名管道的包装器,它正在侦听连接。

地址是监听器对象的绑定套接字或命名管道使用的地址。

注意

如果使用地址'0.0.0.0',则该地址在Windows上不会是可连接的终点。如果您需要可连接的端点,则应使用“127.0.0.1”。

family是要使用的套接字(或命名管道)的类型。这可以是一个字符串'AF_INET'(对于TCP套接字),'AF_UNIX'(对于Unix域套接字)或'AF_PIPE'(对于Windows命名管道)。其中只有第一个保证可用。如果家庭None地址格式推断出家庭的话。如果地址也是None默认选择。这个默认值是被认为是最快可用的家族。请参阅地址格式。请注意,如果系列'AF_UNIX'和地址,None那么将使用创建的私有临时目录创建套接字tempfile.mkstemp()

如果侦听器对象使用套接字,则在绑定后,backlog(默认为1)将传递给listen()套接字的方法。

如果认证TrueFalse默认)或authkey不是,None那么使用摘要认证。

如果authkey是一个字符串,那么它将被用作认证密钥; 否则它一定是None

如果AUTHKEYNone身份验证True然后current_process().authkey被用作认证密钥。如果AUTHKEYNone身份验证False则没有进行身份认证。如果认证失败,则AuthenticationError引发。请参阅认证密钥。

accept()

接受监听器对象的绑定套接字或命名管道上的连接并返回一个Connection对象。如果尝试进行身份验证并失败,则会AuthenticationError引发此问题。

close()

关闭侦听器对象的绑定套接字或命名管道。当收听器被垃圾收集时,这被自动调用。但明确地称它是明智的。

侦听器对象具有以下只读属性:

address

Listener对象正在使用的地址。

last_accepted

上次接受连接的地址。如果这不可用,那就是了None

该模块定义了两个例外:

exception multiprocessing.connection.AuthenticationError

出现身份验证错误时引发异常。

例子

以下服务器代码创建一个'secret password'用作身份验证密钥的侦听器。然后等待连接并将一些数据发送到客户端:

代码语言:javascript
复制
from multiprocessing.connection import Listener
from array import array

address = ('localhost', 6000)     # family is deduced to be 'AF_INET'
listener = Listener(address, authkey='secret password')

conn = listener.accept()
print 'connection accepted from', listener.last_accepted

conn.send([2.25, None, 'junk', float])

conn.send_bytes('hello')

conn.send_bytes(array('i', [42, 1729]))

conn.close()
listener.close()

以下代码连接到服务器并从服务器接收一些数据:

代码语言:javascript
复制
from multiprocessing.connection import Client
from array import array

address = ('localhost', 6000)
conn = Client(address, authkey='secret password')

print conn.recv()                 # => [2.25, None, 'junk', float]

print conn.recv_bytes()            # => 'hello'

arr = array('i', [0, 0, 0, 0, 0])
print conn.recv_bytes_into(arr)     # => 8
print arr                         # => array('i', [42, 1729, 0, 0, 0])

conn.close()

2.10.1. 地址格式

  • 一个'AF_INET'地址是以下形式的元组(hostname, port),其中主机名是一个字符串,端口是一个整数。
  • 一个'AF_UNIX'地址是代表文件系统中的文件名的字符串。
  • An 'AF_PIPE' address is a string of the form r'\\.\pipe\PipeName'。要用于Client()连接到名为ServerName的远程计算机上的命名管道,应该使用表单的地址。r'\\ServerName\pipe\PipeName'

请注意,任何以两个反斜杠开头的字符串默认为'AF_PIPE'地址而不是'AF_UNIX'地址。

2.11. 验证密钥

当使用时Connection.recv,收到的数据会自动取消。不幸的是,从不可信来源取出数据是一种安全风险。因此Listener,请Client()使用该hmac模块提供摘要式身份验证。

认证密钥是一个可以被认为是密码的字符串:一旦建立连接,两端都会要求证明对方知道认证密钥。(证明两端都使用相同的密钥并没有涉及发送通过连接键。)

如果请求身份验证但未指定身份验证密钥,current_process().authkey则使用返回值(请参阅Process)。该值将由Process当前进程创建的任何对象自动继承。这意味着(默认情况下)多进程程序的所有进程将共享一个身份验证密钥,可以在设置它们之间的连接时使用该身份验证密钥。

合适的认证密钥也可以通过使用生成os.urandom()

2.12. 记录

有些支持日志记录功能。但请注意,logging程序包不使用进程共享锁,因此可能(取决于处理程序类型)来自不同进程的消息混淆。

multiprocessing.get_logger()

返回使用的记录器multiprocessing。如有必要,将创建一个新的。

首次创建时,记录器具有级别logging.NOTSET并且没有默认处理程序。发送到此记录器的消息不会默认传播到根记录器。

请注意,在Windows上,子进程只会继承父进程记录器的级别 - 记录器的其他任何自定义都不会被继承。

multiprocessing.log_to_stderr()

该函数执行一个调用,get_logger()但除了返回由get_logger创建的记录器之外,它还添加了一个处理程序,该处理程序将输出发送到sys.stderr使用格式'[%(levelname)s/%(processName)s] %(message)s'

以下是打开日志记录的示例会话:

代码语言:javascript
复制
>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0

除了具有这两个日志记录功能之外,多处理还会公开两个附加日志记录级别属性。这些是SUBWARNINGSUBDEBUG。下表说明了这些论文在正常层次结构中的位置。

水平

数字值

SUBWARNING

25

SUBDEBUG

5

有关日志级别的完整表格,请参阅logging模块。

这些额外的日志级别主要用于多处理模块中的某些调试消息。以下是与上面相同的示例,但已SUBDEBUG启用:

代码语言:javascript
复制
>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(multiprocessing.SUBDEBUG)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../pymp-djGBXN/listener-...'
>>> del m
[SUBDEBUG/MainProcess] finalizer calling ...
[INFO/MainProcess] sending shutdown message to manager
[DEBUG/SyncManager-...] manager received shutdown message
[SUBDEBUG/SyncManager-...] calling <Finalize object, callback=unlink, ...
[SUBDEBUG/SyncManager-...] finalizer calling <built-in function unlink> ...
[SUBDEBUG/SyncManager-...] calling <Finalize object, dead>
[SUBDEBUG/SyncManager-...] finalizer calling <function rmtree at 0x5aa730> ...
[INFO/SyncManager-...] manager exiting with exitcode 0

2.13. multiprocessing.dummy模块

multiprocessing.dummy复制API的multiprocessing但不过是threading模块的包装。

3.编程指南

使用时应遵守一定的准则和习惯用语multiprocessing

3.1. 所有平台

避免共享状态

尽可能避免在进程间转移大量数据。

最好坚持使用队列或管道进行进程之间的通信,而不是使用threading模块中较低级别的同步原语。

Picklability

确保代理方法的参数是可挑选的。

代理线程安全

除非用锁保护,否则不要使用来自多个线程的代理对象。

(使用相同代理的不同进程永远不会有问题。)

加入僵尸进程

在Unix上,当进程结束但尚未加入时,它变成僵尸。不应该有很多,因为每当新进程开始(或被active_children()调用)时,所有已完成的尚未加入的进程都将被加入。也调用完成的进程Process.is_alive将加入该进程。即使如此,明确加入您开始的所有流程也可能是一种很好的做法。

继承不如pickle/unpickle

在Windows上,许多类型multiprocessing需要可选择,以便子进程可以使用它们。但是,通常应避免使用管道或队列将共享对象发送到其他进程。相反,您应该安排程序,以便需要访问在别处创建的共享资源的进程可以从祖先进程继承。

避免终止进程

使用Process.terminate方法停止进程可能会导致进程当前正在使用的任何共享资源(例如锁,信号量,管道和队列)被破坏或不可用于其他进程。

因此,最好只考虑Process.terminate在不使用任何共享资源的进程上使用。

加入使用队列的进程

请记住,已将项目放入队列的进程将在终止之前等待,直到所有缓冲项目都由“馈线”线程馈送到基础管道。(子进程可以调用cancel_join_thread()队列的方法来避免这种行为。)

这意味着无论什么时候使用队列,都需要确保放入队列中的所有项目在加入之前最终都会被删除。否则,您无法确定将项目放入队列的进程将终止。还要记住,非守护进程会自动加入。

一个会导致死锁的例子如下:

代码语言:javascript
复制
from multiprocessing import Process, Queue

def f(q):
    q.put('X' * 1000000)

if __name__ == '__main__':
    queue = Queue()
    p = Process(target=f, args=(queue,))
    p.start()
    p.join()                    # this deadlocks
    obj = queue.get()

这里的一个解决方法是交换最后两行(或者直接删除p.join()行)。

明确地将资源传递给子进程

在Unix上,子进程可以使用全局资源在父进程中创建的共享资源。但是,最好将该对象作为参数传递给子进程的构造函数。

除了使代码(可能)与Windows兼容外,这也确保了只要子进程仍然存在,对象将不会被垃圾收集到父进程中。如果在父进程中垃圾收集对象时某些资源被释放,这可能很重要。

举例来说

代码语言:javascript
复制
from multiprocessing import Process, Lock

def f():
    ... do something using "lock" ...

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        Process(target=f).start()

应改写为

代码语言:javascript
复制
from multiprocessing import Process, Lock

def f(l):
    ... do something using "l" ...

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        Process(target=f, args=(lock,)).start()

谨防用sys.stdin“像对象的文件”

multiprocessing 原本被无条件地称为:

代码语言:javascript
复制
os.close(sys.stdin.fileno())

multiprocessing.Process._bootstrap()方法中 - 这导致了流程中的问题。这已被更改为:

代码语言:javascript
复制
sys.stdin.close()
sys.stdin = open(os.devnull)

它解决了进程之间相互冲突的基本问题,导致了错误的文件描述符错误,但却给应用程序带来了潜在的危险,这些应用程序将替换sys.stdin()为带有输出缓冲的“类文件对象”。这种危险是,如果多个进程调用close()这个类文件对象,可能会导致相同的数据被多次刷新到对象,从而导致损坏。

如果您编写一个类似文件的对象并实现自己的缓存,那么无论何时添加到缓存中,都可以通过存储pid来使其更安全,并在pid更改时放弃缓存。例如:

代码语言:javascript
复制
@property
def cache(self):
    pid = os.getpid()
    if pid != self._pid:
        self._pid = pid
        self._cache = []
    return self._cache

有关更多信息,请参阅问题5155问题5313问题5331

3.2. 视窗

由于Windows缺少os.fork()它有一些额外的限制:

更多的可拾取性

确保所有参数Process.__init__()都是可选择的。这尤其意味着绑定或未绑定的方法不能直接用作targetWindows上的参数 - 只需定义一个函数并使用它即可。

此外,如果您继承了子类,Process那么确保在Process.start调用方法时可以选择实例。

全局变量

请记住,如果在子进程中运行的代码尝试访问全局变量,则它所看到的值(如果有)可能与Process.start被调用时父进程中的值不同。

但是,仅仅是模块级别常量的全局变量不会导致问题。

主模块安全导入

确保主模块可以通过新的Python解释器安全地导入,而不会引起意外的副作用(例如开始一个新过程)。

例如,在Windows下运行以下模块将会失败,并显示RuntimeError

代码语言:javascript
复制
from multiprocessing import Process

def foo():
    print 'hello'

p = Process(target=foo)
p.start()

相反,应该使用if __name__ == '__main__':以下方式来保护程序的“切入点” :

代码语言:javascript
复制
from multiprocessing import Process, freeze_support

def foo():
    print 'hello'

if __name__ == '__main__':
    freeze_support()
    p = Process(target=foo)
    p.start()

freeze_support()如果程序正常运行而不是冻结,可以省略该行。)

这允许新产生的Python解释器安全地导入模块,然后运行模块的foo()功能。

如果在主模块中创建池或管理器,则会应用类似的限制。

4.例子

演示如何创建和使用定制的管理人员和代理人:

代码语言:javascript
复制
#
# This module shows how to use arbitrary callables with a subclass of
# `BaseManager`.
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#

from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator

##

class Foo(object):
    def f(self):
        print 'you called Foo.f()'
    def g(self):
        print 'you called Foo.g()'
    def _h(self):
        print 'you called Foo._h()'

# A simple generator function
def baz():
    for i in xrange(10):
        yield i*i

# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
    _exposed_ = ('next', '__next__')
    def __iter__(self):
        return self
    def next(self):
        return self._callmethod('next')
    def __next__(self):
        return self._callmethod('__next__')

# Function to return the operator module
def get_operator_module():
    return operator

##

class MyManager(BaseManager):
    pass

# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register('Foo1', Foo)

# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register('Foo2', Foo, exposed=('g', '_h'))

# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register('baz', baz, proxytype=GeneratorProxy)

# register get_operator_module(); make public functions accessible via proxy
MyManager.register('operator', get_operator_module)

##

def test():
    manager = MyManager()
    manager.start()

    print '-' * 20

    f1 = manager.Foo1()
    f1.f()
    f1.g()
    assert not hasattr(f1, '_h')
    assert sorted(f1._exposed_) == sorted(['f', 'g'])

    print '-' * 20

    f2 = manager.Foo2()
    f2.g()
    f2._h()
    assert not hasattr(f2, 'f')
    assert sorted(f2._exposed_) == sorted(['g', '_h'])

    print '-' * 20

    it = manager.baz()
    for i in it:
        print '<%d>' % i,
    print

    print '-' * 20

    op = manager.operator()
    print 'op.add(23, 45) =', op.add(23, 45)
    print 'op.pow(2, 94) =', op.pow(2, 94)
    print 'op.getslice(range(10), 2, 6) =', op.getslice(range(10), 2, 6)
    print 'op.repeat(range(5), 3) =', op.repeat(range(5), 3)
    print 'op._exposed_ =', op._exposed_

##

if __name__ == '__main__':
    freeze_support()
    test()

使用Pool

代码语言:javascript
复制
#
# A test of `multiprocessing.Pool` class
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#

import multiprocessing
import time
import random
import sys

#
# Functions used by test code
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % (
        multiprocessing.current_process().name,
        func.__name__, args, result
        )

def calculatestar(args):
    return calculate(*args)

def mul(a, b):
    time.sleep(0.5*random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5*random.random())
    return a + b

def f(x):
    return 1.0 / (x-5.0)

def pow3(x):
    return x**3

def noop(x):
    pass

#
# Test code
#

def test():
    print 'cpu_count() = %d\n' % multiprocessing.cpu_count()

    #
    # Create pool
    #

    PROCESSES = 4
    print 'Creating pool with %d processes\n' % PROCESSES
    pool = multiprocessing.Pool(PROCESSES)
    print 'pool = %s' % pool
    print

    #
    # Tests
    #

    TASKS = [(mul, (i, 7)) for i in range(10)] + \
            [(plus, (i, 8)) for i in range(10)]

    results = [pool.apply_async(calculate, t) for t in TASKS]
    imap_it = pool.imap(calculatestar, TASKS)
    imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)

    print 'Ordered results using pool.apply_async():'
    for r in results:
        print '\t', r.get()
    print

    print 'Ordered results using pool.imap():'
    for x in imap_it:
        print '\t', x
    print

    print 'Unordered results using pool.imap_unordered():'
    for x in imap_unordered_it:
        print '\t', x
    print

    print 'Ordered results using pool.map() --- will block till complete:'
    for x in pool.map(calculatestar, TASKS):
        print '\t', x
    print

    #
    # Simple benchmarks
    #

    N = 100000
    print 'def pow3(x): return x**3'

    t = time.time()
    A = map(pow3, xrange(N))
    print '\tmap(pow3, xrange(%d)):\n\t\t%s seconds' % \
          (N, time.time() - t)

    t = time.time()
    B = pool.map(pow3, xrange(N))
    print '\tpool.map(pow3, xrange(%d)):\n\t\t%s seconds' % \
          (N, time.time() - t)

    t = time.time()
    C = list(pool.imap(pow3, xrange(N), chunksize=N//8))
    print '\tlist(pool.imap(pow3, xrange(%d), chunksize=%d)):\n\t\t%s' \
          ' seconds' % (N, N//8, time.time() - t)

    assert A == B == C, (len(A), len(B), len(C))
    print

    L = [None] * 1000000
    print 'def noop(x): pass'
    print 'L = [None] * 1000000'

    t = time.time()
    A = map(noop, L)
    print '\tmap(noop, L):\n\t\t%s seconds' % \
          (time.time() - t)

    t = time.time()
    B = pool.map(noop, L)
    print '\tpool.map(noop, L):\n\t\t%s seconds' % \
          (time.time() - t)

    t = time.time()
    C = list(pool.imap(noop, L, chunksize=len(L)//8))
    print '\tlist(pool.imap(noop, L, chunksize=%d)):\n\t\t%s seconds' % \
          (len(L)//8, time.time() - t)

    assert A == B == C, (len(A), len(B), len(C))
    print

    del A, B, C, L

    #
    # Test error handling
    #

    print 'Testing error handling:'

    try:
        print pool.apply(f, (5,))
    except ZeroDivisionError:
        print '\tGot ZeroDivisionError as expected from pool.apply()'
    else:
        raise AssertionError('expected ZeroDivisionError')

    try:
        print pool.map(f, range(10))
    except ZeroDivisionError:
        print '\tGot ZeroDivisionError as expected from pool.map()'
    else:
        raise AssertionError('expected ZeroDivisionError')

    try:
        print list(pool.imap(f, range(10)))
    except ZeroDivisionError:
        print '\tGot ZeroDivisionError as expected from list(pool.imap())'
    else:
        raise AssertionError('expected ZeroDivisionError')

    it = pool.imap(f, range(10))
    for i in range(10):
        try:
            x = it.next()
        except ZeroDivisionError:
            if i == 5:
                pass
        except StopIteration:
            break
        else:
            if i == 5:
                raise AssertionError('expected ZeroDivisionError')

    assert i == 9
    print '\tGot ZeroDivisionError as expected from IMapIterator.next()'
    print

    #
    # Testing timeouts
    #

    print 'Testing ApplyResult.get() with timeout:',
    res = pool.apply_async(calculate, TASKS[0])
    while 1:
        sys.stdout.flush()
        try:
            sys.stdout.write('\n\t%s' % res.get(0.02))
            break
        except multiprocessing.TimeoutError:
            sys.stdout.write('.')
    print
    print

    print 'Testing IMapIterator.next() with timeout:',
    it = pool.imap(calculatestar, TASKS)
    while 1:
        sys.stdout.flush()
        try:
            sys.stdout.write('\n\t%s' % it.next(0.02))
        except StopIteration:
            break
        except multiprocessing.TimeoutError:
            sys.stdout.write('.')
    print
    print

    #
    # Testing callback
    #

    print 'Testing callback:'

    A = []
    B = [56, 0, 1, 8, 27, 64, 125, 216, 343, 512, 729]

    r = pool.apply_async(mul, (7, 8), callback=A.append)
    r.wait()

    r = pool.map_async(pow3, range(10), callback=A.extend)
    r.wait()

    if A == B:
        print '\tcallbacks succeeded\n'
    else:
        print '\t*** callbacks failed\n\t\t%s != %s\n' % (A, B)

    #
    # Check there are no outstanding tasks
    #

    assert not pool._cache, 'cache = %r' % pool._cache

    #
    # Check close() methods
    #

    print 'Testing close():'

    for worker in pool._pool:
        assert worker.is_alive()

    result = pool.apply_async(time.sleep, [0.5])
    pool.close()
    pool.join()

    assert result.get() is None

    for worker in pool._pool:
        assert not worker.is_alive()

    print '\tclose() succeeded\n'

    #
    # Check terminate() method
    #

    print 'Testing terminate():'

    pool = multiprocessing.Pool(2)
    DELTA = 0.1
    ignore = pool.apply(pow3, [2])
    results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
    pool.terminate()
    pool.join()

    for worker in pool._pool:
        assert not worker.is_alive()

    print '\tterminate() succeeded\n'

    #
    # Check garbage collection
    #

    print 'Testing garbage collection:'

    pool = multiprocessing.Pool(2)
    DELTA = 0.1
    processes = pool._pool
    ignore = pool.apply(pow3, [2])
    results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]

    results = pool = None

    time.sleep(DELTA * 2)

    for worker in processes:
        assert not worker.is_alive()

    print '\tgarbage collection succeeded\n'


if __name__ == '__main__':
    multiprocessing.freeze_support()

    assert len(sys.argv) in (1, 2)

    if len(sys.argv) == 1 or sys.argv[1] == 'processes':
        print ' Using processes '.center(79, '-')
    elif sys.argv[1] == 'threads':
        print ' Using threads '.center(79, '-')
        import multiprocessing.dummy as multiprocessing
    else:
        print 'Usage:\n\t%s [processes | threads]' % sys.argv[0]
        raise SystemExit(2)

    test()

同步类型,如锁,条件和队列:

代码语言:javascript
复制
#
# A test file for the `multiprocessing` package
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#

import time, sys, random
from Queue import Empty

import multiprocessing               # may get overwritten


#### TEST_VALUE

def value_func(running, mutex):
    random.seed()
    time.sleep(random.random()*4)

    mutex.acquire()
    print '\n\t\t\t' + str(multiprocessing.current_process()) + ' has finished'
    running.value -= 1
    mutex.release()

def test_value():
    TASKS = 10
    running = multiprocessing.Value('i', TASKS)
    mutex = multiprocessing.Lock()

    for i in range(TASKS):
        p = multiprocessing.Process(target=value_func, args=(running, mutex))
        p.start()

    while running.value > 0:
        time.sleep(0.08)
        mutex.acquire()
        print running.value,
        sys.stdout.flush()
        mutex.release()

    print
    print 'No more running processes'


#### TEST_QUEUE

def queue_func(queue):
    for i in range(30):
        time.sleep(0.5 * random.random())
        queue.put(i*i)
    queue.put('STOP')

def test_queue():
    q = multiprocessing.Queue()

    p = multiprocessing.Process(target=queue_func, args=(q,))
    p.start()

    o = None
    while o != 'STOP':
        try:
            o = q.get(timeout=0.3)
            print o,
            sys.stdout.flush()
        except Empty:
            print 'TIMEOUT'

    print


#### TEST_CONDITION

def condition_func(cond):
    cond.acquire()
    print '\t' + str(cond)
    time.sleep(2)
    print '\tchild is notifying'
    print '\t' + str(cond)
    cond.notify()
    cond.release()

def test_condition():
    cond = multiprocessing.Condition()

    p = multiprocessing.Process(target=condition_func, args=(cond,))
    print cond

    cond.acquire()
    print cond
    cond.acquire()
    print cond

    p.start()

    print 'main is waiting'
    cond.wait()
    print 'main has woken up'

    print cond
    cond.release()
    print cond
    cond.release()

    p.join()
    print cond


#### TEST_SEMAPHORE

def semaphore_func(sema, mutex, running):
    sema.acquire()

    mutex.acquire()
    running.value += 1
    print running.value, 'tasks are running'
    mutex.release()

    random.seed()
    time.sleep(random.random()*2)

    mutex.acquire()
    running.value -= 1
    print '%s has finished' % multiprocessing.current_process()
    mutex.release()

    sema.release()

def test_semaphore():
    sema = multiprocessing.Semaphore(3)
    mutex = multiprocessing.RLock()
    running = multiprocessing.Value('i', 0)

    processes = [
        multiprocessing.Process(target=semaphore_func,
                                args=(sema, mutex, running))
        for i in range(10)
        ]

    for p in processes:
        p.start()

    for p in processes:
        p.join()


#### TEST_JOIN_TIMEOUT

def join_timeout_func():
    print '\tchild sleeping'
    time.sleep(5.5)
    print '\n\tchild terminating'

def test_join_timeout():
    p = multiprocessing.Process(target=join_timeout_func)
    p.start()

    print 'waiting for process to finish'

    while 1:
        p.join(timeout=1)
        if not p.is_alive():
            break
        print '.',
        sys.stdout.flush()


#### TEST_EVENT

def event_func(event):
    print '\t%r is waiting' % multiprocessing.current_process()
    event.wait()
    print '\t%r has woken up' % multiprocessing.current_process()

def test_event():
    event = multiprocessing.Event()

    processes = [multiprocessing.Process(target=event_func, args=(event,))
                 for i in range(5)]

    for p in processes:
        p.start()

    print 'main is sleeping'
    time.sleep(2)

    print 'main is setting event'
    event.set()

    for p in processes:
        p.join()


#### TEST_SHAREDVALUES

def sharedvalues_func(values, arrays, shared_values, shared_arrays):
    for i in range(len(values)):
        v = values[i][1]
        sv = shared_values[i].value
        assert v == sv

    for i in range(len(values)):
        a = arrays[i][1]
        sa = list(shared_arrays[i][:])
        assert a == sa

    print 'Tests passed'

def test_sharedvalues():
    values = [
        ('i', 10),
        ('h', -2),
        ('d', 1.25)
        ]
    arrays = [
        ('i', range(100)),
        ('d', [0.25 * i for i in range(100)]),
        ('H', range(1000))
        ]

    shared_values = [multiprocessing.Value(id, v) for id, v in values]
    shared_arrays = [multiprocessing.Array(id, a) for id, a in arrays]

    p = multiprocessing.Process(
        target=sharedvalues_func,
        args=(values, arrays, shared_values, shared_arrays)
        )
    p.start()
    p.join()

    assert p.exitcode == 0


####

def test(namespace=multiprocessing):
    global multiprocessing

    multiprocessing = namespace

    for func in [ test_value, test_queue, test_condition,
                  test_semaphore, test_join_timeout, test_event,
                  test_sharedvalues ]:

        print '\n\t######## %s\n' % func.__name__
        func()

    ignore = multiprocessing.active_children()      # cleanup any old processes
    if hasattr(multiprocessing, '_debug_info'):
        info = multiprocessing._debug_info()
        if info:
            print info
            raise ValueError('there should be no positive refcounts left')


if __name__ == '__main__':
    multiprocessing.freeze_support()

    assert len(sys.argv) in (1, 2)

    if len(sys.argv) == 1 or sys.argv[1] == 'processes':
        print ' Using processes '.center(79, '-')
        namespace = multiprocessing
    elif sys.argv[1] == 'manager':
        print ' Using processes and a manager '.center(79, '-')
        namespace = multiprocessing.Manager()
        namespace.Process = multiprocessing.Process
        namespace.current_process = multiprocessing.current_process
        namespace.active_children = multiprocessing.active_children
    elif sys.argv[1] == 'threads':
        print ' Using threads '.center(79, '-')
        import multiprocessing.dummy as namespace
    else:
        print 'Usage:\n\t%s [processes | manager | threads]' % sys.argv[0]
        raise SystemExit(2)

    test(namespace)

显示如何使用队列将任务提供给工作进程集合并收集结果的示例:

代码语言:javascript
复制
#
# Simple example which uses a pool of workers to carry out some tasks.
#
# Notice that the results will probably not come out of the output
# queue in the same in the same order as the corresponding tasks were
# put on the input queue.  If it is important to get the results back
# in the original order then consider using `Pool.map()` or
# `Pool.imap()` (which will save on the amount of code needed anyway).
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#

import time
import random

from multiprocessing import Process, Queue, current_process, freeze_support

#
# Function run by worker processes
#

def worker(input, output):
    for func, args in iter(input.get, 'STOP'):
        result = calculate(func, args)
        output.put(result)

#
# Function used to calculate result
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % \
        (current_process().name, func.__name__, args, result)

#
# Functions referenced by tasks
#

def mul(a, b):
    time.sleep(0.5*random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5*random.random())
    return a + b

#
#
#

def test():
    NUMBER_OF_PROCESSES = 4
    TASKS1 = [(mul, (i, 7)) for i in range(20)]
    TASKS2 = [(plus, (i, 8)) for i in range(10)]

    # Create queues
    task_queue = Queue()
    done_queue = Queue()

    # Submit tasks
    for task in TASKS1:
        task_queue.put(task)

    # Start worker processes
    for i in range(NUMBER_OF_PROCESSES):
        Process(target=worker, args=(task_queue, done_queue)).start()

    # Get and print results
    print 'Unordered results:'
    for i in range(len(TASKS1)):
        print '\t', done_queue.get()

    # Add more tasks using `put()`
    for task in TASKS2:
        task_queue.put(task)

    # Get and print some more results
    for i in range(len(TASKS2)):
        print '\t', done_queue.get()

    # Tell child processes to stop
    for i in range(NUMBER_OF_PROCESSES):
        task_queue.put('STOP')


if __name__ == '__main__':
    freeze_support()
    test()

一个工作进程池如何SimpleHTTPServer.HttpServer在共享一个监听套接字时分别运行一个实例。

代码语言:javascript
复制
#
# Example where a pool of http servers share a single listening socket
#
# On Windows this module depends on the ability to pickle a socket
# object so that the worker processes can inherit a copy of the server
# object.  (We import `multiprocessing.reduction` to enable this pickling.)
#
# Not sure if we should synchronize access to `socket.accept()` method by
# using a process-shared lock -- does not seem to be necessary.
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#

import os
import sys

from multiprocessing import Process, current_process, freeze_support
from BaseHTTPServer import HTTPServer
from SimpleHTTPServer import SimpleHTTPRequestHandler

if sys.platform == 'win32':
    import multiprocessing.reduction    # make sockets pickable/inheritable


def note(format, *args):
    sys.stderr.write('[%s]\t%s\n' % (current_process().name, format%args))


class RequestHandler(SimpleHTTPRequestHandler):
    # we override log_message() to show which process is handling the request
    def log_message(self, format, *args):
        note(format, *args)

def serve_forever(server):
    note('starting server')
    try:
        server.serve_forever()
    except KeyboardInterrupt:
        pass


def runpool(address, number_of_processes):
    # create a single server object -- children will each inherit a copy
    server = HTTPServer(address, RequestHandler)

    # create child processes to act as workers
    for i in range(number_of_processes-1):
        Process(target=serve_forever, args=(server,)).start()

    # main process also acts as a worker
    serve_forever(server)


def test():
    DIR = os.path.join(os.path.dirname(__file__), '..')
    ADDRESS = ('localhost', 8000)
    NUMBER_OF_PROCESSES = 4

    print 'Serving at http://%s:%d using %d worker processes' % \
          (ADDRESS[0], ADDRESS[1], NUMBER_OF_PROCESSES)
    print 'To exit press Ctrl-' + ['C', 'Break'][sys.platform=='win32']

    os.chdir(DIR)
    runpool(ADDRESS, NUMBER_OF_PROCESSES)


if __name__ == '__main__':
    freeze_support()
    test()

一些简单的基准测试比较multiprocessingthreading

代码语言:javascript
复制
#
# Simple benchmarks for the multiprocessing package
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#

import time, sys, multiprocessing, threading, Queue, gc

if sys.platform == 'win32':
    _timer = time.clock
else:
    _timer = time.time

delta = 1


#### TEST_QUEUESPEED

def queuespeed_func(q, c, iterations):
    a = '0' * 256
    c.acquire()
    c.notify()
    c.release()

    for i in xrange(iterations):
        q.put(a)

    q.put('STOP')

def test_queuespeed(Process, q, c):
    elapsed = 0
    iterations = 1

    while elapsed < delta:
        iterations *= 2

        p = Process(target=queuespeed_func, args=(q, c, iterations))
        c.acquire()
        p.start()
        c.wait()
        c.release()

        result = None
        t = _timer()

        while result != 'STOP':
            result = q.get()

        elapsed = _timer() - t

        p.join()

    print iterations, 'objects passed through the queue in', elapsed, 'seconds'
    print 'average number/sec:', iterations/elapsed


#### TEST_PIPESPEED

def pipe_func(c, cond, iterations):
    a = '0' * 256
    cond.acquire()
    cond.notify()
    cond.release()

    for i in xrange(iterations):
        c.send(a)

    c.send('STOP')

def test_pipespeed():
    c, d = multiprocessing.Pipe()
    cond = multiprocessing.Condition()
    elapsed = 0
    iterations = 1

    while elapsed < delta:
        iterations *= 2

        p = multiprocessing.Process(target=pipe_func,
                                    args=(d, cond, iterations))
        cond.acquire()
        p.start()
        cond.wait()
        cond.release()

        result = None
        t = _timer()

        while result != 'STOP':
            result = c.recv()

        elapsed = _timer() - t
        p.join()

    print iterations, 'objects passed through connection in',elapsed,'seconds'
    print 'average number/sec:', iterations/elapsed


#### TEST_SEQSPEED

def test_seqspeed(seq):
    elapsed = 0
    iterations = 1

    while elapsed < delta:
        iterations *= 2

        t = _timer()

        for i in xrange(iterations):
            a = seq[5]

        elapsed = _timer()-t

    print iterations, 'iterations in', elapsed, 'seconds'
    print 'average number/sec:', iterations/elapsed


#### TEST_LOCK

def test_lockspeed(l):
    elapsed = 0
    iterations = 1

    while elapsed < delta:
        iterations *= 2

        t = _timer()

        for i in xrange(iterations):
            l.acquire()
            l.release()

        elapsed = _timer()-t

    print iterations, 'iterations in', elapsed, 'seconds'
    print 'average number/sec:', iterations/elapsed


#### TEST_CONDITION

def conditionspeed_func(c, N):
    c.acquire()
    c.notify()

    for i in xrange(N):
        c.wait()
        c.notify()

    c.release()

def test_conditionspeed(Process, c):
    elapsed = 0
    iterations = 1

    while elapsed < delta:
        iterations *= 2

        c.acquire()
        p = Process(target=conditionspeed_func, args=(c, iterations))
        p.start()

        c.wait()

        t = _timer()

        for i in xrange(iterations):
            c.notify()
            c.wait()

        elapsed = _timer()-t

        c.release()
        p.join()

    print iterations * 2, 'waits in', elapsed, 'seconds'
    print 'average number/sec:', iterations * 2 / elapsed

####

def test():
    manager = multiprocessing.Manager()

    gc.disable()

    print '\n\t######## testing Queue.Queue\n'
    test_queuespeed(threading.Thread, Queue.Queue(),
                    threading.Condition())
    print '\n\t######## testing multiprocessing.Queue\n'
    test_queuespeed(multiprocessing.Process, multiprocessing.Queue(),
                    multiprocessing.Condition())
    print '\n\t######## testing Queue managed by server process\n'
    test_queuespeed(multiprocessing.Process, manager.Queue(),
                    manager.Condition())
    print '\n\t######## testing multiprocessing.Pipe\n'
    test_pipespeed()

    print

    print '\n\t######## testing list\n'
    test_seqspeed(range(10))
    print '\n\t######## testing list managed by server process\n'
    test_seqspeed(manager.list(range(10)))
    print '\n\t######## testing Array("i", ..., lock=False)\n'
    test_seqspeed(multiprocessing.Array('i', range(10), lock=False))
    print '\n\t######## testing Array("i", ..., lock=True)\n'
    test_seqspeed(multiprocessing.Array('i', range(10), lock=True))

    print

    print '\n\t######## testing threading.Lock\n'
    test_lockspeed(threading.Lock())
    print '\n\t######## testing threading.RLock\n'
    test_lockspeed(threading.RLock())
    print '\n\t######## testing multiprocessing.Lock\n'
    test_lockspeed(multiprocessing.Lock())
    print '\n\t######## testing multiprocessing.RLock\n'
    test_lockspeed(multiprocessing.RLock())
    print '\n\t######## testing lock managed by server process\n'
    test_lockspeed(manager.Lock())
    print '\n\t######## testing rlock managed by server process\n'
    test_lockspeed(manager.RLock())

    print

    print '\n\t######## testing threading.Condition\n'
    test_conditionspeed(threading.Thread, threading.Condition())
    print '\n\t######## testing multiprocessing.Condition\n'
    test_conditionspeed(multiprocessing.Process, multiprocessing.Condition())
    print '\n\t######## testing condition managed by a server process\n'
    test_conditionspeed(multiprocessing.Process, manager.Condition())

    gc.enable()

if __name__ == '__main__':
    multiprocessing.freeze_support()
    test()

扫码关注腾讯云开发者

领取腾讯云代金券

http://www.vxiaotou.com