您的位置:首页 > 编程语言 > Python开发

Python多线程,生产者-消费者-终结者

2015-08-07 10:29 471 查看
Python实现多线程很简单,直接继承
threading.Thread
类,覆盖掉
run()
方法即可。必要时还可以覆盖
__init__()
方法以便于传递参数,不过要保证在新的
__init__()
中首先调用
threading.Thread
__init__()
来完成一些必要的线程初始化工作。下面是一个简单的多线程版HelloWorld:

import threading

class MyThread(threading.Thread):
    def __init__(self, para1, para2):
        threading.Thread.__init__(self)
        self.para1 = para1
        self.para2 = para2

    def run(self):
        print self.para1
        print self.para2

thread1 = MyThread('Hello, earthman.', 'Goodbye.')
thread2 = MyThread('Hello, ET.', 'Bye.')
thread1.start()
thread2.start()


“生产者-消费者”模型是很常见的多线程场景,程序维护一个数据缓冲区,一个“生产者”线程往里面放数据,另一个“消费者”线程从中取数据。这在Python中实现起来非常容易,因为Python有个线程安全、又非常高效的双端队列结构
deque
,左右两端都能插入和取出数据,可以随心所欲地实现FIFO、LIFO甚至更复杂的结构,若指定
maxlen
参数,还能自动变成循环缓冲区。
这一切都不用操心加锁的问题。

在许多场景中,生产者和消费者自系统启动便始终谈笑风生。然而天下没有不散的筵席,系统总有终结的一刻,所以此处再考虑一个棒打鸳鸯的角色,叫做“终结者”,负责监控系统状态,在必要时通知大家撤退。整个系统包含一个生产者、若干个消费者和一个终结者,关系如下:

生产者负责采集数据,每次将新数据放入缓冲区后,通知所有消费者

消费者分析缓冲区中的数据(可以只读也可以取出数据)

终结者监控系统状态,在必要时通知所有生产者和消费者停止工作退出

通知机制可以用
threading
模块中提供的条件变量
Condition()
实现。
Condition()
类自带一个锁,支持
acquire()
release()
方法,又实现了
wait()
notify()
/
notifyAll()
方法。持有锁的线程可以调用
wait()
进入休眠状态,此方法会自动释放锁。持有锁的线程也可以调用
notify()
/
notifyAll()
来唤醒休眠线程,然后显式释放锁,休眠线程获得锁后从
wait()
函数返回。


用条件变量再配合一个表示系统是否要终结的状态变量,便实现了“生产者-消费者-终结者”模型中需要的同步机制。完整代码如下:

#!/usr/bin/python2
from collections import deque
import threading
import time

BUFFER_SIZE = 5

class State(object):
    """A set of mechanism for synchronization and notification between threads.

    Attributes:
        shutting: A boolean indicating if the system is shutting down. Its 
            value should only be changed by calling the shutdown() method.
    """

    def __init__(self):
        self.shutting = False
        self.__cv = threading.Condition()

    def shutdown(self):
        """Set shutting to True and notify other threads who are waiting."""
        self.shutting = True
        self.__cv.acquire()
        self.__cv.notifyAll()
        self.__cv.release()

    def gotdata(self):
        """Notify other threads who are waiting."""
        if self.__cv.acquire(False):
            # if self.shutting:
                # self.__cv.release()
                # return
            self.__cv.notifyAll()
            self.__cv.release()

    def waitfordata(self):
        """Sleep until another thread calls either gotdata() or shutdown()."""
        if self.__cv.acquire(False):
            if self.shutting:
                self.__cv.release()
                return
            self.__cv.wait()
            self.__cv.release()         

class MyThread(threading.Thread):
    def __init__(self, buffer, state):
        threading.Thread.__init__(self)
        self.buffer = buffer
        self.state = state

    def run(self):
        """This is for consumers. The producer should override this method."""
        self.state.waitfordata()
        while not self.state.shutting:
            self.consume(self.buffer)
            self.state.waitfordata()

    def consume(self, buffer):
        """This method should be overridden for a consumer thread."""
        return

class Producer(MyThread):
    """A thread who produces data periodically and save them into buffer."""
    def run(self):
        data = 0
        while not self.state.shutting:
            time.sleep(0.2)
            data += 1
            self.buffer.append(data)
            self.state.gotdata()

class Consumer1(MyThread):
    """A thread who analyzes the data in buffer."""
    def consume(self, buffer):
        for data in buffer:
            print data,
        print ''

class Consumer2(MyThread):
    """A thread who analyzes the data in buffer."""
    def consume(self, buffer):
        time.sleep(0.1)
        print 'Buffer length: ', len(buffer)

def main():
    thread_classes = (Producer, Consumer1, Consumer2)
    buffer = deque(maxlen = BUFFER_SIZE)
    state = State()
    threads = []
    for cls in thread_classes:
        threads.append(cls(buffer, state))
    for thread in threads:
        thread.start()
    time.sleep(2)
    state.shutdown()
    for thread in threads:
        thread.join()

if __name__ == '__main__':
    main()


程序里建立
deque
时用
maxlen
参数指定了队列的最大长度,这样就很方便地实现了一个循环缓冲区,每次用
append()
方法从队列右边插入数据的时候,若队列已满,最左边的数据会自动被丢弃。

运行结果:

1

Buffer length: 1

1 2

Buffer length: 2

1 2 3

Buffer length: 3

1 2 3 4

Buffer length: 4

1 2 3 4 5

Buffer length: 5

2 3 4 5 6

Buffer length: 5

3 4 5 6 7

Buffer length: 5

4 5 6 7 8

Buffer length: 5

5 6 7 8 9

Buffer length: 5
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: