通过hadoop二次排序分析hadoop工作过程
2017-07-22 15:57
399 查看
1.什么是hadoop,可以做什么
hadoop是Apache基金会开发的分布式系统基础架构,可以开发和运行大规模数据的软件平台。Hadoop有两个重要模块,HDFS与MapReduce。
HDFS是Hadoop的分布式存储平台,对于hadoop集群,拥有namenode和datanode节点。namenode节点职责是对存储系统进行控制,想一想namenode可以控制整个集群,是不是任务重大呢?所以我们需要避免namenode出现单点故障,因此我们可以再用一台namenode机器然后使用zookeeper配置主备,当主namenode出故障时候,备用namenode可以及时顶起任务。我们阅读其它关于hadoop的内容会发现集群还会有一个SecondNameNode节点,那么SecondNameNode节点的作用是什么呢?SecondNameNode职责在于对namenode产生的日志文件进行合并。
MapReduce是集群的分布式计算平台。这个平台在老版本中的角色是jobtracker和tasktracker;在新版本中的角色有ResourceManager和NodeManager。我们以新版本的角色为例分析MapReduce过程。
a. 客户端向ResourceManager申请作业ID,将作业需要的jar文件、配置信息和分片信息上传到hdfs上面。数据分片信息决定了ResourceManager需要为该作业启动多少个map任务,而reduce任务数目则在集群配置信息中指定。
b. 完成后提交任务,ResourceManager开始执行任务
c. ResourceManager首先初始化任务,创建作业对象,将任务放置调度器中;然后向hdfs请求处理数据的详细描述信息
d. NodeManager向ResourceManager领取自己的具体任务描述,在hdfs取回运行时数据,在本地启动JVM虚拟机运行子进程。
e. 在任务运行的同时,一个NodeManager启动MRAppMaster进程,这个进程获取所有任务信息,并汇集任务完成情况向ResourceManager汇报。
f. reduce完成后结果输入到hdfs
2.什么是hadoop的二次排序
所谓二次排序,就是对第一个字段进行排序,对第二个字段进行排序。根据二次排序我们也可以实现三次排序。对于hadoop来说,默认情况下会进行一次排序,也就是对第一个字段进行排序。
3.hadoop怎样做二次排序
上面有讲到MapReduce的流程,我们分析下二次排序的流程。MapReduce会先对原始文件进行分片,每个分片对应着一个map过程。中间经过环形缓冲区的排序,分区,自定义二次排序,合并,shuffle后传到reduce阶段;在reduce阶段,将不同源头的数据合并后会按照key值进行分组,每处理完一个分组后调用reduce函数,输出结果。
解决思路:
我们指导MapReduce的排序总会对key进行排序,在数据不是key的情况下,我们需要将其变为key值,因此我们需要将key,value组合成一个key值,即(k,v) -> v
4.二次排序代码
客户端代码
mapper代码
reduce代码
kv组合代码(因为在Job类中设置了比较器,所以这个类中的compare方法未实现)
比较器
分区代码
分组代码
输入
结果展示
代码注意点:需要将hadoop下面的jar包导入,否则会出现noClassDefFoundError
参考内容:
1. http://blog.csdn.net/lzm1340458776/article/details/42875751
2. hadoop权威指南第二版
hadoop是Apache基金会开发的分布式系统基础架构,可以开发和运行大规模数据的软件平台。Hadoop有两个重要模块,HDFS与MapReduce。
HDFS是Hadoop的分布式存储平台,对于hadoop集群,拥有namenode和datanode节点。namenode节点职责是对存储系统进行控制,想一想namenode可以控制整个集群,是不是任务重大呢?所以我们需要避免namenode出现单点故障,因此我们可以再用一台namenode机器然后使用zookeeper配置主备,当主namenode出故障时候,备用namenode可以及时顶起任务。我们阅读其它关于hadoop的内容会发现集群还会有一个SecondNameNode节点,那么SecondNameNode节点的作用是什么呢?SecondNameNode职责在于对namenode产生的日志文件进行合并。
MapReduce是集群的分布式计算平台。这个平台在老版本中的角色是jobtracker和tasktracker;在新版本中的角色有ResourceManager和NodeManager。我们以新版本的角色为例分析MapReduce过程。
a. 客户端向ResourceManager申请作业ID,将作业需要的jar文件、配置信息和分片信息上传到hdfs上面。数据分片信息决定了ResourceManager需要为该作业启动多少个map任务,而reduce任务数目则在集群配置信息中指定。
b. 完成后提交任务,ResourceManager开始执行任务
c. ResourceManager首先初始化任务,创建作业对象,将任务放置调度器中;然后向hdfs请求处理数据的详细描述信息
d. NodeManager向ResourceManager领取自己的具体任务描述,在hdfs取回运行时数据,在本地启动JVM虚拟机运行子进程。
e. 在任务运行的同时,一个NodeManager启动MRAppMaster进程,这个进程获取所有任务信息,并汇集任务完成情况向ResourceManager汇报。
f. reduce完成后结果输入到hdfs
2.什么是hadoop的二次排序
所谓二次排序,就是对第一个字段进行排序,对第二个字段进行排序。根据二次排序我们也可以实现三次排序。对于hadoop来说,默认情况下会进行一次排序,也就是对第一个字段进行排序。
3.hadoop怎样做二次排序
上面有讲到MapReduce的流程,我们分析下二次排序的流程。MapReduce会先对原始文件进行分片,每个分片对应着一个map过程。中间经过环形缓冲区的排序,分区,自定义二次排序,合并,shuffle后传到reduce阶段;在reduce阶段,将不同源头的数据合并后会按照key值进行分组,每处理完一个分组后调用reduce函数,输出结果。
解决思路:
我们指导MapReduce的排序总会对key进行排序,在数据不是key的情况下,我们需要将其变为key值,因此我们需要将key,value组合成一个key值,即(k,v) -> v
4.二次排序代码
客户端代码
package job; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import mapper.SortMapper; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import reducer.SortReducer; import util.Group; import util.KVPair; import util.MyCompare; import util.MyPartition; import com.sun.xml.internal.ws.policy.privateutil.PolicyUtils.Text; public class MyJob { private static final String INPUT = "hdfs://master:9000/test.txt"; private static final String OUTPUT = "hdfs://master:9000/out.txt"; public static void main(String args[]) { Configuration conf = new Configuration(); try { FileSystem fs = FileSystem.get(new URI(OUTPUT), conf); if(fs.exists(new Path(OUTPUT))) { fs.delete(new Path(OUTPUT), true); } Job job = new Job(conf, MyJob.class.getName()); FileInputFormat.setInputPaths(job, INPUT); // job.setInputFormatClass(KeyValueTextInputFormat.class); job.setMapperClass(SortMapper.class); job.setMapOutputKeyClass(KVPair.class); job.setMapOutputValueClass(IntWritable.class); job.setPartitionerClass(MyPartition.class); job.setNumReduceTasks(1); job.setGroupingComparatorClass(Group.class); job.setSortComparatorClass(MyCompare.class); job.setReducerClass(SortReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileOutputFormat.setOutputPath(job, new Path(OUTPUT)); System.exit(job.waitForCompletion(true)?0:1); } catch (IOException e) { e.printStackTrace(); } catch (URISyntaxException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
mapper代码
package mapper; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import util.KVPair; public class SortMapper extends Mapper<Object, Text, KVPair, IntWritable> { @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { if(null == value) return; System.out.println("--------------enter map--------------"); String[] it = value.toString().split(" "); KVPair kv = new KVPair(); IntWritable v = null; Text t = new Text(); if(it.length == 2){ t.set(it[0]); kv.setKey(t); v = new IntWritable(Integer.valueOf(it[1])); kv.setValue(v); context.write(kv, v); } System.out.println("--------------end map--------------"); } }
reduce代码
package reducer; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import util.KVPair; public class SortReducer extends Reducer<KVPair, IntWritable, Text, Text>{ @Override protected void reduce(KVPair key, Iterable<IntWritable> value, Context context) throws IOException, InterruptedException { System.out.println("--------------enter reduce----------------"); StringBuilder sb = new StringBuilder(); for(IntWritable v : value) { sb.append(v.get() + ","); } if(sb.length() > 0) { sb.deleteCharAt(sb.length()-1); } Text t = new Text(); t.set(sb.toString()); context.write(key.getKey(), t); System.out.println("--------------end reduce----------------"); } }
kv组合代码(因为在Job类中设置了比较器,所以这个类中的compare方法未实现)
package util; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; public class KVPair implements WritableComparable<KVPair>{ Text key; IntWritable value; public KVPair() { this.key = new Text(); this.value = new IntWritable(); } public Text getKey() { return key; } public void setKey(Text key) { this.key = key; } public IntWritable getValue() { return value; } public void setValue(IntWritable value) { this.value = value; } @Override public void readFields(DataInput in) throws IOException { if(null == key) try { throw new Exception("key is null"); } catch (Exception e) { e.printStackTrace(); } this.key.readFields(in); this.value.readFields(in); } @Override public void write(DataOutput out) throws IOException { this.key.write(out); this.value.write(out); } @Override public int compareTo(KVPair o) { // TODO Auto-generated method stub return 0; } @Override public String toString() { return "KVPair [key=" + key + ", value=" + value + "]"; } }
比较器
package util; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class MyCompare extends WritableComparator { protected MyCompare(){ super(KVPair.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { System.out.println("--------enter sort---------"); KVPair k1 = (KVPair) a; KVPair k2 = (KVPair) b; int minus = k1.getKey().compareTo(k2.getKey()); System.out.println("--------end sort---------"); if(minus != 0) { return minus; } else { return k1.getValue().compareTo(k2.getValue()); } } }
分区代码
package util; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Partitioner; public class MyPartition extends Partitioner<KVPair, IntWritable> { @Override public int getPartition(KVPair key, IntWritable vale, int num) { System.out.print("-----enter partition-----"); System.out.print("-----end partition-----"); return (key.getKey().hashCode()) % num; } }
分组代码
package util; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class Group extends WritableComparator { protected Group() { super(KVPair.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { System.out.println("--------enter group---------"); KVPair k1 = (KVPair) a; KVPair k2 = (KVPair) b; System.out.println("--------end group---------"); return k1.getKey().compareTo(k2.getKey()); } }
输入
a 1 a 2 a 3 a 4 b 5 b 3 b 8 b 0 c 10 c 4 c 6 c 8 c 2 c 1 c 5 c 2 d 10 d 4 d 5 d 8 d 8 d 10 e 4 e 4 e 10 e 3 e 2
结果展示
a 1,2,3,4 b 0,3,5,8 c 1,2,2,4,5,6,8,10 d 4,5,8,8,10,10 e 2,3,4,4,10
代码注意点:需要将hadoop下面的jar包导入,否则会出现noClassDefFoundError
参考内容:
1. http://blog.csdn.net/lzm1340458776/article/details/42875751
2. hadoop权威指南第二版
相关文章推荐
- Linux内核分析第一周学习博客 --- 通过反汇编方式学习计算机工作过程
- Hadoop Mapreduce分区、分组、二次排序过程详解
- hadoop 0.21的bakupnode的工作过程分析
- Hadoop Mapreduce分区、分组、二次排序过程详解
- Hadoop 0.21的bakupnode的工作过程分析
- Hadoop Mapreduce分区、分组、连接以及辅助排序(也叫二次排序)过程详解
- Hadoop链式MapReduce、多维排序、倒排索引、自连接算法、二次排序、Join性能优化、处理员工信息Join实战、URL流量分析、TopN及其排序、求平均值和最大最小值、数据清洗ETL、分析气
- 通过分析一个C程序的汇编指令执行过程,理解计算机的工作。
- 利用hadoop二次排序进行用户行为分析
- Hadoop Mapreduce分区、分组、二次排序过程详解[转]
- Hadoop Mapreduce分区、分组、二次排序过程详解
- 通过分析一个C程序的汇编指令执行过程,理解计算机的工作。
- hadoop通过log分析mapreduce的过程及定位错误、分析问题
- Hadoop Mapreduce分区、分组、二次排序过程详解[转]
- IntelliJ IDEA通过Tomcat启动项目过程分析
- Mapreduce运行过程分析(基于Hadoop2.4)——(三)
- Android camera fw学习(二)-open camera操作过程&准备工作分析。
- Hadoop MapReduce 二次排序
- 通过反汇编一个简单的C程序,分析汇编代码理解计算机是如何工作的(原创)
- Hadoop进程启动过程分析