您的位置:首页 > 运维架构

Hadoop中Partition解析 和 storm的grouping

2016-06-19 14:23 281 查看

自定义Grouping测试

Storm是支持自定义分组的,本篇文章就是探究Storm如何编写一个自定义分组器,以及对Storm分组器如何分组数据的理解。这是我写的一个自定义分组,总是把数据分到第一个Task:

public class MyFirstStreamGrouping implements CustomStreamGrouping {
private static Logger log = LoggerFactory.getLogger(MyFirstStreamGrouping.class);
private List<Integer> tasks;
    @Override
    public void prepare(WorkerTopologyContext context, GlobalStreamId stream,List<Integer> targetTasks)
        List<Integer> targetTasks) {
        this.tasks = targetTasks;
        log.info(tasks.toString());
    }
    @Override
    public List<Integer> chooseTasks(int taskId, List<Object> values) {
        log.info(values.toString());
        return Arrays.asList(tasks.get(0));
    }
}

从上面的代码可以看出,该自定义分组会把数据归并到第一个Task
Arrays.asList(tasks.get(0));
,也就是数据到达后总是被派发到第一组。测试代码:
TopologyBuilder builder = new TopologyBuilder();builder.setSpout("words", new TestWordSpout(), 2);//自定义分组,builder.setBolt("exclaim1", new DefaultStringBolt(), 3).customGrouping("words", new MyFirstStreamGrouping());
[/code]和之前的测试用例一样,Spout总是发送
new String[] {“nathan”, “mike”, “jackson”, “golda”, “bertels”}
列表的字符串。我们运行验证一下:
11878 [Thread-25-exclaim1] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson11943 [Thread-41-words] INFO  cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [nathan]11944 [Thread-25-exclaim1] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan11979 [Thread-29-words] INFO  cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [mike]11980 [Thread-25-exclaim1] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: mike12045 [Thread-41-words] INFO  cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [jackson]12045 [Thread-25-exclaim1] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson12080 [Thread-29-words] INFO  cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [jackson]12081 [Thread-25-exclaim1] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson12145 [Thread-41-words] INFO  cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [mike]12146 [Thread-25-exclaim1] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: mike
[/code]从这个运行日志我们可以看出,数据总是派发到一个Blot:Thread-25-exclaim1。因为我时本地测试,Thread-25-exclaim1是线程名。而派发的线程是数据多个线程的。因此该测试符合预期,即总是发送到一个Task,并且这个Task也是第一个。

理解自定义分组实现

