您的位置:首页 > 编程语言 > Python开发

python2.7 threading 模块 三 Condition Semaphore

2016-09-01 20:22 411 查看

1.Condition

Condition被称为条件变量,除了提供与Lock类似的acquire和release方法外,还提供了wait和notify方法。线程首先acquire一个条件变量,然后判断一些条件。如果条件不满足则wait;如果条件满足,进行一些处理改变条件后,通过notify方法通知其他线程,其他处于wait状态的线程接到通知后会重新判断条件。不断的重复这一过程,从而解决复杂的同步问题。

可以认为Condition对象维护了一个锁(Lock/RLock)和一个waiting池。线程通过acquire获得Condition对象,当调用wait方法时,线程会释放Condition内部的锁并进入blocked状态,同时在waiting池中记录这个线程。当调用notify方法时,Condition对象会从waiting池中挑选一个线程,通知其调用acquire方法尝试取到锁。

以python 的queue源码(只截取了关键的代码)来说明:

class Queue:
#初始化代码
def __init__(self, maxsize=0):
#几个Condition维护者一个Lock。
# mutex is shared between the three conditions, so acquiring and
# releasing the conditions also acquires and releases mutex.
self.mutex = _threading.Lock()
# Notify not_empty whenever an item is added to the queue; a
# thread waiting to get is notified then.
self.not_empty = _threading.Condition(self.mutex)
# Notify not_full whenever an item is removed from the queue;
# a thread waiting to put is notified then.
self.not_full = _threading.Condition(self.mutex)


put方法:

def put(self, item, block=True, timeout=None):
#为了线程安全先获得各个Condition共享的锁Lock。
self.not_full.acquire()
try:
#再做一些判断
if self.maxsize > 0:
if not block:
if self._qsize() == self.maxsize:
raise Full
elif timeout is None:
#如果队列满了就**阻塞**在这里,并且**释放锁**
while self._qsize() == self.maxsize:
self.not_full.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = _time() + timeout
while self._qsize() == self.maxsize:
remaining = endtime - _time()
if remaining <= 0.0:
raise Full
self.not_full.wait(remaining)
#向队列里面push一个item
self._put(item)
self.unfinished_tasks += 1
#因为现在queue不为空了所以唤醒一个在not_empty中waiting池中的某个线程来竞争共同的Lock
self.not_empty.notify()
finally:
#最后释放Condition 维护的锁
#ps:可能这里会有疑问前面wait()方法不是释放Lock了吗?
#答:对wait()方法释放了Lock且阻塞了,但是当该线程再从wait()后开始执行的时候,**又重新获得Lock了**.
self.not_full.release()


get方法:

#get方法可以对照前面的put方便来看
def get(self, block=True, timeout=None):
self.not_empty.acquire()
try:
if not block:
if not self._qsize():
raise Empty
elif timeout is None:
while not self._qsize():
self.not_empty.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = _time() + timeout
while not self._qsize():
remaining = endtime - _time()
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
item = self._get()
self.not_full.notify()
return item
finally:
self.not_empty.release()


python Queue 模块实现了多生产者、多消费者队列。它特别适用于信息必须在多个线程间安全地交换的多线程程序中。这个模块中的 Queue 类实现了所有必须的锁语义 ——[python官方文档]

2.Semaphore

semaphore模块就是对PV操作的封装吧。

python 官方文档解释想不到合适的翻译。

Semaphores are often used to guard resources with limited capacity, for example, a database server.

网上找的一个小例子(只能同时访问两个网址):

import threading
import urllib2
import time, random

class GrabUrl(threading.Thread):
def __init__(self, arg0):
threading.Thread.__init__(self)
self.host=arg0
def run(self):
k=random.randint(10,20)
print "Processing " + self.host + " waiting for : " + str(k)
time.sleep(k)
print "exiting " + self.host
pool.release()

class Handler(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
for i in hosts:
pool.acquire()
graburl=GrabUrl(i)
graburl.setDaemon(True)
graburl.start()

maxconn=2
pool=threading.BoundedSemaphore(value=maxconn)
hosts=["http://yahoo.com", "http://google.com", "http://amazon.com", "http://ibm.com", "http://apple.com"]
print str(len(hosts))
handler=Handler()
handler.start()
handler.join()
print "exiting main"


运行结果:

5
Processing http://yahoo.com waiting for : 14
Processing http://google.com waiting for : 15
exiting http://yahoo.com Processing http://amazon.com waiting for : 10
exiting http://google.com Processing http://ibm.com waiting for : 10
exiting http://amazon.com Processing http://apple.com waiting for : 19
exiting main
[Finished in 24.4s]
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  python semaphore 线程