您的位置:首页 > 编程语言 > Python开发

Learning Python(16)--多线程编程(threading,Queue模块)

2016-03-18 21:47 609 查看

Threading模块

在开始介绍Python的多线程模块之前,首先要明确Python多线程的一个重要的问题。先看看Python2.7的官方文档的描述:

CPython implementation detail: In CPython, due to the Global Interpreter Lock, only one thread can execute Python code at once (even though certain performance-oriented libraries might overcome this limitation). If you want your application to make better use of the computational resources of multi-core machines, you are advised to use multiprocessing. However, threading is still an appropriate model if you want to run multiple I/O-bound tasks simultaneously.

这段话的意思是,在CPython解释器中,由于解释器全局锁(GIL)的存在,在一个时刻只可能有一个线程在运行。如果要想你的应用可以充分利用多核机器的计算性能,那最好使用multiprocessing模块的多进程。当然,如果想要同时运行多个I/O密集型的任务,使用多线程模块依然是一个很好的选择。

创建线程(Thread Object)

共有两种创建线程的方法:向构造函数中传入一个可调用的对象(函数);或者在threading.Thread的子类中重写run()方法,即子类中只需要重写init()与run()方法。

线程类:

class threading.Thread(group=None, target=None, name=None, args=(), kwargs={})


group
必须为None,为
ThreadGroup
类的实现而保留。

target
为run()方法要调用对象。默认为None,表示不会调用任何对象。

name
线程名

args
向被调用对象传入的参数元祖(tuple),默认为()

kwargs
向被调用对象传入的关键词参数字典(dict),默认dict

start()
方法:启动线程。每个线程最多被启动一次,否次会引发
RunTimeError
。start()方法中会调用run()方法。

run()
方法:这个方法代表这线程已进入活跃状态,它会被start()方法调用。当使用子类的方法创建线程时,必须在子类中重写此方法。

join([timeout])
方法:阻塞当前线程,直到调用join方法的线程中断后,才会继续当前线程。

is_alive()
方法:判断线程是否在运行。

setDaemon(True)
方法:设置线程为守护线程,该方法必须在start()前调用。一个线程为守护线程,则当主线程运行完毕后,若守护线程没有运行完毕则会随主线程一起退出。而对于非守护线程,主线程运行完毕后会等待非守护线程结束然后一起退出。

"""
创建线程实例(1):通过向构造函数传入可调用对象创建线程
"""
import threading

def func(a):
print "This is a children thread..."
print a

test_thread = threading.Thread(target=func, name="test thread",args=(3,))

test_thread.start()


"""
创建线程实例(2):通过在子类中重写run()方法创建线程类
"""
import threading

class Demo(threading.Thread):
def __init__(self, a):
threading.Thread.__init__(self, name="Demo thread")
self.a = a
def run(self):
print self.name
print self.a

demo = Demo(3)
demo.start()


"""
join()方法使用
"""
import threading
import time

def doWaiting():
print 'start waiting1: ' + time.strftime('%H:%M:%S') + "\n"
time.sleep(3)
print 'stop waiting1: ' + time.strftime('%H:%M:%S') + "\n"
def doWaiting1():
print 'start waiting2: ' + time.strftime('%H:%M:%S') + "\n"
time.sleep(8)
print 'stop waiting2: ', time.strftime('%H:%M:%S') + "\n"
tsk = []
thread1 = threading.Thread(target = doWaiting)
thread1.start()
tsk.append(thread1)
thread2 = threading.Thread(target = doWaiting1)
thread2.start()
tsk.append(thread2)

print 'start join: ' + time.strftime('%H:%M:%S') + "\n"
for tt in tsk:
tt.join()
a = input()
print 'end join: ' + time.strftime('%H:%M:%S') + "\n"


线程1与线程2分别调用join()方法后,主线程阻塞,此时并没有运行到
a=input()
输入a的值只是会记录在内存中,没有实际赋给a.两个线程运行完毕后,主线程才会继续执行.需要注意的一点是,如果有多个子线程,用for循环启动它们时,不能在一个for循环中同时调用start()与join()方法,因为如果这样做的话只会启动第一个线程,然后阻塞主线程,其他线程并没有启动.

"""
setDaemon()方法使用
"""
import threading
import time

def doWaiting():
print 'start waiting1: ' + time.strftime('%H:%M:%S') + "\n"
time.sleep(3)
print 'stop waiting1: ' + time.strftime('%H:%M:%S') + "\n"
def doWaiting1():
print 'start waiting2: ' + time.strftime('%H:%M:%S') + "\n"
time.sleep(8)
print 'stop waiting2: ', time.strftime('%H:%M:%S') + "\n"

thread1 = threading.Thread(target = doWaiting)
thread1.setDaemon(True)
thread1.start()
thread2 = threading.Thread(target = doWaiting1)
thread2.setDaemon(True)
thread2.start()

print 'end : ' + time.strftime('%H:%M:%S') + "\n"


此程序中将两个子线程都设置为守护线程.启动程序后,打印完end…,则程序立即退出,两个子线程也随之退出.如下:

start waiting1: 20:28:55
start waiting2: 20:28:55

end : 20:28:56
Process finished with exit code 0


若不将它们设置为守护线程,则输入如下:

start waiting1: 20:31:16
start waiting2: 20:31:16

end join: 20:31:17
stop waiting1: 20:31:19
stop waiting2:  20:31:24

Process finished with exit code 0


线程间的资源共享同步(Lock,RLock)

多线程情况下最常见的问题之一:数据共享。当多个线程都要去修改某一个共享数据的时候,我们需要对数据访问进行同步。

Lock Object:一个锁对象有两种状态,“Locked”与“unlocked”状态。一把锁被创建时处于“打开”状态,它有两个基本的方法,acquire()与release()方法。当锁处于打开状态时,acquire()方法会将状态改变为“Locked”状态。若锁处于锁闭状态,则acquire()方法会被阻塞,直至另外一个线程调用该锁的release()方法将锁改变为打开状态,然后acquire()方法又会将锁改变为锁闭状态。

需要注意的是,若有多个线程阻塞在acquire()方法等待锁的打开,则只有一个线程会在锁打开后获得该锁,具体是哪个线程可能会根据实现而变化。

Lock.acquire([blocking])—-Acquire a lock, blocking or non-blocking

Lock.release()—-Release a lock.

RLock Object:python提供了Lock对象的变种: RLock对象。RLock对象内部维护着一个Lock对象,它是一种可重入的对象。对于Lock对象而言,如果一个线程连续两次进行acquire操作,那么由于第一次acquire之后没有release,第二次acquire将挂起线程。这会导致Lock对象永远不会release,使得线程死锁。RLock对象允许一个线程多次对其进行acquire操作,因为在其内部通过一个counter变量维护着线程acquire的次数。而且每一次的acquire操作必须有一个release操作与之对应,在所有的release操作完成之后,别的线程才能申请该RLock对象。

RLock.acquire([blocking=1])

RLock.release()

条件同步(Condition Object)

    锁只能提供最基本的同步。假如只在发生某些事件时才访问一个“临界区”,这时需要使用条件变量Condition。

     Condition对象是对Lock对象的包装,在创建Condition对象时,其构造函数需要一个Lock对象作为参数,如果没有这个Lock对象参数,Condition将在内部自行创建一个Rlock对象。在Condition对象上,当然也可以调用acquire和release操作,因为内部的Lock对象本身就支持这些操作。但是Condition的价值在于其提供的wait和notify的语义。

    条件变量是如何工作的呢?首先一个线程成功获得一个条件变量后,调用此条件变量的wait()方法会导致这个线程释放这个锁,并进入“blocked”状态,直到另一个线程调用同一个条件变量的notify()方法来唤醒那个进入“blocked”状态的线程。如果调用这个条件变量的notifyAll()方法的话就会唤醒所有的在等待的线程。

    如果程序或者线程永远处于“blocked”状态的话,就会发生死锁。所以如果使用了锁、条件变量等同步机制的话,一定要注意仔细检查,防止死锁情况的发生。对于可能产生异常的临界区要使用异常处理机制中的finally子句来保证释放锁。等待一个条件变量的线程必须用notify()方法显式的唤醒,否则就永远沉默。保证每一个wait()方法调用都有一个相对应的notify()调用,当然也可以调用notifyAll()方法以防万一。

acquire([timeout])/release(): 调用关联的锁的相应方法。

wait([timeout]): 调用这个方法将使线程进入Condition的等待池等待通知,并释放锁。使用前线程必须已获得锁定,否则将抛出异常。

notify(): 调用这个方法将从等待池挑选一个线程并通知,收到通知的线程将自动调用acquire()尝试获得锁定(进入锁定池);其他线程仍然在等待池中。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。

notifyAll(): 调用这个方法将通知等待池中所有的线程,这些线程都将进入锁定池尝试获得锁定。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。

条件同步(Event Object)

还有一种简单的线程间的通信方法:一个线程发送一个Event信号,另外的线程在等待。一个Event对象的set()方法会把标志位设为True,clear()方法会把flag设置为false。wait()方法会阻塞线程直至,对象的标志位变为True。

* is_set():如果标志位为True,则返回True

* set():将Event对象的标志位设置为True。这样所有等待它变为true的线程都会被唤醒。

* clear():将标志位重新设置为False

* wait([timeout]):阻塞线程,直至标志位变为True

Python中还可以使用Queue模块中的Queue()对象实现线程间的数据传送

    Python中的Queue对象也提供了对线程同步的支持。使用Queue对象可以实现多个生产者和多个消费者形成的FIFO的队列。

    生产者将数据依次存入队列,消费者依次从队列中取出数据。

Queue模块实现了一个支持多producer和多consumer的FIFO队列。当共享信息需要安全的在多线程之间交换时,Queue非常有用。Queue的默认长度是无限的,但是可以设置其构造函数的maxsize参数来设定其长度。Queue的put方法在队尾插入,该方法的原型是:

put(item, block=True, timeout=None)

如果可选参数block为true并且timeout为None(缺省值),线程被block,直到队列空出一个数据单元。如果timeout大于0,在timeout的时间内,仍然没有可用的数据单元,Full exception被抛出。反之,如果block参数为false(忽略timeout参数),item被立即加入到空闲数据单元中,如果没有空闲数据单元,Full exception被抛出。

get(block=True, timeout=None)

Queue的get方法是从队首取数据,其参数和put方法一样。如果block参数为true且timeout为None(缺省值),线程被block,直到队列中有数据。如果timeout大于0,在timeout时间内,仍然没有可取数据,Empty exception被抛出。反之,如果block参数为false(忽略timeout参数),队列中的数据被立即取出。如果此时没有可取数据,Empty exception也会被抛出。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: