MR的高级功能 1、序列化
2018-01-25 16:22
176 查看
(*)Java的序列化:如果一个类实现了Java的序列化接口(Serializable),这个类的对象可以作为InputStream和OutputStream对象
(*)MR的序列化:(1)所有的hadoop的数据类型都实现了Hadoop的序列化
(2)如果一个类实现了Hadoop的序列化接口(Writable),这个类对象可以作为Map和Reduce的输入和输出(key value)
(3)序列化的顺序一定要跟反序列化顺序一样
===================================================================
(*)Java的序列化:如果一个类实现了Java的序列化接口(Serializable),这个类的对象可以作为InputStream和OutputStream对象
---------------------------------------------------------------------------------------------------------------
1、定义student对象实现 Serializable 接口
package demo.serializable.java;
import java.io.Serializable;
public class Student implements Serializable{
private int stuID;
private String stuName;
public Student(){
}
public int getStuID() {
return stuID;
}
public void setStuID(int stuID) {
this.stuID = stuID;
}
public String getStuName() {
return stuName;
}
public void setStuName(String stuName) {
this.stuName = stuName;
}
}
------------------------------------------------------------------------------------------------------
package demo.serializable.java;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
public class StudentMain {
public static void main(String[] args) throws Exception {
//创建一个学生对象
Student s = new Student();
s.setStuID(1);
s.setStuName("Tom");
//将该对象保存到文件 -----> 序列化
OutputStream out = new FileOutputStream("d:\\temp\\student.ooo");
ObjectOutputStream objOut = new ObjectOutputStream(out);
//输出对象
objOut.writeObject(s);
//关闭
objOut.close();
out.close();
System.out.println("完成");
}
}
====================================================================
MR的序列化:(1)所有的hadoop的数据类型都实现了Hadoop的序列化
(2)如果一个类实现了Hadoop的序列化接口(Writable),这个类对象可以作为Map和Reduce的输入和输出(key value)
(3)序列化的顺序一定要跟反序列化顺序一样
------------------------------------------------------------------------------------------------------------------
1、定义员工 Employee 类 实现Hadoop的Writable接口
package demo.serializable.mr;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
//员工类: 7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
public class Employee implements Writable{
private int empno;//员工号
private String ename;//姓名
private String job;//职位
private int mgr;//老板号
private String hiredate;//入职日期
private int sal;//月薪
private int comm;//奖金
private int deptno;// 部门号
@Override
public String toString() {
return "["+this.empno+"\t"+this.ename+"\t"+this.sal+"\t"+this.deptno+"]";
}
@Override
public void write(DataOutput output) throws IOException {
// 序列化:把对象输出
output.writeInt(this.empno);
output.writeUTF(this.ename);
output.writeUTF(this.job);
output.writeInt(this.mgr);
output.writeUTF(this.hiredate);
output.writeInt(this.sal);
output.writeInt(this.comm);
output.writeInt(this.deptno);
}
@Override
public void readFields(DataInput input) throws IOException {
// 反序列化:把对象读入
this.empno = input.readInt();
this.ename = input.readUTF();
this.job = input.readUTF();
this.mgr = input.readInt();
this.hiredate = input.readUTF();
this.sal = input.readInt();
this.comm = input.readInt();
this.deptno = input.readInt();
}
public int getEmpno() {
return empno;
}
public void setEmpno(int empno) {
this.empno = empno;
}
public String getEname() {
return ename;
}
public void setEname(String ename) {
this.ename = ename;
}
public String getJob() {
return job;
}
public void setJob(String job) {
this.job = job;
}
public int getMgr() {
return mgr;
}
public void setMgr(int mgr) {
this.mgr = mgr;
}
public String getHiredate() {
return hiredate;
}
public void setHiredate(String hiredate) {
this.
f08b
hiredate = hiredate;
}
public int getSal() {
return sal;
}
public void setSal(int sal) {
this.sal = sal;
}
public int getComm() {
return comm;
}
public void setComm(int comm) {
this.comm = comm;
}
public int getDeptno() {
return deptno;
}
public void setDeptno(int deptno) {
this.deptno = deptno;
}
}
---------------------------------------------------------------------------------------------------------
EmployeeMapper extends Mapper<LongWritable, Text, LongWritable, Employee>
阶段
(package demo.serializable.mr;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
// k2:员工号 v2: 员工对象
public class EmployeeMapper extends Mapper<LongWritable, Text, LongWritable, Employee> {
@Override
protected void map(LongWritable k1, Text v1, Context context)
throws IOException, InterruptedException {
// 数据:7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
String data = v1.toString();
//分词
String[] words = data.split(",");
//创建一个员工对象
Employee e = new Employee();
//设置员工号
e.setEmpno(Integer.parseInt(words[0]));
//设置姓名
e.setEname(words[1]);
//设置职位 job
e.setJob(words[2]);
//设置老板号
try{
e.setMgr(Integer.parseInt(words[3]));
}catch(Exception ex){
//老板号为null
e.setMgr(0);
}
//设置入职日期
e.setHiredate(words[4]);
//设置薪水
e.setSal(Integer.parseInt(words[5]));
//设置奖金
try{
e.setComm(Integer.parseInt(words[6]));
}catch(Exception ex){
//没有奖金
e.setComm(0);
}
//设置部门号
e.setDeptno(Integer.parseInt(words[7]));
//输出
context.write(new LongWritable(e.getEmpno()), e);
}
}
-----------------------------------------------------------------------------------------------------
Reducer阶段
package demo.serializable.mr;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class EmployeeReducer extends Reducer<LongWritable, Employee, LongWritable, Employee> {
@Override
protected void reduce(LongWritable k3, Iterable<Employee> v3,Context context)
throws IOException, InterruptedException {
for(Employee e:v3){
context.write(k3, e);
}
}
}
---------------------------------------------------------------------------------------------------------------
main阶段
package demo.serializable.mr;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class EmployeeMain {
public static void main(String[] args) throws Exception {
// 创建一个任务job = map + reduce
Job job = Job.getInstance(new Configuration());
//指定任务的入口
job.setJarByClass(EmployeeMain.class);
//指定任务的Map和输出的数据类型
job.setMapperClass(EmployeeMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Employee.class);
//指定任务的Reduce和输出的数据类型
job.setReducerClass(EmployeeReducer.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Employee.class);
//指定输入和输出的HDFS路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//提交任务
job.waitForCompletion(true);
}
}
(*)MR的序列化:(1)所有的hadoop的数据类型都实现了Hadoop的序列化
(2)如果一个类实现了Hadoop的序列化接口(Writable),这个类对象可以作为Map和Reduce的输入和输出(key value)
(3)序列化的顺序一定要跟反序列化顺序一样
===================================================================
(*)Java的序列化:如果一个类实现了Java的序列化接口(Serializable),这个类的对象可以作为InputStream和OutputStream对象
---------------------------------------------------------------------------------------------------------------
1、定义student对象实现 Serializable 接口
package demo.serializable.java;
import java.io.Serializable;
public class Student implements Serializable{
private int stuID;
private String stuName;
public Student(){
}
public int getStuID() {
return stuID;
}
public void setStuID(int stuID) {
this.stuID = stuID;
}
public String getStuName() {
return stuName;
}
public void setStuName(String stuName) {
this.stuName = stuName;
}
}
------------------------------------------------------------------------------------------------------
package demo.serializable.java;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
public class StudentMain {
public static void main(String[] args) throws Exception {
//创建一个学生对象
Student s = new Student();
s.setStuID(1);
s.setStuName("Tom");
//将该对象保存到文件 -----> 序列化
OutputStream out = new FileOutputStream("d:\\temp\\student.ooo");
ObjectOutputStream objOut = new ObjectOutputStream(out);
//输出对象
objOut.writeObject(s);
//关闭
objOut.close();
out.close();
System.out.println("完成");
}
}
====================================================================
MR的序列化:(1)所有的hadoop的数据类型都实现了Hadoop的序列化
(2)如果一个类实现了Hadoop的序列化接口(Writable),这个类对象可以作为Map和Reduce的输入和输出(key value)
(3)序列化的顺序一定要跟反序列化顺序一样
------------------------------------------------------------------------------------------------------------------
1、定义员工 Employee 类 实现Hadoop的Writable接口
package demo.serializable.mr;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
//员工类: 7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
public class Employee implements Writable{
private int empno;//员工号
private String ename;//姓名
private String job;//职位
private int mgr;//老板号
private String hiredate;//入职日期
private int sal;//月薪
private int comm;//奖金
private int deptno;// 部门号
@Override
public String toString() {
return "["+this.empno+"\t"+this.ename+"\t"+this.sal+"\t"+this.deptno+"]";
}
@Override
public void write(DataOutput output) throws IOException {
// 序列化:把对象输出
output.writeInt(this.empno);
output.writeUTF(this.ename);
output.writeUTF(this.job);
output.writeInt(this.mgr);
output.writeUTF(this.hiredate);
output.writeInt(this.sal);
output.writeInt(this.comm);
output.writeInt(this.deptno);
}
@Override
public void readFields(DataInput input) throws IOException {
// 反序列化:把对象读入
this.empno = input.readInt();
this.ename = input.readUTF();
this.job = input.readUTF();
this.mgr = input.readInt();
this.hiredate = input.readUTF();
this.sal = input.readInt();
this.comm = input.readInt();
this.deptno = input.readInt();
}
public int getEmpno() {
return empno;
}
public void setEmpno(int empno) {
this.empno = empno;
}
public String getEname() {
return ename;
}
public void setEname(String ename) {
this.ename = ename;
}
public String getJob() {
return job;
}
public void setJob(String job) {
this.job = job;
}
public int getMgr() {
return mgr;
}
public void setMgr(int mgr) {
this.mgr = mgr;
}
public String getHiredate() {
return hiredate;
}
public void setHiredate(String hiredate) {
this.
f08b
hiredate = hiredate;
}
public int getSal() {
return sal;
}
public void setSal(int sal) {
this.sal = sal;
}
public int getComm() {
return comm;
}
public void setComm(int comm) {
this.comm = comm;
}
public int getDeptno() {
return deptno;
}
public void setDeptno(int deptno) {
this.deptno = deptno;
}
}
---------------------------------------------------------------------------------------------------------
EmployeeMapper extends Mapper<LongWritable, Text, LongWritable, Employee>
阶段
(package demo.serializable.mr;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
// k2:员工号 v2: 员工对象
public class EmployeeMapper extends Mapper<LongWritable, Text, LongWritable, Employee> {
@Override
protected void map(LongWritable k1, Text v1, Context context)
throws IOException, InterruptedException {
// 数据:7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
String data = v1.toString();
//分词
String[] words = data.split(",");
//创建一个员工对象
Employee e = new Employee();
//设置员工号
e.setEmpno(Integer.parseInt(words[0]));
//设置姓名
e.setEname(words[1]);
//设置职位 job
e.setJob(words[2]);
//设置老板号
try{
e.setMgr(Integer.parseInt(words[3]));
}catch(Exception ex){
//老板号为null
e.setMgr(0);
}
//设置入职日期
e.setHiredate(words[4]);
//设置薪水
e.setSal(Integer.parseInt(words[5]));
//设置奖金
try{
e.setComm(Integer.parseInt(words[6]));
}catch(Exception ex){
//没有奖金
e.setComm(0);
}
//设置部门号
e.setDeptno(Integer.parseInt(words[7]));
//输出
context.write(new LongWritable(e.getEmpno()), e);
}
}
-----------------------------------------------------------------------------------------------------
Reducer阶段
package demo.serializable.mr;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class EmployeeReducer extends Reducer<LongWritable, Employee, LongWritable, Employee> {
@Override
protected void reduce(LongWritable k3, Iterable<Employee> v3,Context context)
throws IOException, InterruptedException {
for(Employee e:v3){
context.write(k3, e);
}
}
}
---------------------------------------------------------------------------------------------------------------
main阶段
package demo.serializable.mr;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class EmployeeMain {
public static void main(String[] args) throws Exception {
// 创建一个任务job = map + reduce
Job job = Job.getInstance(new Configuration());
//指定任务的入口
job.setJarByClass(EmployeeMain.class);
//指定任务的Map和输出的数据类型
job.setMapperClass(EmployeeMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Employee.class);
//指定任务的Reduce和输出的数据类型
job.setReducerClass(EmployeeReducer.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Employee.class);
//指定输入和输出的HDFS路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//提交任务
job.waitForCompletion(true);
}
}
相关文章推荐
- 类的高级运用、异常、集合、文件读写、序列化、重写equals和hashCode方法实例
- IOS 类似网易的频道选择功能、长按移动item、UICollectionView的高级使用
- 百度中的高级搜索功能
- Java 序列化的高级认识
- JAVA语言序列化和反序列化(3)-序列化高级认识
- [导入]微软关闭Live Search高级搜索功能
- vim高级功能
- Keil uVesion4 高级查找功能的使用方法
- Microsoft Dynamics CRM 2015 新增功能 介绍 高级查找功能
- 如何:使用 ASP.NET Login 控件的高级功能
- android之xml序列化 模拟短信备份功能
- Filter高级开发(二)——实现敏感字符过滤功能
- Redis高级功能 - 慢查询日志
- (七)vim高级功能
- 测试了下boost的序列化反序列化功能
- 高级网络功能(Docker支持的网络定制配置)
- python爬虫高级功能
- 高级功能:很有用的javascript自定义事件
- mybatis高级应用系列一:分页功能
- Python 网络爬虫 010 (高级功能) 解析 robots.txt 文件