Python多线程,生产者-消费者-终结者
2015-08-07 10:29
471 查看
Python实现多线程很简单,直接继承
“生产者-消费者”模型是很常见的多线程场景,程序维护一个数据缓冲区,一个“生产者”线程往里面放数据,另一个“消费者”线程从中取数据。这在Python中实现起来非常容易,因为Python有个线程安全、又非常高效的双端队列结构
在许多场景中,生产者和消费者自系统启动便始终谈笑风生。然而天下没有不散的筵席,系统总有终结的一刻,所以此处再考虑一个棒打鸳鸯的角色,叫做“终结者”,负责监控系统状态,在必要时通知大家撤退。整个系统包含一个生产者、若干个消费者和一个终结者,关系如下:
生产者负责采集数据,每次将新数据放入缓冲区后,通知所有消费者
消费者分析缓冲区中的数据(可以只读也可以取出数据)
终结者监控系统状态,在必要时通知所有生产者和消费者停止工作退出
通知机制可以用
用条件变量再配合一个表示系统是否要终结的状态变量,便实现了“生产者-消费者-终结者”模型中需要的同步机制。完整代码如下:
程序里建立
运行结果:
1
Buffer length: 1
1 2
Buffer length: 2
1 2 3
Buffer length: 3
1 2 3 4
Buffer length: 4
1 2 3 4 5
Buffer length: 5
2 3 4 5 6
Buffer length: 5
3 4 5 6 7
Buffer length: 5
4 5 6 7 8
Buffer length: 5
5 6 7 8 9
Buffer length: 5
threading.Thread类,覆盖掉
run()方法即可。必要时还可以覆盖
__init__()方法以便于传递参数,不过要保证在新的
__init__()中首先调用
threading.Thread的
__init__()来完成一些必要的线程初始化工作。下面是一个简单的多线程版HelloWorld:
import threading class MyThread(threading.Thread): def __init__(self, para1, para2): threading.Thread.__init__(self) self.para1 = para1 self.para2 = para2 def run(self): print self.para1 print self.para2 thread1 = MyThread('Hello, earthman.', 'Goodbye.') thread2 = MyThread('Hello, ET.', 'Bye.') thread1.start() thread2.start()
“生产者-消费者”模型是很常见的多线程场景,程序维护一个数据缓冲区,一个“生产者”线程往里面放数据,另一个“消费者”线程从中取数据。这在Python中实现起来非常容易,因为Python有个线程安全、又非常高效的双端队列结构
deque,左右两端都能插入和取出数据,可以随心所欲地实现FIFO、LIFO甚至更复杂的结构,若指定
maxlen参数,还能自动变成循环缓冲区。这一切都不用操心加锁的问题。
在许多场景中,生产者和消费者自系统启动便始终谈笑风生。然而天下没有不散的筵席,系统总有终结的一刻,所以此处再考虑一个棒打鸳鸯的角色,叫做“终结者”,负责监控系统状态,在必要时通知大家撤退。整个系统包含一个生产者、若干个消费者和一个终结者,关系如下:
生产者负责采集数据,每次将新数据放入缓冲区后,通知所有消费者
消费者分析缓冲区中的数据(可以只读也可以取出数据)
终结者监控系统状态,在必要时通知所有生产者和消费者停止工作退出
通知机制可以用
threading模块中提供的条件变量
Condition()实现。
Condition()类自带一个锁,支持
acquire()和
release()方法,又实现了
wait()和
notify()/
notifyAll()方法。持有锁的线程可以调用
wait()进入休眠状态,此方法会自动释放锁。持有锁的线程也可以调用
notify()/
notifyAll()来唤醒休眠线程,然后显式释放锁,休眠线程获得锁后从
wait()函数返回。
用条件变量再配合一个表示系统是否要终结的状态变量,便实现了“生产者-消费者-终结者”模型中需要的同步机制。完整代码如下:
#!/usr/bin/python2 from collections import deque import threading import time BUFFER_SIZE = 5 class State(object): """A set of mechanism for synchronization and notification between threads. Attributes: shutting: A boolean indicating if the system is shutting down. Its value should only be changed by calling the shutdown() method. """ def __init__(self): self.shutting = False self.__cv = threading.Condition() def shutdown(self): """Set shutting to True and notify other threads who are waiting.""" self.shutting = True self.__cv.acquire() self.__cv.notifyAll() self.__cv.release() def gotdata(self): """Notify other threads who are waiting.""" if self.__cv.acquire(False): # if self.shutting: # self.__cv.release() # return self.__cv.notifyAll() self.__cv.release() def waitfordata(self): """Sleep until another thread calls either gotdata() or shutdown().""" if self.__cv.acquire(False): if self.shutting: self.__cv.release() return self.__cv.wait() self.__cv.release() class MyThread(threading.Thread): def __init__(self, buffer, state): threading.Thread.__init__(self) self.buffer = buffer self.state = state def run(self): """This is for consumers. The producer should override this method.""" self.state.waitfordata() while not self.state.shutting: self.consume(self.buffer) self.state.waitfordata() def consume(self, buffer): """This method should be overridden for a consumer thread.""" return class Producer(MyThread): """A thread who produces data periodically and save them into buffer.""" def run(self): data = 0 while not self.state.shutting: time.sleep(0.2) data += 1 self.buffer.append(data) self.state.gotdata() class Consumer1(MyThread): """A thread who analyzes the data in buffer.""" def consume(self, buffer): for data in buffer: print data, print '' class Consumer2(MyThread): """A thread who analyzes the data in buffer.""" def consume(self, buffer): time.sleep(0.1) print 'Buffer length: ', len(buffer) def main(): thread_classes = (Producer, Consumer1, Consumer2) buffer = deque(maxlen = BUFFER_SIZE) state = State() threads = [] for cls in thread_classes: threads.append(cls(buffer, state)) for thread in threads: thread.start() time.sleep(2) state.shutdown() for thread in threads: thread.join() if __name__ == '__main__': main()
程序里建立
deque时用
maxlen参数指定了队列的最大长度,这样就很方便地实现了一个循环缓冲区,每次用
append()方法从队列右边插入数据的时候,若队列已满,最左边的数据会自动被丢弃。
运行结果:
1
Buffer length: 1
1 2
Buffer length: 2
1 2 3
Buffer length: 3
1 2 3 4
Buffer length: 4
1 2 3 4 5
Buffer length: 5
2 3 4 5 6
Buffer length: 5
3 4 5 6 7
Buffer length: 5
4 5 6 7 8
Buffer length: 5
5 6 7 8 9
Buffer length: 5
相关文章推荐
- python 工作自动签到记录
- 零基础学python-初识python与python的解释过程
- 零基础学python-初识python与python的解释过程
- python装饰器的理解
- 670个常用的Python库和示例代码
- python_class_1 How to use def
- 教你用200行Python代码“换脸”
- 用python来爬某电影网站的下载地址
- Python调用百度API之天气查询
- python中MySQLdb模块用法实例
- Read Large Files in Python
- python基础学习笔记<内建模块与第三方模块>
- python基础学习笔记<进阶>
- Python开源异步并发框架
- Python开源异步并发框架
- pycurl,Python cURL library
- pycurl,Python cURL library
- python中if __name__ == '__main__': 的解析
- python中if __name__ == '__main__': 的解析
- Python转码问题的解决方法:ignore,replace,xmlcharrefreplace