Python多线程调用Hive接口的MapReduce示例
2017-01-03 15:54
519 查看
1、单线程Hive调用
/Users/nisj/PycharmProjects/EsDataProc/bi-static/hive-user_appsource_detail.py
2、多线程Hive调用
/Users/nisj/PycharmProjects/EsDataProc/bi-static/hive-user_appsource_detail_thread.py
3、用到的相关函数
生成日期列表的函数
进行进制转换的UDF(java实现)
/Users/nisj/PycharmProjects/EsDataProc/bi-static/hive-user_appsource_detail.py
# -*- coding=utf-8 -*- import warnings import datetime import time import os warnings.filterwarnings("ignore") today = datetime.date.today() yesterday = today - datetime.timedelta(days=1) tomorrow = today + datetime.timedelta(days=1) # batch_date = today - datetime.timedelta(days=52) now_time = time.strftime('%Y-%m-%d %X', time.localtime()) print "当前时间是:",now_time def user_register_appsource(batch_date): os.system("""/usr/lib/hive-current/bin/hive -e " \ add jar /home/hadoop/bi/static_bygame/jar/hadoop_udf_radixChange.jar; \ create temporary function RadixChange as 'com.kascend.hadoop.RadixChange'; \ alter table bi_user_appsource drop if exists partition (pt_day='%s'); \ alter table bi_user_appsource add partition (pt_day='%s'); \ insert overwrite table bi_user_appsource partition(pt_day='%s') \ select a1.uid,a2.appsource,a1.created_time \ from (select uid,created_time from bi_user_profile where substr(created_time,1,10)='%s' ) a1 \ left join (select appkey,appsource,RadixChange(uid,16,10) uid \ from bi_all_access_log \ where appsource is not null and pt_day='%s' \ group by appkey,appsource,RadixChange(uid,16,10)) a2 on a1.uid=a2.uid \ ; \ " \ """ % (batch_date, batch_date, batch_date, batch_date, batch_date)) def dateRange(beginDate, endDate): dates = [] dt = datetime.datetime.strptime(beginDate, "%Y-%m-%d") date = beginDate[:] while date <= endDate: dates.append(date) dt = dt + datetime.timedelta(1) date = dt.strftime("%Y-%m-%d") return dates if __name__ == '__main__': date_list = dateRange("2015-05-27", "2016-12-28") for batch_date in date_list: print batch_date user_register_appsource(batch_date) now_time = time.strftime('%Y-%m-%d %X', time.localtime()) print "当前时间是:",now_time
2、多线程Hive调用
/Users/nisj/PycharmProjects/EsDataProc/bi-static/hive-user_appsource_detail_thread.py
# -*- coding=utf-8 -*- import warnings import datetime import time import os import threadpool warnings.filterwarnings("ignore") today = datetime.date.today() yesterday = today - datetime.timedelta(days=1) tomorrow = today + datetime.timedelta(days=1) # batch_date = today - datetime.timedelta(days=52) now_time = time.strftime('%Y-%m-%d %X', time.localtime()) print "当前时间是:",now_time def user_register_appsource(batch_date): os.system("""/usr/lib/hive-current/bin/hive -e " \ add jar /home/hadoop/nisj/udf-jar/hadoop_udf_radixChange.jar; \ create temporary function RadixChange as 'com.kascend.hadoop.RadixChange'; \ alter table bi_user_uksi drop if exists partition (pt_day='%s'); \ alter table bi_user_uksi add partition (pt_day='%s'); \ insert overwrite table bi_user_uksi partition(pt_day='%s') \ select a1.uid,a2.identifier,a2.appkey,a2.appsource,a1.created_time \ from (select uid,created_time from bi_user_profile where substr(created_time,1,10)='%s' ) a1 \ left join (select appkey,appsource,RadixChange(uid,16,10) uid,identifier \ from bi_all_access_log \ where pt_day='%s' \ group by appkey,appsource,RadixChange(uid,16,10),identifier) a2 on a1.uid=a2.uid \ ; \ " \ """ % (batch_date, batch_date, batch_date, batch_date, batch_date)) def dateRange(beginDate, endDate): dates = [] dt = datetime.datetime.strptime(beginDate, "%Y-%m-%d") date = beginDate[:] while date <= endDate: dates.append(date) dt = dt + datetime.timedelta(1) date = dt.strftime("%Y-%m-%d") return dates date_list = dateRange("2016-12-01", "2016-12-26") requests = threadpool.makeRequests(user_register_appsource, date_list) main_pool = threadpool.ThreadPool(4) [main_pool.putRequest(req) for req in requests] if __name__ == '__main__': while True: try: time.sleep(3) main_pool.poll() except KeyboardInterrupt: print("**** Interrupted!") break except threadpool.NoResultsPending: break if main_pool.dismissedWorkers: print("Joining all dismissed worker threads...") main_pool.joinAllDismissedWorkers() now_time = time.strftime('%Y-%m-%d %X', time.localtime()) print "当前时间是:",now_time
3、用到的相关函数
生成日期列表的函数
def dateRange(beginDate, endDate): dates = [] dt = datetime.datetime.strptime(beginDate, "%Y-%m-%d") date = beginDate[:] while date <= endDate: dates.append(date) dt = dt + datetime.timedelta(1) date = dt.strftime("%Y-%m-%d") return dates
进行进制转换的UDF(java实现)
package com.kascend.hadoop; import java.util.Stack; import org.apache.hadoop.hive.ql.exec.UDF; public class RadixChange extends UDF{ /** * 将数转为任意进制 * @param num * @param base * @return */ public static String baseString(int num,int base){ if(base > 16){ throw new RuntimeException("进制数超出范围,base<=16"); } StringBuffer str = new StringBuffer(""); String digths = "0123456789ABCDEF"; Stack<Character> s = new Stack<Character>(); while(num != 0){ s.push(digths.charAt(num%base)); num/=base; } while(!s.isEmpty()){ str.append(s.pop()); } return str.toString(); } /** * 16进制内任意进制转换 * @param num * @param srcBase * @param destBase * @return */ public static String evaluate(String num,int srcBase,int destBase){ num=num.toUpperCase(); if(srcBase == destBase){ return num; } String digths = "0123456789ABCDEF"; char[] chars = num.toCharArray(); int len = chars.length; if(destBase != 10){//目标进制不是十进制 先转化为十进制 num = evaluate(num,srcBase,10); }else{ int n = 0; for(int i = len - 1; i >=0; i--){ n+=digths.indexOf(chars[i])*Math.pow(srcBase, len - i - 1); } return n + ""; } return baseString(Integer.valueOf(num),destBase); } }
相关文章推荐
- C++多线程中调用python api函数
- python下调用淘宝top接口例子
- 关于python调用zabbix api接口的自动化实例 [结合saltstack] 推荐
- python调用支付宝支付接口例子
- 用ladon框架封装Python为Webservice接口以及调用接口的方法
- C++多线程中调用python api函数
- Python调用第三方接口实现nagios短信报警 推荐
- python 实现redis 操作API 接口,提供其他程序调用
- Python调用飞信接口发送短信
- Linux使用Python调用C/C++接口
- python下调用淘宝top接口例子
- Python调用远程Socket接口
- boost.python,在python中调用c++接口注意的问题(python2.5.4)
- Python调用飞信接口发送短信
- c++调用python函数接口
- C++多线程中调用python api函数
- Python调用C/C++的接口(Boost)
- python 调用dll中参数为BYTE的函数(注意接口定义的格式)
- python调用sendcloud接口实现邮件批量发送收取及url回调 推荐
- 深入学习python (七) 如何用python调用C/C++接口