您的位置:首页 > 编程语言 > Python开发

python 线程池

2016-05-13 15:34 579 查看
python 的线程池模式是手动实现的,并没有现成的实现,因此我猜,线程池并不能够直接使代码清晰起来,更多的是概念性的,因为我觉得直接创建线程更加的简单粗暴,并且很好的定位错误信息。但是,也应该去了解它的实现原理。

import Queue, threading, sys
from threading import Thread
import time
import urllib

# working thread
class Worker(Thread):
worker_count = 0
timeout = 1
def __init__( self, workQueue, resultQueue, **kwds):
Thread.__init__( self, **kwds )
self.id = Worker.worker_count
Worker.worker_count += 1
self.setDaemon( True )
self.workQueue = workQueue
self.resultQueue = resultQueue
self.start( )

def run( self ):
''' the get-some-work, do-some-work main loop of worker threads '''
while True:
try:
callable, args, kwds = self.workQueue.get(timeout=Worker.timeout)
res = callable(*args, **kwds)
print "worker[%2d]: %s" % (self.id, str(res) )
self.resultQueue.put( res )
#time.sleep(Worker.sleep)
except Queue.Empty:
break
except :
print 'worker[%2d]' % self.id, sys.exc_info()[:2]
raise

class WorkerManager:
def __init__( self, num_of_workers=10, timeout = 2):
self.workQueue = Queue.Queue()
self.resultQueue = Queue.Queue()
self.workers = []
self.timeout = timeout
self._recruitThreads( num_of_workers )

def _recruitThreads( self, num_of_workers ):
for i in range( num_of_workers ):
worker = Worker( self.workQueue, self.resultQueue )
self.workers.append(worker)

def wait_for_complete( self):
# ...then, wait for each of them to terminate:
while len(self.workers):
worker = self.workers.pop()
worker.join( )
if worker.isAlive() and not self.workQueue.empty():
self.workers.append( worker )
print "All jobs are are completed."

def add_job( self, callable, *args, **kwds ):
self.workQueue.put( (callable, args, kwds) )

def get_result( self, *args, **kwds ):
return self.resultQueue.get( *args, **kwds )


  以上是线程管理的创建方法。下面是调用的实例:

import urllib2
import time
import socket
from datetime import datetime
from thread_pool import *

def main():
url_list = {"sina":"http://www.sina.com.cn",
"sohu":"http://www.sohu.com",
"yahoo":"http://www.yahoo.com",
"xiaonei":"http://www.xiaonei.com",
"qihoo":"http://www.qihoo.com",
"laohan":"http://www.laohan.org",
"eyou":"http://www.eyou.com",
"chinaren":"http://www.chinaren.com",
"douban":"http://www.douban.com",
"163":"http://www.163.com",
"daqi":"http://www.daqi.com",
"qq":"http://www.qq.com",
"baidu_1":"http://www.baidu.com/s?wd=asdfasdf",
"baidu_2":"http://www.baidu.com/s?wd=dddddddf",
"google_1":"http://www.baidu.com/s?wd=sadfas",
"google_2":"http://www.baidu.com/s?wd=sadflasd",
"hainei":"http://www.hainei.com",
"microsoft":"http://www.microsoft.com",
"wlzuojia":"http://www.wlzuojia.com"}

socket.setdefaulttimeout(10)
print 'start testing'
wm = WorkerManager(50)
for url_name in url_list.keys():
wm.add_job(do_get_con, url_name, url_list[url_name])
wm.wait_for_complete()
print 'end testing'

def do_get_con(url_name,url_link):
try:
fd = urllib2.urlopen(url_link)
data = fd.read()
f_hand = open("/tmp/ttt/%s" % url_name,"w")
f_hand.write(data)
f_hand.close()
except Exception,e:
pass

if __name__ == "__main__":
main()


  
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: