您的位置:首页 > 编程语言 > Python开发

python学习第十课续 :线程池

2016-01-15 21:57 831 查看
线程分步走
t=threading.Thread(target=fun,args=())
t.start()
执行流程:
threading.Thread(target=fun,args=()) à
self.__target = target
self.__name = str(name or _newname())
self.__args = args
t.start  à
_start_new_thread(self.__bootstrap, ())
_start_new_thread àthread.start_new_thread àpass
self.__bootstrapàself.__bootstrap_inner()àself.run()à
self.__target(*self.__args,**self.__kwargs)

在网上找到一篇改写Thread类的run方法的例子,可以扩展思路 http://www.cnblogs.com/vamei/archive/2012/10/11/2720042.html# A program to simulate selling tickets in multi-thread way# Written by Vamei import threadingimport timeimport os # This function could be any function to do other chores.def doChore(): time.sleep(0.5) # Function for each threadclass BoothThread(threading.Thread): def __init__(self, tid, monitor): self.tid = tid self.monitor = monitor threading.Thread.__init__(self) def run(self): while True: monitor['lock'].acquire() # Lock; or wait if other thread is holding the lock if monitor['tick'] != 0: monitor['tick'] = monitor['tick'] - 1 # Sell tickets print(self.tid,':now left:',monitor['tick']) # Tickets left doChore() # Other critical operations else: print("Thread_id",self.tid," No more tickets") os._exit(0) # Exit the whole process immediately monitor['lock'].release() # Unblock doChore() # Non-critical operations # Start of the main functionmonitor = {'tick':100, 'lock':threading.Lock()} # Start 10 threadsfor k in range(10): new_thread = BoothThread(k, monitor) new_thread.start()
首先,定义了一个类BoothThread, 这个类继承自thread.Threading类。然后把要进行的操作统统放入到BoothThread类的run()方法中。实例化booththread的时候,第一个参数就不必为一个函数了,它可以定义为类booththread需要的参数。
注意
1. 在booththread的类中,因为它继承自threading.Thread,所以在它的__init__中,必须调用基本类的构造器threading.Thread.__init__(self)
2. 本例没有使用全局变量声明global,而是使用了一个词典monitor存放全局变量,然后把词典作为参数传递给线程函数。由于词典是可变数据对象,所以当它被传递给函数的时候,函数所使用的依然是同一个对象,相当于被多个线程所共享。这也是多线程乃至于多进程编程的一个技巧 (应尽量避免global声明的用法,因为它并不适用于windows平台)。

Pipe Queue
IPC,进程间通信(Inter-Process Communication),就是指多个进程之间相互通信,交换信息的方法,常用的一般是socke,rpc,pipe和消息队列等。multiprocessing提供了IPC(Pipe和Queue),使Python多进程并发,效率上更高
Pipe
import multiprocessing as mul def proc1(pipe): pipe.send('hello,i am 1') print('proc1 rec:',pipe.recv()) def proc2(pipe): pipe.send('hello,i am 2') print('proc2 rec:',pipe.recv())# pipe.send('hello, too') # Build a pipepipe = mul.Pipe() # Pass an end of the pipe to process 1p1 = mul.Process(target=proc1, args=(pipe[0],))# Pass the other end of the pipe to process 2p2 = mul.Process(target=proc2, args=(pipe[1],))p1.start()p2.start()p1.join()p2.join() 输出结果:('proc1 rec:', 'hello,i am 2')('proc2 rec:', 'hello,i am 1')
multiprocessing.Pipe([duplex])
Returns a pair (conn1, conn2) of Connection objects representing the ends of a pipe.
If duplex is True (the default) then the pipe is bidirectional. If duplex is False then the pipe is unidirectional: conn1 can only be used for receiving messages and conn2 can only be used for sending messages.
#生成两个对象,用这两个对象,来互相的交流。每个对象代表Pipe的一端(Connection对象)。我们对Pipe的某一端调用send()方法来传送对象,在另一端使用recv()来接收。
duplex默认为True,表示双向,单向管道只允许管道一端的进程输入,另一端输出,而双向管道则允许从两端输入输出。Pipe主要是用来两个进程之间进行通信。也可以多个进程占用一端,与另一端的进程通信,但同一端的进程之间不能进行通信
import multiprocessing as mul def proc1(pipe): pipe.send('hello,i am 1') print('proc1 rec:',pipe.recv()) print('proc1 rec:',pipe.recv()) def proc2(pipe): pipe.send('hello,i am 2') print('proc2 rec:',pipe.recv()) def proc3(pipe): pipe.send('hello,i am 3') pipe = mul.Pipe() p1 = mul.Process(target=proc1, args=(pipe[0],))p2 = mul.Process(target=proc2, args=(pipe[1],))p3 = mul.Process(target=proc3, args=(pipe[1],))p1.start()p2.start()p3.start()p1.join()p2.join()p3.join() 输出结果:
('proc1 rec:', 'hello,i am 2')
('proc1 rec:', 'hello,i am 3')
('proc2 rec:', 'hello,i am 1')

Queue
Queue与Pipe相类似,都是先进先出的结构。但Queue允许多个进程放入,多个进程从队列取出对象。Queue使用mutiprocessing.Queue(maxsize)创建,maxsize表示队列中可以存放对象的最大数量。
import Queue
queue = Queue.Queue(10)for i in range (5): queue.put(i)print queue.get()print queue.get()print queue.qsize() 下面的例子:(http://www.cnblogs.com/vamei/)# Written by Vameiimport osimport multiprocessingimport time#==================# input workerdef inputQ(queue): info = str(os.getpid()) + '(put):' + str(time.time()) queue.put(info) # output workerdef outputQ(queue,lock): info = queue.get() lock.acquire() print (str(os.getpid()) + '(get):' + info) lock.release()#===================# Mainrecord1 = [] # store input processesrecord2 = [] # store output processeslock = multiprocessing.Lock() # To prevent messy printqueue = multiprocessing.Queue(3) # input processesfor i in range(10): process = multiprocessing.Process(target=inputQ,args=(queue,)) process.start() record1.append(process) # output processesfor i in range(10): process = multiprocessing.Process(target=outputQ,args=(queue,lock)) process.start() record2.append(process) for p in record1: p.join() queue.close() # No more object will come, close the queue for p in record2: p.join()

multiprocessing
queues.py __all__ = ['Queue', 'SimpleQueue', 'JoinableQueue']
Queue -- type using a pipe, buffer and thread
joinablequeue -- A queue type which also supports join() and task_done() methods
simplequeue -- really just a locked pipe
connection.py
def Pipe(duplex=True)
Queue.py __all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue']
一些进程使用put()在Queue中放入字符串,这个字符串中包含PID和时间。另一些进程从Queue中取出,并打印自己的PID以及get()的字符串。
如果例中的multiprocessing.Queue换成Queue.Queue,就不能执行,为什么?
另, Queue.Queue没有close()方法,multiprocess.Queue有

线程池
简单的线程池
import Queue,time
class threadpool(object):
def __init__(self,maxnum=20):
self.queue = Queue.Queue(maxnum)
for i in xrange(maxnum):
self.queue.put(threading.Thread)
def get_thread(self):
self.queue.get()
def add_thread(self):
self.queue.put(threading.Thread)
pool = threadpool(10)
def foo(arg,pool):
print arg
time.sleep(0.2)
pool.add_thread()

for i in range(20):
thread = pool.get_thread()
t = thread(target = foo,args=(i,pool))
t.start()

用列表来代替queue实现相同功能
import time
import threading
class threadpool():
def __init__(self,maxnum = 20):
self.t_list = []
self.lock = threading.Lock()
for i in xrange(maxnum):
self.t_list.append(threading.Thread)
def get_thread(self):
self.lock.acquire() #用来模拟线程安全
while len(self.t_list) == 0:
time.sleep(1)
x = self.t_list.pop(0)
self.lock.release()
return x
def add_thread(self):
self.t_list.append(threading.Thread)
pool = threadpool(10)
def foo(i,p):
print i
time.sleep(2)
p.add_thread()
for i in xrange(30):
thread = pool.get_thread()
t = thread(target=foo,args=(i,pool))
t.start()

复杂的线程池
from Queue import Queue
import contextlib --上下文管理
import threading

WorkerStop = object() #定义一个对象,没有什么含义,只是用来当个东西用
class ThreadPool:
workers = 0
threadFactory = threading.Thread
currentThread = staticmethod(threading.currentThread)
def __init__(self, maxthreads=20, name=None):
self.q = Queue(0)
self.max = maxthreads
self.name = name
self.waiters = []
self.working = []
def start(self):
while self.workers < min(self.max, self.q.qsize()):
self.startAWorker()
def startAWorker(self):
self.workers += 1
name = "PoolThread-%s-%s" % (self.name or id(self), self.workers)
newThread = self.threadFactory(target=self._worker, name=name)
newThread.start()
def callInThread(self, func, *args, **kw):
self.callInThreadWithCallback(None, func, *args, **kw)
def callInThreadWithCallback(self, onResult, func, *args, **kw):
o = (func, args, kw, onResult)
self.q.put(o)

@contextlib.contextmanager
def _workerState(self, stateList, workerThread):
stateList.append(workerThread)
try:
yield
finally:
stateList.remove(workerThread)
def _worker(self):
ct = self.currentThread()
o = self.q.get()
while o is not WorkerStop:
with self._workerState(self.working, ct):
function, args, kwargs, onResult = o
del o
try:
result = function(*args, **kwargs)
success = True
except:
success = False
if onResult is None:
pass
else:
pass

del function, args, kwargs
if onResult is not None:
try:
onResult(success, result)
except:
#context.call(ctx, log.err)
pass
del onResult, result
with self._workerState(self.waiters, ct):
o = self.q.get()

def stop(self):
while self.workers:
self.q.put(WorkerStop)
self.workers -= 1

def show(arg):
import time
time.sleep(1)
print arg
pool = ThreadPool(20)

for i in range(500):
pool.callInThread(show, i)

pool.start()
pool.stop()
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  线程池 pipe queue