您的位置:首页 > 编程语言 > Python开发

Python将Hive汇总数据装载到Mysql

2017-03-29 10:59 447 查看
0、Hive里的临时表预计算
此部分只是将功能在Hive里实现了,将结果存放在Hive表里;与后面的Python代码程序无关联性。
--每个游戏各个房间按平均观看时长(观看总时长/房间内的总观看独立UID)前100名统计,以天为统计粒度
drop table if exists xx_view_time_rank_byidentifier;
create table xx_view_time_rank_byidentifier
as
with tab_view_time_basic as (
select pt_day,gameid,roomid,count(distinct identifier) view_identifier_cnt,sum(view_time) view_total_time,sum(view_time)/count(distinct identifier) view_time_per_identifier
from recommend_data_view
-- where pt_day='2017-01-06'
group by pt_day,gameid,roomid
having count(distinct identifier)>10)
select * from (
select pt_day,gameid,roomid,view_identifier_cnt,view_total_time,view_time_per_identifier,row_number()over(partition by pt_day,gameid order by view_time_per_identifier desc) rk
from tab_view_time_basic) x
where rk<=100
;

--数据查看与验证
select * from xx_view_time_rank_byidentifier where gameid<>'-1' order by pt_day,gameid,rk;

--每个房间的平台观看时长/按天
drop table if exists xx_view_time_byroomid;
create table xx_view_time_byroomid
as
select pt_day,roomid,count(distinct identifier) view_identifier_cnt,sum(view_time) view_total_time,sum(view_time)/count(distinct identifier) view_time_per_identifier
from recommend_data_view
-- where pt_day='2017-01-06'
group by pt_day,roomid
;

--数据查看与验证
select * from xx_view_time_byroomid order by pt_day,roomid limit 1000;

1、Mysql目标表结构
在此次的处理中,目标表要在目标库上提前建好。后续会考虑目标库自动建表的可能。
drop table if exists xx_view_time_byroomid;
CREATE TABLE `xx_view_time_byroomid`(
`pt_day` varchar(10),
`roomid` varchar(20),
`view_identifier_cnt` bigint,
`view_total_time` bigint,
`view_time_per_identifier` double(22,6),
etl_time datetime
);

drop table if exists xx_view_time_rank_byidentifier;
CREATE TABLE `xx_view_time_rank_byidentifier`(
`pt_day` varchar(10),
`gameid` varchar(20),
`roomid` varchar(10),
`view_identifier_cnt` bigint,
`view_total_time` bigint,
`view_time_per_identifier` double(22,6),
`rk` int,
etl_time datetime);

2、目标Mysql数据库配置文件
/Users/nisj/PycharmProjects/BiDataProc/HiveSum2Mysql/TargetMysqlConfig.ini
[reportMysql]
host = xxx.xxx.xxx.xxx
port = 3306
user = hadoop
pawd = xxxxxx
dbnm = xxxxxx

3、参数处理代码
主要处理日期时间及数据库配置文件信息的获取。
/Users/nisj/PycharmProjects/BiDataProc/HiveSum2Mysql/ParProc.py
# -*- coding=utf-8 -*-
import warnings
import datetime
import ConfigParser

warnings.filterwarnings("ignore")

def getNowDay():
DayNow = datetime.datetime.today().strftime('%Y-%m-%d')
return DayNow

def getYesterDay():
YesterDay = (datetime.datetime.today() - datetime.timedelta(1)).strftime('%Y-%m-%d')
return YesterDay

def dateRange(beginDate, endDate):
dates = []
dt = datetime.datetime.strptime(beginDate, "%Y-%m-%d")
date = beginDate[:]
while date <= endDate:
dates.append(date)
dt = dt + datetime.timedelta(1)
date = dt.strftime("%Y-%m-%d")
return dates

def getSrcMysqlConfig():
config = ConfigParser.ConfigParser()
config.read("TargetMysqlConfig.ini")

# 读取家庭地址信息
host = config.get("reportMysql", "host")
port = config.get("reportMysql", "port")
user = config.get("reportMysql", "user")
pawd = config.get("reportMysql", "pawd")
dbnm = config.get("reportMysql", "dbnm")

return (host, port ,user, pawd, dbnm)

# print getYesterDay()
# print getNowDay()

4、Hive数据汇总及将数据传输到Mysql数据库
/Users/nisj/PycharmProjects/BiDataProc/HiveSum2Mysql/HiveDataSum2Mysql.py
# -*- coding=utf-8 -*-
import os
import re
import time
from ParProc import *

