Hadoop 实例8 Join讲解1: 获取员工所在部门信息
2015-09-05 22:39
766 查看
输出格式要求:员工编号,员工姓名,部门名称,部门编号
1、原始数据
员工数据
部门数据
2、处理join的思路:
将Join key 当作map的输出key, 也就是reduce的输入key , 这样只要join的key相同,shuffle过后,就会进入到同一个reduce 的key - value list 中去。
需要为join的2张表设计一个通用的一个bean. 并且bean中加一个flag的标志属性,这样可以根据flag来区分是哪张表的数据。
reduce 阶段根据flag来判断是员工数据还是部门数据就很容易了 。而join的真正处理是在reduce阶段。
3.实现中间bean
存储数据的bean (由于数据要在网络上传输必须序列化,hadoop处理的时候需要分组和排序,所以要实现WritableComparable接口):
4.Mapper程序:
5.Reducer程序:
6.主程序:
1、原始数据
员工数据
empno ename job mgr hiredate sal comm deptno loc 7499 allen salesman 7698 1981-02-20 1600 300 30 7782 clark managers 7639 1981-06-09 2450 10 7654 martin salesman 7698 1981-03-20 1250 1400 30 boston 7900 james clerk 7698 1981-01-09 950 30 7788 scott analyst 7566 1981-09-01 3000 100 20
部门数据
deptno dname loc 30 sales chicago 20 research dallas 10 accounting newyork
2、处理join的思路:
将Join key 当作map的输出key, 也就是reduce的输入key , 这样只要join的key相同,shuffle过后,就会进入到同一个reduce 的key - value list 中去。
需要为join的2张表设计一个通用的一个bean. 并且bean中加一个flag的标志属性,这样可以根据flag来区分是哪张表的数据。
reduce 阶段根据flag来判断是员工数据还是部门数据就很容易了 。而join的真正处理是在reduce阶段。
3.实现中间bean
存储数据的bean (由于数据要在网络上传输必须序列化,hadoop处理的时候需要分组和排序,所以要实现WritableComparable接口):
package cn.edu.bjut.joinone; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class Emplyee implements WritableComparable<Emplyee> { private String empNo = ""; private String empName = ""; private String deptNo = ""; private String deptName = ""; private int flag = 0;//部门数据为1,员工数据为0 public Emplyee() {} public Emplyee(String empNo, String empName, String deptNo, String deptName, int flag) { super(); this.empNo = empNo; this.empName = empName; this.deptNo = deptNo; this.deptName = deptName; this.flag = flag; } public Emplyee(Emplyee e) { this.empNo = e.getEmpNo(); this.empName = e.getEmpName(); this.deptNo = e.getDeptNo(); this.deptName = e.getDeptName(); this.flag = e.getFlag(); } public void write(DataOutput out) throws IOException { out.writeUTF(getEmpNo()); out.writeUTF(getEmpName()); out.writeUTF(getDeptNo()); out.writeUTF(getDeptName()); out.writeInt(getFlag()); } public void readFields(DataInput in) throws IOException { this.empNo = in.readUTF(); this.empName = in.readUTF(); this.deptNo = in.readUTF(); this.deptName = in.readUTF(); this.flag = in.readInt(); } public int compareTo(Emplyee o) { return 0; } @Override public String toString() { return "empNo=" + empNo + ", empName=" + empName + ", deptNo=" + deptNo + ", deptName=" + deptName; } public String getEmpNo() { return empNo; } public void setEmpNo(String empNo) { this.empNo = empNo; } public String getEmpName() { return empName; } public void setEmpName(String empName) { this.empName = empName; } public String getDeptNo() { return deptNo; } public void setDeptNo(String deptNo) { this.deptNo = deptNo; } public String getDeptName() { return deptName; } public void setDeptName(String deptName) { this.deptName = deptName; } public int getFlag() { return flag; } public void setFlag(int flag) { this.flag = flag; } }
4.Mapper程序:
package cn.edu.bjut.joinone; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class JoinMapper extends Mapper<LongWritable, Text, LongWritable, Emplyee> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] arr = line.split("\t"); if(arr.length <= 3) { Emplyee e = new Emplyee(); e.setDeptNo(arr[0]); e.setDeptName(arr[1]); e.setFlag(1); context.write(new LongWritable(Long.parseLong(e.getDeptNo())), e); } else { Emplyee e = new Emplyee(); e.setEmpNo(arr[0]); e.setEmpName(arr[1]); e.setDeptNo(arr[7]); e.setFlag(0); context.write(new LongWritable(Long.parseLong(e.getDeptNo())), e); } } }
5.Reducer程序:
package cn.edu.bjut.joinone; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class JoinReducer extends Reducer<LongWritable, Emplyee, Text, NullWritable> { @Override protected void reduce(LongWritable key, Iterable<Emplyee> values, Context context) throws IOException, InterruptedException { Emplyee e = null; List<Emplyee> list = new ArrayList<Emplyee>(); for(Emplyee emplyee : values) { if(0 == emplyee.getFlag()) { list.add(new Emplyee(emplyee)); } else { e = new Emplyee(emplyee); } } if(null != e) { for(Emplyee emplyee : list) { emplyee.setDeptName(e.getDeptName()); context.write(new Text(emplyee.toString()), NullWritable.get()); } } } }
6.主程序:
package cn.edu.bjut.joinone; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MainJob { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, "join"); job.setJarByClass(MainJob.class); job.setMapperClass(JoinMapper.class); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Emplyee.class); job.setReducerClass(JoinReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); Path outPath = new Path(args[1]); FileSystem fs = FileSystem.get(conf); if(fs.exists(outPath)) { fs.delete(outPath, true); } FileOutputFormat.setOutputPath(job, outPath); job.waitForCompletion(true); } }
相关文章推荐
- linux -samba
- web网站 天气接口
- linux 标准GPIO 驱动模型—version1
- shell
- OpenERP|odoo Web开发
- DX和opengl的区别
- 跨域iframe高度自适应(兼容IE/FF/OP/Chrome)
- chattr维护文件特殊属性
- linux 权限说明
- Linux文件属性
- centos下chm阅读器
- 存储scale-up和scalce-out架构
- centos下chm阅读器
- chmod维护文件权限
- Linux 内存及cpu解析
- linuxshell中"2>&1"含义
- 最近碰到的一个关于memcpy的奇葩问题
- linux 内核调试
- CSS架构最佳实践:预测、重用、扩展、维护
- linux vim详解