广告统计数据恢复改进及实现
2016-12-03 10:29
621 查看
最近在做广告效果数据统计优化相关的工作,在优化中发先现有的实时统计方法存在许多的缺点,下面做一下简单的分析:
2).按天统计应用、位置上的广告请求数、请求人数、曝光人数、曝光次数、点击人数、点击次数等
2).按小时统计应用、位置上的广告请求数、请求人数、曝光人数、曝光次数、点击人数、点击次数等
1).数据出错回复代价较大
该实现方案目前有一些不足之处,当统计业务出现异常的时候,这个时候如果消费MQ的线程是正常运行的,并且对数据进行了清洗写入了Hive库中,但在进计算的时候出现问题,此时虽然MQ数据已经入到Hive库中,但是任务异常并尝试多次后任务自动结束,当任务再次启动并消费MQ,此时消费的MQ中已经不包含上次的业务统计时间了 ,因此这段时间内的数据就丢失了计算,如果需要恢复这段时间的数据,就需要重新注册一个GROUP去消费MQ,但是重新注册GROUP消费MQ的时候,该GROUP会消费对应TOPIC所包含的历史数据,所以不管是恢复一分钟的数据、还是一天的数据,相对来说代价都是一样的。
2).数据量大的时候任务延时较为严重
可以从需求看出,按天统计广告效果数据的时候,因为要对广告位上的这一天的数据都要进行一次计算,也就是说落地的数据都要在Spark Streaming任务时间间隔每次一任务都要重新读取计算,虽然Spark擅长处理大批量的数据,但是在现有集群机器数量有限,任务较多,资源紧张的时候,处理大批量的数据就不可避免的出现任务延时等情况。
鉴于现方案在恢复数据的时候需要重新消费历史数据,这里采用离线的方式,不消费MQ直接对Hive库中的数据进行读取计算广告统计需求,按天和按小时统计计算逻辑是一致的,只是想要的数据粒度不一样,按天统计只需要对所有数据去重(去除重复上报的数据)根据应用和位置、内容源等进分组计算,人数根据imei、次数根据对应的事件进行区分计算即可,按小时计算的难点就是数据去重,然后把按把oper_time(毫秒级精度)转换成需要计算的粒度(这里统计粒度是十分钟)。
难点分析:
HQL中如果有group by语句那么select出的字段必须在group by 中出现、或者是聚合函数(sum/avg等),如果某些字段不想进行group,但又想在select中出现,Hive提供了collect_set(column)函数,该函数返回的是根据column去重的列表。因为去除重复上报的数据,现在采取的机制是:除了时间(oper_time)字段不一样以外,其他字段一样则去除重复的数据,所以oper_time不能出现在group字段中,但是计算的时候需要oper_time,这时候collect_set(column)函数,就可以使用上了,解决了去重的问题,剩下的一个难点就是把oper_time(毫秒级精度)转换成需要计算的粒度(这里统计粒度是十分钟)。这里使用floor()这个向下取整函数,毫秒级转换为精度为10分钟级别数据算法如下:
oper_time = floor(oper_time/1000/60/10)*60*10
公式解释:除以1000是转换成秒级别精度除以60是转换为分钟级别的粒度,除以10是转换为10分钟界别的,此时取整后就是10分钟级别的,但是相对于妙级别的数据长度,向下取整后数据缩小了600倍所以*600 ,这时候计算出来的数据就是我们需要的粒度为10分钟的时间数据。解决了上面两个难点后,就可以像我们平时写SQL一样根据条件去计算了。
问题2解决方案:
可以看出,是资源与数据量的冲突,如果集群资源充足,则多分配点内存、CPU则可以实时的计算统计,但如果资源不够则可以采用另一种方案,把实时的变为准实时,用离线的方式,半个小时或者一个小时运行一次,这样对机器的压力就小很多了。
需求描述:
按天统计广告效果数据:
1).按天统计应用、位置、内容源上的广告请求数、请求人数、曝光人数、曝光次数、点击人数、点击次数等2).按天统计应用、位置上的广告请求数、请求人数、曝光人数、曝光次数、点击人数、点击次数等
按小时统计广告效果数据:
1).按小时统计应用、位置、内容源上的广告请求数、请求人数、曝光人数、曝光次数、点击人数、点击次数等2).按小时统计应用、位置上的广告请求数、请求人数、曝光人数、曝光次数、点击人数、点击次数等
现有解决方案
目前广告效果统计采用Spark Streaming整合MQ做实时在线计算,实时消费MQ并按业务需求对数据进行清洗并存储到Hive库中,Spark Streaming 根据任务时间间隔,计算出所消费的MQ在这段时间范围内所包含的业务统计时间(oper_time),并根据统计时间从Hive库中读取出这段时间内的数据,按业务需求统计广告位上的请求数、请求人数、曝光、点击、下载等效果数据数据。1.现方案存在的问题
现有方案存在两个问题:1).数据出错回复代价较大
该实现方案目前有一些不足之处,当统计业务出现异常的时候,这个时候如果消费MQ的线程是正常运行的,并且对数据进行了清洗写入了Hive库中,但在进计算的时候出现问题,此时虽然MQ数据已经入到Hive库中,但是任务异常并尝试多次后任务自动结束,当任务再次启动并消费MQ,此时消费的MQ中已经不包含上次的业务统计时间了 ,因此这段时间内的数据就丢失了计算,如果需要恢复这段时间的数据,就需要重新注册一个GROUP去消费MQ,但是重新注册GROUP消费MQ的时候,该GROUP会消费对应TOPIC所包含的历史数据,所以不管是恢复一分钟的数据、还是一天的数据,相对来说代价都是一样的。
2).数据量大的时候任务延时较为严重
可以从需求看出,按天统计广告效果数据的时候,因为要对广告位上的这一天的数据都要进行一次计算,也就是说落地的数据都要在Spark Streaming任务时间间隔每次一任务都要重新读取计算,虽然Spark擅长处理大批量的数据,但是在现有集群机器数量有限,任务较多,资源紧张的时候,处理大批量的数据就不可避免的出现任务延时等情况。
2、基于现方案的改进及实现
问题1解决方案如下:鉴于现方案在恢复数据的时候需要重新消费历史数据,这里采用离线的方式,不消费MQ直接对Hive库中的数据进行读取计算广告统计需求,按天和按小时统计计算逻辑是一致的,只是想要的数据粒度不一样,按天统计只需要对所有数据去重(去除重复上报的数据)根据应用和位置、内容源等进分组计算,人数根据imei、次数根据对应的事件进行区分计算即可,按小时计算的难点就是数据去重,然后把按把oper_time(毫秒级精度)转换成需要计算的粒度(这里统计粒度是十分钟)。
难点分析:
HQL中如果有group by语句那么select出的字段必须在group by 中出现、或者是聚合函数(sum/avg等),如果某些字段不想进行group,但又想在select中出现,Hive提供了collect_set(column)函数,该函数返回的是根据column去重的列表。因为去除重复上报的数据,现在采取的机制是:除了时间(oper_time)字段不一样以外,其他字段一样则去除重复的数据,所以oper_time不能出现在group字段中,但是计算的时候需要oper_time,这时候collect_set(column)函数,就可以使用上了,解决了去重的问题,剩下的一个难点就是把oper_time(毫秒级精度)转换成需要计算的粒度(这里统计粒度是十分钟)。这里使用floor()这个向下取整函数,毫秒级转换为精度为10分钟级别数据算法如下:
oper_time = floor(oper_time/1000/60/10)*60*10
公式解释:除以1000是转换成秒级别精度除以60是转换为分钟级别的粒度,除以10是转换为10分钟界别的,此时取整后就是10分钟级别的,但是相对于妙级别的数据长度,向下取整后数据缩小了600倍所以*600 ,这时候计算出来的数据就是我们需要的粒度为10分钟的时间数据。解决了上面两个难点后,就可以像我们平时写SQL一样根据条件去计算了。
问题2解决方案:
可以看出,是资源与数据量的冲突,如果集群资源充足,则多分配点内存、CPU则可以实时的计算统计,但如果资源不够则可以采用另一种方案,把实时的变为准实时,用离线的方式,半个小时或者一个小时运行一次,这样对机器的压力就小很多了。
附件
完整脚本:!/usr/bin/env python coding=UTF-8 import os import sys import getopt import datetime import time ##执行命令 def execute_shell(db,hql): #将hql语句进行字符转义 hql = hql.replace("\"","'") #执行查询,并取得执行的状态和输出 cmd = HIVE_PATH+"hive -S -e\" use "+database";"+hql+"\"" status,output = commands.getstatusoutput(cmd) if status !=0: return None else: print "success" output = str(output).split("\n") return output def execute(db,yyyymmdd): sql_comment="from bdl_fdt_ad_ssp_log to adl_fdt_ad_ssp_log_hour_result" sql=""" insert overwrite table adl_fdt_ad_ssp_log_hour_result partition(stat_date=%(yyyymmdd)s) select api_type,mz_appid,s_mz_id, sum(case when oper_type = 'EXPOSURE' then 1 else 0 end) as expo_times, count(distinct case when oper_type = 'EXPOSURE' then s_imei else null end) as expo_peoples, sum(case when oper_type = 'DCLICK' then 1 else 0 end) as click_times, count(distinct case when oper_type = 'DCLICK' then s_imei else null end) as click_peoples, sum(case when oper_type = 'QUERY' then 1 else 0 end) as query_times, count(distinct case when oper_type = 'QUERY' then s_imei else null end) as query_peoples, sum(case when i_r 4000 esp_adcount>0 then 1 else 0 end) as i_resp_adcount, sum(case when oper_type = 'DOWNLOAD' then 1 else 0 end) as download_times, count(distinct case when oper_type = 'DOWNLOAD' then s_imei else null end) as download_peoples, sum(case when oper_type = 'DOWNLOAD_COMPLETED' then 1 else 0 end) as download_completed_times, count(distinct case when oper_type = 'DOWNLOAD_COMPLETED' then s_imei else null end) as download_completed_peoples, sum(case when oper_type = 'INSTALL_COMPLETED' then 1 else 0 end) as install_completed_times, count(distinct case when oper_type = 'INSTALL_COMPLETED' then s_imei else null end) as install_completed_peoples, sum(case when oper_type = 'CLOSE' then 1 else 0 end) as close_times, count(distinct case when oper_type = 'CLOSE' then s_imei else null end) as close_peoples, sum(case when oper_type = 'QUERY' and addition = 'true' then 1 else 0 end) as query_addition_times, sum(case when oper_type = 'EXPOSURE' and addition = 'true' then 1 else 0 end) as expo_addition_times, oper_time as stat_time from (select s_mz_id,s_imei,oper_type,api_type,mz_appid,s_request_id,i_resp_adcount,addition,stat_date,floor(collect_set(oper_time)[0]/600000)*600 oper_time from bdl_fdt_ad_ssp_log where stat_date = %(yyyymmdd)s group by s_mz_id,s_imei,oper_type,api_type,mz_appid,s_request_id,i_resp_adcount,addition,stat_date ) bdl where bdl.stat_date = %(yyyymmdd)s group by bdl.api_type,bdl.mz_appid,bdl.s_mz_id,oper_time union all select 0,mz_appid,s_mz_id, sum(case when oper_type = 'EXPOSURE' then 1 else 0 end) as expo_times, count(distinct case when oper_type = 'EXPOSURE' then s_imei else null end) as expo_peoples, sum(case when oper_type = 'DCLICK' then 1 else 0 end) as click_times, count(distinct case when oper_type = 'DCLICK' then s_imei else null end) as click_peoples, count(distinct case when oper_type = 'QUERY' then s_request_id else null end) as query_times, count(distinct case when oper_type = 'QUERY' then s_imei else null end) as query_peoples, if(count(distinct case when oper_type = 'QUERY' then s_request_id else null end)<sum(case when i_resp_adcount>0 then 1 else 0 end),count(distinct case when oper_type = 'QUERY' then s_request_id else null end),sum(case when i_resp_adcount>0 then 1 else 0 end)) as i_resp_adcount, sum(case when oper_type = 'DOWNLOAD' then 1 else 0 end) as download_times, count(distinct case when oper_type = 'DOWNLOAD' then s_imei else null end) as download_peoples, sum(case when oper_type = 'DOWNLOAD_COMPLETED' then 1 else 0 end) as download_completed_times, count(distinct case when oper_type = 'DOWNLOAD_COMPLETED' then s_imei else null end) as download_completed_peoples, sum(case when oper_type = 'INSTALL_COMPLETED' then 1 else 0 end) as install_completed_times, count(distinct case when oper_type = 'INSTALL_COMPLETED' then s_imei else null end) as install_completed_peoples, sum(case when oper_type = 'CLOSE' then 1 else 0 end) as close_times, count(distinct case when oper_type = 'CLOSE' then s_imei else null end) as close_peoples, sum(case when oper_type = 'QUERY' and addition = 'true' then 1 else 0 end) as query_addition_times, sum(case when oper_type = 'EXPOSURE' and addition = 'true' then 1 else 0 end) as expo_addition_times, oper_time as stat_time from (select s_mz_id,s_imei,oper_type,api_type,mz_appid,s_request_id,i_resp_adcount,addition,stat_date,floor(collect_set(oper_time)[0]/600000)*600 oper_time from bdl_fdt_ad_ssp_log where stat_date = %(yyyymmdd)s group by s_mz_id,s_imei,oper_type,api_type,mz_appid,s_request_id,i_resp_adcount,addition,stat_date ) bdl where bdl.stat_date = %(yyyymmdd)s group by bdl.mz_appid,bdl.s_mz_id,oper_time; """%{'yyyymmdd':yyyymmdd} execute_shell(db,hql) if __name__ == "__main__": #运行计算sql脚本 execute(db,20161203) ###数据原始表 CREATE TABLE `bdl_fdt_ad_ssp_log`( `s_mz_id` string, `s_imei` string, `oper_type` string, `api_type` string, `mz_appid` string, `oper_time` bigint, `s_request_id` string, `i_resp_adcount` bigint, `i_resp_code` bigint, `addition` string) PARTITIONED BY ( `stat_date` bigint) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION ###统计结果表 CREATE TABLE `adl_fdt_ad_ssp_log_hour_result`( `api_type` string, `mz_appid` string, `s_mz_id` string, `expo_times` bigint, `expo_peoples` bigint, `click_times` bigint, `click_peoples` bigint, `query_times` bigint, `query_peoples` bigint, `i_resp_adcount` bigint, `download_times` bigint, `download_peoples` bigint, `download_completed_times` bigint, `download_completed_peoples` bigint, `install_completed_times` bigint, `install_completed_peoples` bigint, `close_times` bigint, `close_peoples` bigint, `query_addition_times` bigint, `expo_addition_times` bigint, `stat_time` bigint) PARTITIONED BY ( `stat_date` bigint) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.RCFileInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.RCFileOutputFormat'
相关文章推荐
- 通过logmnr工具实现数据恢复
- 对CTreeCtrl数据加载方式的改进,实现代码的重用
- 应用jQuery实现表格数据的动态添加与统计
- MYSQL使用.frm恢复数据表结构的实现方法
- 应用jQuery实现表格数据的动态添加与统计
- DataTable用中使用Compute 实现简单的DataTable数据的统计.....
- oracle 实现按周,月,季度,年查询统计数据
- java代码实现mysql数据备份与恢复
- 采用一个自创的"验证框架"实现对数据实体的验证[改进篇]
- WAP联盟统计数据定时任务的实现
- [转] Linux AS4系统损坏后数据的恢复实现方法
- DataTable用中使用Compute 实现简单的DataTable数据的统计.....
- 用Jxl实现将统计数据导出到excel表中
- 用 union 将现有数据,按条件拆分,并同时做合计统计,最后实现数据条件分组统计。
- EDB PPAS/PostgreSQL异地容灾,并实现“0数据丢失”的灾难恢复
- 利用 GROUP BY 和 MAX 实现对表数据分组统计后的过滤查询(Oracle920)
- Delphi6使用TAdoDataSet来实现数据的备份与恢复
- 第二章 数据备份和恢复技术 --基于mkCDrec的核心网网络运维系统的备份和恢复的研究与实现
- 比较两种方法实现数据统计效果