您的位置:首页 > 编程语言 > Python开发

python 多进程 —— multiprocessing.Pool

2017-06-02 12:01 441 查看

Pool

如果要启动大量的子进程,可以用进程池的方式批量创建子进程.

class multiprocessing.Pool([processes[, initializer[, initargs[, maxtasksperchild]]]])


控制可以提交作业的工作进程池的进程池对象。它支持超时和回调的异步结果,并具有并行映射实现。

processes 是要使用的工作进程数。如果进程为None,则使用cpu_count()返回的数字。如果初始化程序不是无,则每个工作进程将在启动时调用初始化程序(* initargs)。

请注意,池对象的方法只能由创建池的进程调用。

2.7版本中的新功能:maxtasksperchild是一个工作进程可以在退出之前完成的任务数,并将其替换为一个新的工作进程,以使未使用的资源被释放。默认的maxtasksperchild是None,这意味着工作进程将生存只要池。

apply(func[, args[, kwds]])


等效于
apply()
内置函数。 它阻塞直到结果准备就绪,所以
apply_async()
更适合并行执行工作。 此外,func仅在池的一个工作人员中执行。

apply_async(func[, args[, kwds[, callback]]])


返回结果对象的
apply()
方法的变体。

如果指定回调,那么它应该是一个可接受单个参数的可调用函数。 当结果变得准备好时,应答回调(除非调用失败)。 回调应该立即完成,否则处理结果的线程将被阻止。

close()


防止任何更多的任务被提交到池中。 一旦完成所有任务,工作进程将退出。

join()


等待工作进程退出。 在使用join()之前必须调用close()或terminate()。

使用 Pool.apply()

# -*- coding:utf-8 -*-
from multiprocessing import Pool
import os
import time
import random

def long_time_task(name):
print('Run task %s (%s)...' % (name, os.getpid()))
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print('Task %s runs %0.2f seconds.' % (name, (end - start)))

if __name__ == '__main__':
print('Parent process %s.' % os.getpid())
p = Pool(5)
for i in range(5):
# p.apply_async(long_time_task, args=(i,))
p.apply(long_time_task, args=(i,))

print('Waiting for all subprocesses done...')
p.close()
p.join()
print('All subprocesses done.')


输出:

Parent process 3924.
Run task 0 (2568)...
Task 0 runs 2.63 seconds.
Run task 1 (5708)...
Task 1 runs 0.01 seconds.
Run task 2 (12152)...
Task 2 runs 2.62 seconds.
Run task 3 (6888)...
Task 3 runs 2.92 seconds.
Run task 4 (8256)...
Task 4 runs 0.50 seconds.
Waiting for all subprocesses done...
All subprocesses done.

Process finished with exit code 0


使用 Pool.apply_async()

# -*- coding:utf-8 -*-
from multiprocessing import Pool
import os
import time
import random

def long_time_task(name):
print('Run task %s (%s)...' % (name, os.getpid()))
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print('Task %s runs %0.2f seconds.' % (name, (end - start)))

if __name__ == '__main__':
print('Parent process %s.' % os.getpid())
p = Pool(5)
for i in range(5):
p.apply_async(long_time_task, args=(i,))
# p.apply(long_time_task, args=(i,))

print('Waiting for all subprocesses done...')
p.close()
p.join()
print('All subprocesses done.')


输出:

Parent process 12272.
Waiting for all subprocesses done...
Run task 0 (10036)...
Run task 1 (5400)...
Run task 2 (12152)...
Run task 3 (10372)...
Run task 4 (7548)...
Task 1 runs 0.58 seconds.
Task 4 runs 0.83 seconds.
Task 3 runs 1.92 seconds.
Task 2 runs 2.40 seconds.
Task 0 runs 2.48 seconds.
All subprocesses done.

Process finished with exit code 0


对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: