您的位置:首页 > 其它

实现JD二级域名数据的统计

2015-06-18 17:48 507 查看

编写MR程序

程序思路

使用key来作为domain和uid的区分,每一条记录都是一个pv。接到key后判断,如果domain为新的domain,则开始重新统计pv和uv,如果uid变化,则uv++,pv是一直++即可实现。

代码实现

package com.zamplus.insight.mr;

import gdi.records.pb;
import gdi.task.MRTask;
import gdi.utils.converter.Converter;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import com.zamplus.insight.utils.XURI;

/**
* 实现JD二级域名数据的统计
*
* @author wankun
*
*/
public class JDVisitJob extends MRTask {

public static class JDVisitMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
private Converter converter;

@Override
protected void setup(Context context) throws IOException, InterruptedException {
converter = createConverter(context);
}

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException,
InterruptedException {
pb.StandardInput record;
try {
record = converter.convertOne(key, value, context);
} catch (Converter.ConversionFailureException x) {
context.getCounter("SKIP", "BAD_INPUT").increment(1);
return;
}

// ad + ua 做uid标识
String ad;
if (record.hasUserId()) {
ad = record.getUserId(); // 设备id
} else {
context.getCounter("SKIP", "NO_AD").increment(1);
return;
}
String ua = "";
if (record.hasUserAgent()) {
ua = record.getUserAgent();
} else {
context.getCounter("WARN", "NO_USERAGENT").increment(1);
}
String uid = DigestUtils.md5Hex(ad + "." + ua);

// url解析domain计算pv
XURI url;
if (record.hasUrl()) {
try {
url = new XURI(record.getUrl());
String host = url.getHost();
if (url.getDomain().equalsIgnoreCase("jd.com")) {
context.write(new Text(host + "\t" + uid), new LongWritable(1));
}
} catch (XURI.InvalidURIException e) {
context.getCounter("SKIP", "BAD_URL").increment(1);
return;
}
} else {
context.getCounter("SKIP", "NO_URL").increment(1);
return;
}
}

}

public static class JDVisitCombiner extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context)
throws IOException, InterruptedException {
long pv = 0;
for (LongWritable value : values)
pv += value.get();
context.write(key, new LongWritable(pv));
}
}

public static class JDVisitReducer extends Reducer<Text, LongWritable, Text, Text> {
Map<String, String> result = new HashMap<>();
String host = null;
String uid = null;
long pv = 0;
long uv = 0;

@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context)
throws IOException, InterruptedException {
String[] ss = key.toString().split("\t");
if (ss.length != 2) {
context.getCounter("ERROR", "BAD_REDUCE_KEY").increment(1);
return;
}
if (host == null || !host.equals(ss[0])) {
if (uv != 0)
result.put(host, "" + uv + "\t" + pv);
host = ss[0];
pv = 0;
uv = 0;
}
uv++;
for (LongWritable value : values)
pv += value.get();

}

protected void cleanup(Context context) throws IOException, InterruptedException {
result.put(host, "" + uv + "\t" + pv);
for (Map.Entry<String, String> en : result.entrySet()) {
context.write(new Text(en.getKey()), new Text(en.getValue()));
}
// result.entrySet().forEach(en -> context.write(new
// Text(en.getKey()), new Text(en.getValue())));
}
}

@Override
public void setup() {
}

@Override
public void cleanup() {

}

@Override
protected Job createJob(Configuration conf) throws Exception {
// conf.set("mapred.job.reuse.jvm.num.tasks", "10");
conf.set("mapreduce.job.jvm.numtasks", "15");
Job job = Job.getInstance(conf, "[TMP] jd visit");

job.setJarByClass(JDVisitJob.class);
job.setInputFormatClass(TextInputFormat.class);

job.setMapperClass(JDVisitMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);

job.setCombinerClass(JDVisitCombiner.class);

job.setReducerClass(JDVisitReducer.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// job.setNumReduceTasks(0);

return job;
}
}


配置文件

<?xml version="1.0" encoding="utf-8" ?>
<job xmlns="http://opt/job">
<task>
<sequence>
<mapred class="com.zamplus.insight.mr.JDVisitJob">
<input type="hdfs">/bh/fixed/roger_shanghai_raw/20150303/*</input>
<output type="hdfs">/user/wankun/output/user_overlap/20150303</output>
<config name="mapred.reduce.tasks">1</config>
<config name="mapreduce.output.fileoutputformat.compress">false</config>
<format>
<delimited delimiter="9" quote="32" numFields="8">
<fieldMap field="ipv4" index="0" codec="ipv4"/>
<fieldMap field="user_id" index="1"/>
<fieldMap field="timestamp" index="2" codec="long"/>
<fieldMap field="url" index="3"/>
<fieldMap field="refer" index="4" codec="base64"/>
<fieldMap field="user_agent" index="5" codec="base64"/>
<fieldMap field="cookie_content" index="7" codec="base64"/>
</delimited>
</format>
</mapred>
</sequence>
</task>
<config name="mapreduce.output.fileoutputformat.compress.codec">org.apache.hadoop.io.compress.BZip2Codec</config>
<config name="mapreduce.output.fileoutputformat.compress">true</config>
<config name="mapreduce.job.queuename">dmp_job</config>
<logging>
<stderr level="ERROR" logger="Job"/>
<stdout level="DEBUG"/>
</logging>
</job>


运行程序

执行命令如:

HADOOP_CLASSPATH=$HADOOP_CLASSPATH:insight-roger-1.0-SNAPSHOT.jar:gdi-common-1.0.0.jar hadoop jar gdi-common-1.0.0.jar gdi.Job -j jdvistjob -c jdvisitjob.xml –log-date 20150303
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: