Hive数据汇总导入Mysql(Draw lessons from colleagues)
2018-01-05 11:25
471 查看
此方法中,采用datax配置文件json格式的方法,将目标表、查询sql等写入json配置中。目标表需要提前建好,数据库连接也是通过明文的方式;整个方案有一定的借鉴之处,但还是有很多东西需要改进。
同时,可以参考如下两篇文章:
Hive汇总统计数据自动化传输到Mysql数据库-跑批参数文本配置及提取使用-->http://blog.csdn.net/babyfish13/article/details/73188712
Hive汇总统计数据自动化传输到Mysql数据库-->http://blog.csdn.net/babyfish13/article/details/72701512
/Users/nisj/PycharmProjects/BiDataProc/Demand/hiveSum2Mysql.py
同时,可以参考如下两篇文章:
Hive汇总统计数据自动化传输到Mysql数据库-跑批参数文本配置及提取使用-->http://blog.csdn.net/babyfish13/article/details/73188712
Hive汇总统计数据自动化传输到Mysql数据库-->http://blog.csdn.net/babyfish13/article/details/72701512
/Users/nisj/PycharmProjects/BiDataProc/Demand/hiveSum2Mysql.py
#!/usr/bin/env python # encoding: utf-8 import datetime import os task_info = { "enable": True, "comment": "HoneyComb-用户:真正新设备注册", "reader": { "type": "hive", "hive_name": "hive_old", "query_sql": """ CREATE TEMPORARY TABLE tab_today_new_device_uid AS select a.appkey appkey, a.appsource appsource,a.device_ds device_ds, b.uid uid from ( select appkey, appsource, device_ds from oss_bi_all_real_device_ds_log where pt_day='{0}' group by appkey, appsource, device_ds ) a inner join ( select device_ds, parms['uid'] uid from oss_bi_all_device_log where pt_day='{0}' group by device_ds, parms['uid'] ) b on a.device_ds=b.device_ds group by a.appkey, a.appsource, a.device_ds, b.uid; CREATE TEMPORARY TABLE tab_lastday_newdevice_todayuid AS select a.appkey appkey, a.appsource appsource,a.device_ds device_ds, b.uid uid from ( select appkey, appsource, device_ds from oss_bi_all_real_device_ds_log where pt_day=date_sub('{0}',1) group by appkey, appsource, device_ds ) a inner join ( select device_ds, parms['uid'] uid from oss_bi_all_device_log where pt_day='{0}' group by device_ds, parms['uid'] ) b on a.device_ds=b.device_ds group by a.appkey, a.appsource, a.device_ds, b.uid; CREATE TEMPORARY TABLE tab_today_old_device AS select a.device_ds device_ds from ( select parms['arg_device_ds'] device_ds, parms['uid'] uid from oss_bi_all_login_log where pt_day='{0}' and parms['errorCode'] is null and parms['arg_device_ds'] != '' and parms['arg_device_ds'] is not null group by parms['arg_device_ds'], parms['uid'] ) a left join ( select uid from oss_bi_all_user_profile where pt_day='{0}' and substring(created_time, 1, 10) = '{0}' ) b on a.uid=b.uid where b.uid is null group by a.device_ds; with tab_currday_sum as( SELECT A.appkey, A.appsource, count(distinct A.device_ds) device_ds_cnt, count(distinct A.uid) uid_cnt FROM tab_today_new_device_uid A LEFT JOIN tab_today_old_device B ON A.device_ds=B.device_ds WHERE B.device_ds is null GROUP BY A.appkey, A.appsource), tab_lastday_remain_sum as( SELECT A.appkey, A.appsource, count(distinct A.device_ds) device_ds_cnt, count(distinct A.uid) uid_cnt FROM tab_lastday_newdevice_todayuid A LEFT JOIN tab_today_old_device B ON A.device_ds=B.device_ds WHERE B.device_ds is null GROUP BY A.appkey, A.appsource) select a1.appkey,a1.appsource,split(index_key,':')[0] index_key,split(index_key,':')[1] index_value from tab_currday_sum a1 lateral view explode(str_to_map(concat('currday_device_ds_cnt:',a1.device_ds_cnt,'&','currday_uid_cnt:',a1.uid_cnt),'&',',')) mid_list1 as index_key,index_value union all select a1.appkey,a1.appsource,split(index_key,':')[0] index_key,split(index_key,':')[1] index_value from tab_lastday_remain_sum a1 lateral view explode(str_to_map(concat('lastday_currdayremain_device_ds_cnt:',a1.device_ds_cnt,'&','lastday_device_currday_uid_cnt:',a1.uid_cnt),'&',',')) mid_list1 as index_key,index_value ; """ }, "writer": { "type": "mysql", "conn": "funnyai_data", "clear_sql": """ delete from honeycomb_retention_ds_daily_stat where `date`="{0}"; """, "insert_sql": """ insert into honeycomb_retention_ds_daily_stat (`date`, `appkey`, `appsource`, `s_type`, `index_cnt`) values ("{0}", "{1}", "{2}", "{3}", {4}); """ } } hive_conn = { "hive_old": { "jdbc_conn": "jdbc:hive2://HiveHostIp:10000" }, "hive_new": { "jdbc_conn": "jdbc:hive2://HiveHostIp:10000" } } mysql_conn = { "funnyai_data": { "ip": "HostIP", "port": 6603, "db": "funnyai_data", "username": "MysqlUser", "password": "MysqlPass" } } def get_day_ago(day_ago, date_time=datetime.datetime.now()): """获取n天前的日期""" return (date_time - datetime.timedelta(days=day_ago)).strftime('%Y-%m-%d') def get_mysql_cmd(conn, sql): if conn in mysql_conn: my_conf = mysql_conn[conn] return " /usr/bin/mysql -h{0} -P{1} -u{2} -p{3} -e \"\"\"set names utf8; use {4}; {5}\"\"\" ".format( my_conf['ip'], my_conf['port'], my_conf['username'], my_conf['password'], my_conf['db'], sql) else: return None def run_query_hive(task_conf, pt_day): mp_name_sql = """ SET mapred.job.name=' run_honey_task ({0}) '; """.format(pt_day) hive_sql = mp_name_sql + task_conf['reader']['query_sql'].format( pt_day).replace('"', "'").replace('`', '\`') # 替换所有的 双引号 成 单引号 jdbc_conn = hive_conn[task_conf['reader']['hive_name']]['jdbc_conn'] query_cmd = " source ~/.bash_profile && beeline --outputformat=csv2 --showHeader=false -u '{0}' -n hadoop -p '' -e \"\"\"{1}\"\"\" ".format( jdbc_conn, hive_sql) print query_cmd query_rs_list = os.popen(query_cmd).read().split("\n") if len(query_rs_list) > 1: return query_rs_list[:-1] else: raise Exception("Ooops, Query No Data!!!!!!!!!!!!!!!!") def run_upinsert_mysql(task_conf, pt_day, lines): if 'max_bulk_insert' in task_conf['writer']: MAX_INSERT = task_conf['writer']['max_bulk_insert'] else: MAX_INSERT = 10 run_sql_list = [] # 组装清空sql clear_sql = task_conf['writer']['clear_sql'].format(pt_day).replace( '"', "'").replace('`', '\`') if not clear_sql.strip()[-1:] == ';': clear_sql += ';' run_sql_list.append(clear_sql) # 组装插入sql insert_sql = '' insert_count = 0 for line in lines: if insert_count >= MAX_INSERT: run_sql_list.append(insert_sql) insert_count = 0 insert_sql = '' words = line.strip().split(',') print words insert_sql += task_conf['writer']['insert_sql'].format( pt_day, *words).replace('"', "'").replace('`', '\`') if not insert_sql.strip()[-1:] == ';': insert_sql += ';' insert_count += 1 if insert_count > 0: run_sql_list.append(insert_sql) # 执行所有分批sql for run_sql in run_sql_list: upinsert_cmd = get_mysql_cmd(task_conf['writer']['conn'], run_sql) + " && echo 'upinsert success' " print upinsert_cmd # 执行,并输出执行结果 print os.popen(upinsert_cmd).read() def run_one_day(pt_day): query_rs = [] if task_info['reader']['type'] == 'hive': try: query_rs = run_query_hive(task_info, pt_day) except Exception, e: print e if task_info['writer']['type'] == 'mysql': run_upinsert_mysql(task_info, pt_day, query_rs) if __name__ == '__main__': run_one_day('2017-12-25')
相关文章推荐
- Hive中的数据导入到MySQL
- sqoop mysql数据导入Hive中
- Hadoop数据经Hive汇总计算之后导出到Mysql
- 导入mysql数据问题:Cannot load from mysql.proc. The table is probably corrupted
- OOzie调度sqoop1 Action 从mysql导入数据到hive
- 利用sqoop将hive数据导入导出数据到mysql
- 大数据基础(二)hadoop, mave, hbase, hive, sqoop在ubuntu 14.04.04下的安装和sqoop与hdfs,hive,mysql导入导出
- 使用sqoop把mysql数据导入hive
- python脚本用sqoop把mysql数据导入hive数据仓库中
- 数据从mysql中导入hive表中异常解决:
- 使用Sqoop将HDFS/Hive/HBase与MySQL/Oracle中的数据相互导入、导出
- sqoop 导入mysql数据到hive中,把多个mysql字段数据类型转换hive数据类型
- Sqoop将MySQL和Oracle的数据导入HIVE和Hbase
- 导入Hive数据导MySQL
- 使用sqoop将mysql中数据导入到hive中
- Sqoop将MySQL中数据导入到Hive表中
- sqoop把hive表数据导入到mysql中
- Python将Hive汇总数据装载到Mysql
- mysql数据导入到hive遇到的问题
- Sqoop_具体总结 使用Sqoop将HDFS/Hive/HBase与MySQL/Oracle中的数据相互导入、导出