python多线程代码实例
2016-10-14 16:46
441 查看
因业务需求,写了一个python多线程处理数据的程序,记录下来,方便以后参考:
#!/usr/bin/python ''' coding=utf-8 bossid.fy.mapping.itil.dic 2611:171:video_p2p_pieces 2125:82:live_pcclients 1365:142:appnews_1365 1446:143:appnews_1446 2181:144:appnews_2181 note:please care the data dir(line:131),and get bossid(line 78) ''' import urllib2 import json import os import os.path import threading import datetime import time import re #import nameapi import sys if not "/usr/local/zk_l5_name/zk_ex_named/names" in sys.path: sys.path.append("/usr/local/zk_l5_name/zk_ex_named/names") if not 'nameapi' in sys.modules: nameapi=__import__('nameapi') else: eval('import nameapi') nameapi=eval('reload(nameapi)') class Th(threading.Thread): def __init__(self,strBossIdConf,processer): threading.Thread.__init__(self) self._strBossIdConf = strBossIdConf self._processer = processer def run(self): self._processer.LoadBossIdData(self._strBossIdConf) class BossDataProcesser: def GetBossIdColInfo(self,strBossId): """ Get columns info of bossid """ strurl='http://beehive.boss.webdev.com/bossid/api/getAllIds?bossid='+str(strBossId) #print strurl req = urllib2.Request(strurl) response = urllib2.urlopen(req) strContent = response.read() #print strContent objJson = json.loads(strContent) lstColInfo = list() for key in objJson: if key == "data": cols = objJson[key] for col in cols: colname = col['name'] coltype = col['type'] collen = col['len'] #print "name:" + colname + " type:" + coltype + " len:" + collen if cmp(coltype,'varchar') == 0: lstColInfo.append(collen) else: lstColInfo.append(-1) return lstColInfo def GetFyIdColInfo(self,strFyId): """ Get columns info of bossid """ strurl='http://fy.webdev.com/php/interface/getTableInfo.php?id='+str(strFyId) #print strurl req = urllib2.Request(strurl) response = urllib2.urlopen(req) strContent = response.read() #print strContent objJson = json.loads(strContent) lstColInfo = list() for key in objJson: if key == "data": cols = objJson[key] for col in cols: colname = col['f_Name'] coltype = col['f_Type'] collen = col['f_Length'] #print "name:" + colname + " type:" + coltype + " len:" + collen if cmp(coltype,'VARCHAR') == 0: lstColInfo.append(collen) else: lstColInfo.append(-1) return lstColInfo def GetBossidColInfoFromFile(self,strBossId): today = datetime.datetime.today().strftime("%Y%m%d") strcol = '' filename = "/tmp/fycolinfo.info" fileobj = open(filename) try: while True: strLine = fileobj.readline() if not strLine: break lstitem = strLine.split("|") bossid = lstitem[0] day = lstitem[1] strjson = lstitem[2] if ( int(strBossId) == int(bossid) and cmp(today,day) == 0 ): strcol = strjson break finally: fileobj.close() lstColInfo = list() if ( len(strcol) > 0 ): objJson = json.loads(strcol) lstColInfo = list() for key in objJson: if key == "data": cols = objJson[key] for col in cols: colname = col['f_Name'] coltype = col['f_Type'] collen = col['f_Length'] #print "name:" + colname + " type:" + coltype + " len:" + collen if cmp(coltype,'VARCHAR') == 0: lstColInfo.append(collen) else: lstColInfo.append(-1) return lstColInfo def ProcessFile(self,filename,bossid,fyid): """ process data file """ #lstInfo = filename.split('/') #bossid = lstInfo[5] lstColInfo = list() #lstColInfo = self.GetBossIdColInfo(bossid) lstColInfo = self.GetBossidColInfoFromFile(bossid) if ( len(lstColInfo) == 0 ): print("enter") lstColInfo = self.GetFyIdColInfo(fyid) fileobj = open(filename) filewriteobj = open(filename + ".ok",'w+') try: while True: strLine = fileobj.readline() if not strLine: break strLine = self.ProcessLine(strLine,lstColInfo) filewriteobj.write(strLine) #filewriteobj.write(os.linesep) finally: fileobj.close() filewriteobj.close() def ProcessLine(self,strLine,lstColInfo): """ process one line of file """ lstColLen = len(lstColInfo) if ( lstColLen == 0 ): return strLine #lstItems = strLine.split(',') lstItems = re.split(r'(?<!\\),',strLine) lstItemsLen = len(lstItems) for index in range(0,lstColLen): collen = lstColInfo[index] collen = int(collen) if ( collen > 0 ): if ( index < lstItemsLen and len(lstItems[index]) > collen ): lstItems[index] = lstItems[index][0:collen] if ( collen == 0 ): lstItems[index] = "" #newline = ",".join(lstItems) if ( lstItemsLen > lstColLen ): newline = ",".join(lstItems[0:lstColLen]) newline += os.linesep else: diff = lstColLen - lstItemsLen diff += 1 tmplist = lstItems[0:lstItemsLen-1] for i in range(0,diff): tmplist.append('0') newline = ",".join(tmplist) newline += os.linesep return newline def GetProcessFiles(self,dir,lstFiles): fileminute = datetime.datetime.now()-datetime.timedelta(minutes=1) fileminute = fileminute.strftime("%Y%m%d%H%M") if os.path.isdir(dir): for s in os.listdir(dir): if ( fileminute in s and ".ok" not in s ): filename = os.path.join(dir,s) #print filename lstFiles.append(filename) return lstFiles def LoadBossIdData(self,conf): """ Load data to fy """ lstitem = conf.split(":") bossid = lstitem[0] fyid = lstitem[1] fytable = lstitem[2] today = datetime.datetime.today().strftime("%Y%m%d") datadir="/data1/webitil/logreceiver/data/" + bossid +"/" + today if os.path.exists(datadir): #print datadir filelst = self.GetProcessFiles(datadir,[]) for file in filelst: self.ProcessFile(file,bossid,fyid) strcmd = '/usr/local/bin/fyloader --escaped "\\\\\\\\" --delimiter "," --abort_on_threshold 0.2 -l 200000 ' + fyid + ' ' + fytable+'_' + today + ' ' + file + '.ok' #print strcmd os.system(strcmd); if os.path.exists(file + '.ok'): os.remove(file + '.ok') if __name__ == '__main__': start = time.clock() processer = BossDataProcesser() lstTh = list() ret,value = nameapi.getValueByKey('bossid.fy.mapping.itil.dic') lstconf = value.split() for conf in lstconf: lstitem = conf.split(":") switch = lstitem[4] if switch == "1": t = Th(conf,processer) t.start() lstTh.append(t) #t.join() for t in lstTh: t.join() end = time.clock() print "cost:%f s" % (end - start)
相关文章推荐
- Python实现多线程下载文件的代码实例
- day11_python多线程实例代码——02
- Python实现多线程下载文件的代码实例
- Python多线程下载文件实例代码
- Python 爬虫多线程详解及实例代码
- Python 爬虫多线程详解及实例代码
- python面向对象多线程爬虫爬取搜狐页面的实例代码
- Python操作Mysql实例代码
- Python操作Mysql实例代码教程(查询手册)
- Python操作Mysql实例代码教程
- python利用hook技术破解https的实例代码
- 多线程死锁代码实例
- Python: UTF8转换代码实例
- Python连接MySQL的实例代码
- Python操作Mysql实例代码教程
- python发送邮件的实例代码(支持html、图片、附件)
- python 多线程生产者消费者代码
- python执行多线程的实例
- 初识python多线程(转+实例)
- Python多线程写文件实例