您的位置:首页 > 编程语言 > Python开发

Python将Mysql分表数据按小时增量装载到Hive示例

2017-05-26 11:32 441 查看
1、Hive建存储在阿里云OSS上的外部目标表
hive> show create table oss_live_history_status;
OK
CREATE EXTERNAL TABLE `oss_live_history_status`(
`id` bigint,
`room_id` bigint,
`name` string,
`live_id` string,
`live_status_id` bigint,
`game_id` bigint,
`game_name` string,
`category_id` bigint,
`live_source_id` bigint,
`style` bigint,
`model_name` string,
`screenshot` string,
`online_count` bigint,
`online_real_count` bigint,
`score` bigint,
`is_profession` bigint,
`xappkey` string,
`open_uid` string,
`live_tag_id` bigint,
`display` bigint,
`state` bigint,
`switch_time` string,
`created_time` string,
`updated_time` string)
PARTITIONED BY (
`pt_day` string,
`pt_hour` string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
'field.delim'='\t',
'line.delim'='\n',
'serialization.format'='\t')
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
'oss://LTAIJMz:YZI5W80risJgio7@tv-hz.oss-cn-hangzhou-internal.aliyuncs.com/hive/oss_live_history_status'
TBLPROPERTIES (
'transient_lastDdlTime'='1495014665')
Time taken: 0.076 seconds, Fetched: 42 row(s)

2、参数处理(Mysql目标库配置信息及取小时等处理)
/Users/nisj/PycharmProjects/BiDataProc/Mysql2Hive-LiveHistory/ParProc.py
# -*- coding=utf-8 -*-
import warnings
import datetime

warnings.filterwarnings("ignore")

# src Database config
srcMysqlConfig_tv_server = {
'host': 'hostInterIp',
# 'host': 'hostOuterIp',
'user': 'MysqlUser',
'passwd': 'MysqlPasswd',
'port': 50506,
'db': 'tv_server'
}

srcMysqlConfig_tv_user = {
'host': 'hostInterIp',
# 'host': 'hostOuterIp',
'user': 'MysqlUser',
'passwd': 'MysqlPasswd',
'port': 50514,
'db': 'tv_user'
}

srcMysqlConfig_tv_seed = {
'host': 'hostInterIp',
# 'host': 'hostOuterIp',
'user': 'MysqlUser',
'passwd': 'MysqlPasswd',
'port': 50029,
'db': 'tv_seed'
}

srcMysqlConfig_tv_event = {
'host': 'hostInterIp',
# 'host': 'hostOuterIp',
'user': 'MysqlUser',
'passwd': 'MysqlPasswd',
'port': 50512,
'db': 'tv_event'
}

srcMysqlConfig_tv_hadoop_stat = {
'host': 'hostInterIp',
# 'host': 'hostOuterIp',
'user': 'MysqlUser',
'passwd': 'MysqlPasswd',
'port': 6605,
'db': 'tv_hadoop_stat'
}

def getNowDay():
DayNow = datetime.datetime.today().strftime('%Y-%m-%d')
return DayNow

def getPreviousHour():
PreviousHour = (datetime.datetime.today() - datetime.timedelta(hours=1)).strftime('%Y-%m-%d %H')
return PreviousHour

def dateRange(beginDate, endDate):
dates = []
dt = datetime.datetime.strptime(beginDate, "%Y-%m-%d")
date = beginDate[:]
while date <= endDate:
dates.append(date)
dt = dt + datetime.timedelta(1)
date = dt.strftime("%Y-%m-%d")
return dates

def dateHourRange(beginDateHour, endDateHour):
dateHours = []
dth = datetime.datetime.strptime(beginDateHour, "%Y-%m-%d %H")
dateHour = beginDateHour[:]
while dateHour <= endDateHour:
dateHours.append(dateHour)
dth = dth + datetime.timedelta(hours=1)
dateHour = dth.strftime("%Y-%m-%d %H")
return dateHours

def getSrcMysqlConfig(srcMysql_config):
srcMysql_config = srcMysql_config
return srcMysql_config['host'], srcMysql_config['port'], srcMysql_config['user'], srcMysql_config['passwd'], srcMysql_config['db']

# print getSrcMysqlConfig(srcMysql_config=srcMysqlConfig_tv_server)
# print getPreviousHour()
# print dateRange(beginDate='2015-06-14', endDate='2017-05-23')
# print dateHourRange(beginDateHour='2015-06-14 10', endDateHour='2015-06-23 11')

3、数据的增量装载处理
/Users/nisj/PycharmProjects/BiDataProc/Mysql2Hive-LiveHistory/MysqlData2Hive.py
# -*- coding=utf-8 -*-
import os
from ParProc import *

warnings.filterwarnings("ignore")

def mysqlDataDownload(srcMysql_config, src_tabName, tabType, runHour):
# 参数初始化赋值
host = getSrcMysqlConfig(srcMysql_config)[0]
port = getSrcMysqlConfig(srcMysql_config)[1]
user = getSrcMysqlConfig(srcMysql_config)[2]
passwd = getSrcMysqlConfig(srcMysql_config)[3]
db = getSrcMysqlConfig(srcMysql_config)[4]
PreviousHour = runHour

if os.path.exists('/mnt/disk1/mysql2hive_tempdata/') == False:
os.system('mkdir -p /mnt/disk1/mysql2hive_tempdata/')

if tabType == 'submeter':
os.system("rm -rf /mnt/disk1/mysql2hive_tempdata/oss_{src_tabName}.txt".format(src_tabName=src_tabName))
for submeterPlus in range(0, 256, 1):
os.system("""source /etc/profile; \
/usr/bin/mysql -h{host} -P{port} -u{user} -p{passwd} -D{db} \
-N -e"set names utf8; \
select id,room_id,REPLACE(REPLACE(REPLACE(name,char(10),''),char(13),''),'\t','') name,live_id,live_status_id,game_id,REPLACE(REPLACE(REPLACE(game_name,char(10),''),char(13),''),'\t','') game_name,category_id,live_source_id,style,REPLACE(REPLACE(REPLACE(model_name,char(10),''),char(13),''),'\t','') model_name,screenshot,online_count,online_real_count,score,is_profession,xappkey,open_uid,live_tag_id,display,state,switch_time,created_time,updated_time from {db}.{src_tabName}_{submeterPlus} where substr(updated_time,1,13)='{PreviousHour}';" \
>>/mnt/disk1/mysql2hive_tempdata/oss_{src_tabName}.txt \
""".format(host=host, port=port, user=user, passwd=passwd, db=db, src_tabName=src_tabName, submeterPlus=submeterPlus, PreviousHour=PreviousHour))

def DataUploadHive(src_tabName, runHour):
PreviousHour = runHour
DayOfPreviousHour = runHour[:10]
os.system("""source /etc/profile; \
/usr/lib/hive-current/bin/hive -e "alter table oss_live_history_status drop partition (pt_day='{DayOfPreviousHour}',pt_hour='{PreviousHour}'); alter table oss_live_history_status add partition (pt_day='{DayOfPreviousHour}',pt_hour='{PreviousHour}') location '{DayOfPreviousHour}/{PreviousHour}';load data local inpath '/mnt/disk1/mysql2hive_tempdata/oss_{src_tabName}.txt' overwrite into table oss_{src_tabName} partition (pt_day='{DayOfPreviousHour}',pt_hour='{PreviousHour}') ;" """.format(src_tabName=src_tabName, PreviousHour=PreviousHour, DayOfPreviousHour=DayOfPreviousHour))
os.system("rm -rf /mnt/disk1/mysql2hive_tempdata/oss_{src_tabName}.txt ".format(src_tabName=src_tabName))

def MysqlData2hive(srcMysql_config, src_tabName, tabType, runHour):
mysqlDataDownload(srcMysql_config, src_tabName, tabType, runHour)
DataUploadHive(src_tabName, runHour)

# Batch Test
MysqlData2hive(srcMysql_config=srcMysqlConfig_jellyfish_server, src_tabName='live_history_status', tabType='submeter', runHour=getPreviousHour())
# for runHour in dateHourRange(beginDateHour='2015-06-14 14', endDateHour='2017-05-25 08'):
#     MysqlData2hive(srcMysql_config=srcMysqlConfig_jellyfish_server, src_tabName='live_history_status',
#                    tabType='submeter', runHour=runHour)

4、crontab定时调度
[hadoop@emr-worker-10 ~]$ crontab -l
9 * * * * /usr/bin/python /home/hadoop/nisj/Mysql2Hive-LiveHistory/MysqlData2Hive.py >> /home/hadoop/nisj/Mysql2Hive-LiveHistory/MysqlData2Hive_LiveHistory-by_hour.log 2>&1


5、说明
Mysql上的源表是由256个分表组成,数据一旦写入便不会更改。考虑到增量及数据回滚,Hive目标表考虑用使用分区的方式(一个小时相对太小,可以考虑一天或周进行数据提取)。

数据的传输相对根据表及实际情况是定制的,并不具有工具特性。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: