大数据_开发自己的workCount程序
2017-12-31 20:50
148 查看
把对象写到文件当中,这个过程叫做序列化;反过来,你想从文件里面去恢复这个对象,这个过程叫做反序列化。
求每个部门的工资总薪
package demo.wc; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; //public class WordCountMapper extends Mapper<k1, v1, k2, v2> { public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { @Override protected void map(LongWritable key1, Text value1, Context context) throws IOException, InterruptedException { /* * context 代表Mapper的上下文 * 上文:HDFS * 下文:Reducer */ //取出数据: I love Beijing 它按照一行一行的读取数据 String data = value1.toString(); //分词 String[] words = data.split(" "); //输出 for(String word:words){ // k2 就是 单词 v2: 记一次数 context.write(new Text(word), new IntWritable(1)); } } } package demo.wc; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; //public class WordCountReducer extends Reducer<k3, v3, k4, v4> { public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text k3, Iterable<IntWritable> v3,Context context) throws IOException, InterruptedException { /* * context 代表reduce的上下文 * 上文:Mapper * 下文:HDFS */ //对v3进行求和 int total = 0; for(IntWritable v:v3){ total += v.get(); } //输出:k4 单词 v4 频率 context.write(k3, new IntWritable(total)); } } package demo.wc; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCountMain { public static void main(String[] args) throws Exception { // 创建一个job:job = map + reduce Job job = Job.getInstance(new Configuration()); //指定任务的入口 job.setJarByClass(WordCountMain.class); //指定任务的Mapper和输出的数据类型: k2 v2 job.setMapperClass(WordCountMapper.class); job.setMapOutputKeyClass(Text.class); //指定k2 job.setMapOutputValueClass(IntWritable.class); //指定v2 //指定任务的Reducer和输出的数据类型: k4 v4 job.setReducerClass(WordCountReducer.class); job.setOutputKeyClass(Text.class); //指定k4 job.setOutputValueClass(IntWritable.class); //指定v4 //指定输入的路径(map)、输出的路径(reduce) FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //执行任务 job.waitForCompletion(true);//设置成true,表示在执行的时候,打印日志。 } }
六、MapReduce程序开发 1、Demo:WordCount单词计数 /root/training/hadoop-2.7.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.3.jar 执行: hadoop jar hadoop-mapreduce-examples-2.7.3.jar wordcount /input/data.txt /output/mr/wc1213 分析WordCount程序数据处理的过程(非常重要) 2、开发自己的WordCount程序 需要包含的jar: /root/training/hadoop-2.7.3/share/hadoop/common /root/training/hadoop-2.7.3/share/hadoop/common/lib /root/training/hadoop-2.7.3/share/hadoop/mapreduce /root/training/hadoop-2.7.3/share/hadoop/mapreduce/lib 执行: hadoop jar wc.jar /input/data.txt /output/day1215/wc 再举一个例子:开发一个MapReduce:求每个部门的工资总额 数据:员工表 SQL> select deptno,sum(sal) from emp group by deptno order by deptno; DEPTNO SUM(SAL) ---------- ---------- 10 8750 20 10875 30 9400 3、MapReduce的一些高级特性 (1)序列化:类似Java的序列化 如果一个类实现了的Hadoop的序列化机制(接口:Writable),这个类的对象就可以作为输入和输出的值 举例1:使用Employee类来封装员工信息,并且作为Map和Reduce的输入和输出 一定注意:序列化的顺序和反序列化的顺序要一致 举例2:使用序列化Employee重写 求每个部门的工资总额 (2)排序:注意:按照key2进行排序 默认排序:数字 升序 字符串 字典顺序 对象的排序:按照员工的薪水排序 如果要改变默认的排序规则,需要创建一个自己的比较器 (3)分区:Partition,默认情况下,MapReduce只有一个分区,意思是:只有一个输出文件 (4)合并:Combiner,在Mapper端,先做一次Reducer,用于减少输出到Reducer中的数据,从而提高效率 (5)MapReduce的核心:Shuffle(洗牌) 七、MapReduce的编程案例
求每个部门的工资总薪
package demo.saltotal; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; // k1 v1 k2部门号 v2 薪水 public class SalaryTotalMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> { @Override protected void map(LongWritable key1, Text value1,Context context) throws IOException, InterruptedException { // 数据:7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30 String data = value1.toString(); //分词 String[] words = data.split(","); //输出:部门号 薪水 context.write(new IntWritable(Integer.parseInt(words[7])), new IntWritable(Integer.parseInt(words[5]))); } } package demo.saltotal; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Reducer; // k3 v3 k4 v4 public class SalaryTotalReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> { @Override protected void reduce(IntWritable k3, Iterable<IntWritable> v3,Context context) throws IOException, InterruptedException { //对v3求和:得到这个部门的工资总额 int total = 0; for(IntWritable v:v3){ total += v.get(); } //输出: 部门号 工资总额 context.write(k3, new IntWritable(total)); } } package demo.saltotal; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class SalaryTotalMain { public static void main(String[] args) throws Exception { //创建一个任务 Job job = Job.getInstance(new Configuration()); //任务的入口 job.setJarByClass(SalaryTotalMain.class); //任务的Mapper和输出 job.setMapperClass(SalaryTotalMapper.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(IntWritable.class); //任务的Reducer和输出 job.setReducerClass(SalaryTotalReducer.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); //指定任务的输入和输出 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //执行任务 job.waitForCompletion(true); } }
package demo.ser.java; import java.io.Serializable; public class Student implements Serializable { //属性 private int stuID; private String stuName; public int getStuID() { return stuID; } public void setStuID(int stuID) { this.stuID = stuID; } public String getStuName() { return stuName; } public void setStuName(String stuName) { this.stuName = stuName; } } package demo.ser.java; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.ObjectOutputStream; public class TestStudent { public static void main(String[] args) throws Exception { // 创建一个学生对象,并且保存到文件 Student s = new Student(); s.setStuID(1); s.setStuName("Tom"); //创建一个输出流,把这个学生保存到文件 ObjectOutputStream out = new ObjectOutputStream(new FileOutputStream("d:\\temp\\student.aaa")); out.writeObject(s); //关闭输出流 out.close(); } }
package demo.ser.hadoop; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; public class Employee implements Writable { //定义员工的属性 7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30 private int empno;//员工号 private String ename;//姓名 private String job;//职位 private int mgr;//经理的员工号 private String hiredate;//入职日期 private int sal;//月薪 private int comm;//奖金 private int deptno;//部门号 @Override public String toString() { return "Employee [empno=" + empno + ", ename=" + ename + ", deptno=" + deptno + "]"; } @Override public void readFields(DataInput input) throws IOException { // 代表反序列化:输入 this.empno = input.readInt(); this.ename = input.readUTF(); this.job = input.readUTF(); this.mgr = input.readInt(); this.hiredate = input.readUTF(); this.sal = input.readInt(); this.comm = input.readInt(); this.deptno = input.readInt(); } @Override public void write(DataOutput output) throws IOException { // 代表序列化过程,输出 output.writeInt(this.empno); output.writeUTF(this.ename); output.writeUTF(this.job); output.writeInt(this.mgr); output.writeUTF(this.hiredate); output.writeInt(this.sal); output.writeInt(this.comm); output.writeInt(this.deptno); } public int getEmpno() { return empno; } public void setEmpno(int empno) { this.empno = empno; } public String getEname() { return ename; } public void setEname(String ename) { this.ename = ename; } public String getJob() { return job; } public void setJob(String job) { this.job = job; } public int getMgr() { return mgr; } public void setMgr(int mgr) { this.mgr = mgr; } public String getHiredate() { return hiredate; } public void setHiredate(String hiredate) { this.hiredate = hiredate; } public int getSal() { return sal; } public void setSal(int sal) { this.sal = sal; } public int getComm() { return comm; } public void setComm(int comm) { this.comm = comm; } public int getDeptno() { return deptno; } public void setDeptno(int deptno) { this.deptno = deptno; } } package demo.ser.hadoop; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; // k1 v1 k2员工号 v2 员工对象 public class EmployeeMapper extends Mapper<LongWritable, Text, IntWritable, Employee> { @Override protected void map(LongWritable key1, Text value1, Context context) throws IOException, InterruptedException { // 数据:7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30 String data = value1.toString(); //分词 String[] words = data.split(","); //创建员工的对象 Employee e = new Employee(); //设置员工的属性 //员工号 e.setEmpno(Integer.parseInt(words[0])); //姓名 e.setEname(words[1]); //职位 e.setJob(words[2]); //经理号: 注意:有些员工没有经理 try{ e.setMgr(Integer.parseInt(words[3])); }catch(Exception ex){ //没有老板号 e.setMgr(-1); } //入职日期 e.setHiredate(words[4]); //薪水 e.setSal(Integer.parseInt(words[5])); //奖金:注意:有些员工没有奖金 try{ e.setComm(Integer.parseInt(words[6])); }catch(Exception ex){ //没有奖金 e.setComm(0); } //部门号 e.setDeptno(Integer.parseInt(words[7])); //输出 员工号 员工对象 context.write(new IntWritable(e.getEmpno()), e); } } package demo.ser.hadoop; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class EmployeeMain { public static void main(String[] args) throws Exception { // 创建一个job:job = map + reduce Job job = Job.getInstance(new Configuration()); //指定任务的入口 job.setJarByClass(EmployeeMain.class); //指定任务的Mapper和输出的数据类型: k2 v2 job.setMapperClass(EmployeeMapper.class); job.setMapOutputKeyClass(IntWritable.class); //指定k2 job.setMapOutputValueClass(Employee.class); //指定v2: 是Employee对象 //指定输入的路径(map)、输出的路径(reduce) FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //执行任务 job.waitForCompletion(true); } }
package demo.ser.saltotal; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; public class Employee implements Writable { //定义员工的属性 7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30 private int empno;//员工号 private String ename;//姓名 private String job;//职位 private int mgr;//经理的员工号 private String hiredate;//入职日期 private int sal;//月薪 private int comm;//奖金 private int deptno;//部门号 @Override public String toString() { return "Employee [empno=" + empno + ", ename=" + ename + ", deptno=" + deptno + "]"; } @Override public void readFields(DataInput input) throws IOException { // 代表反序列化:输入 this.empno = input.readInt(); this.ename = input.readUTF(); this.job = input.readUTF(); this.mgr = input.readInt(); this.hiredate = input.readUTF(); this.sal = input.readInt(); this.comm = input.readInt(); this.deptno = input.readInt(); } @Override public void write(DataOutput output) throws IOException { // 代表序列化过程,输出 output.writeInt(this.empno); output.writeUTF(this.ename); output.writeUTF(this.job); output.writeInt(this.mgr); output.writeUTF(this.hiredate); output.writeInt(this.sal); output.writeInt(this.comm); output.writeInt(this.deptno); } public int getEmpno() { return empno; } public void setEmpno(int empno) { this.empno = empno; } public String getEname() { return ename; } public void setEname(String ename) { this.ename = ename; } public String getJob() { return job; } public void setJob(String job) { this.job = job; } public int getMgr() { return mgr; } public void setMgr(int mgr) { this.mgr = mgr; } public String getHiredate() { return hiredate; } public void setHiredate(String hiredate) { this.hiredate = hiredate; } public int getSal() { return sal; } public void setSal(int sal) { this.sal = sal; } public int getComm() { return comm; } public void setComm(int comm) { this.comm = comm; } public int getDeptno() { return deptno; } public void setDeptno(int deptno) { this.deptno = deptno; } } package demo.ser.saltotal; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; // k2 部门号 v2 员工对象 public class SalaryTotalMapper extends Mapper<LongWritable, Text, IntWritable, Employee> { @Override protected void map(LongWritable key1, Text value1, Context context) throws IOException, InterruptedException { // 数据:7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30 String data = value1.toString(); //分词 String[] words = data.split(","); //创建员工的对象 Employee e = new Employee(); //设置员工的属性 //员工号 e.setEmpno(Integer.parseInt(words[0])); //姓名 e.setEname(words[1]); //职位 e.setJob(words[2]); //经理号: 注意:有些员工没有经理 try{ e.setMgr(Integer.parseInt(words[3])); }catch(Exception ex){ //没有老板号 e.setMgr(-1); } //入职日期 e.setHiredate(words[4]); //薪水 e.setSal(Integer.parseInt(words[5])); //奖金:注意:有些员工没有奖金 try{ e.setComm(Integer.parseInt(words[6])); }catch(Exception ex){ //没有奖金 e.setComm(0); } //部门号 e.setDeptno(Integer.parseInt(words[7])); //输出 部门号 员工对象 context.write(new IntWritable(e.getDeptno()), e); } } package demo.ser.saltotal; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Reducer; // k3 部门号 v3员工对象 k4 部门号 v4 工资总额 public class SalaryTotalReducer extends Reducer<IntWritable, Employee, IntWritable, IntWritable> { @Override protected void reduce(IntWritable k3, Iterable<Employee> v3,Context context) throws IOException, InterruptedException { //对v3求和 int total = 0; for(Employee e:v3){ total = total + e.getSal(); } //输出 context.write(k3, new IntWritable(total)); } } package demo.ser.saltotal; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class SalaryTotalMain { public static void main(String[] args) throws Exception { // 创建一个job:job = map + reduce Job job = Job.getInstance(new Configuration()); //指定任务的入口 job.setJarByClass(SalaryTotalMain.class); //指定任务的Mapper和输出的数据类型: k2 v2 job.setMapperClass(SalaryTotalMapper.class); job.setMapOutputKeyClass(IntWritable.class); //指定k2 job.setMapOutputValueClass(Employee.class); //指定v2 //指定任务的Reducer和输出的数据类型: k4 v4 job.setReducerClass(SalaryTotalReducer.class); job.setOutputKeyClass(IntWritable.class); //指定k4 job.setOutputValueClass(IntWritable.class); //指定v4 //指定输入的路径(map)、输出的路径(reduce) FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //执行任务 job.waitForCompletion(true); } }
package demo.sort.hadoop.number; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; // k1 v1 k2 v2???是什么 ----> 空值 public class NumberMapper extends Mapper<LongWritable, Text, IntWritable, NullWritable> { @Override protected void map(LongWritable key1, Text value1,Context context) throws IOException, InterruptedException { // 数字: 10 String data = value1.toString().trim(); int number = Integer.parseInt(data); //输出:一定要把这个数字作为key2 context.write(new IntWritable(number), NullWritable.get()); } } package demo.sort.hadoop.number; import org.apache.hadoop.io.IntWritable; //对数字进行排序,定义自己规则(降序排列) public class MyNumberComparator extends IntWritable.Comparator{ @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return -super.compare(b1, s1, l1, b2, s2, l2); } } package demo.sort.hadoop.number; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class NumberMain { public static void main(String[] args) throws Exception { // 创建一个job:job = map + reduce Job job = Job.getInstance(new Configuration()); //指定任务的入口 job.setJarByClass(NumberMain.class); //指定任务的Mapper和输出的数据类型: k2 v2 job.setMapperClass(NumberMapper.class); job.setMapOutputKeyClass(IntWritable.class); //指定k2 job.setMapOutputValueClass(NullWritable.class); //指定v2: null值 //指定自己的比较规则 job.setSortComparatorClass(MyNumberComparator.class); //指定输入的路径(map)、输出的路径(reduce) FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //执行任务 job.waitForCompletion(true); } }
相关文章推荐
- 自己开发Blog博客程序:关于MetaWeblogAPI 的实现
- Adobe Flash Builder 4.5 Android Air 程序开发系列 之五 保存数据的几种方式
- JavaWeb学习笔记——开发动态WEB资源(一)Java程序向浏览器输出数据
- android开发:EditText如何正确自己实现删除和添加数据
- 使用自己开发的ORM数据层的好处
- android开发(26) 和其他应用交换数据方式一,使用intent指定自定义action调用其他程序里的activity,并获得其返回的结果
- 【discuzX2】如何二次开发自己的程序?
- qgis 二次开发,重写满足自己功能的程序
- 自己兴趣使然设计并开发了一款web管理方式的远程数据增量备份/恢复软件
- 利用Camera API实现自己的拍照和摄像程序——android开发
- 解决:VB.NET程序中 datagridview 多次绑定出现 的“不能在数据绑定的 DataGridView 控件上设置 ColumnCount
- JavaScript程序开发(七)—js语法之数据类型
- 开发自己的操作系统引导程序
- 程序中如何给自己设置硬件断点(通过程序代码设置数据断点而不使用JTAG)
- 从零开始学微信小程序开发:4 美化UI界面 5 保存数据到本地
- [置顶] 开发自己的CMS系统の1: iwp-press 数据表结构
- 自己开发的分布式天眼查企业数据爬虫系统-技术分享
- [JS前端开发] js/jquery控制页面动态载入数据 滑动滚动栏自己主动载入事件
- 自己写的第一个移动开发程序