您的位置:首页 > 编程语言 > Python开发

Python多线程调用Hive接口的MapReduce示例

2017-01-03 15:54 519 查看
1、单线程Hive调用
/Users/nisj/PycharmProjects/EsDataProc/bi-static/hive-user_appsource_detail.py
# -*- coding=utf-8 -*-
import warnings
import datetime
import time
import os

warnings.filterwarnings("ignore")

today = datetime.date.today()
yesterday = today - datetime.timedelta(days=1)
tomorrow = today + datetime.timedelta(days=1)

# batch_date = today - datetime.timedelta(days=52)

now_time = time.strftime('%Y-%m-%d %X', time.localtime())
print "当前时间是:",now_time

def user_register_appsource(batch_date):
os.system("""/usr/lib/hive-current/bin/hive -e " \
add jar /home/hadoop/bi/static_bygame/jar/hadoop_udf_radixChange.jar; \
create temporary function RadixChange as 'com.kascend.hadoop.RadixChange'; \
alter table bi_user_appsource drop if exists partition (pt_day='%s'); \
alter table bi_user_appsource add partition (pt_day='%s'); \
insert overwrite table bi_user_appsource partition(pt_day='%s') \
select a1.uid,a2.appsource,a1.created_time \
from (select uid,created_time from bi_user_profile where substr(created_time,1,10)='%s' ) a1 \
left join (select appkey,appsource,RadixChange(uid,16,10) uid \
from bi_all_access_log \
where appsource is not null and pt_day='%s' \
group by appkey,appsource,RadixChange(uid,16,10)) a2 on a1.uid=a2.uid \
; \
" \
""" % (batch_date, batch_date, batch_date, batch_date, batch_date))

def dateRange(beginDate, endDate):
dates = []
dt = datetime.datetime.strptime(beginDate, "%Y-%m-%d")
date = beginDate[:]
while date <= endDate:
dates.append(date)
dt = dt + datetime.timedelta(1)
date = dt.strftime("%Y-%m-%d")
return dates

if __name__ == '__main__':
date_list = dateRange("2015-05-27", "2016-12-28")
for batch_date in date_list:
print batch_date
user_register_appsource(batch_date)

now_time = time.strftime('%Y-%m-%d %X', time.localtime())
print "当前时间是:",now_time

2、多线程Hive调用
/Users/nisj/PycharmProjects/EsDataProc/bi-static/hive-user_appsource_detail_thread.py
# -*- coding=utf-8 -*-
import warnings
import datetime
import time
import os
import threadpool

warnings.filterwarnings("ignore")

today = datetime.date.today()
yesterday = today - datetime.timedelta(days=1)
tomorrow = today + datetime.timedelta(days=1)

# batch_date = today - datetime.timedelta(days=52)

now_time = time.strftime('%Y-%m-%d %X', time.localtime())
print "当前时间是:",now_time

def user_register_appsource(batch_date):
os.system("""/usr/lib/hive-current/bin/hive -e " \
add jar /home/hadoop/nisj/udf-jar/hadoop_udf_radixChange.jar; \
create temporary function RadixChange as 'com.kascend.hadoop.RadixChange'; \
alter table bi_user_uksi drop if exists partition (pt_day='%s'); \
alter table bi_user_uksi add partition (pt_day='%s'); \
insert overwrite table bi_user_uksi partition(pt_day='%s') \
select a1.uid,a2.identifier,a2.appkey,a2.appsource,a1.created_time \
from (select uid,created_time from bi_user_profile where substr(created_time,1,10)='%s' ) a1 \
left join (select appkey,appsource,RadixChange(uid,16,10) uid,identifier \
from bi_all_access_log \
where pt_day='%s' \
group by appkey,appsource,RadixChange(uid,16,10),identifier) a2 on a1.uid=a2.uid \
; \
" \
""" % (batch_date, batch_date, batch_date, batch_date, batch_date))

def dateRange(beginDate, endDate):
dates = []
dt = datetime.datetime.strptime(beginDate, "%Y-%m-%d")
date = beginDate[:]
while date <= endDate:
dates.append(date)
dt = dt + datetime.timedelta(1)
date = dt.strftime("%Y-%m-%d")
return dates

date_list = dateRange("2016-12-01", "2016-12-26")
requests = threadpool.makeRequests(user_register_appsource, date_list)
main_pool = threadpool.ThreadPool(4)
[main_pool.putRequest(req) for req in requests]
if __name__ == '__main__':
while True:
try:
time.sleep(3)
main_pool.poll()
except KeyboardInterrupt:
print("**** Interrupted!")
break
except threadpool.NoResultsPending:
break

if main_pool.dismissedWorkers:
print("Joining all dismissed worker threads...")
main_pool.joinAllDismissedWorkers()

now_time = time.strftime('%Y-%m-%d %X', time.localtime())
print "当前时间是:",now_time

3、用到的相关函数
生成日期列表的函数
def dateRange(beginDate, endDate):
dates = []
dt = datetime.datetime.strptime(beginDate, "%Y-%m-%d")
date = beginDate[:]
while date <= endDate:
dates.append(date)
dt = dt + datetime.timedelta(1)
date = dt.strftime("%Y-%m-%d")
return dates

进行进制转换的UDF(java实现)
package com.kascend.hadoop;
import java.util.Stack;
import org.apache.hadoop.hive.ql.exec.UDF;

public class RadixChange extends UDF{
/**
* 将数转为任意进制
* @param num
* @param base
* @return
*/
public static String baseString(int num,int base){
if(base > 16){
throw new RuntimeException("进制数超出范围,base<=16");
}
StringBuffer str = new StringBuffer("");
String digths = "0123456789ABCDEF";
Stack<Character> s = new Stack<Character>();
while(num != 0){
s.push(digths.charAt(num%base));
num/=base;
}
while(!s.isEmpty()){
str.append(s.pop());
}
return str.toString();
}
/**
* 16进制内任意进制转换
* @param num
* @param srcBase
* @param destBase
* @return
*/
public static String evaluate(String num,int srcBase,int destBase){
num=num.toUpperCase();
if(srcBase == destBase){
return num;
}
String digths = "0123456789ABCDEF";
char[] chars = num.toCharArray();
int len = chars.length;
if(destBase != 10){//目标进制不是十进制 先转化为十进制
num = evaluate(num,srcBase,10);
}else{
int n = 0;
for(int i = len - 1; i >=0; i--){
n+=digths.indexOf(chars[i])*Math.pow(srcBase, len - i - 1);
}
return n + "";
}
return baseString(Integer.valueOf(num),destBase);
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: