您的位置:首页 > 其它

mapreduce

2015-11-24 14:54 363 查看
public class AppUserProfileChangeChannelJoin {

public static class Mapper extends Mapper<Object, Writable, Text, Text> {

private UserProfile up = new UserProfile();

@Override

protected void map(Object key, Writable value, Context context) throws IOException, InterruptedException {

String[] tmp = StringUtils.splitPreserveAllTokens(value.toString(), ZPConstants.FIELDS_TAB);

if (tmp.length == 1) {

context.write(new Text(key.toString()), new Text(value.toString()));

} else if (tmp.length >= 27) {

up.parseToUserProfile(tmp);

if (up.isValidRecord()) {

String deviceId = up.getAppkey().trim() + ZPConstants.FIELDS_SPLIT + up.getIdfa().trim()

+ ZPConstants.FIELDS_SPLIT + up.getMac().trim() + ZPConstants.FIELDS_SPLIT + up.getAndroidid().trim()

+ ZPConstants.FIELDS_SPLIT + up.getImei().trim();

context.write(new Text(deviceId), new Text(up.toString()));

}

}

}

}

public static class Reducer extends Reducer<Text, Text, Text, Text> {

protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

String channel = null;

UserProfile up = null;

for (Text value : values) {

String tmp = value.toString();

String[] fields = StringUtils.splitPreserveAllTokens(tmp, ZPConstants.FIELDS_TAB);

if (fields.length >= 27) {

up = new UserProfile();

up.parseToUserProfile(fields);

} else if (fields.length == 1) {

channel = fields[0];

}

}

if (up != null && channel != null && StringUtils.isNotBlank(channel) && StringUtils.isNotEmpty(channel)) {

up.setChannelid(channel);

context.write(new Text(""), new Text(up.toString()));

}

if (up != null && channel == null) {

context.write(new Text(""), new Text(up.toString()));

}

}

}

@Override

protected void execute() throws Exception {

Job job = new Job(getConfiguration(), getJobName());

job.setJarByClass(AppUserProfileChangeChannelJoin.class); // class that contains mapper

job.setMapperClass(AppUserProfileChangeChannelMapper.class);

job.setReducerClass(AppUserProfileChangeChannelReducer.class);

job.setInputFormatClass(SequenceFileInputFormat.class);

job.setOutputFormatClass(SequenceFileOutputFormat.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(Text.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(Text.class);

job.setReduceSpeculativeExecution(false);

System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: