您的位置:首页 > 产品设计 > UI/UE

Hive数据汇总导入Mysql(Draw lessons from colleagues)

2018-01-05 11:25 471 查看
此方法中,采用datax配置文件json格式的方法,将目标表、查询sql等写入json配置中。目标表需要提前建好,数据库连接也是通过明文的方式;整个方案有一定的借鉴之处,但还是有很多东西需要改进。
同时,可以参考如下两篇文章:
Hive汇总统计数据自动化传输到Mysql数据库-跑批参数文本配置及提取使用-->http://blog.csdn.net/babyfish13/article/details/73188712
Hive汇总统计数据自动化传输到Mysql数据库-->http://blog.csdn.net/babyfish13/article/details/72701512
/Users/nisj/PycharmProjects/BiDataProc/Demand/hiveSum2Mysql.py
#!/usr/bin/env python
# encoding: utf-8

import datetime
import os

task_info = {
"enable": True,
"comment": "HoneyComb-用户:真正新设备注册",
"reader": {
"type": "hive",
"hive_name": "hive_old",
"query_sql": """
CREATE TEMPORARY TABLE tab_today_new_device_uid
AS
select a.appkey appkey, a.appsource appsource,a.device_ds device_ds, b.uid uid
from (
select appkey, appsource, device_ds
from oss_bi_all_real_device_ds_log
where pt_day='{0}'
group by appkey, appsource, device_ds
) a inner join (
select device_ds, parms['uid'] uid
from oss_bi_all_device_log
where pt_day='{0}'
group by device_ds, parms['uid']
) b on a.device_ds=b.device_ds
group by a.appkey, a.appsource, a.device_ds, b.uid;

CREATE TEMPORARY TABLE tab_lastday_newdevice_todayuid
AS
select a.appkey appkey, a.appsource appsource,a.device_ds device_ds, b.uid uid
from (
select appkey, appsource, device_ds
from oss_bi_all_real_device_ds_log
where pt_day=date_sub('{0}',1)
group by appkey, appsource, device_ds
) a inner join (
select device_ds, parms['uid'] uid
from oss_bi_all_device_log
where pt_day='{0}'
group by device_ds, parms['uid']
) b on a.device_ds=b.device_ds
group by a.appkey, a.appsource, a.device_ds, b.uid;

CREATE TEMPORARY TABLE tab_today_old_device
AS
select a.device_ds device_ds
from (
select parms['arg_device_ds'] device_ds, parms['uid'] uid
from oss_bi_all_login_log
where pt_day='{0}' and parms['errorCode'] is null
and parms['arg_device_ds'] != '' and parms['arg_device_ds'] is not null
group by parms['arg_device_ds'], parms['uid']
) a left join (
select uid
from oss_bi_all_user_profile
where pt_day='{0}' and substring(created_time, 1, 10) = '{0}'
) b on a.uid=b.uid where b.uid is null
group by a.device_ds;

with tab_currday_sum as(
SELECT A.appkey, A.appsource,
count(distinct A.device_ds) device_ds_cnt,
count(distinct A.uid) uid_cnt
FROM tab_today_new_device_uid A
LEFT JOIN tab_today_old_device B ON A.device_ds=B.device_ds
WHERE B.device_ds is null
GROUP BY A.appkey, A.appsource),
tab_lastday_remain_sum as(
SELECT A.appkey, A.appsource,
count(distinct A.device_ds) device_ds_cnt,
count(distinct A.uid) uid_cnt
FROM tab_lastday_newdevice_todayuid A
LEFT JOIN tab_today_old_device B ON A.device_ds=B.device_ds
WHERE B.device_ds is null
GROUP BY A.appkey, A.appsource)
select a1.appkey,a1.appsource,split(index_key,':')[0] index_key,split(index_key,':')[1] index_value
from tab_currday_sum a1
lateral view explode(str_to_map(concat('currday_device_ds_cnt:',a1.device_ds_cnt,'&','currday_uid_cnt:',a1.uid_cnt),'&',',')) mid_list1 as index_key,index_value
union all
select a1.appkey,a1.appsource,split(index_key,':')[0] index_key,split(index_key,':')[1] index_value
from tab_lastday_remain_sum a1
lateral view explode(str_to_map(concat('lastday_currdayremain_device_ds_cnt:',a1.device_ds_cnt,'&','lastday_device_currday_uid_cnt:',a1.uid_cnt),'&',',')) mid_list1 as index_key,index_value
;
"""
},
"writer": {
"type": "mysql",
"conn": "funnyai_data",
"clear_sql": """
delete from honeycomb_retention_ds_daily_stat
where `date`="{0}";
""",
"insert_sql": """
insert into honeycomb_retention_ds_daily_stat
(`date`, `appkey`, `appsource`, `s_type`, `index_cnt`)
values
("{0}", "{1}", "{2}", "{3}", {4});
"""
}
}

hive_conn = {
"hive_old": {
"jdbc_conn": "jdbc:hive2://HiveHostIp:10000"
},
"hive_new": {
"jdbc_conn": "jdbc:hive2://HiveHostIp:10000"
}
}

mysql_conn = {
"funnyai_data": {
"ip": "HostIP",
"port": 6603,
"db": "funnyai_data",
"username": "MysqlUser",
"password": "MysqlPass"
}
}

def get_day_ago(day_ago, date_time=datetime.datetime.now()):
"""获取n天前的日期"""
return (date_time - datetime.timedelta(days=day_ago)).strftime('%Y-%m-%d')

def get_mysql_cmd(conn, sql):
if conn in mysql_conn:
my_conf = mysql_conn[conn]
return " /usr/bin/mysql -h{0} -P{1} -u{2} -p{3} -e \"\"\"set names utf8; use {4}; {5}\"\"\" ".format(
my_conf['ip'], my_conf['port'], my_conf['username'],
my_conf['password'], my_conf['db'], sql)
else:
return None

def run_query_hive(task_conf, pt_day):
mp_name_sql = """
SET mapred.job.name=' run_honey_task ({0}) ';
""".format(pt_day)
hive_sql = mp_name_sql + task_conf['reader']['query_sql'].format(
pt_day).replace('"', "'").replace('`', '\`')  # 替换所有的 双引号 成 单引号
jdbc_conn = hive_conn[task_conf['reader']['hive_name']]['jdbc_conn']
query_cmd = " source ~/.bash_profile && beeline --outputformat=csv2 --showHeader=false -u '{0}' -n hadoop -p '' -e \"\"\"{1}\"\"\" ".format(
jdbc_conn, hive_sql)

print query_cmd

query_rs_list = os.popen(query_cmd).read().split("\n")
if len(query_rs_list) > 1:
return query_rs_list[:-1]
else:
raise Exception("Ooops, Query No Data!!!!!!!!!!!!!!!!")

def run_upinsert_mysql(task_conf, pt_day, lines):
if 'max_bulk_insert' in task_conf['writer']:
MAX_INSERT = task_conf['writer']['max_bulk_insert']
else:
MAX_INSERT = 10
run_sql_list = []

# 组装清空sql
clear_sql = task_conf['writer']['clear_sql'].format(pt_day).replace(
'"', "'").replace('`', '\`')
if not clear_sql.strip()[-1:] == ';':
clear_sql += ';'
run_sql_list.append(clear_sql)

# 组装插入sql
insert_sql = ''
insert_count = 0
for line in lines:
if insert_count >= MAX_INSERT:
run_sql_list.append(insert_sql)
insert_count = 0
insert_sql = ''

words = line.strip().split(',')
print words
insert_sql += task_conf['writer']['insert_sql'].format(
pt_day, *words).replace('"', "'").replace('`', '\`')
if not insert_sql.strip()[-1:] == ';':
insert_sql += ';'
insert_count += 1

if insert_count > 0:
run_sql_list.append(insert_sql)

# 执行所有分批sql
for run_sql in run_sql_list:
upinsert_cmd = get_mysql_cmd(task_conf['writer']['conn'],
run_sql) + " && echo 'upinsert success' "
print upinsert_cmd

# 执行,并输出执行结果
print os.popen(upinsert_cmd).read()

def run_one_day(pt_day):
query_rs = []

if task_info['reader']['type'] == 'hive':
try:
query_rs = run_query_hive(task_info, pt_day)
except Exception, e:
print e

if task_info['writer']['type'] == 'mysql':
run_upinsert_mysql(task_info, pt_day, query_rs)

if __name__ == '__main__':
run_one_day('2017-12-25')
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: