用Python在Mysql与Hive间数据计算一例
2017-08-15 19:13
495 查看
1、自带粉丝的计算
功能说明:
从mysql中取房间号roomId,然后在Hive中计算其首播日期,将首播日期返插回到Mysql中;然后取Mysql中的roomId和首播日期,在Hive中计算主播的自带粉丝数据;最后,将自带粉丝数更新到Mysql中。
/Users/nisj/PycharmProjects/BiDataProc/Demand/Cc0810/ccQuery.py
2、<1>之前的一个版本
与1功能相同,但代码细节上因为功能逻辑的变更,稍有区别。<1>可能更规范合理一些,<2>可忽略。
/Users/nisj/PycharmProjects/BiDataProc/Demand/Cc0810/ccQuery0813.py
功能说明:
从mysql中取房间号roomId,然后在Hive中计算其首播日期,将首播日期返插回到Mysql中;然后取Mysql中的roomId和首播日期,在Hive中计算主播的自带粉丝数据;最后,将自带粉丝数更新到Mysql中。
/Users/nisj/PycharmProjects/BiDataProc/Demand/Cc0810/ccQuery.py
# -*- coding=utf-8 -*- import datetime import os import warnings import sys import re reload(sys) sys.setdefaultencoding('utf8') warnings.filterwarnings("ignore") yesterday = (datetime.date.today() - datetime.timedelta(days=1)).strftime('%Y-%m-%d') def get10DayRangeAndYesterday(first_rec_date): day10Start=(datetime.datetime.strptime(first_rec_date, '%Y-%m-%d') - datetime.timedelta(days=2)).strftime('%Y-%m-%d') day10End=(datetime.datetime.strptime(first_rec_date, '%Y-%m-%d') + datetime.timedelta(days=7)).strftime('%Y-%m-%d') yesterday = (datetime.date.today() - datetime.timedelta(days=1)).strftime('%Y-%m-%d') return day10Start, day10End, yesterday def getRoomId(): roomIds = os.popen("""source /etc/profile; \ /usr/bin/mysql -h199.199.199.199 -P6605 -umysqlUser -pmysqlPass -N -e "select room_id \ from jellyfish_hadoop_stat.invite_anchor \ where fans_count=0; \ " """).readlines(); roomId_list = [] for roomIdList in roomIds: roomId = re.split('\t', roomIdList.replace('\n', '')) roomId_list.append(roomId) return roomId_list def getFirstRecDatesAndUpdate2Mysql(): roomId_con='' for roomId in getRoomId(): roomId_con = roomId_con + ',' + roomId[0] roomId_con = roomId_con[1:] first_rec_dates=os.popen("""source /etc/profile; \ /usr/lib/hive-current/bin/hive -e "select room_id,switch_date from \ (select room_id,switch_date,cast(sum(hour(time_minus)*60+minute(time_minus)+second(time_minus)/60) as bigint) live_duration,row_number()over(partition by room_id order by switch_date) rk \ from ( \ select room_id,substr(switch_time,1,10) switch_date,cast(updated_time as timestamp)-cast(switch_time as timestamp) time_minus \ from xxx_live_history_status \ where room_id in({roomId_con})) x \ group by room_id,switch_date \ having cast(sum(hour(time_minus)*60+minute(time_minus)+second(time_minus)/60) as bigint)>=120) xx \ where rk=1; \ " """.format(roomId_con=roomId_con)).readlines(); first_rec_date_list = [] for first_rec_dateList in first_rec_dates: first_rec_date = re.split('\t', first_rec_dateList.replace('\n', '')) first_rec_date_list.append(first_rec_date) for first_rec_date in first_rec_date_list: roomId=first_rec_date[0] first_rec_date=first_rec_date[1] os.system("""source /etc/profile; \ /usr/bin/mysql -h199.199.199.199 -P6605 -umysqlUser -pmysqlPass -e "update jellyfish_hadoop_stat.invite_anchor \ set first_rec_date='{first_rec_date}' \ where room_id={roomId}; \ " """.format(roomId=roomId, first_rec_date=first_rec_date)); def getRoomIdAndFirstRecDates(): roomIds = os.popen("""source /etc/profile; \ /usr/bin/mysql -h199.199.199.199 -P6605 -umysqlUser -pmysqlPass -N -e "select room_id,substr(first_rec_date,1,10) first_rec_date \ from jellyfish_hadoop_stat.invite_anchor \ where fans_count=0 and first_rec_date<>'2099-12-30 23:59:59'; \ " """).readlines(); roomId_list = [] for roomIdList in roomIds: roomId = re.split('\t', roomIdList.replace('\n', '')) roomId_list.append(roomId) return roomId_list def getFansCntUpdate2Mysql(): # 自带粉丝数据的计算 for roomId, first_rec_date in getRoomIdAndFirstRecDates(): if first_rec_date <= (datetime.date.today() - datetime.timedelta(days=8)).strftime('%Y-%m-%d'): day10Start = get10DayRangeAndYesterday(first_rec_date=first_rec_date)[0] day10End = get10DayRangeAndYesterday(first_rec_date=first_rec_date)[1] yesterday = get10DayRangeAndYesterday(first_rec_date=first_rec_date)[2] fans_byself_cnts=os.popen("""source /etc/profile; \ /usr/lib/hive-current/bin/hive -e "with tab_user_frist_subscriber as (select room_id,uid view_uid,state,created_time \ from (select room_id,uid,state,created_time,row_number()over(partition by uid order by created_time) rk from oss_room_subscriber_roomid where pt_day='{yesterday}') x \ where rk=1 and room_id={roomId}), \ tab_newuser10days as(select uid,nickname,created_time,last_login_time \ from oss_chushou_user_profile \ where pt_day='{yesterday}' and substr(created_time,1,10) between '{day10Start}' and 'day10End') \ select a1.room_id,'{first_rec_date}' first_rec_date,count(distinct a2.uid) fans_byself_cnt \ from tab_user_frist_subscriber a1 \ inner join tab_newuser10days a2 on a1.view_uid=a2.uid \ group by a1.room_id; \ " """.format(day10Start=day10Start, day10End=day10End, yesterday=yesterday, roomId=roomId, first_rec_date=first_rec_date)).readlines(); fans_byself_cnt_list = [] for fans_byself_cntList in fans_byself_cnts: fans_byself_cnt = re.split('\t', fans_byself_cntList.replace('\n', '')) fans_byself_cnt_list.append(fans_byself_cnt) for fans_byself_cnt in fans_byself_cnt_list: roomId=fans_byself_cnt[0] fans_byself_cnt=fans_byself_cnt[2] os.system("""source /etc/profile; \ /usr/bin/mysql -h199.199.199.199 -P6605 -umysqlUser -pmysqlPass -e "update jellyfish_hadoop_stat.invite_anchor \ set fans_count={fans_byself_cnt} \ where room_id={roomId}; \ " """.format(roomId=roomId, fans_byself_cnt=fans_byself_cnt)); # Batch Test getFirstRecDatesAndUpdate2Mysql() getFansCntUpdate2Mysql()
2、<1>之前的一个版本
与1功能相同,但代码细节上因为功能逻辑的变更,稍有区别。<1>可能更规范合理一些,<2>可忽略。
/Users/nisj/PycharmProjects/BiDataProc/Demand/Cc0810/ccQuery0813.py
# -*- coding=utf-8 -*- import datetime import os import warnings import sys import re reload(sys) sys.setdefaultencoding('utf8') warnings.filterwarnings("ignore") yesterday = (datetime.date.today() - datetime.timedelta(days=1)).strftime('%Y-%m-%d') def get10DayRangeAndYesterday(first_rec_date): day10Start=(datetime.datetime.strptime(first_rec_date, '%Y-%m-%d') - datetime.timedelta(days=2)).strftime('%Y-%m-%d') day10End=(datetime.datetime.strptime(first_rec_date, '%Y-%m-%d') + datetime.timedelta(days=7)).strftime('%Y-%m-%d') yesterday = (datetime.date.today() - datetime.timedelta(days=1)).strftime('%Y-%m-%d') return day10Start, day10End, yesterday def getRoomId(): roomIds = os.popen("""source /etc/profile; \ /usr/bin/mysql -h199.199.199.199 -P6605 -umysqlUser -pmysqlPass -N -e "select room_id \ from jellyfish_hadoop_stat.invite_anchor \ where fans_count=0; \ " """).readlines(); roomId_list = [] for roomIdList in roomIds: roomId = re.split('\t', roomIdList.replace('\n', '')) roomId_list.append(roomId) return roomId_list def dataQueryUpdate2Mysql(): # 自带粉丝数据的计算 for roomId in getRoomId(): first_rec_dates=os.popen("""source /etc/profile; \ /usr/lib/hive-current/bin/hive -e "select room_id roomid,switch_date from ( \ select room_id,substr(switch_time,1,10) switch_date,cast(updated_time as timestamp)-cast(switch_time as timestamp) time_interval,row_number()over(partition by room_id order by switch_time) rk \ from xxx_live_history_status \ where room_id={roomId} and hour(cast(updated_time as timestamp)-cast(switch_time as timestamp))>=2 ) x \ where rk=1; \ " """.format(roomId=roomId[0])).readlines(); first_rec_date_list = [] for first_rec_dateList in first_rec_dates: first_rec_date = re.split('\t', first_rec_dateList.replace('\n', '')) first_rec_date_list.append(first_rec_date) for first_rec_date in first_rec_date_list: if first_rec_date[1] <= (datetime.date.today() - datetime.timedelta(days=8)).strftime('%Y-%m-%d'): day10Start = get10DayRangeAndYesterday(first_rec_date=first_rec_date[1])[0] day10End = get10DayRangeAndYesterday(first_rec_date=first_rec_date[1])[1] yesterday = get10DayRangeAndYesterday(first_rec_date=first_rec_date[1])[2] fans_byself_cnts=os.popen("""source /etc/profile; \ /usr/lib/hive-current/bin/hive -e "with tab_user_frist_subscriber as (select room_id,uid view_uid,state,created_time \ from (select room_id,uid,state,created_time,row_number()over(partition by uid order by created_time) rk from oss_room_subscriber_roomid where pt_day='{yesterday}') x \ where rk=1 and room_id={roomId}), \ tab_newuser10days as(select uid,nickname,created_time,last_login_time \ from oss_chushou_user_profile \ where pt_day='{yesterday}' and substr(created_time,1,10) between '{day10Start}' and 'day10End') \ select a1.room_id,'{first_rec_date}' first_rec_date,count(distinct a2.uid) fans_byself_cnt \ from tab_user_frist_subscriber a1 \ inner join tab_newuser10days a2 on a1.view_uid=a2.uid \ group by a1.room_id; \ " """.format(day10Start=day10Start, day10End=day10End, yesterday=yesterday, roomId=roomId[0], first_rec_date=first_rec_date[1])).readlines(); fans_byself_cnt_list = [] for fans_byself_cntList in fans_byself_cnts: fans_byself_cnt = re.split('\t', fans_byself_cntList.replace('\n', '')) fans_byself_cnt_list.append(fans_byself_cnt) for fans_byself_cnt in fans_byself_cnt_list: roomId=fans_byself_cnt[0] first_rec_date=fans_byself_cnt[1] fans_byself_cnt=fans_byself_cnt[2] os.system("""source /etc/profile; \ /usr/bin/mysql -h199.199.199.199 -P6605 -umysqlUser -pmysqlPass -e "update jellyfish_hadoop_stat.invite_anchor \ set fans_count={fans_byself_cnt},first_rec_date='{first_rec_date}' \ where room_id={roomId}; \ " """.format(roomId=roomId, fans_byself_cnt=fans_byself_cnt, first_rec_date=first_rec_date)); # Batch Test dataQueryUpdate2Mysql()
相关文章推荐
- 用Python在Mysql与Hive间数据计算一例(续)
- 用Python调度数据在Mysql及Hive间进行计算2例
- 用Python调度数据在Mysql及Hive间进行计算2例(续)-idfa与日志比对进行留存充值数据计算
- Hadoop数据经Hive汇总计算之后导出到Mysql
- Python将Mysql分表数据按小时增量装载到Hive示例
- python脚本用sqoop把mysql数据导入hive数据仓库中
- python脚本用sqoop把mysql数据导入hive数据仓库中
- Python自动化拉取Mysql数据并装载到Hive(V3.0)
- python脚本 用sqoop把mysql数据导入hive
- Python将Hive汇总数据装载到Mysql
- python脚本用sqoop把mysql数据导入hive数据仓库中
- Python自动化拉取Mysql数据并装载到Hive(V2.0)
- Python自动化拉取Mysql数据并装载到Hive
- 使用Python脚本从Hive中取数据计算后加载到Mysql示例
- 使用Python3 xlrd pymysql 实现读取Excel数据读取以及mysql存储
- 配置MySQL5.6.15存储Hive-0.11.0元数据
- Sqoop-1.4.6安装配置及Mysql->HDFS->Hive数据导入(基于Hadoop2.7.3)
- python爬取微博图片数据存到Mysql中遇到的各种坑\python Mysql存储图片\python爬取微博图片
- python 向MySQL里插入中文数据
- 在Python中对MySQL中的数据进行可视化