您的位置:首页 > 编程语言 > Python开发

Python自定义进程池(生产者/消费者模型)

2014-05-21 14:32 507 查看
代码说明一切:
#encoding=utf-8
#author: walker
#date: 2014-05-21
#summary: 自定义进程池遍历目录下文件

from multiprocessing import Process, Queue, Lock
import time, os

#消费者
class Consumer(Process):
def __init__(self, queue, ioLock):
super(Consumer, self).__init__()
self.queue = queue
self.ioLock = ioLock

def run(self):
while True:
task = self.queue.get()	#队列中无任务时,会阻塞进程
if isinstance(task, str) and task == 'quit':
break;
time.sleep(1)	#假定任务处理需要1秒钟
self.ioLock.acquire()
print( str(os.getpid()) + '  ' + task)
self.ioLock.release()
self.ioLock.acquire()
print 'Bye-bye'
self.ioLock.release()

#生产者
def Producer():
queue = Queue()    #这个队列是进程/线程安全的
ioLock = Lock()
subNum = 4	#子进程数量
workers = build_worker_pool(queue, ioLock, subNum)
start_time = time.time()

for parent, dirnames, filenames in os.walk(r'D:\test'):
for filename in filenames:
queue.put(filename)
ioLock.acquire()
print('qsize:' + str(queue.qsize()))
ioLock.release()
while queue.qsize() > subNum * 10:	#控制队列中任务数量
time.sleep(1)

for worker in workers:
queue.put('quit')

for worker in workers:
worker.join()

ioLock.acquire()
print('Done! Time taken: {}'.format(time.time() - start_time))
ioLock.release()

#创建进程池
def build_worker_pool(queue, ioLock, size):
workers = []
for _ in range(size):
worker = Consumer(queue, ioLock)
worker.start()
workers.append(worker)
return workers

if __name__ == '__main__':
Producer()
ps:
self.ioLock.acquire()
...
self.ioLock.release()
可用
with self.ioLock:
...
替代。
#encoding=utf-8
#author: walker
#date: 2017-03-14
#summary: 一个子进程生产,一个子进程消费

import os, sys, time
from multiprocessing import Process, Pool, Queue, Manager

#生产
def Produce(q):
print('Produce %d ...' % os.getpid())
for i in range(1, 20):
while q.full():
print('sleep %d/%d ...' % (i, q.qsize()))
time.sleep(1)
q.put(i)

q.put(0)		#用0通知结束

#消费
def Consume(q):
print('Consume %d ...' % os.getpid())
while True:
num = q.get()
if 0 == num:	#收到结束信号
print('receive 0')
break
print('Consumer ' + str(num))
time.sleep(2)
print('Consumer end ' + str(num))

if __name__ == '__main__':
q = Queue(10)				#可用
q = Manager().Queue(10)		#可用

print(os.getpid())

producerProcess = Process(target=Produce, args=(q,))		#生产进程
consumerProcess = Process(target=Consume, args=(q,))		#消费进程

producerProcess.start()
consumerProcess.start()

producerProcess.join()
consumerProcess.join()
#encoding=utf-8
#author: walker
#date: 2017-03-14
#summary: 一个子进程生产,进程池消费

import os, sys, time
from multiprocessing import Process, Pool, Queue, Manager

#生产
def Produce(q, poolSize):
print('Produce ...')
for i in range(1, 100):
while q.full():
print('sleep %d/%d ...' % (i, q.qsize()))
time.sleep(1)
q.put(i)

for _ in range(0, poolSize):
q.put(0)		#用0通知结束

#消费
def Consume(q):
print('Consume ...')
while True:
num = q.get()
if 0 == num:	#收到结束信号
print('receive 0')
break
print('Consumer ' + str(num))
time.sleep(2)
print('Consumer end ' + str(num))

if __name__ == '__main__':
#q = Queue(10)				#不可用
q = Manager().Queue(10)		#可用

poolSize = 4
producerProcess = Process(target=Produce, args=(q, poolSize))		#生产进程
consumerPool = Pool(processes=poolSize)	#消费进程池,默认子进程个数为os.cpu_count()
for _ in range(0, poolSize):
consumerPool.apply_async(func=Consume, args=(q,))

producerProcess.start()
consumerPool.close()

producerProcess.join()
consumerPool.join()
#encoding=utf-8
#author: walker
#date: 2017-03-14
#summary: 主进程生产,进程池消费

import os, sys, time
from multiprocessing import Process, Pool, Queue, Manager

#消费
def Consume(q):
print('Consume ...')
num = q.get()
print('Consume %d ...' % num)
time.sleep(2)
print('Consumer %d over' % num)

if __name__ == '__main__':
#q = Queue(10)				#不可用
q = Manager().Queue(10)		#可用

pool = Pool(processes = 4)
for i in range(1, 100):		#生产
while q.full():
print('sleep %d ...' % q.qsize())
time.sleep(1)
q.put(i)
print(i)
pool.apply_async(Consume, (q,))

pool.close()
pool.join()
*** Updated 2016-01-06 ***
一个好玩的例子:
#encoding=utf-8
#author: walker
#date: 2016-01-06
#summary: 一个多进程的好玩例子

import os, sys, time
from multiprocessing import Pool

cur_dir_fullpath = os.path.dirname(os.path.abspath(__file__))

g_List = ['a']

#修改全局变量g_List
def ModifyDict_1():
global g_List
g_List.append('b')

#修改全局变量g_List
def ModifyDict_2():
global g_List
g_List.append('c')

#处理一个
def ProcOne(num):
print('ProcOne ' + str(num) + ', g_List:' + repr(g_List))

#处理所有
def ProcAll():
pool = Pool(processes = 4)
for i in range(1, 20):
#ProcOne(i)
#pool.apply(ProcOne, (i,))
pool.apply_async(ProcOne, (i,))

pool.close()
pool.join()

ModifyDict_1()  #修改全局变量g_List

if __name__ == '__main__':
ModifyDict_2()  #修改全局变量g_List

print('In main g_List :' + repr(g_List))

ProcAll()
Windows7 下运行的结果:
λ python3 demo.py
In main g_List :['a', 'b', 'c']
ProcOne 1, g_List:['a', 'b']
ProcOne 2, g_List:['a', 'b']
ProcOne 3, g_List:['a', 'b']
ProcOne 4, g_List:['a', 'b']
ProcOne 5, g_List:['a', 'b']
ProcOne 6, g_List:['a', 'b']
ProcOne 7, g_List:['a', 'b']
ProcOne 8, g_List:['a', 'b']
ProcOne 9, g_List:['a', 'b']
ProcOne 10, g_List:['a', 'b']
ProcOne 11, g_List:['a', 'b']
ProcOne 12, g_List:['a', 'b']
ProcOne 13, g_List:['a', 'b']
ProcOne 14, g_List:['a', 'b']
ProcOne 15, g_List:['a', 'b']
ProcOne 16, g_List:['a', 'b']
ProcOne 17, g_List:['a', 'b']
ProcOne 18, g_List:['a', 'b']
ProcOne 19, g_List:['a', 'b']
Ubuntu 14.04下运行的结果:

In main g_List :['a', 'b', 'c']
ProcOne 1, g_List:['a', 'b', 'c']
ProcOne 2, g_List:['a', 'b', 'c']
ProcOne 3, g_List:['a', 'b', 'c']
ProcOne 5, g_List:['a', 'b', 'c']
ProcOne 4, g_List:['a', 'b', 'c']
ProcOne 8, g_List:['a', 'b', 'c']
ProcOne 9, g_List:['a', 'b', 'c']
ProcOne 7, g_List:['a', 'b', 'c']
ProcOne 11, g_List:['a', 'b', 'c']
ProcOne 6, g_List:['a', 'b', 'c']
ProcOne 12, g_List:['a', 'b', 'c']
ProcOne 13, g_List:['a', 'b', 'c']
ProcOne 10, g_List:['a', 'b', 'c']
ProcOne 14, g_List:['a', 'b', 'c']
ProcOne 15, g_List:['a', 'b', 'c']
ProcOne 16, g_List:['a', 'b', 'c']
ProcOne 17, g_List:['a', 'b', 'c']
ProcOne 18, g_List:['a', 'b', 'c']
ProcOne 19, g_List:['a', 'b', 'c']
  可以看见Windows7下第二次修改没有成功,而Ubuntu下修改成功了。据uliweb作者limodou讲,原因是Windows下是重启实现的子进程;Linux下是fork实现的。

相关阅读:
0、官方多进程文档
1、Python 并行任务技巧
2、python中的多进程处理
3、python的threading和multiprocessing模块
4、python下使用ctypes获取threading线程id

*** walker * 2014-05-21 ***
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息