自定义计数器
2015-04-16 14:10
253 查看
代码:
console输出结果:
可以看到输出结果:
关键代码:
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; public class CounterTest { public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> { final Text k2 = new Text(); final LongWritable v2 = new LongWritable(); protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws InterruptedException, IOException { Counter counterForhello=context.getCounter("xiaobaozi", "startText"); Counter counterForyou=context.getCounter("xiaobaozi", "endText"); final String line = value.toString(); if(line!=null){ if(line.contains("hello")){ counterForhello.increment(1); } if(line.contains("you")){ counterForyou.increment(1); } } final String[] splited = line.split("\\s"); for (String word : splited) { k2.set(word); v2.set(1); context.write(k2, v2); } } } public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> { LongWritable v3 = new LongWritable(); protected void reduce(Text k2, Iterable<LongWritable> v2s, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException { long count = 0L; for (LongWritable v2 : v2s) { count += v2.get(); } v3.set(count); context.write(k2, v3); } } public static void main(String[] args) throws Exception { final Configuration conf = new Configuration(); final Job job = Job.getInstance(conf, CounterTest.class.getSimpleName()); // 1.1 FileInputFormat.setInputPaths(job,"hdfs://192.168.1.100:9000/input/hehe"); NLineInputFormat.setNumLinesPerSplit(job, Integer.parseInt("2")); //NLineInputFormat.setNumLinesPerSplit(job, Integer.parseInt(args[0])); job.setInputFormatClass(NLineInputFormat.class); // 1.2 job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); // 1.3 job.setPartitionerClass(HashPartitioner.class); job.setNumReduceTasks(1); // 1.4 // 1.5 // 2.2 job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); // 2.3 FileOutputFormat.setOutputPath(job, new Path( "hdfs://192.168.1.100:9000/out1")); job.setOutputFormatClass(TextOutputFormat.class); // job.setJarByClass(CounterTest.class); job.waitForCompletion(true); } }
console输出结果:
15/04/16 14:06:19 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/04/16 14:06:19 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 15/04/16 14:06:19 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String). 15/04/16 14:06:20 INFO input.FileInputFormat: Total input paths to process : 1 15/04/16 14:06:20 INFO mapred.JobClient: Running job: job_local535401905_0001 15/04/16 14:06:20 INFO mapred.LocalJobRunner: Waiting for map tasks 15/04/16 14:06:20 INFO mapred.LocalJobRunner: Starting task: attempt_local535401905_0001_m_000000_0 15/04/16 14:06:21 INFO mapred.Task: Using ResourceCalculatorPlugin : null 15/04/16 14:06:21 INFO mapred.MapTask: Processing split: hdfs://192.168.1.100:9000/input/hehe:18+19 15/04/16 14:06:21 INFO mapred.MapTask: io.sort.mb = 100 15/04/16 14:06:21 INFO mapred.MapTask: data buffer = 79691776/99614720 15/04/16 14:06:21 INFO mapred.MapTask: record buffer = 262144/327680 15/04/16 14:06:21 WARN snappy.LoadSnappy: Snappy native library not loaded 15/04/16 14:06:21 INFO mapred.MapTask: Starting flush of map output 15/04/16 14:06:21 INFO mapred.MapTask: Finished spill 0 15/04/16 14:06:21 INFO mapred.Task: Task:attempt_local535401905_0001_m_000000_0 is done. And is in the process of commiting 15/04/16 14:06:21 INFO mapred.LocalJobRunner: 15/04/16 14:06:21 INFO mapred.Task: Task 'attempt_local535401905_0001_m_000000_0' done. 15/04/16 14:06:21 INFO mapred.LocalJobRunner: Finishing task: attempt_local535401905_0001_m_000000_0 15/04/16 14:06:21 INFO mapred.LocalJobRunner: Starting task: attempt_local535401905_0001_m_000001_0 15/04/16 14:06:21 INFO mapred.Task: Using ResourceCalculatorPlugin : null 15/04/16 14:06:21 INFO mapred.MapTask: Processing split: hdfs://192.168.1.100:9000/input/hehe:0+18 15/04/16 14:06:21 INFO mapred.MapTask: io.sort.mb = 100 15/04/16 14:06:21 INFO mapred.MapTask: data buffer = 79691776/99614720 15/04/16 14:06:21 INFO mapred.MapTask: record buffer = 262144/327680 15/04/16 14:06:21 INFO mapred.MapTask: Starting flush of map output 15/04/16 14:06:21 INFO mapred.MapTask: Finished spill 0 15/04/16 14:06:21 INFO mapred.Task: Task:attempt_local535401905_0001_m_000001_0 is done. And is in the process of commiting 15/04/16 14:06:21 INFO mapred.LocalJobRunner: 15/04/16 14:06:21 INFO mapred.Task: Task 'attempt_local535401905_0001_m_000001_0' done. 15/04/16 14:06:21 INFO mapred.LocalJobRunner: Finishing task: attempt_local535401905_0001_m_000001_0 15/04/16 14:06:21 INFO mapred.LocalJobRunner: Map task executor complete. 15/04/16 14:06:21 INFO mapred.Task: Using ResourceCalculatorPlugin : null 15/04/16 14:06:21 INFO mapred.LocalJobRunner: 15/04/16 14:06:21 INFO mapred.Merger: Merging 2 sorted segments 15/04/16 14:06:21 INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 122 bytes 15/04/16 14:06:21 INFO mapred.LocalJobRunner: 15/04/16 14:06:22 INFO mapred.JobClient: map 100% reduce 0% 15/04/16 14:06:23 INFO mapred.Task: Task:attempt_local535401905_0001_r_000000_0 is done. And is in the process of commiting 15/04/16 14:06:23 INFO mapred.LocalJobRunner: 15/04/16 14:06:23 INFO mapred.Task: Task attempt_local535401905_0001_r_000000_0 is allowed to commit now 15/04/16 14:06:23 INFO output.FileOutputCommitter: Saved output of task 'attempt_local535401905_0001_r_000000_0' to hdfs://192.168.1.100:9000/out1 15/04/16 14:06:23 INFO mapred.LocalJobRunner: reduce > reduce 15/04/16 14:06:23 INFO mapred.Task: Task 'attempt_local535401905_0001_r_000000_0' done. 15/04/16 14:06:24 INFO mapred.JobClient: map 100% reduce 100% 15/04/16 14:06:24 INFO mapred.JobClient: Job complete: job_local535401905_0001 15/04/16 14:06:24 INFO mapred.JobClient: Counters: 21 15/04/16 14:06:24 INFO mapred.JobClient: File Output Format Counters 15/04/16 14:06:24 INFO mapred.JobClient: Bytes Written=19 15/04/16 14:06:24 INFO mapred.JobClient: xiaobaozi 15/04/16 14:06:24 INFO mapred.JobClient: endText=2 15/04/16 14:06:24 INFO mapred.JobClient: startText=4 15/04/16 14:06:24 INFO mapred.JobClient: FileSystemCounters 15/04/16 14:06:24 INFO mapred.JobClient: FILE_BYTES_READ=1319 15/04/16 14:06:24 INFO mapred.JobClient: HDFS_BYTES_READ=250 15/04/16 14:06:24 INFO mapred.JobClient: FILE_BYTES_WRITTEN=209690 15/04/16 14:06:24 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=19 15/04/16 14:06:24 INFO mapred.JobClient: File Input Format Counters 15/04/16 14:06:24 INFO mapred.JobClient: Bytes Read=58 15/04/16 14:06:24 INFO mapred.JobClient: Map-Reduce Framework 15/04/16 14:06:24 INFO mapred.JobClient: Map output materialized bytes=130 15/04/16 14:06:24 INFO mapred.JobClient: Map input records=4 15/04/16 14:06:24 INFO mapred.JobClient: Reduce shuffle bytes=0 15/04/16 14:06:24 INFO mapred.JobClient: Spilled Records=16 15/04/16 14:06:24 INFO mapred.JobClient: Map output bytes=102 15/04/16 14:06:24 INFO mapred.JobClient: Total committed heap usage (bytes)=682819584 15/04/16 14:06:24 INFO mapred.JobClient: SPLIT_RAW_BYTES=202 15/04/16 14:06:24 INFO mapred.JobClient: Combine input records=0 15/04/16 14:06:24 INFO mapred.JobClient: Reduce input records=8 15/04/16 14:06:24 INFO mapred.JobClient: Reduce input groups=3 15/04/16 14:06:24 INFO mapred.JobClient: Combine output records=0 15/04/16 14:06:24 INFO mapred.JobClient: Reduce output records=3 15/04/16 14:06:24 INFO mapred.JobClient: Map output records=8
可以看到输出结果:
15/04/16 14:06:24 INFO mapred.JobClient: Counters: 21 15/04/16 14:06:24 INFO mapred.JobClient: File Output Format Counters 15/04/16 14:06:24 INFO mapred.JobClient: Bytes Written=19 15/04/16 14:06:24 INFO mapred.JobClient: xiaobaozi 15/04/16 14:06:24 INFO mapred.JobClient: endText=2 15/04/16 14:06:24 INFO mapred.JobClient: startText=4
关键代码:
Counter counterForhello=context.getCounter("xiaobaozi", "startText"); Counter counterForyou=context.getCounter("xiaobaozi", "endText"); final String line = value.toString(); if(line!=null){ if(line.contains("hello")){ counterForhello.increment(1); } if(line.contains("you")){ counterForyou.increment(1); } }
相关文章推荐
- 利用cacti和性能计数器实现自定义远程监控
- Hadoop初学指南(7)--MapReduce自定义计数器
- Zabbix 自定义监控Windows性能计数器
- PyCon 2011 - Hidden Treasures of the Python Standard Library - 自定义交互模式表达式计数器
- ionic3.x angular4.x ng4.x 自定义组件component双向绑定之自定义计数器
- Hadoop学习笔记—7.计数器与自定义计数器
- Hadoop中自定义计数器
- 【Loadrunner】LR中监控ORACLE数据库常用计数器(如何自定义Oracle计数器)
- LABEL MATRIX里用计数器自动生成流水号和自定义条码格式
- 在Asp.Net中使用自定义性能计数器
- 【hadoop】用户自定义计数器
- 一个自定义位数的php多用户计数器代码
- swift 自定义购物车计数器
- 一个自定义位数的php多用户计数器代码
- LR中监控ORACLE数据库常用计数器(如何自定义Oracle计数器)
- Hadoop学习笔记—7.计数器与自定义计数器
- Hadoop中自定义计数器
- LR中监控ORACLE数据库常用计数器(如何自定义Oracle计数器)
- Hadoop中自定义计数器
- Hadoop自定义计数器的使用