mapreduce
2015-11-24 14:54
363 查看
public class AppUserProfileChangeChannelJoin {
public static class Mapper extends Mapper<Object, Writable, Text, Text> {
private UserProfile up = new UserProfile();
@Override
protected void map(Object key, Writable value, Context context) throws IOException, InterruptedException {
String[] tmp = StringUtils.splitPreserveAllTokens(value.toString(), ZPConstants.FIELDS_TAB);
if (tmp.length == 1) {
context.write(new Text(key.toString()), new Text(value.toString()));
} else if (tmp.length >= 27) {
up.parseToUserProfile(tmp);
if (up.isValidRecord()) {
String deviceId = up.getAppkey().trim() + ZPConstants.FIELDS_SPLIT + up.getIdfa().trim()
+ ZPConstants.FIELDS_SPLIT + up.getMac().trim() + ZPConstants.FIELDS_SPLIT + up.getAndroidid().trim()
+ ZPConstants.FIELDS_SPLIT + up.getImei().trim();
context.write(new Text(deviceId), new Text(up.toString()));
}
}
}
}
public static class Reducer extends Reducer<Text, Text, Text, Text> {
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
String channel = null;
UserProfile up = null;
for (Text value : values) {
String tmp = value.toString();
String[] fields = StringUtils.splitPreserveAllTokens(tmp, ZPConstants.FIELDS_TAB);
if (fields.length >= 27) {
up = new UserProfile();
up.parseToUserProfile(fields);
} else if (fields.length == 1) {
channel = fields[0];
}
}
if (up != null && channel != null && StringUtils.isNotBlank(channel) && StringUtils.isNotEmpty(channel)) {
up.setChannelid(channel);
context.write(new Text(""), new Text(up.toString()));
}
if (up != null && channel == null) {
context.write(new Text(""), new Text(up.toString()));
}
}
}
@Override
protected void execute() throws Exception {
Job job = new Job(getConfiguration(), getJobName());
job.setJarByClass(AppUserProfileChangeChannelJoin.class); // class that contains mapper
job.setMapperClass(AppUserProfileChangeChannelMapper.class);
job.setReducerClass(AppUserProfileChangeChannelReducer.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setReduceSpeculativeExecution(false);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
public static class Mapper extends Mapper<Object, Writable, Text, Text> {
private UserProfile up = new UserProfile();
@Override
protected void map(Object key, Writable value, Context context) throws IOException, InterruptedException {
String[] tmp = StringUtils.splitPreserveAllTokens(value.toString(), ZPConstants.FIELDS_TAB);
if (tmp.length == 1) {
context.write(new Text(key.toString()), new Text(value.toString()));
} else if (tmp.length >= 27) {
up.parseToUserProfile(tmp);
if (up.isValidRecord()) {
String deviceId = up.getAppkey().trim() + ZPConstants.FIELDS_SPLIT + up.getIdfa().trim()
+ ZPConstants.FIELDS_SPLIT + up.getMac().trim() + ZPConstants.FIELDS_SPLIT + up.getAndroidid().trim()
+ ZPConstants.FIELDS_SPLIT + up.getImei().trim();
context.write(new Text(deviceId), new Text(up.toString()));
}
}
}
}
public static class Reducer extends Reducer<Text, Text, Text, Text> {
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
String channel = null;
UserProfile up = null;
for (Text value : values) {
String tmp = value.toString();
String[] fields = StringUtils.splitPreserveAllTokens(tmp, ZPConstants.FIELDS_TAB);
if (fields.length >= 27) {
up = new UserProfile();
up.parseToUserProfile(fields);
} else if (fields.length == 1) {
channel = fields[0];
}
}
if (up != null && channel != null && StringUtils.isNotBlank(channel) && StringUtils.isNotEmpty(channel)) {
up.setChannelid(channel);
context.write(new Text(""), new Text(up.toString()));
}
if (up != null && channel == null) {
context.write(new Text(""), new Text(up.toString()));
}
}
}
@Override
protected void execute() throws Exception {
Job job = new Job(getConfiguration(), getJobName());
job.setJarByClass(AppUserProfileChangeChannelJoin.class); // class that contains mapper
job.setMapperClass(AppUserProfileChangeChannelMapper.class);
job.setReducerClass(AppUserProfileChangeChannelReducer.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setReduceSpeculativeExecution(false);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
相关文章推荐
- oracle中记录存在性的判断
- Spring JdbcTemplate 查询方法中的RowMapper实现汇总
- block 解析 - 内存
- android点滴
- Win10系统在脱机状态下不能使用pin码登陆的详细解决方法
- (二)linux下ping不通的解决方法
- spring mvc 注释配置
- oracle的jdbc连接方式:oci和thin
- iOS中Block介绍 基础
- Quartz.NET管理周期性任务
- 精通 JS正则表达式
- 开发:用自定义控件来实现常用的标题栏
- gradle编译出错:Execution failed for task ':app:compileTestDebugJava'
- linux查看目录大小
- grafana-zabbix添加图形&模板化
- php发送短信验证码完成注册功能
- grafana-zabbix添加图形&模板化
- 冒泡排序
- 抽象工厂设计模式
- 《ArcGIS Runtime SDK for .NET开发笔记》 --Hello Word