您的位置:首页 > 编程语言 > Python开发

Python-mysql 抽取数据 脚本

2012-02-15 20:40 661 查看
全部十几款游戏,大概200+区组,3000+服务器,需要抽取日志进行运营计算,所以紧急学习python实现自动化,5进程,每进程2线程抽取所有区组数据.4天内完成...直接上代码

启动main

import multiprocessing
from com.gyyx.common.inverted import inverted

def do_calculation(row):
print ('run', multiprocessing.current_process().name)
print row
inverted.trn_dist(row)

def start_process():
print ('Starting', multiprocessing.current_process().name)

if __name__ == '__main__':

set=inverted.trn_getdist()

pool_size = 5
pool = multiprocessing.Pool(processes=pool_size,
initializer=start_process,
)

pool_outputs = pool.map(do_calculation, set)

pool.close() # no more tasks
pool.join()  # wrap up current tasks

print ('Pool    :' + str(pool_outputs))


每组服务器调用

from com.gyyx.DBUtility import MySQLHelper
from com.gyyx.tools.commons import commons
from com.gyyx.common.important_log import important_log
from com.gyyx.common.cost_log import cost_log
from com.gyyx.common.credit_log import credit_log
from com.gyyx.tools.threadManager import WorkManager
from threading import Thread
from multiprocessing import Pool
from time import sleep

class inverted(object):
@staticmethod
def trn_getdist():
sql='SELECT dist,ip1,ip2,ip3 FROM dist_wd WHERE flag=1 AND dist>0'
return MySQLHelper.executeList(dbs=commons.db191_game_conf, sql=sql,dict='d')

@staticmethod
def trn_dist(row):
paramimg={'dist':row['dist'],'ip':row['ip2'],'port':3306,'db':'ldb','user':commons.distuser,'pwd':commons.distpwd}
imglog=important_log(paramimg)
thimg=Thread(target=imglog.doData)
thimg.start()

paramcost={'dist':row['dist'],'ip':row['ip1'],'port':3306,'db':'adb','user':commons.distuser,'pwd':commons.distpwd}
costlog=cost_log(paramcost)
thcost=Thread(target=costlog.doData)
thcost.start()

thcost.join()

paramcredit={'dist':row['dist'],'ip':row['ip1'],'port':3306,'db':'adb','user':commons.distuser,'pwd':commons.distpwd}
creditlog=credit_log(paramcredit)
thcredit=Thread(target=creditlog.doData)
thcredit.start()

thimg.join()
thcredit.join()


逻辑基础类

from datetime import datetime
'''
Created on Jan 10, 2012

@author: admin
'''

class base_log(object):

@staticmethod
def doData(obj_log):
obj_log.createTable()
month=1
while month<13:
day=1
while day<27:
bdate=datetime(obj_log.year, month, day, 0, 0, 0).strftime('%Y%m%d%H%M%S')
edate=''
if(day+5>30):
if(month==12):
edate=datetime(obj_log.year+1, 1, 1, 0, 0, 0).strftime('%Y%m%d%H%M%S')
else:
edate=datetime(obj_log.year, month+1, 1, 0, 0, 0).strftime('%Y%m%d%H%M%S')
else:
edate=datetime(obj_log.year, month, day+5, 0, 0, 0).strftime('%Y%m%d%H%M%S')
set=obj_log.getData(bdate,edate)
obj_log.setDate(set)
del set
day+=5

month+=1

@staticmethod
def doDataMonth(obj_log):
obj_log.createTable()
month=1
while month<13:
bdate=datetime(obj_log.year, month, 1, 0, 0, 0).strftime('%Y%m%d%H%M%S')
edate=''
if(month==12):
edate=datetime(obj_log.year+1, 1, 1, 0, 0, 0).strftime('%Y%m%d%H%M%S')
else:
edate=datetime(obj_log.year, month+1, 1, 0, 0, 0).strftime('%Y%m%d%H%M%S')
set=obj_log.getData(bdate,edate)
obj_log.setDate(set)
del set
month+=1


逻辑扩展类 1important_log

from com.gyyx.DBUtility import MySQLHelper
from com.gyyx.tools.commons import commons
from com.gyyx.common.base_log import base_log

'''
Created on Jan 9, 2012

@author: admin
'''

class important_log(object):

def __init__(self,param):
self.dist=param['dist']
self.ip=param['ip']
self.user=param['user']
self.pwd=param['pwd']
if(param.has_key('port')):
self.port=param['port']
else:
self.port=3306
if(param.has_key('db')):
self.db=param['db']
else:
self.db='ldb'
self.year=commons.log_year
self.tablename=str(self.dist)+'_important_log'
self.dbs={'host':self.ip,'port':self.port ,'db':self.db, 'user':self.user, 'passwd':self.pwd}
self.conn=MySQLHelper.getConn(self.dbs)
self.dbcenter=MySQLHelper.getConn(commons.db199_important)

def getData(self,bdate,edate):
sql="SELECT id,update_time,`server`,type,severity,action,para1,para2,para3,memo FROM important_log WHERE update_time>='"+bdate+"' AND update_time<'"+edate+"'"
return MySQLHelper.executeListL(conn=self.conn, sql=sql,dict='d')

def setDate(self,set):
MySQLHelper.executeInsertL(conn=self.dbcenter, tn=self.tablename,dicts=set)

def createTable(self):
sql='DROP TABLE IF EXISTS `'+self.tablename+'`'
MySQLHelper.executeNoQueryL(conn=self.dbcenter, sql=sql)
sql='CREATE TABLE `'''+self.tablename+'''` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`update_time` varchar(14) NOT NULL default '',
`server` varchar(32) NOT NULL default '',
`type` varchar(32) NOT NULL default '',
`severity` int(11) unsigned NOT NULL default '0',
`action` varchar(32) NOT NULL default '',
`para1` varchar(128) NOT NULL default '',
`para2` varchar(128) NOT NULL default '',
`para3` varchar(128) NOT NULL default '',
`memo` text NOT NULL,
PRIMARY KEY  (`id`),
KEY `type_action` (`type`,`action`),
KEY `time` (`update_time`)
) ENGINE=MyISAM DEFAULT CHARSET=latin1;
'''
MySQLHelper.executeNoQueryL(conn=self.dbcenter, sql=sql)

def doData(self):
base_log.doData(self)

逻辑扩展类 2 cost_log
from com.gyyx.DBUtility import MySQLHelper
from com.gyyx.tools.commons import commons
from com.gyyx.common.base_log import base_log

'''
Created on Jan 10, 2012

@author: admin
'''

class cost_log(object):

def __init__(self,param):
self.dist=param['dist']
self.ip=param['ip']
self.user=param['user']
self.pwd=param['pwd']
if(param.has_key('port')):
self.port=param['port']
else:
self.port=3306
if(param.has_key('db')):
self.db=param['db']
else:
self.db='adb'
self.year=commons.log_year
self.tablename=str(self.dist)+'_cost_log'
self.dbs={'host':self.ip,'port':self.port ,'db':self.db, 'user':self.user, 'passwd':self.pwd}
self.conn=MySQLHelper.getConn(self.dbs)
self.dbcenter=MySQLHelper.getConn(commons.db199_cost)

def getData(self,bdate,edate):
sql="SELECT id,account,dist,cost_time,cost_coin,item_price,buy_item,buy_item_amount,silver_coin_cost,silver_coin_left,gold_coin_cost,gold_coin_left FROM cost_log WHERE cost_time>='"+bdate+"' AND cost_time<'"+edate+"'"
return MySQLHelper.executeListL(conn=self.conn, sql=sql,dict='d')

def setDate(self,set):
MySQLHelper.executeInsertL(conn=self.dbcenter, tn=self.tablename,dicts=set)

def createTable(self):
sql='DROP TABLE IF EXISTS `'+self.tablename+'`'
MySQLHelper.executeNoQueryL(conn=self.dbcenter, sql=sql)
sql='CREATE TABLE `'''+self.tablename+'''` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`account` varchar(32) NOT NULL DEFAULT '',
`dist` varchar(32) NOT NULL DEFAULT '',
`cost_time` datetime NOT NULL DEFAULT '0000-00-00 00:00:00',
`cost_coin` int(11) NOT NULL DEFAULT '0',
`item_price` int(11) NOT NULL DEFAULT '0',
`buy_item` varchar(32) NOT NULL DEFAULT '',
`buy_item_amount` int(11) NOT NULL DEFAULT '0',
`silver_coin_cost` int(11) NOT NULL DEFAULT '0',
`silver_coin_left` int(11) NOT NULL DEFAULT '0',
`gold_coin_cost` int(11) NOT NULL DEFAULT '0',
`gold_coin_left` int(11) NOT NULL DEFAULT '0',
PRIMARY KEY (`id`),
KEY `account` (`account`),
KEY `cost_time` (`cost_time`)
) ENGINE=MyISAM DEFAULT CHARSET=latin1;
'''
MySQLHelper.executeNoQueryL(conn=self.dbcenter, sql=sql)

def doData(self):
base_log.doData(self)

逻辑扩展类 3 credit_log
from com.gyyx.DBUtility import MySQLHelper
from com.gyyx.tools.commons import commons
from com.gyyx.common.base_log import base_log

'''
Created on Jan 10, 2012

@author: admin
'''

class credit_log(object):

def __init__(self,param):
self.dist=param['dist']
self.ip=param['ip']
self.user=param['user']
self.pwd=param['pwd']
if(param.has_key('port')):
self.port=param['port']
else:
self.port=3306
if(param.has_key('db')):
self.db=param['db']
else:
self.db='adb'
self.year=commons.log_year
self.tablename=str(self.dist)+'_credit_log'
self.dbs={'host':self.ip,'port':self.port ,'db':self.db, 'user':self.user, 'passwd':self.pwd}
self.conn=MySQLHelper.getConn(self.dbs)
self.dbcenter=MySQLHelper.getConn(commons.db199_credit)

def getData(self,bdate,edate):
sql="SELECT id,transaction_id,account,gold_coin,silver_coin,time FROM credit_log WHERE time>='"+bdate+"' AND time<'"+edate+"'"
return MySQLHelper.executeListL(conn=self.conn, sql=sql,dict='d')

def setDate(self,set):
MySQLHelper.executeInsertL(conn=self.dbcenter, tn=self.tablename,dicts=set)

def createTable(self):
sql='DROP TABLE IF EXISTS `'+self.tablename+'`'
MySQLHelper.executeNoQueryL(conn=self.dbcenter, sql=sql)
sql='CREATE TABLE `'''+self.tablename+'''` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`transaction_id` varchar(32) NOT NULL DEFAULT '',
`account` varchar(32) NOT NULL DEFAULT '',
`gold_coin` int(11) NOT NULL DEFAULT '0',
`silver_coin` int(11) NOT NULL DEFAULT '0',
`time` datetime NOT NULL DEFAULT '0000-00-00 00:00:00',
PRIMARY KEY (`id`),
KEY `account` (`account`)
) ENGINE=MyISAM DEFAULT CHARSET=latin1;
'''
MySQLHelper.executeNoQueryL(conn=self.dbcenter, sql=sql)

def doData(self):
base_log.doData(self)


数据操作类 DBUtility

import MySQLdb

