您的位置:首页 > 其它

广告统计数据恢复改进及实现

2016-12-03 10:29 621 查看
最近在做广告效果数据统计优化相关的工作,在优化中发先现有的实时统计方法存在许多的缺点,下面做一下简单的分析:

需求描述:

按天统计广告效果数据:

1).按天统计应用、位置、内容源上的广告请求数、请求人数、曝光人数、曝光次数、点击人数、点击次数等

2).按天统计应用、位置上的广告请求数、请求人数、曝光人数、曝光次数、点击人数、点击次数等

按小时统计广告效果数据:

1).按小时统计应用、位置、内容源上的广告请求数、请求人数、曝光人数、曝光次数、点击人数、点击次数等

2).按小时统计应用、位置上的广告请求数、请求人数、曝光人数、曝光次数、点击人数、点击次数等

现有解决方案

目前广告效果统计采用Spark Streaming整合MQ做实时在线计算,实时消费MQ并按业务需求对数据进行清洗并存储到Hive库中,Spark Streaming 根据任务时间间隔,计算出所消费的MQ在这段时间范围内所包含的业务统计时间(oper_time),并根据统计时间从Hive库中读取出这段时间内的数据,按业务需求统计广告位上的请求数、请求人数、曝光、点击、下载等效果数据数据。

1.现方案存在的问题

现有方案存在两个问题:

1).数据出错回复代价较大

该实现方案目前有一些不足之处,当统计业务出现异常的时候,这个时候如果消费MQ的线程是正常运行的,并且对数据进行了清洗写入了Hive库中,但在进计算的时候出现问题,此时虽然MQ数据已经入到Hive库中,但是任务异常并尝试多次后任务自动结束,当任务再次启动并消费MQ,此时消费的MQ中已经不包含上次的业务统计时间了 ,因此这段时间内的数据就丢失了计算,如果需要恢复这段时间的数据,就需要重新注册一个GROUP去消费MQ,但是重新注册GROUP消费MQ的时候,该GROUP会消费对应TOPIC所包含的历史数据,所以不管是恢复一分钟的数据、还是一天的数据,相对来说代价都是一样的。

2).数据量大的时候任务延时较为严重

可以从需求看出,按天统计广告效果数据的时候,因为要对广告位上的这一天的数据都要进行一次计算,也就是说落地的数据都要在Spark Streaming任务时间间隔每次一任务都要重新读取计算,虽然Spark擅长处理大批量的数据,但是在现有集群机器数量有限,任务较多,资源紧张的时候,处理大批量的数据就不可避免的出现任务延时等情况。

2、基于现方案的改进及实现

问题1解决方案如下:

鉴于现方案在恢复数据的时候需要重新消费历史数据,这里采用离线的方式,不消费MQ直接对Hive库中的数据进行读取计算广告统计需求,按天和按小时统计计算逻辑是一致的,只是想要的数据粒度不一样,按天统计只需要对所有数据去重(去除重复上报的数据)根据应用和位置、内容源等进分组计算,人数根据imei、次数根据对应的事件进行区分计算即可,按小时计算的难点就是数据去重,然后把按把oper_time(毫秒级精度)转换成需要计算的粒度(这里统计粒度是十分钟)。

难点分析:

HQL中如果有group by语句那么select出的字段必须在group by 中出现、或者是聚合函数(sum/avg等),如果某些字段不想进行group,但又想在select中出现,Hive提供了collect_set(column)函数,该函数返回的是根据column去重的列表。因为去除重复上报的数据,现在采取的机制是:除了时间(oper_time)字段不一样以外,其他字段一样则去除重复的数据,所以oper_time不能出现在group字段中,但是计算的时候需要oper_time,这时候collect_set(column)函数,就可以使用上了,解决了去重的问题,剩下的一个难点就是把oper_time(毫秒级精度)转换成需要计算的粒度(这里统计粒度是十分钟)。这里使用floor()这个向下取整函数,毫秒级转换为精度为10分钟级别数据算法如下:

oper_time = floor(oper_time/1000/60/10)*60*10

公式解释:除以1000是转换成秒级别精度除以60是转换为分钟级别的粒度,除以10是转换为10分钟界别的,此时取整后就是10分钟级别的,但是相对于妙级别的数据长度,向下取整后数据缩小了600倍所以*600 ,这时候计算出来的数据就是我们需要的粒度为10分钟的时间数据。解决了上面两个难点后,就可以像我们平时写SQL一样根据条件去计算了。

问题2解决方案:

可以看出,是资源与数据量的冲突,如果集群资源充足,则多分配点内存、CPU则可以实时的计算统计,但如果资源不够则可以采用另一种方案,把实时的变为准实时,用离线的方式,半个小时或者一个小时运行一次,这样对机器的压力就小很多了。

附件

完整脚本:

!/usr/bin/env python
coding=UTF-8

import os
import sys
import getopt

import datetime
import time

##执行命令
def execute_shell(db,hql):
#将hql语句进行字符转义
hql = hql.replace("\"","'")
#执行查询,并取得执行的状态和输出
cmd = HIVE_PATH+"hive -S -e\" use "+database";"+hql+"\""
status,output = commands.getstatusoutput(cmd)
if status !=0:
return None
else:
print "success"
output = str(output).split("\n")
return output

def execute(db,yyyymmdd):

sql_comment="from bdl_fdt_ad_ssp_log to adl_fdt_ad_ssp_log_hour_result"
sql="""
insert overwrite table adl_fdt_ad_ssp_log_hour_result partition(stat_date=%(yyyymmdd)s)
select
api_type,mz_appid,s_mz_id,
sum(case when oper_type = 'EXPOSURE'  then 1 else 0 end) as expo_times,
count(distinct case when oper_type = 'EXPOSURE' then s_imei else null end) as expo_peoples,
sum(case when oper_type = 'DCLICK'  then 1 else 0 end) as click_times,
count(distinct case when oper_type = 'DCLICK' then s_imei else null end) as click_peoples,
sum(case when oper_type = 'QUERY'  then 1 else 0 end) as query_times,
count(distinct case when oper_type = 'QUERY' then s_imei else null end) as query_peoples,
sum(case when i_r
4000
esp_adcount>0  then 1 else 0 end) as i_resp_adcount,
sum(case when oper_type = 'DOWNLOAD'  then 1 else 0 end) as download_times,
count(distinct case when oper_type = 'DOWNLOAD' then s_imei else null end) as download_peoples,
sum(case when oper_type = 'DOWNLOAD_COMPLETED'  then 1 else 0 end) as download_completed_times,
count(distinct case when oper_type = 'DOWNLOAD_COMPLETED' then s_imei else null end) as download_completed_peoples,
sum(case when oper_type = 'INSTALL_COMPLETED'  then 1 else 0 end) as install_completed_times,
count(distinct case when oper_type = 'INSTALL_COMPLETED' then s_imei else null end) as install_completed_peoples,
sum(case when oper_type = 'CLOSE'  then 1 else 0 end) as close_times,
count(distinct case when oper_type = 'CLOSE' then s_imei else null end) as close_peoples,
sum(case when oper_type = 'QUERY' and addition = 'true' then 1 else 0 end) as query_addition_times,
sum(case when oper_type = 'EXPOSURE' and addition = 'true' then 1 else 0 end) as expo_addition_times,
oper_time as stat_time
from (select s_mz_id,s_imei,oper_type,api_type,mz_appid,s_request_id,i_resp_adcount,addition,stat_date,floor(collect_set(oper_time)[0]/600000)*600 oper_time  from bdl_fdt_ad_ssp_log  where stat_date = %(yyyymmdd)s group by s_mz_id,s_imei,oper_type,api_type,mz_appid,s_request_id,i_resp_adcount,addition,stat_date ) bdl where bdl.stat_date = %(yyyymmdd)s group by bdl.api_type,bdl.mz_appid,bdl.s_mz_id,oper_time
union all
select
0,mz_appid,s_mz_id,
sum(case when oper_type = 'EXPOSURE'  then 1 else 0 end) as expo_times,
count(distinct case when oper_type = 'EXPOSURE' then s_imei else null end) as expo_peoples,
sum(case when oper_type = 'DCLICK'  then 1 else 0 end) as click_times,
count(distinct case when oper_type = 'DCLICK' then s_imei else null end) as click_peoples,
count(distinct case when oper_type = 'QUERY'  then s_request_id else null end) as query_times,
count(distinct case when oper_type = 'QUERY' then s_imei else null end) as query_peoples,
if(count(distinct case when oper_type = 'QUERY'  then s_request_id else null end)<sum(case when i_resp_adcount>0  then 1 else 0 end),count(distinct case when oper_type = 'QUERY'  then s_request_id else null end),sum(case when i_resp_adcount>0  then 1 else 0 end)) as i_resp_adcount,
sum(case when oper_type = 'DOWNLOAD'  then 1 else 0 end) as download_times,
count(distinct case when oper_type = 'DOWNLOAD' then s_imei else null end) as download_peoples,
sum(case when oper_type = 'DOWNLOAD_COMPLETED'  then 1 else 0 end) as download_completed_times,
count(distinct case when oper_type = 'DOWNLOAD_COMPLETED' then s_imei else null end) as download_completed_peoples,
sum(case when oper_type = 'INSTALL_COMPLETED'  then 1 else 0 end) as install_completed_times,
count(distinct case when oper_type = 'INSTALL_COMPLETED' then s_imei else null end) as install_completed_peoples,
sum(case when oper_type = 'CLOSE'  then 1 else 0 end) as close_times,
count(distinct case when oper_type = 'CLOSE' then s_imei else null end) as close_peoples,
sum(case when oper_type = 'QUERY' and addition = 'true' then 1 else 0 end) as query_addition_times,
sum(case when oper_type = 'EXPOSURE' and addition = 'true' then 1 else 0 end) as expo_addition_times,
oper_time as stat_time
from (select s_mz_id,s_imei,oper_type,api_type,mz_appid,s_request_id,i_resp_adcount,addition,stat_date,floor(collect_set(oper_time)[0]/600000)*600 oper_time
from bdl_fdt_ad_ssp_log  where stat_date = %(yyyymmdd)s group by s_mz_id,s_imei,oper_type,api_type,mz_appid,s_request_id,i_resp_adcount,addition,stat_date  ) bdl
where bdl.stat_date = %(yyyymmdd)s group by bdl.mz_appid,bdl.s_mz_id,oper_time;
"""%{'yyyymmdd':yyyymmdd}
execute_shell(db,hql)

if __name__ == "__main__":

#运行计算sql脚本
execute(db,20161203)

###数据原始表
CREATE TABLE `bdl_fdt_ad_ssp_log`(
`s_mz_id` string,
`s_imei` string,
`oper_type` string,
`api_type` string,
`mz_appid` string,
`oper_time` bigint,
`s_request_id` string,
`i_resp_adcount` bigint,
`i_resp_code` bigint,
`addition` string)
PARTITIONED BY (
`stat_date` bigint)
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION

###统计结果表
CREATE TABLE `adl_fdt_ad_ssp_log_hour_result`(
`api_type` string,
`mz_appid` string,
`s_mz_id` string,
`expo_times` bigint,
`expo_peoples` bigint,
`click_times` bigint,
`click_peoples` bigint,
`query_times` bigint,
`query_peoples` bigint,
`i_resp_adcount` bigint,
`download_times` bigint,
`download_peoples` bigint,
`download_completed_times` bigint,
`download_completed_peoples` bigint,
`install_completed_times` bigint,
`install_completed_peoples` bigint,
`close_times` bigint,
`close_peoples` bigint,
`query_addition_times` bigint,
`expo_addition_times` bigint,
`stat_time` bigint)
PARTITIONED BY (
`stat_date` bigint)
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.RCFileInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.RCFileOutputFormat'
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息