实现JD二级域名数据的统计
2015-06-18 17:48
507 查看
编写MR程序
程序思路
使用key来作为domain和uid的区分,每一条记录都是一个pv。接到key后判断,如果domain为新的domain,则开始重新统计pv和uv,如果uid变化,则uv++,pv是一直++即可实现。代码实现
package com.zamplus.insight.mr; import gdi.records.pb; import gdi.task.MRTask; import gdi.utils.converter.Converter; import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.commons.codec.digest.DigestUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import com.zamplus.insight.utils.XURI; /** * 实现JD二级域名数据的统计 * * @author wankun * */ public class JDVisitJob extends MRTask { public static class JDVisitMapper extends Mapper<LongWritable, Text, Text, LongWritable> { private Converter converter; @Override protected void setup(Context context) throws IOException, InterruptedException { converter = createConverter(context); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { pb.StandardInput record; try { record = converter.convertOne(key, value, context); } catch (Converter.ConversionFailureException x) { context.getCounter("SKIP", "BAD_INPUT").increment(1); return; } // ad + ua 做uid标识 String ad; if (record.hasUserId()) { ad = record.getUserId(); // 设备id } else { context.getCounter("SKIP", "NO_AD").increment(1); return; } String ua = ""; if (record.hasUserAgent()) { ua = record.getUserAgent(); } else { context.getCounter("WARN", "NO_USERAGENT").increment(1); } String uid = DigestUtils.md5Hex(ad + "." + ua); // url解析domain计算pv XURI url; if (record.hasUrl()) { try { url = new XURI(record.getUrl()); String host = url.getHost(); if (url.getDomain().equalsIgnoreCase("jd.com")) { context.write(new Text(host + "\t" + uid), new LongWritable(1)); } } catch (XURI.InvalidURIException e) { context.getCounter("SKIP", "BAD_URL").increment(1); return; } } else { context.getCounter("SKIP", "NO_URL").increment(1); return; } } } public static class JDVisitCombiner extends Reducer<Text, LongWritable, Text, LongWritable> { @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long pv = 0; for (LongWritable value : values) pv += value.get(); context.write(key, new LongWritable(pv)); } } public static class JDVisitReducer extends Reducer<Text, LongWritable, Text, Text> { Map<String, String> result = new HashMap<>(); String host = null; String uid = null; long pv = 0; long uv = 0; @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { String[] ss = key.toString().split("\t"); if (ss.length != 2) { context.getCounter("ERROR", "BAD_REDUCE_KEY").increment(1); return; } if (host == null || !host.equals(ss[0])) { if (uv != 0) result.put(host, "" + uv + "\t" + pv); host = ss[0]; pv = 0; uv = 0; } uv++; for (LongWritable value : values) pv += value.get(); } protected void cleanup(Context context) throws IOException, InterruptedException { result.put(host, "" + uv + "\t" + pv); for (Map.Entry<String, String> en : result.entrySet()) { context.write(new Text(en.getKey()), new Text(en.getValue())); } // result.entrySet().forEach(en -> context.write(new // Text(en.getKey()), new Text(en.getValue()))); } } @Override public void setup() { } @Override public void cleanup() { } @Override protected Job createJob(Configuration conf) throws Exception { // conf.set("mapred.job.reuse.jvm.num.tasks", "10"); conf.set("mapreduce.job.jvm.numtasks", "15"); Job job = Job.getInstance(conf, "[TMP] jd visit"); job.setJarByClass(JDVisitJob.class); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(JDVisitMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setCombinerClass(JDVisitCombiner.class); job.setReducerClass(JDVisitReducer.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // job.setNumReduceTasks(0); return job; } }
配置文件
<?xml version="1.0" encoding="utf-8" ?> <job xmlns="http://opt/job"> <task> <sequence> <mapred class="com.zamplus.insight.mr.JDVisitJob"> <input type="hdfs">/bh/fixed/roger_shanghai_raw/20150303/*</input> <output type="hdfs">/user/wankun/output/user_overlap/20150303</output> <config name="mapred.reduce.tasks">1</config> <config name="mapreduce.output.fileoutputformat.compress">false</config> <format> <delimited delimiter="9" quote="32" numFields="8"> <fieldMap field="ipv4" index="0" codec="ipv4"/> <fieldMap field="user_id" index="1"/> <fieldMap field="timestamp" index="2" codec="long"/> <fieldMap field="url" index="3"/> <fieldMap field="refer" index="4" codec="base64"/> <fieldMap field="user_agent" index="5" codec="base64"/> <fieldMap field="cookie_content" index="7" codec="base64"/> </delimited> </format> </mapred> </sequence> </task> <config name="mapreduce.output.fileoutputformat.compress.codec">org.apache.hadoop.io.compress.BZip2Codec</config> <config name="mapreduce.output.fileoutputformat.compress">true</config> <config name="mapreduce.job.queuename">dmp_job</config> <logging> <stderr level="ERROR" logger="Job"/> <stdout level="DEBUG"/> </logging> </job>
运行程序
执行命令如:HADOOP_CLASSPATH=$HADOOP_CLASSPATH:insight-roger-1.0-SNAPSHOT.jar:gdi-common-1.0.0.jar hadoop jar gdi-common-1.0.0.jar gdi.Job -j jdvistjob -c jdvisitjob.xml –log-date 20150303
相关文章推荐
- 《大道至简》阅读笔记Ⅲ
- 团队项目第二次冲刺(8)
- PowerShell传递Exchange中的自定义属性(员工编号等)
- PowerShell传递Exchange中的自定义属性(员工编号等)
- IOS ARC 机制
- 远程检测Linux服务器中内存占用情况的方法
- 屏蔽windows的全半角切换快捷键
- 内存池
- 二进制搜索方法C++通用执行
- SYNONYM和权限用法
- [Leetcode]Implement Stack using Queues
- SAT阅读填空题常考单词整理(1)
- Mongodb从库数据重新初始化步骤
- CSS选择器
- 【转】Android软件开发需要学什么
- 高精度(还有其它的以后再补充)
- 7z压缩文件时排除指定的文件
- Android中Application类用法
- 备战2015GRE之核心词组under the banner of
- grep