warnings.filterwarnings("ignore")

def hiveDataSum2Mysql_timePerByRoomid(runDay):
# 参数初始化赋值
host = getSrcMysqlConfig()[0]
port = getSrcMysqlConfig()[1]
user = getSrcMysqlConfig()[2]
passwd = getSrcMysqlConfig()[3]
db = getSrcMysqlConfig()[4]

os.system("""source /etc/profile; \
/usr/bin/mysql -h{host} -P{port} -u{user} -p{passwd} -e "use {db}; \
delete from xx_view_time_byroomid where pt_day='{pt_day}'; " """.format(host=host, port=port, user=user, passwd=passwd, db=db, pt_day=runDay))

Sum_Data = os.popen("""/usr/lib/hive-current/bin/hive -e " \
select pt_day,roomid,count(distinct identifier) view_identifier_cnt,sum(view_time) view_total_time,sum(view_time)/count(distinct identifier) view_time_per_identifier \
from recommend_data_view \
where pt_day='{runDay}' \
group by pt_day,roomid \
;" """.format(runDay=runDay)).readlines();

Sum_Data_list = []
for sum_list in Sum_Data:
sum = re.split('\t', sum_list.replace('\n', ''))
Sum_Data_list.append(sum)

i = 0
insert_mysql_sql = """/usr/bin/mysql -h{host} -P{port} -u{user} -p{passwd} -e "use {db}; \
insert into xx_view_time_byroomid(pt_day, roomid, view_identifier_cnt, view_total_time, view_time_per_identifier, etl_time) \
values """.format(host=host, port=port, user=user, passwd=passwd, db=db)
for sumd in Sum_Data_list:
pt_day = sumd[0]
roomid = sumd[1]
view_identifier_cnt = sumd[2]
view_total_time = sumd[3]
view_time_per_identifier = sumd[4]
etl_time = time.strftime('%Y-%m-%d %X', time.localtime())

i += 1
insert_mysql_sql = insert_mysql_sql + """('{pt_day}','{roomid}','{view_identifier_cnt}','{view_total_time}','{view_time_per_identifier}','{etl_time}'),""".format(host=host, port=port, user=user, passwd=passwd, db=db, pt_day=pt_day, roomid=roomid, view_identifier_cnt=view_identifier_cnt, view_total_time=view_total_time, view_time_per_identifier=view_time_per_identifier, etl_time=etl_time)
if (i % 1000 == 0):
insert_mysql_sql = insert_mysql_sql.rstrip(',') + """ ;" """
os.system(insert_mysql_sql)

insert_mysql_sql = """/usr/bin/mysql -h{host} -P{port} -u{user} -p{passwd} -e "use {db}; \
insert into xx_view_time_byroomid(pt_day, roomid, view_identifier_cnt, view_total_time, view_time_per_identifier, etl_time) \
values """.format(host=host, port=port, user=user, passwd=passwd, db=db)

insert_mysql_sql = insert_mysql_sql.rstrip(',') + """ ;" """
os.system(insert_mysql_sql)

def hiveDataSum2Mysql_timePerTop100(runDay):
# 参数初始化赋值
host = getSrcMysqlConfig()[0]
port = getSrcMysqlConfig()[1]
user = getSrcMysqlConfig()[2]
passwd = getSrcMysqlConfig()[3]
db = getSrcMysqlConfig()[4]

os.system("""source /etc/profile; \
/usr/bin/mysql -h{host} -P{port} -u{user} -p{passwd} -e "use {db}; \
delete from xx_view_time_rank_byidentifier where pt_day='{pt_day}'; " """.format(host=host, port=port, user=user, passwd=passwd, db=db, pt_day=runDay))

Sum_Data = os.popen("""/usr/lib/hive-current/bin/hive -e " \
with tab_view_time_basic as ( \
select pt_day,gameid,roomid,count(distinct identifier) view_identifier_cnt,sum(view_time) view_total_time,sum(view_time)/count(distinct identifier) view_time_per_identifier \
from recommend_data_view \
where pt_day='{runDay}' \
group by pt_day,gameid,roomid \
having count(distinct identifier)>10) \
select * from ( \
select pt_day,gameid,roomid,view_identifier_cnt,view_total_time,view_time_per_identifier,row_number()over(partition by pt_day,gameid order by view_time_per_identifier desc) rk \
from tab_view_time_basic) x \
where rk<=100 \
;" """.format(runDay=runDay)).readlines();

