您的位置:首页 > 数据库 > SQL

sqoop迁移mysql数据到hive中

2018-01-29 17:31 531 查看
一、先创建sqoop job,使用增量导入,python脚本如下:

#!/usr/bin/python

import os

import sys

import glob

import time

import datetime

##import all row fields --check-column create_time

def job_group0(name):
cmd = 'sqoop job --create %s -- import -m 1 --connect "jdbc:mysql://192.168.76.12:3306/smart_hardware?useSSL=false&user=phidoop&password=phidoop" --table %s --where "create_time < current_date()" --hive-import --hive-database
phi_health --hive-table %s --incremental append --check-column create_time --last-value \'1900-01-01\' ' % (name,name,name)
run = os.system(cmd)

##import all row fields --check-column create_time

def job_group1(num1,num2,name):
for i in range(num1,num2):
cmd = 'sqoop job --create %s_%s -- import -m 1 --connect "jdbc:mysql://192.168.76.12:3306/smart_hardware?useSSL=false&user=phidoop&password=phidoop" --table %s_%s --where "create_time < current_date()" --hive-import --hive-database
phi_health --hive-table %s --incremental append --check-column create_time --last-value \'1900-01-01\' ' % (name,i,name,i,name)
print cmd
run = os.system(cmd)

##import all row fields --check-column date

def job_group2(name):
cmd = 'sqoop job --create %s -- import -m 1 --connect "jdbc:mysql://192.168.76.12:3306/smart_hardware?useSSL=false&user=phidoop&password=phidoop" --table %s --where "date < current_date()" --hive-import --hive-database phi_health
--hive-table %s --incremental append --check-column date --last-value \'1900-01-01\' ' % (name,name,name)
run = os.system(cmd)

##import all row fields --check-column date

def job_group3(num1,num2,name):
for i in range(num1,num2):
cmd = 'sqoop job --create %s_%s -- import -m 1 --connect "jdbc:mysql://192.168.76.12:3306/smart_hardware?useSSL=false&user=phidoop&password=phidoop" --table %s_%s --where "date < current_date()" --hive-import --hive-database
phi_health --hive-table %s --incremental append --check-column date --last-value \'1900-01-01\' ' % (name,i,name,i,name)
print cmd
run = os.system(cmd)

if __name__=="__main__":

job_group0("balance_mac_manger_info")
job_group0("balance_measure_info")
job_group1(0,5,"balance_mac_measure_info")
job_group1(0,20,"blood_pressure_measure_info")
job_group1(0,50,"balance_measure_info")
job_group2("user_body_info")
job_group3(0,10,"user_body_info")

二、执行sqoop job的脚本job_exec.py如下:

#!/usr/bin/python

import os

import sys

import glob

import time

import datetime

def job_exec_group0(name):
cmd = 'sqoop job --exec %s' % (name)

##import all row fields

def job_exec_group1(num1,num2,name):
for i in range(num1,num2):
cmd = 'sqoop job --exec %s_%s' % (name,i)
run = os.system(cmd)

if __name__=="__main__":
job_exec_group0("balance_mac_manger_info")
job_exec_group0("balance_measure_info")
job_exec_group0("user_body_info")
job_exec_group1(0,5,"balance_mac_measure_info")
job_exec_group1(0,20,"blood_pressure_measure_info")
job_exec_group1(0,10,"user_body_info")
job_exec_group1(0,50,"balance_measure_info")

三、将job_exec.py写进定时脚本:

30 1 * * * nohup /var/lib/hadoop-hdfs/sqoop/job_exec.py &

每天凌晨1点半自动执行
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: