Hadoop多个输入案例
2016-03-08 19:04
309 查看
需求:将原始数据文件和类别数据文件根据用户id合并成一个文件。
原始数据文件:用户id,详细信息
类别数据文件:用户id,所属类别
两个Mapper分别为OriDataMapper,IdKindDataMapper,输出key,value均为Text。
其中DataReducer读取Mapper产生的数据。具体Reducer读入的格式为:
<用户id,{详细信息,所属类别}>
根据具体需求,对数据进行相应操作操作。
关键代码:
原始数据文件:用户id,详细信息
类别数据文件:用户id,所属类别
两个Mapper分别为OriDataMapper,IdKindDataMapper,输出key,value均为Text。
private void job1(Configuration config, Path outputdata, String idkinddata, String outkindData) throws Exception { Job job1 = Job.getInstance(config); job1.setJobName("PostProcessor"); job1.setJarByClass(Postprocessor.class); // 多个输入文件的mapreduce job MultipleInputs.addInputPath(job1, outputdata, TextInputFormat.class, OriDataMapper.class); MultipleInputs.addInputPath(job1, new Path(idkinddata), TextInputFormat.class, IdKindDataMapper.class); // 设置Reducer相关属性 job1.setReducerClass(DataReducer.class); job1.setNumReduceTasks(1); job1.setOutputKeyClass(Text.class); job1.setOutputValueClass(Text.class); Path outputPath = new Path(outkindData); FileSystem.get(config).delete(outputPath, true); FileOutputFormat.setOutputPath(job1, outputPath); job1.waitForCompletion(true); }
public class IdKindDataMapper extends Mapper<LongWritable, Text, Text, Text>{ Text outputKey = new Text(); Text outputValue = new Text(); @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String str = value.toString(); String[] cols = str.split("\t"); outputKey.set(cols[0]); outputValue.set(cols[1]); context.write(outputKey, outputValue); } }
public class OriDataMapper extends Mapper<LongWritable, Text, Text, Text>{ Text outputKey = new Text(); Text outputValue = new Text(); @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String str = value.toString(); String[] cols = str.split("\t"); outputKey.set(cols[0]); outputValue.set(cols[1]); context.write(outputKey, outputValue); } }
其中DataReducer读取Mapper产生的数据。具体Reducer读入的格式为:
<用户id,{详细信息,所属类别}>
根据具体需求,对数据进行相应操作操作。
关键代码:
// 多个输入文件的mapreduce job MultipleInputs.addInputPath(job1, outputdata, TextInputFormat.class, OriDataMapper.class); MultipleInputs.addInputPath(job1, new Path(idkinddata), TextInputFormat.class, IdKindDataMapper.class);
相关文章推荐
- 老庙黄金2016春晚抢红包活动技术架构详解
- Xilinx—Zynq架构介绍
- 【2015-2016 XVI Open CupL】【找规律】Liesbeth and the String 字符串变换何时变为长度1
- Nginx:作为cdn缓存时,follow 302
- opencv hsv颜色空间区分出来某一特定颜色的简单程序
- 【2015-2016 XVI Open CupJ】【DP bitset记录方案】Judgement n人投票前后权威值改变是否有实际区别
- openssl 使用非阻塞 bio
- Apache Maven 打包可执行jar
- 教你如何安全设置Linux操作系统密码
- 【2015-2016 XVI Open CupI】【交互题 队列筛法】Judgement 赌博机2^20内数循环1的个数为奇数赢 恰好赢到200元钱
- 【2015-2016 XVI Open CupH】【STL大融合】Hierarchy 公司人来人往 公寓入住进出 求公司老大与寝室长
- 【2015-2016 XVI Open CupD】【ST-RMQ】dir -C 文件划分成最少的行
- Centos 6,7 用户的权限管理
- 【2015-2016 XVI Open CupC】【暴力】Constant Ratio 等差数列前若干项恰好为n
- 【2015-2016 XVI Open CupA】【贪心 确定性思想 正难则反 本身具有拓扑序】Abstract Picture 每行每列各染色一次 恢复颜色方案
- linux创建vg、lv
- centos6 编译安装nodejs4.3
- CentOS 7 下 通过shell + expect 实现 scp 文件(目录)传输
- LINUX内核分析第三周学习总结——构造一个简单的Linux系统MenuOS
- 电商总结(四)基于共享存储的图片服务器架构