Sum_Data_list = []
for sum_list in Sum_Data:
sum = re.split('\t', sum_list.replace('\n', ''))
Sum_Data_list.append(sum)

i = 0
insert_mysql_sql = """/usr/bin/mysql -h{host} -P{port} -u{user} -p{passwd} -e "use {db}; \
insert into xx_view_time_rank_byidentifier(pt_day, gameid, roomid, view_identifier_cnt, view_total_time, view_time_per_identifier, rk, etl_time) \
values """.format(host=host, port=port, user=user, passwd=passwd, db=db)
for sumd in Sum_Data_list:
pt_day = sumd[0]
gameid = sumd[1]
roomid = sumd[2]
view_identifier_cnt = sumd[3]
view_total_time = sumd[4]
view_time_per_identifier = sumd[5]
rk = sumd[6]
etl_time = time.strftime('%Y-%m-%d %X', time.localtime())

i += 1
insert_mysql_sql = insert_mysql_sql + """('{pt_day}','{gameid}','{roomid}','{view_identifier_cnt}','{view_total_time}','{view_time_per_identifier}',{rk},'{etl_time}'),""".format(host=host, port=port, user=user, passwd=passwd, db=db, pt_day=pt_day, gameid=gameid, roomid=roomid, view_identifier_cnt=view_identifier_cnt, view_total_time=view_total_time, view_time_per_identifier=view_time_per_identifier, rk=rk, etl_time=etl_time)
if (i % 1000 == 0):
insert_mysql_sql = insert_mysql_sql.rstrip(',') + """ ;" """
os.system(insert_mysql_sql)

insert_mysql_sql = """/usr/bin/mysql -h{host} -P{port} -u{user} -p{passwd} -e "use {db}; \
insert into xx_view_time_rank_byidentifier(pt_day, gameid, roomid, view_identifier_cnt, view_total_time, view_time_per_identifier, rk, etl_time) \
values """.format(host=host, port=port, user=user, passwd=passwd, db=db)

insert_mysql_sql = insert_mysql_sql.rstrip(',') + """ ;" """
os.system(insert_mysql_sql)
代码里注意mysql数据库python批量插入的方法。

5、程序的总控调度
/Users/nisj/PycharmProjects/BiDataProc/HiveSum2Mysql/Hive2MysqlCtl.py
# -*- coding=utf-8 -*-
from HiveDataSum2Mysql import *

warnings.filterwarnings("ignore")

def Hive2Mysql(runDay):
hiveDataSum2Mysql_timePerByRoomid(runDay=runDay)
hiveDataSum2Mysql_timePerTop100(runDay=runDay)

# Batch Ctl
# Hive2Mysql(runDay='2017-03-26')
# for runDay in dateRange(beginDate='2017-03-01', endDate='2017-03-27'):
#     Hive2Mysql(runDay=runDay)

Hive2Mysql(runDay=getYesterDay())

6、程序的并行调度
目前,程序并行有一点总是,怀疑是多线程模块的总是,待研究。
/Users/nisj/PycharmProjects/BiDataProc/HiveSum2Mysql/BatchThread.py
# -*- coding=utf-8 -*-
import threadpool
from Hive2MysqlCtl import *

warnings.filterwarnings("ignore")

today = datetime.date.today()
yesterday = today - datetime.timedelta(days=1)
tomorrow = today + datetime.timedelta(days=1)

now_time = time.strftime('%Y-%m-%d %X', time.localtime())
print "当前时间是:",now_time

runDay_list = dateRange(beginDate='2017-01-01', endDate='2017-03-27')
requests = []
request_Hive2Mysql_batchCtl = threadpool.makeRequests(Hive2Mysql, runDay_list)
requests.extend(request_Hive2Mysql_batchCtl)
main_pool = threadpool.ThreadPool(10)
[main_pool.putRequest(req) for req in requests]

if __name__ == '__main__':
while True:
try:
time.sleep(30)
main_pool.poll()
except KeyboardInterrupt:
print("**** Interrupted!")
break
except threadpool.NoResultsPending:
break

if main_pool.dismissedWorkers:
print("Joining all dismissed worker threads...")
main_pool.joinAllDismissedWorkers()

now_time = time.strftime('%Y-%m-%d %X', time.localtime())
print "当前时间是:",now_time
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: