您的位置:首页 > 运维架构 > Apache

Nutch 2.0 之 Apache Gora MR介绍

2014-11-19 18:02 344 查看
Nutch 2.0 之 Apache Gora MR介绍
-----------------


1. 介绍

  Apapche Gora内建了对于Apache Hadoop的支持,而Gora的dataStore可以用来做为InputFormat与OutputFormat的输入与输出,然而这些输出的对象都会被序列化,Gora扩展了Avro的DatumWriters来实现的。


2. 一个简单的例子

  下面是gora-tutorial中的一个简单的例子来说明Gora MR的用法,这个例子是使用了上一个例子的输出,作为其输入,用MR来分析日志的信息,用于统计单个URL一天之中的访问量,然后把分析后的结果写回HBase中的一个例子,当然,你也可以把输出写回SQL或者虽的数据库中,这个例子可以在t gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalytics.java找到。

  为了对日志数据进行分析,mapper方法以pageview为输入,输出的Key为<URL,timestamp>对,Value为1,表示一次,在reducer中对相同key的value进行聚合,得到一个总数,然后生成一个MetricDatum对象,把这个对象序列化到数据库中去。

  数据模型的定义,这里我们用到了MetricDatum这个对象,它的定义在gora-tutorial/src/main/avro/metricdatum.json,我们要用bin/gora compile来产生它的一个java文件,

json的定义文件如下:

[html] view
plaincopy

{  

  "type": "record",  

  "name": "MetricDatum",  

  "namespace": "org.apache.gora.tutorial.log.generated",  

  "fields" : [  

    {"name": "metricDimension", "type": "string"},  

    {"name": "timestamp", "type": "long"},  

    {"name": "metric", "type" : "long"}  

  ]  

}  

   我们可以看到,它的类型为record, 名称空间为org.apache.gora.tutorial.log.generated,还有三个字段,分别定义了字段名与类型。

  下面我们还要定义datastore的一个映射文件,内容如下:
  

[html] view
plaincopy

<class name="org.apache.gora.tutorial.log.generated.MetricDatum" keyClass="java.lang.String" table="Metrics">  

    <field name="metricDimension" family="common"  qualifier="metricDimension"/>  

    <field name="timestamp" family="common" qualifier="ts"/>  

    <field name="metric" family="common" qualifier="metric"/>  

  </class>  

  
  我们可以看到,在class中定义了类名,key的类型,还有表名Metrics。在field中定义了相应的字段名,family还有qualifier。

  好了,下面我们来命令这个MR程式,输入如下命令:

[html] view
plaincopy

$ bin/gora loganalytics  

  有一些输入,部分如下:

[html] view
plaincopy

12/07/21 13:35:16 INFO mapreduce.GoraRecordWriter: gora.buffer.write.limit = 10000  

12/07/21 13:35:17 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting  

12/07/21 13:35:17 INFO mapred.LocalJobRunner: reduce > reduce  

12/07/21 13:35:17 INFO mapred.TaskRunner: Task 'attempt_local_0001_r_000000_0' done.  

12/07/21 13:35:17 INFO mapred.JobClient:  map 100% reduce 100%  

12/07/21 13:35:17 INFO mapred.JobClient: Job complete: job_local_0001  

12/07/21 13:35:17 INFO mapred.JobClient: Counters: 12  

12/07/21 13:35:17 INFO mapred.JobClient:   FileSystemCounters  

12/07/21 13:35:17 INFO mapred.JobClient:     FILE_BYTES_READ=561645  

12/07/21 13:35:17 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=1126606  

12/07/21 13:35:17 INFO mapred.JobClient:   Map-Reduce Framework  

12/07/21 13:35:17 INFO mapred.JobClient:     Reduce input groups=3033  

12/07/21 13:35:17 INFO mapred.JobClient:     Combine output records=0  

12/07/21 13:35:17 INFO mapred.JobClient:     Map input records=10000  

12/07/21 13:35:17 INFO mapred.JobClient:     Reduce shuffle bytes=0  

12/07/21 13:35:17 INFO mapred.JobClient:     Reduce output records=3033  

12/07/21 13:35:17 INFO mapred.JobClient:     Spilled Records=20000  

12/07/21 13:35:17 INFO mapred.JobClient:     Map output bytes=512510  

12/07/21 13:35:17 INFO mapred.JobClient:     Combine input records=0  

12/07/21 13:35:17 INFO mapred.JobClient:     Map output records=10000  

12/07/21 13:35:17 INFO mapred.JobClient:     Reduce input records=10000  

12/07/21 13:35:17 INFO log.LogAnalytics: Log completed with success  

完成以后可以查看一下HBase的Metrics表,使用如下命令

[html] view
plaincopy

hbase(main):007:0> scan 'Metrics' ,{LIMIT=>1}    

ROW                              COLUMN+CELL                                                                                     

 /?a=1__-znawtuabsy&k=96804_1236 column=common:metric, timestamp=1342848916670, value=\x00\x00\x00\x00\x00\x00\x00\x09           

 902400000                                                                                                                       

 /?a=1__-znawtuabsy&k=96804_1236 column=common:metricDimension, timestamp=1342848916670, value=/?a=1__-znawtuabsy&k=96804        

 902400000                                                                                                                       

 /?a=1__-znawtuabsy&k=96804_1236 column=common:ts, timestamp=1342848916670, value=\x00\x00\x01\x1F\xFD \xD0\x00                  

 902400000                                                                                                                       

1 row(s) in 0.0200 seconds  

我们已经可以看到分析后的数据已经存储到Metrics表中了。


3. 程式代码分析

   在定义Hadoop的任务的时候,我们可以选择是否使用Gora来做为其输入与输出, Gora定义了自己的GoraInputFormat 与GoraOutputFormat,而且Gora还定义自己的GoraMapper与GoraReducer类,提供一些方法的初始化静态方法 。在下面的代码中,就使用了Gora(Mapper|reducer)的一些静态方法来进行初始始化。下面是Job初始化代码:

[java] view
plaincopy

 public Job createJob(DataStore<Long, Pageview> inStore  

  , DataStore<String, MetricDatum> outStore, int numReducer) throws IOException {  

Job job = new Job(getConf());  

  

  

job.setJobName("Log Analytics");  

job.setNumReduceTasks(numReducer);  

job.setJarByClass(getClass());  

  

  

/* Mappers are initialized with GoraMapper.initMapper() or  

 * GoraInputFormat.setInput()*/  

GoraMapper.initMapperJob(job, inStore, TextLong.class, LongWritable.class  

    , LogAnalyticsMapper.class, true);  

  

  

/* Reducers are initialized with GoraReducer#initReducer(). 

 * If the output is not to be persisted via Gora, any reducer  

 * can be used instead. */  

GoraReducer.initReducerJob(job, outStore, LogAnalyticsReducer.class);  

  

return job;  

  

      

  下面是它的run函数的代码:

[java] view
plaincopy

  @Override  

 public int run(String[] args) throws Exception {  

     

   DataStore<Long, Pageview> inStore;  

   DataStore<String, MetricDatum> outStore;  

   Configuration conf = new Configuration();      

  

  

   if(args.length > 0) {  

     String dataStoreClass = args[0];  

     inStore = DataStoreFactory.  

         getDataStore(dataStoreClass, Long.class, Pageview.class, conf);  

     if(args.length > 1) {  

       dataStoreClass = args[1];  

     }  

     outStore = DataStoreFactory.  

         getDataStore(dataStoreClass,   

         String.class, MetricDatum.class, conf);  

   } else {  

inStore = DataStoreFactory.getDataStore(Long.class, Pageview.class, conf);  

outStore = DataStoreFactory.getDataStore(String.class, MetricDatum.class, conf);  

   }  

     

   Job job = createJob(inStore, outStore, 3);  

   boolean success = job.waitForCompletion(true);  

     

   inStore.close();  

   outStore.close();  

     

   log.info("Log completed with " + (success ? "success" : "failure"));  

     

   return success ? 0 : 1;  

 }  

  我们可以看到,生成两个输入与输出的dataStore。

  下来看一下自定义的LogAnalyticsMapper代码,它扩展自GoraMapper,它的输入为<Long,Pageview>,部分代码如下:

[java] view
plaincopy

 private TextLong tuple;  

  

  

protected void map(Long key, Pageview pageview, Context context)   

  throws IOException ,InterruptedException {  

    

  Utf8 url = pageview.getUrl();  

  long day = getDay(pageview.getTimestamp());  

    

  tuple.getKey().set(url.toString());  

  tuple.getValue().set(day);  

    

  context.write(tuple, one);  

};  

而LogAnalyticsReducer也扩展自GoraReducer,部分代码如下:

[java] view
plaincopy

 protected void reduce(TextLong tuple  

    , Iterable<LongWritable> values, Context context)   

  throws IOException ,InterruptedException {  

    

  long sum = 0L; //sum up the values  

  for(LongWritable value: values) {  

    sum+= value.get();  

  }  

    

  String dimension = tuple.getKey().toString();  

  long timestamp = tuple.getValue().get();  

    

  metricDatum.setMetricDimension(new Utf8(dimension));  

  metricDatum.setTimestamp(timestamp);  

    

  String key = metricDatum.getMetricDimension().toString();  

  metricDatum.setMetric(sum);  

    

  context.write(key, metricDatum);  

};  

我们可以看到,它输出的Value类型是metricDatum,这个就是我们之前在json中定义的数据模型,它会被序列化到数据库中去。


4. 参考

http://gora.apache.org/docs/current/tutorial.html

转载   :http://blog.csdn.net/amuseme_lu/article/details/7770424
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: