Python将Hive汇总数据装载到Mysql
2017-03-29 10:59
447 查看
0、Hive里的临时表预计算
此部分只是将功能在Hive里实现了,将结果存放在Hive表里;与后面的Python代码程序无关联性。
1、Mysql目标表结构
在此次的处理中,目标表要在目标库上提前建好。后续会考虑目标库自动建表的可能。
2、目标Mysql数据库配置文件
/Users/nisj/PycharmProjects/BiDataProc/HiveSum2Mysql/TargetMysqlConfig.ini
3、参数处理代码
主要处理日期时间及数据库配置文件信息的获取。
/Users/nisj/PycharmProjects/BiDataProc/HiveSum2Mysql/ParProc.py
4、Hive数据汇总及将数据传输到Mysql数据库
/Users/nisj/PycharmProjects/BiDataProc/HiveSum2Mysql/HiveDataSum2Mysql.py
5、程序的总控调度
/Users/nisj/PycharmProjects/BiDataProc/HiveSum2Mysql/Hive2MysqlCtl.py
6、程序的并行调度
目前,程序并行有一点总是,怀疑是多线程模块的总是,待研究。
/Users/nisj/PycharmProjects/BiDataProc/HiveSum2Mysql/BatchThread.py
此部分只是将功能在Hive里实现了,将结果存放在Hive表里;与后面的Python代码程序无关联性。
--每个游戏各个房间按平均观看时长(观看总时长/房间内的总观看独立UID)前100名统计,以天为统计粒度 drop table if exists xx_view_time_rank_byidentifier; create table xx_view_time_rank_byidentifier as with tab_view_time_basic as ( select pt_day,gameid,roomid,count(distinct identifier) view_identifier_cnt,sum(view_time) view_total_time,sum(view_time)/count(distinct identifier) view_time_per_identifier from recommend_data_view -- where pt_day='2017-01-06' group by pt_day,gameid,roomid having count(distinct identifier)>10) select * from ( select pt_day,gameid,roomid,view_identifier_cnt,view_total_time,view_time_per_identifier,row_number()over(partition by pt_day,gameid order by view_time_per_identifier desc) rk from tab_view_time_basic) x where rk<=100 ; --数据查看与验证 select * from xx_view_time_rank_byidentifier where gameid<>'-1' order by pt_day,gameid,rk; --每个房间的平台观看时长/按天 drop table if exists xx_view_time_byroomid; create table xx_view_time_byroomid as select pt_day,roomid,count(distinct identifier) view_identifier_cnt,sum(view_time) view_total_time,sum(view_time)/count(distinct identifier) view_time_per_identifier from recommend_data_view -- where pt_day='2017-01-06' group by pt_day,roomid ; --数据查看与验证 select * from xx_view_time_byroomid order by pt_day,roomid limit 1000;
1、Mysql目标表结构
在此次的处理中,目标表要在目标库上提前建好。后续会考虑目标库自动建表的可能。
drop table if exists xx_view_time_byroomid; CREATE TABLE `xx_view_time_byroomid`( `pt_day` varchar(10), `roomid` varchar(20), `view_identifier_cnt` bigint, `view_total_time` bigint, `view_time_per_identifier` double(22,6), etl_time datetime ); drop table if exists xx_view_time_rank_byidentifier; CREATE TABLE `xx_view_time_rank_byidentifier`( `pt_day` varchar(10), `gameid` varchar(20), `roomid` varchar(10), `view_identifier_cnt` bigint, `view_total_time` bigint, `view_time_per_identifier` double(22,6), `rk` int, etl_time datetime);
2、目标Mysql数据库配置文件
/Users/nisj/PycharmProjects/BiDataProc/HiveSum2Mysql/TargetMysqlConfig.ini
[reportMysql] host = xxx.xxx.xxx.xxx port = 3306 user = hadoop pawd = xxxxxx dbnm = xxxxxx
3、参数处理代码
主要处理日期时间及数据库配置文件信息的获取。
/Users/nisj/PycharmProjects/BiDataProc/HiveSum2Mysql/ParProc.py
# -*- coding=utf-8 -*- import warnings import datetime import ConfigParser warnings.filterwarnings("ignore") def getNowDay(): DayNow = datetime.datetime.today().strftime('%Y-%m-%d') return DayNow def getYesterDay(): YesterDay = (datetime.datetime.today() - datetime.timedelta(1)).strftime('%Y-%m-%d') return YesterDay def dateRange(beginDate, endDate): dates = [] dt = datetime.datetime.strptime(beginDate, "%Y-%m-%d") date = beginDate[:] while date <= endDate: dates.append(date) dt = dt + datetime.timedelta(1) date = dt.strftime("%Y-%m-%d") return dates def getSrcMysqlConfig(): config = ConfigParser.ConfigParser() config.read("TargetMysqlConfig.ini") # 读取家庭地址信息 host = config.get("reportMysql", "host") port = config.get("reportMysql", "port") user = config.get("reportMysql", "user") pawd = config.get("reportMysql", "pawd") dbnm = config.get("reportMysql", "dbnm") return (host, port ,user, pawd, dbnm) # print getYesterDay() # print getNowDay()
4、Hive数据汇总及将数据传输到Mysql数据库
/Users/nisj/PycharmProjects/BiDataProc/HiveSum2Mysql/HiveDataSum2Mysql.py
# -*- coding=utf-8 -*- import os import re import time from ParProc import * warnings.filterwarnings("ignore") def hiveDataSum2Mysql_timePerByRoomid(runDay): # 参数初始化赋值 host = getSrcMysqlConfig()[0] port = getSrcMysqlConfig()[1] user = getSrcMysqlConfig()[2] passwd = getSrcMysqlConfig()[3] db = getSrcMysqlConfig()[4] os.system("""source /etc/profile; \ /usr/bin/mysql -h{host} -P{port} -u{user} -p{passwd} -e "use {db}; \ delete from xx_view_time_byroomid where pt_day='{pt_day}'; " """.format(host=host, port=port, user=user, passwd=passwd, db=db, pt_day=runDay)) Sum_Data = os.popen("""/usr/lib/hive-current/bin/hive -e " \ select pt_day,roomid,count(distinct identifier) view_identifier_cnt,sum(view_time) view_total_time,sum(view_time)/count(distinct identifier) view_time_per_identifier \ from recommend_data_view \ where pt_day='{runDay}' \ group by pt_day,roomid \ ;" """.format(runDay=runDay)).readlines(); Sum_Data_list = [] for sum_list in Sum_Data: sum = re.split('\t', sum_list.replace('\n', '')) Sum_Data_list.append(sum) i = 0 insert_mysql_sql = """/usr/bin/mysql -h{host} -P{port} -u{user} -p{passwd} -e "use {db}; \ insert into xx_view_time_byroomid(pt_day, roomid, view_identifier_cnt, view_total_time, view_time_per_identifier, etl_time) \ values """.format(host=host, port=port, user=user, passwd=passwd, db=db) for sumd in Sum_Data_list: pt_day = sumd[0] roomid = sumd[1] view_identifier_cnt = sumd[2] view_total_time = sumd[3] view_time_per_identifier = sumd[4] etl_time = time.strftime('%Y-%m-%d %X', time.localtime()) i += 1 insert_mysql_sql = insert_mysql_sql + """('{pt_day}','{roomid}','{view_identifier_cnt}','{view_total_time}','{view_time_per_identifier}','{etl_time}'),""".format(host=host, port=port, user=user, passwd=passwd, db=db, pt_day=pt_day, roomid=roomid, view_identifier_cnt=view_identifier_cnt, view_total_time=view_total_time, view_time_per_identifier=view_time_per_identifier, etl_time=etl_time) if (i % 1000 == 0): insert_mysql_sql = insert_mysql_sql.rstrip(',') + """ ;" """ os.system(insert_mysql_sql) insert_mysql_sql = """/usr/bin/mysql -h{host} -P{port} -u{user} -p{passwd} -e "use {db}; \ insert into xx_view_time_byroomid(pt_day, roomid, view_identifier_cnt, view_total_time, view_time_per_identifier, etl_time) \ values """.format(host=host, port=port, user=user, passwd=passwd, db=db) insert_mysql_sql = insert_mysql_sql.rstrip(',') + """ ;" """ os.system(insert_mysql_sql) def hiveDataSum2Mysql_timePerTop100(runDay): # 参数初始化赋值 host = getSrcMysqlConfig()[0] port = getSrcMysqlConfig()[1] user = getSrcMysqlConfig()[2] passwd = getSrcMysqlConfig()[3] db = getSrcMysqlConfig()[4] os.system("""source /etc/profile; \ /usr/bin/mysql -h{host} -P{port} -u{user} -p{passwd} -e "use {db}; \ delete from xx_view_time_rank_byidentifier where pt_day='{pt_day}'; " """.format(host=host, port=port, user=user, passwd=passwd, db=db, pt_day=runDay)) Sum_Data = os.popen("""/usr/lib/hive-current/bin/hive -e " \ with tab_view_time_basic as ( \ select pt_day,gameid,roomid,count(distinct identifier) view_identifier_cnt,sum(view_time) view_total_time,sum(view_time)/count(distinct identifier) view_time_per_identifier \ from recommend_data_view \ where pt_day='{runDay}' \ group by pt_day,gameid,roomid \ having count(distinct identifier)>10) \ select * from ( \ select pt_day,gameid,roomid,view_identifier_cnt,view_total_time,view_time_per_identifier,row_number()over(partition by pt_day,gameid order by view_time_per_identifier desc) rk \ from tab_view_time_basic) x \ where rk<=100 \ ;" """.format(runDay=runDay)).readlines(); Sum_Data_list = [] for sum_list in Sum_Data: sum = re.split('\t', sum_list.replace('\n', '')) Sum_Data_list.append(sum) i = 0 insert_mysql_sql = """/usr/bin/mysql -h{host} -P{port} -u{user} -p{passwd} -e "use {db}; \ insert into xx_view_time_rank_byidentifier(pt_day, gameid, roomid, view_identifier_cnt, view_total_time, view_time_per_identifier, rk, etl_time) \ values """.format(host=host, port=port, user=user, passwd=passwd, db=db) for sumd in Sum_Data_list: pt_day = sumd[0] gameid = sumd[1] roomid = sumd[2] view_identifier_cnt = sumd[3] view_total_time = sumd[4] view_time_per_identifier = sumd[5] rk = sumd[6] etl_time = time.strftime('%Y-%m-%d %X', time.localtime()) i += 1 insert_mysql_sql = insert_mysql_sql + """('{pt_day}','{gameid}','{roomid}','{view_identifier_cnt}','{view_total_time}','{view_time_per_identifier}',{rk},'{etl_time}'),""".format(host=host, port=port, user=user, passwd=passwd, db=db, pt_day=pt_day, gameid=gameid, roomid=roomid, view_identifier_cnt=view_identifier_cnt, view_total_time=view_total_time, view_time_per_identifier=view_time_per_identifier, rk=rk, etl_time=etl_time) if (i % 1000 == 0): insert_mysql_sql = insert_mysql_sql.rstrip(',') + """ ;" """ os.system(insert_mysql_sql) insert_mysql_sql = """/usr/bin/mysql -h{host} -P{port} -u{user} -p{passwd} -e "use {db}; \ insert into xx_view_time_rank_byidentifier(pt_day, gameid, roomid, view_identifier_cnt, view_total_time, view_time_per_identifier, rk, etl_time) \ values """.format(host=host, port=port, user=user, passwd=passwd, db=db) insert_mysql_sql = insert_mysql_sql.rstrip(',') + """ ;" """ os.system(insert_mysql_sql)代码里注意mysql数据库python批量插入的方法。
5、程序的总控调度
/Users/nisj/PycharmProjects/BiDataProc/HiveSum2Mysql/Hive2MysqlCtl.py
# -*- coding=utf-8 -*- from HiveDataSum2Mysql import * warnings.filterwarnings("ignore") def Hive2Mysql(runDay): hiveDataSum2Mysql_timePerByRoomid(runDay=runDay) hiveDataSum2Mysql_timePerTop100(runDay=runDay) # Batch Ctl # Hive2Mysql(runDay='2017-03-26') # for runDay in dateRange(beginDate='2017-03-01', endDate='2017-03-27'): # Hive2Mysql(runDay=runDay) Hive2Mysql(runDay=getYesterDay())
6、程序的并行调度
目前,程序并行有一点总是,怀疑是多线程模块的总是,待研究。
/Users/nisj/PycharmProjects/BiDataProc/HiveSum2Mysql/BatchThread.py
# -*- coding=utf-8 -*- import threadpool from Hive2MysqlCtl import * warnings.filterwarnings("ignore") today = datetime.date.today() yesterday = today - datetime.timedelta(days=1) tomorrow = today + datetime.timedelta(days=1) now_time = time.strftime('%Y-%m-%d %X', time.localtime()) print "当前时间是:",now_time runDay_list = dateRange(beginDate='2017-01-01', endDate='2017-03-27') requests = [] request_Hive2Mysql_batchCtl = threadpool.makeRequests(Hive2Mysql, runDay_list) requests.extend(request_Hive2Mysql_batchCtl) main_pool = threadpool.ThreadPool(10) [main_pool.putRequest(req) for req in requests] if __name__ == '__main__': while True: try: time.sleep(30) main_pool.poll() except KeyboardInterrupt: print("**** Interrupted!") break except threadpool.NoResultsPending: break if main_pool.dismissedWorkers: print("Joining all dismissed worker threads...") main_pool.joinAllDismissedWorkers() now_time = time.strftime('%Y-%m-%d %X', time.localtime()) print "当前时间是:",now_time
相关文章推荐
- Python自动化拉取Mysql数据并装载到Hive(V2.0)
- Python自动化拉取Mysql数据并装载到Hive
- Python自动化拉取Mysql数据并装载到Hive(V3.0)
- Python将Mysql分表数据按小时增量装载到Hive示例
- python脚本用sqoop把mysql数据导入hive数据仓库中
- Python利用groupby模块进行Mysql分表数据的汇总统计
- python脚本用sqoop把mysql数据导入hive数据仓库中
- Python自动化拉取Mysql数据并装载到Hive
- 用Python调度数据在Mysql及Hive间进行计算2例(续)-idfa与日志比对进行留存充值数据计算
- 用Python将json数据装载到mysql
- python脚本 用sqoop把mysql数据导入hive
- 用Python在Mysql与Hive间数据计算一例(续)
- 用Python调度数据在Mysql及Hive间进行计算2例
- Hive数据汇总导入Mysql(Draw lessons from colleagues)
- 用Python在Mysql与Hive间数据计算一例
- Python自动化拉取Mysql数据并装载到Oracle
- Mysql分表数据通过Python进行汇总统计
- python脚本用sqoop把mysql数据导入hive数据仓库中
- Hadoop数据经Hive汇总计算之后导出到Mysql
- python实现Ubuntu上数据及mysql备份