自己实现一个自定义分组难吗?其实如果你理解了Hadoop的Partitioner,Storm的CustomStreamGrouping和它也是一样的道理。Hadoop MapReduce的Map完成后会把Map的中间结果写入磁盘,在写磁盘前,线程首先根据数据最终要传送到的Reducer把数据划分成相应的分区,然后不同的分区进入不同的Reduce。我们先来看看Hadoop是怎样把数据怎样分组的,这是Partitioner唯一一个方法:
public class Partitioner<K, V> {@Overridepublic int getPartition(K key, V value, int numReduceTasks) {return 0;}}
[/code]上面的代码中:Map输出的数据都会经过getPartition()方法,用来确定下一步的分组。numReduceTasks是一个Job的Reduce数量,而返回值就是确定该条数据进入哪个Reduce。返回值必须大于等于0,小于numReduceTasks,否则就会报错。返回0就意味着这条数据进入第一个Reduce。对于随机分组来说,这个方法可以这么实现:
public int getPartition(K key, V value, int numReduceTasks) {return hash(key) % numReduceTasks;}
[/code]其实Hadoop 默认的Hash分组策略也正是这么实现的。这样好处是,数据在整个集群基本上是负载平衡的。搞通了Hadoop的Partitioner,我们来看看Storm的CustomStreamGrouping。这是CustomStreamGrouping类的源码:
public interface CustomStreamGrouping extends Serializable {void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks);List<Integer> chooseTasks(int taskId, List<Object> values);}
[/code]一模一样的道理,targetTasks就是Storm运行时告诉你,当前有几个目标Task可以选择,每一个都给编上了数字编号。而 
chooseTasks(int taskId, List

2、hadoop 解析Partition

Map的结果,会通过partition分发到Reducer上,Reducer做完Reduce操作后,通过OutputFormat,进行输出,下面我们就来分析参与这个过程的类。Mapper的结果,可能送到Combiner做合并,Combiner在系统中并没有自己的基类,而是用Reducer作为Combiner的基类,他们对外的功能是一样的,只是使用的位置和使用时的上下文不太一样而已。Mapper最终处理的键值对<key, value>,是需要送到Reducer去合并的,合并的时候,有相同key的键/值对会送到同一个Reducer那。哪个key到哪个Reducer的分配过程,是由Partitioner规定的。它只有一个方法,getPartition(Text key, Text value, int numPartitions)  输入是Map的结果对<key, value>和Reducer的数目,输出则是分配的Reducer(整数编号)。就是指定Mappr输出的键值对到哪一个reducer上去。系统缺省的Partitioner是HashPartitioner,它以key的Hash值对Reducer的数目取模,得到对应的Reducer。这样保证如果有相同的key值,肯定被分配到同一个reducre上。如果有N个reducer,编号就为0,1,2,3……(N-1)。Reducer是所有用户定制Reducer类的基类,和Mapper类似,它也有setup,reduce,cleanup和run方法,其中setup和cleanup含义和Mapper相同,reduce是真正合并Mapper结果的地方,它的输入是key和这个key对应的所有value的一个迭代器,同时还包括Reducer的上下文。系统中定义了两个非常简单的Reducer,IntSumReducer和LongSumReducer,分别用于对整形/长整型的value求和。Reduce的结果,通过Reducer.Context的方法collect输出到文件中,和输入类似,Hadoop引入了OutputFormat。OutputFormat依赖两个辅助接口:RecordWriter和OutputCommitter,来处理输出。RecordWriter提供了write方法,用于输出<key, value>和close方法,用于关闭对应的输出。OutputCommitter提供了一系列方法,用户通过实现这些方法,可以定制OutputFormat生存期某些阶段需要的特殊操作。我们在TaskInputOutputContext中讨论过这些方法(明显,TaskInputOutputContext是OutputFormat和Reducer间的桥梁)。OutputFormat和RecordWriter分别对应着InputFormat和RecordReader,系统提供了空输出NullOutputFormat(什么结果都不输出,NullOutputFormat.RecordWriter只是示例,系统中没有定义),LazyOutputFormat(没在类图中出现,不分析),FilterOutputFormat(不分析)和基于文件FileOutputFormat的SequenceFileOutputFormat和TextOutputFormat输出。基于文件的输出FileOutputFormat利用了一些配置项配合工作,包括:mapred.output.compress:是否压缩;mapred.output.compression.codec:压缩方法;mapred.output.dir:输出路径;mapred.work.output.dir:输出工作路径。FileOutputFormat还依赖于FileOutputCommitter,通过FileOutputCommitter提供一些和Job,Task相关的临时文件管理功能。如FileOutputCommitter的setupJob,会在输出路径下创建一个名为_temporary的临时目录,cleanupJob则会删除这个目录。SequenceFileOutputFormat输出和TextOutputFormat输出分别对应输入的SequenceFileInputFormat和TextInputFormat。

2.代码实例

package org.apache.hadoop.examples;    import java.io.IOException;  import java.util.*;  import org.apache.hadoop.fs.Path;  import org.apache.hadoop.conf.*;  import org.apache.hadoop.io.*;  import org.apache.hadoop.mapred.*;  import org.apache.hadoop.util.*;    /**  * 输入文本,以tab间隔  * kaka    1       28  * hua     0       26  * chao    1  * tao     1       22  * mao     0       29      22  * */    //Partitioner函数的使用    public class MyPartitioner {      // Map函数      public static class MyMap extends MapReduceBase implements              Mapper<LongWritable, Text, Text, Text> {          public void map(LongWritable key, Text value,                  OutputCollector<Text, Text> output, Reporter reporter)                  throws IOException {              String[] arr_value = value.toString().split("\t");              //测试输出  //          for(int i=0;i<arr_value.length;i++)  //          {  //              System.out.print(arr_value[i]+"\t");  //          }  //          System.out.print(arr_value.length);  //          System.out.println();                     Text word1 = new Text();              Text word2 = new Text();              if (arr_value.length > 3) {                  word1.set("long");                  word2.set(value);              } else if (arr_value.length < 3) {                  word1.set("short");                  word2.set(value);              } else {                  word1.set("right");                  word2.set(value);              }              output.collect(word1, word2);          }      }            public static class MyReduce extends MapReduceBase implements              Reducer<Text, Text, Text, Text> {          public void reduce(Text key, Iterator<Text> values,                  OutputCollector<Text, Text> output, Reporter reporter)                  throws IOException {              int sum = 0;              System.out.println(key);              while (values.hasNext()) {                  output.collect(key, new Text(values.next().getBytes()));                  }          }      }        // 接口Partitioner继承JobConfigurable,所以这里有两个override方法      public static class MyPartitionerPar implements Partitioner<Text, Text> {          /**          * getPartition()方法的          * 输入参数:键/值对<key,value>与reducer数量numPartitions          * 输出参数:分配的Reducer编号,这里是result          * */          @Override          public int getPartition(Text key, Text value, int numPartitions) {              // TODO Auto-generated method stub              int result = 0;              System.out.println("numPartitions--" + numPartitions);              if (key.toString().equals("long")) {                  result = 0 % numPartitions;              } else if (key.toString().equals("short")) {                  result = 1 % numPartitions;              } else if (key.toString().equals("right")) {                  result = 2 % numPartitions;              }              System.out.println("result--" + result);              return result;          }                    @Override          public void configure(JobConf arg0)           {              // TODO Auto-generated method stub          }      }        //输入参数:/home/hadoop/input/PartitionerExample /home/hadoop/output/Partitioner      public static void main(String[] args) throws Exception {          JobConf conf = new JobConf(MyPartitioner.class);          conf.setJobName("MyPartitioner");                    //控制reducer数量,因为要分3个区,所以这里设定了3个reducer          conf.setNumReduceTasks(3);            conf.setMapOutputKeyClass(Text.class);          conf.setMapOutputValueClass(Text.class);            //设定分区类          conf.setPartitionerClass(MyPartitionerPar.class);            conf.setOutputKeyClass(Text.class);          conf.setOutputValueClass(Text.class);            //设定mapper和reducer类          conf.setMapperClass(MyMap.class);          conf.setReducerClass(MyReduce.class);            conf.setInputFormat(TextInputFormat.class);          conf.setOutputFormat(TextOutputFormat.class);            FileInputFormat.setInputPaths(conf, new Path(args[0]));          FileOutputFormat.setOutputPath(conf, new Path(args[1]));            JobClient.runJob(conf);      }  } 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  hadoop