class MySQLHelper(object):
    @staticmethod
    def executeList(dbs,sql,param={},dict={}):
        conn=MySQLHelper.getConn(dbs)
        cursor={}
        if(dict=={}):
            cursor=conn.cursor()
        else:
            cursor=conn.cursor(cursorclass = MySQLdb.cursors.DictCursor)
        
        try:
            if(param=={}):
                cursor.execute(sql)
            else:
                cursor.execute(sql,param)
            
            set=cursor.fetchall()
            return set
        except Exception,data:
            print Exception,":",data
            return {}
        finally:
            cursor.close()
            conn.close()
    
    @staticmethod
    def executeListL(conn,sql,param={},dict={}):
        cursor=conn.cursor()
        cursor={}
        if(dict=={}):
            cursor=conn.cursor()
        else:
            cursor=conn.cursor(cursorclass = MySQLdb.cursors.DictCursor)
            
        try:
            if(param=={}):
                cursor.execute(sql)
            else:
                cursor.execute(sql,param)
            set=cursor.fetchall()
            return set
        except Exception,data:
            print Exception,":",data
            return {}
        finally:
            cursor.close()
            
    @staticmethod
    def executeNoQuery(dbs,sql,param={}):
        conn=MySQLHelper.getConn(dbs)
        cursor=conn.cursor()

        try:
            if(param=={}):
                rownum=cursor.execute(sql)
            else:
                rownum=cursor.executemany(sql,param)
            conn.commit()
            return rownum
        except Exception,data:
            print Exception,":",data
            return 0
        finally:
            cursor.close()
            conn.close()
    
    @staticmethod
    def executeNoQueryL(conn,sql,param={}):
        cursor=conn.cursor()
        try:
            if(param=={}):
                rownum=cursor.execute(sql)
            else:
                rownum=cursor.executemany(sql,param)
            conn.commit()
            return rownum
        except Exception,data:
            print Exception,":",data
            try:
                cursor.close()
                del cursor
                cursor=conn.cursor()
                if(param=={}):
                    rownum=cursor.execute(sql)
                else:
                    rownum=cursor.executemany(sql,param)
                conn.commit()
                return rownum
            except Exception,data:
                print Exception,":",data
                return 0
        finally:
            cursor.close()
    
    @staticmethod
    def executeInsert(dbs,tn,dicts):
        rownum=0
        keys=dicts[0].keys()
        col=','.join(keys)
        values=[]
        i = len(keys)
        while (i>0):
            values.append('%s')
            i-=1
        val=','.join(values)
        sql="insert "+tn+" ("+col+") values ("+val+")"
        le=lenth=len(dicts)
        i=0
        conn=MySQLHelper.getConn(dbs)
        try:
            while(lenth>0):
                param=[]
                while(i<le):
                    row=dicts[i]
                    temp=[]
                    for key in keys:
                        temp.append(row[key])
                    param.append(temp)
                    
                    if(i!=0 and i%499==0):
                        try:
                            rownum+=MySQLHelper.executeNoQueryL(conn,sql,param)
                            param=[]
                        except Exception,data:
                            print Exception,":",data
                            try:
                                conn.close()
                                del conn
                                conn=MySQLHelper.getConn(dbs)
                                rownum+=MySQLHelper.executeNoQueryL(conn,sql,param)
                                param=[]
                            except Exception,data:
                                print Exception,":",data
                                return 0
                            
                    if(i+1==le and le%499!=0):
                        try:
                            rownum+=MySQLHelper.executeNoQueryL(conn,sql,param)
                            param=[]
                        except Exception,data:
                            print Exception,":",data
                            try:
                                conn.close()
                                del conn
                                conn=MySQLHelper.getConn(dbs)
                                rownum+=MySQLHelper.executeNoQueryL(conn,sql,param)
                                param=[]
                            except Exception,data:
                                print Exception,":",data
                                return 0
                        
                    i+=1
                    lenth-=1
            return rownum
        except Exception,data:
            print Exception,":",data
        finally:
            conn.close()
            
    @staticmethod
    def executeInsertL(conn,sql,dicts):        
        return MySQLHelper.split(conn,sql,dicts,500)
        
    @staticmethod
    def getConn(dbs):
        pot = 3306
        if(dbs.has_key('port')):
            pot=dbs['port']
        dbname='test'
        if(dbs.has_key('db')):
            dbname=dbs['db']
            
        return MySQLdb.connect(host=dbs['host'],port=pot,user=dbs['user'],passwd=dbs['passwd'],db=dbname)
    
    @staticmethod
    def split(conn,sql,list={},per=500):
        rownum=0
        length=0
        rg=0
        if(list!={}):
            length = len(list)
            rg=length/per
            if(rg==0):
                result.append(list)
            else:
                for idx in range(rg):
                    if(idx==rg-1):
                        rownum+=MySQLHelper.executeNoQueryL(conn,sql,list[idx*per:])
                    else:
                        rownum+=MySQLHelper.executeNoQueryL(conn,sql,list[idx*per:(idx+1)*per])
                
        return rownum


通用信息类

'''
Created on Jan 9, 2012

@author: admin
'''

class commons(object):
'''
classdocs
'''

db199_important={'host':'xxx.xxx.xxx.xxx','user':'admin','db':'tj_wd_important_2010', 'passwd':'pwd'}
db199_cost={'host':'xxx.xxx.xxx.xxx','user':'admin','db':'tj_wd_cost_2010', 'passwd':'pwd'}
db199_credit={'host':'xxx.xxx.xxx.xxx','user':'admin','db':'tj_wd_credit_2010', 'passwd':'pwd'}

db191_game_conf={'host':'xxx.xxx.xxx.xxx', 'user':'admin', 'passwd':'pwd','db':'game_conf'}

distuser='admin'
distpwd='pwd'
log_year=2010
def __init__(self):
'''
Constructor
'''






















                                            
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息