Python自定义进程池(生产者/消费者模型)
2014-05-21 14:32
507 查看
代码说明一切:
一个好玩的例子:
相关阅读:
0、官方多进程文档。
1、Python 并行任务技巧
2、python中的多进程处理
3、python的threading和multiprocessing模块
4、python下使用ctypes获取threading线程id
*** walker * 2014-05-21 ***
#encoding=utf-8 #author: walker #date: 2014-05-21 #summary: 自定义进程池遍历目录下文件 from multiprocessing import Process, Queue, Lock import time, os #消费者 class Consumer(Process): def __init__(self, queue, ioLock): super(Consumer, self).__init__() self.queue = queue self.ioLock = ioLock def run(self): while True: task = self.queue.get() #队列中无任务时,会阻塞进程 if isinstance(task, str) and task == 'quit': break; time.sleep(1) #假定任务处理需要1秒钟 self.ioLock.acquire() print( str(os.getpid()) + ' ' + task) self.ioLock.release() self.ioLock.acquire() print 'Bye-bye' self.ioLock.release() #生产者 def Producer(): queue = Queue() #这个队列是进程/线程安全的 ioLock = Lock() subNum = 4 #子进程数量 workers = build_worker_pool(queue, ioLock, subNum) start_time = time.time() for parent, dirnames, filenames in os.walk(r'D:\test'): for filename in filenames: queue.put(filename) ioLock.acquire() print('qsize:' + str(queue.qsize())) ioLock.release() while queue.qsize() > subNum * 10: #控制队列中任务数量 time.sleep(1) for worker in workers: queue.put('quit') for worker in workers: worker.join() ioLock.acquire() print('Done! Time taken: {}'.format(time.time() - start_time)) ioLock.release() #创建进程池 def build_worker_pool(queue, ioLock, size): workers = [] for _ in range(size): worker = Consumer(queue, ioLock) worker.start() workers.append(worker) return workers if __name__ == '__main__': Producer()ps:
self.ioLock.acquire() ... self.ioLock.release()可用
with self.ioLock: ...替代。
#encoding=utf-8 #author: walker #date: 2017-03-14 #summary: 一个子进程生产,一个子进程消费 import os, sys, time from multiprocessing import Process, Pool, Queue, Manager #生产 def Produce(q): print('Produce %d ...' % os.getpid()) for i in range(1, 20): while q.full(): print('sleep %d/%d ...' % (i, q.qsize())) time.sleep(1) q.put(i) q.put(0) #用0通知结束 #消费 def Consume(q): print('Consume %d ...' % os.getpid()) while True: num = q.get() if 0 == num: #收到结束信号 print('receive 0') break print('Consumer ' + str(num)) time.sleep(2) print('Consumer end ' + str(num)) if __name__ == '__main__': q = Queue(10) #可用 q = Manager().Queue(10) #可用 print(os.getpid()) producerProcess = Process(target=Produce, args=(q,)) #生产进程 consumerProcess = Process(target=Consume, args=(q,)) #消费进程 producerProcess.start() consumerProcess.start() producerProcess.join() consumerProcess.join()
#encoding=utf-8 #author: walker #date: 2017-03-14 #summary: 一个子进程生产,进程池消费 import os, sys, time from multiprocessing import Process, Pool, Queue, Manager #生产 def Produce(q, poolSize): print('Produce ...') for i in range(1, 100): while q.full(): print('sleep %d/%d ...' % (i, q.qsize())) time.sleep(1) q.put(i) for _ in range(0, poolSize): q.put(0) #用0通知结束 #消费 def Consume(q): print('Consume ...') while True: num = q.get() if 0 == num: #收到结束信号 print('receive 0') break print('Consumer ' + str(num)) time.sleep(2) print('Consumer end ' + str(num)) if __name__ == '__main__': #q = Queue(10) #不可用 q = Manager().Queue(10) #可用 poolSize = 4 producerProcess = Process(target=Produce, args=(q, poolSize)) #生产进程 consumerPool = Pool(processes=poolSize) #消费进程池,默认子进程个数为os.cpu_count() for _ in range(0, poolSize): consumerPool.apply_async(func=Consume, args=(q,)) producerProcess.start() consumerPool.close() producerProcess.join() consumerPool.join()
#encoding=utf-8 #author: walker #date: 2017-03-14 #summary: 主进程生产,进程池消费 import os, sys, time from multiprocessing import Process, Pool, Queue, Manager #消费 def Consume(q): print('Consume ...') num = q.get() print('Consume %d ...' % num) time.sleep(2) print('Consumer %d over' % num) if __name__ == '__main__': #q = Queue(10) #不可用 q = Manager().Queue(10) #可用 pool = Pool(processes = 4) for i in range(1, 100): #生产 while q.full(): print('sleep %d ...' % q.qsize()) time.sleep(1) q.put(i) print(i) pool.apply_async(Consume, (q,)) pool.close() pool.join()*** Updated 2016-01-06 ***
一个好玩的例子:
#encoding=utf-8 #author: walker #date: 2016-01-06 #summary: 一个多进程的好玩例子 import os, sys, time from multiprocessing import Pool cur_dir_fullpath = os.path.dirname(os.path.abspath(__file__)) g_List = ['a'] #修改全局变量g_List def ModifyDict_1(): global g_List g_List.append('b') #修改全局变量g_List def ModifyDict_2(): global g_List g_List.append('c') #处理一个 def ProcOne(num): print('ProcOne ' + str(num) + ', g_List:' + repr(g_List)) #处理所有 def ProcAll(): pool = Pool(processes = 4) for i in range(1, 20): #ProcOne(i) #pool.apply(ProcOne, (i,)) pool.apply_async(ProcOne, (i,)) pool.close() pool.join() ModifyDict_1() #修改全局变量g_List if __name__ == '__main__': ModifyDict_2() #修改全局变量g_List print('In main g_List :' + repr(g_List)) ProcAll()Windows7 下运行的结果:
λ python3 demo.py In main g_List :['a', 'b', 'c'] ProcOne 1, g_List:['a', 'b'] ProcOne 2, g_List:['a', 'b'] ProcOne 3, g_List:['a', 'b'] ProcOne 4, g_List:['a', 'b'] ProcOne 5, g_List:['a', 'b'] ProcOne 6, g_List:['a', 'b'] ProcOne 7, g_List:['a', 'b'] ProcOne 8, g_List:['a', 'b'] ProcOne 9, g_List:['a', 'b'] ProcOne 10, g_List:['a', 'b'] ProcOne 11, g_List:['a', 'b'] ProcOne 12, g_List:['a', 'b'] ProcOne 13, g_List:['a', 'b'] ProcOne 14, g_List:['a', 'b'] ProcOne 15, g_List:['a', 'b'] ProcOne 16, g_List:['a', 'b'] ProcOne 17, g_List:['a', 'b'] ProcOne 18, g_List:['a', 'b'] ProcOne 19, g_List:['a', 'b']Ubuntu 14.04下运行的结果:
In main g_List :['a', 'b', 'c'] ProcOne 1, g_List:['a', 'b', 'c'] ProcOne 2, g_List:['a', 'b', 'c'] ProcOne 3, g_List:['a', 'b', 'c'] ProcOne 5, g_List:['a', 'b', 'c'] ProcOne 4, g_List:['a', 'b', 'c'] ProcOne 8, g_List:['a', 'b', 'c'] ProcOne 9, g_List:['a', 'b', 'c'] ProcOne 7, g_List:['a', 'b', 'c'] ProcOne 11, g_List:['a', 'b', 'c'] ProcOne 6, g_List:['a', 'b', 'c'] ProcOne 12, g_List:['a', 'b', 'c'] ProcOne 13, g_List:['a', 'b', 'c'] ProcOne 10, g_List:['a', 'b', 'c'] ProcOne 14, g_List:['a', 'b', 'c'] ProcOne 15, g_List:['a', 'b', 'c'] ProcOne 16, g_List:['a', 'b', 'c'] ProcOne 17, g_List:['a', 'b', 'c'] ProcOne 18, g_List:['a', 'b', 'c'] ProcOne 19, g_List:['a', 'b', 'c']可以看见Windows7下第二次修改没有成功,而Ubuntu下修改成功了。据uliweb作者limodou讲,原因是Windows下是重启实现的子进程;Linux下是fork实现的。
相关阅读:
0、官方多进程文档。
1、Python 并行任务技巧
2、python中的多进程处理
3、python的threading和multiprocessing模块
4、python下使用ctypes获取threading线程id
*** walker * 2014-05-21 ***
相关文章推荐
- Python自定义进程池实例分析【生产者、消费者模型问题】
- Python自定义进程池实例分析【生产者、消费者模型问题】
- 4.利用python生成器实现简单的“生产者消费者”模型
- Python之生产者&、消费者模型
- 11.python并发入门(part8 基于线程队列实现生产者消费者模型)
- Python 多线程 生产者消费者模型
- 消费者&生产者模型的python代码
- python 生产者消费者线程模型
- Python 生产者与消费者模型
- python 生产者消费者模型
- python生产者消费者模型
- 利用python实现生产者消费者的并发模型
- Python 第九篇:队列Queue、生产者消费者模型、(IO/异步IP/Select/Poll/Epool)、Mysql操作
- python生产者消费者简单模型
- python用于实现多线程异步交互之生产者消费者模型
- python编写类来实现生产者消费者模型
- Python:生产者消费者模型
- python线程和进程,生产者消费者模型
- 通过“生产者-消费者模型”理解Python协程和yield关键字
- 人生苦短之我用Python篇(队列、生产者和消费者模型)