Python将Mysql分表数据按小时增量装载到Hive示例
2017-05-26 11:32
441 查看
1、Hive建存储在阿里云OSS上的外部目标表
2、参数处理(Mysql目标库配置信息及取小时等处理)
/Users/nisj/PycharmProjects/BiDataProc/Mysql2Hive-LiveHistory/ParProc.py
3、数据的增量装载处理
/Users/nisj/PycharmProjects/BiDataProc/Mysql2Hive-LiveHistory/MysqlData2Hive.py
4、crontab定时调度
[hadoop@emr-worker-10 ~]$ crontab -l
9 * * * * /usr/bin/python /home/hadoop/nisj/Mysql2Hive-LiveHistory/MysqlData2Hive.py >> /home/hadoop/nisj/Mysql2Hive-LiveHistory/MysqlData2Hive_LiveHistory-by_hour.log 2>&1
5、说明
Mysql上的源表是由256个分表组成,数据一旦写入便不会更改。考虑到增量及数据回滚,Hive目标表考虑用使用分区的方式(一个小时相对太小,可以考虑一天或周进行数据提取)。
数据的传输相对根据表及实际情况是定制的,并不具有工具特性。
hive> show create table oss_live_history_status; OK CREATE EXTERNAL TABLE `oss_live_history_status`( `id` bigint, `room_id` bigint, `name` string, `live_id` string, `live_status_id` bigint, `game_id` bigint, `game_name` string, `category_id` bigint, `live_source_id` bigint, `style` bigint, `model_name` string, `screenshot` string, `online_count` bigint, `online_real_count` bigint, `score` bigint, `is_profession` bigint, `xappkey` string, `open_uid` string, `live_tag_id` bigint, `display` bigint, `state` bigint, `switch_time` string, `created_time` string, `updated_time` string) PARTITIONED BY ( `pt_day` string, `pt_hour` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' WITH SERDEPROPERTIES ( 'field.delim'='\t', 'line.delim'='\n', 'serialization.format'='\t') STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 'oss://LTAIJMz:YZI5W80risJgio7@tv-hz.oss-cn-hangzhou-internal.aliyuncs.com/hive/oss_live_history_status' TBLPROPERTIES ( 'transient_lastDdlTime'='1495014665') Time taken: 0.076 seconds, Fetched: 42 row(s)
2、参数处理(Mysql目标库配置信息及取小时等处理)
/Users/nisj/PycharmProjects/BiDataProc/Mysql2Hive-LiveHistory/ParProc.py
# -*- coding=utf-8 -*- import warnings import datetime warnings.filterwarnings("ignore") # src Database config srcMysqlConfig_tv_server = { 'host': 'hostInterIp', # 'host': 'hostOuterIp', 'user': 'MysqlUser', 'passwd': 'MysqlPasswd', 'port': 50506, 'db': 'tv_server' } srcMysqlConfig_tv_user = { 'host': 'hostInterIp', # 'host': 'hostOuterIp', 'user': 'MysqlUser', 'passwd': 'MysqlPasswd', 'port': 50514, 'db': 'tv_user' } srcMysqlConfig_tv_seed = { 'host': 'hostInterIp', # 'host': 'hostOuterIp', 'user': 'MysqlUser', 'passwd': 'MysqlPasswd', 'port': 50029, 'db': 'tv_seed' } srcMysqlConfig_tv_event = { 'host': 'hostInterIp', # 'host': 'hostOuterIp', 'user': 'MysqlUser', 'passwd': 'MysqlPasswd', 'port': 50512, 'db': 'tv_event' } srcMysqlConfig_tv_hadoop_stat = { 'host': 'hostInterIp', # 'host': 'hostOuterIp', 'user': 'MysqlUser', 'passwd': 'MysqlPasswd', 'port': 6605, 'db': 'tv_hadoop_stat' } def getNowDay(): DayNow = datetime.datetime.today().strftime('%Y-%m-%d') return DayNow def getPreviousHour(): PreviousHour = (datetime.datetime.today() - datetime.timedelta(hours=1)).strftime('%Y-%m-%d %H') return PreviousHour def dateRange(beginDate, endDate): dates = [] dt = datetime.datetime.strptime(beginDate, "%Y-%m-%d") date = beginDate[:] while date <= endDate: dates.append(date) dt = dt + datetime.timedelta(1) date = dt.strftime("%Y-%m-%d") return dates def dateHourRange(beginDateHour, endDateHour): dateHours = [] dth = datetime.datetime.strptime(beginDateHour, "%Y-%m-%d %H") dateHour = beginDateHour[:] while dateHour <= endDateHour: dateHours.append(dateHour) dth = dth + datetime.timedelta(hours=1) dateHour = dth.strftime("%Y-%m-%d %H") return dateHours def getSrcMysqlConfig(srcMysql_config): srcMysql_config = srcMysql_config return srcMysql_config['host'], srcMysql_config['port'], srcMysql_config['user'], srcMysql_config['passwd'], srcMysql_config['db'] # print getSrcMysqlConfig(srcMysql_config=srcMysqlConfig_tv_server) # print getPreviousHour() # print dateRange(beginDate='2015-06-14', endDate='2017-05-23') # print dateHourRange(beginDateHour='2015-06-14 10', endDateHour='2015-06-23 11')
3、数据的增量装载处理
/Users/nisj/PycharmProjects/BiDataProc/Mysql2Hive-LiveHistory/MysqlData2Hive.py
# -*- coding=utf-8 -*- import os from ParProc import * warnings.filterwarnings("ignore") def mysqlDataDownload(srcMysql_config, src_tabName, tabType, runHour): # 参数初始化赋值 host = getSrcMysqlConfig(srcMysql_config)[0] port = getSrcMysqlConfig(srcMysql_config)[1] user = getSrcMysqlConfig(srcMysql_config)[2] passwd = getSrcMysqlConfig(srcMysql_config)[3] db = getSrcMysqlConfig(srcMysql_config)[4] PreviousHour = runHour if os.path.exists('/mnt/disk1/mysql2hive_tempdata/') == False: os.system('mkdir -p /mnt/disk1/mysql2hive_tempdata/') if tabType == 'submeter': os.system("rm -rf /mnt/disk1/mysql2hive_tempdata/oss_{src_tabName}.txt".format(src_tabName=src_tabName)) for submeterPlus in range(0, 256, 1): os.system("""source /etc/profile; \ /usr/bin/mysql -h{host} -P{port} -u{user} -p{passwd} -D{db} \ -N -e"set names utf8; \ select id,room_id,REPLACE(REPLACE(REPLACE(name,char(10),''),char(13),''),'\t','') name,live_id,live_status_id,game_id,REPLACE(REPLACE(REPLACE(game_name,char(10),''),char(13),''),'\t','') game_name,category_id,live_source_id,style,REPLACE(REPLACE(REPLACE(model_name,char(10),''),char(13),''),'\t','') model_name,screenshot,online_count,online_real_count,score,is_profession,xappkey,open_uid,live_tag_id,display,state,switch_time,created_time,updated_time from {db}.{src_tabName}_{submeterPlus} where substr(updated_time,1,13)='{PreviousHour}';" \ >>/mnt/disk1/mysql2hive_tempdata/oss_{src_tabName}.txt \ """.format(host=host, port=port, user=user, passwd=passwd, db=db, src_tabName=src_tabName, submeterPlus=submeterPlus, PreviousHour=PreviousHour)) def DataUploadHive(src_tabName, runHour): PreviousHour = runHour DayOfPreviousHour = runHour[:10] os.system("""source /etc/profile; \ /usr/lib/hive-current/bin/hive -e "alter table oss_live_history_status drop partition (pt_day='{DayOfPreviousHour}',pt_hour='{PreviousHour}'); alter table oss_live_history_status add partition (pt_day='{DayOfPreviousHour}',pt_hour='{PreviousHour}') location '{DayOfPreviousHour}/{PreviousHour}';load data local inpath '/mnt/disk1/mysql2hive_tempdata/oss_{src_tabName}.txt' overwrite into table oss_{src_tabName} partition (pt_day='{DayOfPreviousHour}',pt_hour='{PreviousHour}') ;" """.format(src_tabName=src_tabName, PreviousHour=PreviousHour, DayOfPreviousHour=DayOfPreviousHour)) os.system("rm -rf /mnt/disk1/mysql2hive_tempdata/oss_{src_tabName}.txt ".format(src_tabName=src_tabName)) def MysqlData2hive(srcMysql_config, src_tabName, tabType, runHour): mysqlDataDownload(srcMysql_config, src_tabName, tabType, runHour) DataUploadHive(src_tabName, runHour) # Batch Test MysqlData2hive(srcMysql_config=srcMysqlConfig_jellyfish_server, src_tabName='live_history_status', tabType='submeter', runHour=getPreviousHour()) # for runHour in dateHourRange(beginDateHour='2015-06-14 14', endDateHour='2017-05-25 08'): # MysqlData2hive(srcMysql_config=srcMysqlConfig_jellyfish_server, src_tabName='live_history_status', # tabType='submeter', runHour=runHour)
4、crontab定时调度
[hadoop@emr-worker-10 ~]$ crontab -l
9 * * * * /usr/bin/python /home/hadoop/nisj/Mysql2Hive-LiveHistory/MysqlData2Hive.py >> /home/hadoop/nisj/Mysql2Hive-LiveHistory/MysqlData2Hive_LiveHistory-by_hour.log 2>&1
5、说明
Mysql上的源表是由256个分表组成,数据一旦写入便不会更改。考虑到增量及数据回滚,Hive目标表考虑用使用分区的方式(一个小时相对太小,可以考虑一天或周进行数据提取)。
数据的传输相对根据表及实际情况是定制的,并不具有工具特性。
相关文章推荐
- Python自动化拉取Mysql数据并装载到Hive
- Python将Hive汇总数据装载到Mysql
- Python自动化拉取Mysql数据并装载到Hive(V2.0)
- Python自动化拉取Mysql数据并装载到Hive(V3.0)
- Python--增量循环删除MySQL表数据
- python脚本用sqoop把mysql数据导入hive数据仓库中
- Python增量循环删除MySQL表数据的方法
- 用Python在Mysql与Hive间数据计算一例(续)
- Python通过调用mysql存储过程实现更新数据功能示例
- 用Python调度数据在Mysql及Hive间进行计算2例(续)-idfa与日志比对进行留存充值数据计算
- python脚本用sqoop把mysql数据导入hive数据仓库中
- 用Python将json数据装载到mysql
- Python自动化拉取Mysql数据并装载到Oracle
- Sqoop增量从MySQL中向hive导入数据
- Python访问Mysql分表数据的方法示例
- 使用 sqoop从MySQL增量导出数据到hive
- 用Python调度数据在Mysql及Hive间进行计算2例
- Python自动化拉取Mysql数据并装载到Hive
- python脚本 用sqoop把mysql数据导入hive
- Python增量循环删除MySQL表数据的方法