MapReduce计数器详解
2016-03-22 15:44
471 查看
计数器:计数器是用来记录job的执行进度和状态的。它的作用可以理解为日志。我们通常可以在程序的某个位置插入计数器,用来记录数据或者进度的变化情况,它比日志更便利进行分析。
1. 内置计数器
Hadoop其实内置了很多计数器,那么这些计数器在哪看呢?
我们先来看下最简单的wordcount程序。
HDFS上的源文件:
WordCount.java:
打成jar包后执行: hadoop jar wc.jar com.oner.mr.mrcounter.WordCount
发现有如下信息(注释部分是自己加的):
上面的信息就是内置计数器的一些信息,包括:
文件系统计数器(File System Counters)
作业计数器(Job Counters)
MapReduce框架计数器(Map-Reduce Framework)
Shuffle 错误计数器(Shuffle Errors)
文件输入格式计数器(File Output Format Counters)
文件输出格式计数器(File Input Format Counters)
2. 自定义计数器
Hadoop也支持自定义计数器,在Hadoop2.x中可以使用Context的getCounter()方法(其实是接口TaskAttemptContext的方法,Context继承了该接口)得到自定义计数器。
public Counter getCounter(Enum<?> counterName):Get the Counter for the given counterName
public Counter getCounter(String groupName, String counterName):Get the Counter for the given groupName and counterName
由此可见,可以通过枚举或者字符串来得到计数器。
计数器常见的方法有几下几个:
String getName():Get the name of the counter
String getDisplayName():Get the display name of the counter
long getValue():Get the current value
void setValue(long value):Set this counter by the given value
void increment(long incr):Increment this counter by the given value
假设现在要在控制台输出源文件中的一些敏感词的个数,这里设定“mapreduce”为敏感词,该如何做呢?
package com.oner.mr.mrcounter;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class WordCount {
public static void main(String[] args) throws IOException,
InterruptedException, URISyntaxException, ClassNotFoundException {
Path inPath = new Path("/MR_Counter/");// 输入目录
Path outPath = new Path("/MR_Counter/out");// 输出目录
Configuration conf = new Configuration();
// conf.set("fsdefaultFS", "hdfs://master:9000");
FileSystem fs = FileSystem.get(new URI("hdfs://master:9000"), conf,
"hadoop");
if (fs.exists(outPath)) {// 如果输出目录已存在,则删除
fs.delete(outPath, true);
}
Job job = Job.getInstance(conf);
job.setJarByClass(WordCount.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
job.waitForCompletion(true);
}
public static class MyMapper extends
Mapper<LongWritable, Text, Text, LongWritable> {
private static Text k = new Text();
private static LongWritable v = new LongWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
<span style="color:#FF0000;">Counter sensitiveCounter = context.getCounter("Sensitive Words:",
"mapreduce");</span>// 创建一个组是Sensitive Words,名是mapreduce的计数器
String line = value.toString();
String[] words = line.split(" ");
for (String word : words) {
<span style="color:#FF0000;">if (word.equalsIgnoreCase("mapreduce")) {//如果出现了mapreduce,则计数器值加1
sensitiveCounter.increment(1L);
}</span>
k.set(word);
context.write(k, v);
}
}
}
public static class MyReducer extends
Reducer<Text, LongWritable, Text, LongWritable> {
private static LongWritable v = new LongWritable();
@Override
protected void reduce(Text key, Iterable<LongWritable> values,
Reducer<Text, LongWritable, Text, LongWritable>.Context context)
throws IOException, InterruptedException {
int sum = 0;
for (LongWritable value : values) {
sum += value.get();
}
v.set(sum);
context.write(key, v);
}
}
}
打成jar包后重新执行,发现控制台中确实多了一组计数器Sensitive Words:,其中有一个名叫mapreduce的计数器,值为2。
1. 内置计数器
Hadoop其实内置了很多计数器,那么这些计数器在哪看呢?
我们先来看下最简单的wordcount程序。
HDFS上的源文件:
[hadoop@master logfile]$ hadoop fs -cat /MR_Counter/diary Today is 2016-3-22 I study mapreduce counter I realized that mapreduce counter is simple
WordCount.java:
package com.oner.mr.mrcounter; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class WordCount { public static void main(String[] args) throws IOException, InterruptedException, URISyntaxException, ClassNotFoundException { Path inPath = new Path("/MR_Counter/");// 输入目录 Path outPath = new Path("/MR_Counter/out");// 输出目录 Configuration conf = new Configuration(); // conf.set("fsdefaultFS", "hdfs://master:9000"); FileSystem fs = FileSystem.get(new URI("hdfs://master:9000"), conf, "hadoop"); if (fs.exists(outPath)) {// 如果输出目录已存在,则删除 fs.delete(outPath, true); } Job job = Job.getInstance(conf); job.setJarByClass(WordCount.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.setInputPaths(job, inPath); FileOutputFormat.setOutputPath(job, outPath); job.waitForCompletion(true); } public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> { private static Text k = new Text(); private static LongWritable v = new LongWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] words = line.split(" "); for (String word : words) { k.set(word); context.write(k, v); } } } public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> { private static LongWritable v = new LongWritable(); @Override protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException { int sum = 0; for (LongWritable value : values) { sum += value.get(); } v.set(sum); context.write(key, v); } } }
打成jar包后执行: hadoop jar wc.jar com.oner.mr.mrcounter.WordCount
发现有如下信息(注释部分是自己加的):
16/03/22 14:25:30 INFO mapreduce.Job: Counters: 49 // 表示本次job共49个计数器 File System Counters // 文件系统计数器 FILE: Number of bytes read=235 FILE: Number of bytes written=230421 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=189 HDFS: Number of bytes written=86 HDFS: Number of read operations=6 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters // 作业计数器 Launched map tasks=1 // 启动的map数为1 Launched reduce tasks=1 // 启动的reduce数为1 Data-local map tasks=1 Total time spent by all maps in occupied slots (ms)=12118 Total time spent by all reduces in occupied slots (ms)=11691 Total time spent by all map tasks (ms)=12118 Total time spent by all reduce tasks (ms)=11691 Total vcore-seconds taken by all map tasks=12118 Total vcore-seconds taken by all reduce tasks=11691 Total megabyte-seconds taken by all map tasks=12408832 Total megabyte-seconds taken by all reduce tasks=11971584 Map-Reduce Framework //MapReduce框架计数器 Map input records=3 Map output records=14 Map output bytes=201 Map output materialized bytes=235 Input split bytes=100 Combine input records=0 Combine output records=0 Reduce input groups=10 Reduce shuffle bytes=235 Reduce input records=14 Reduce output records=10 Spilled Records=28 Shuffled Maps =1 Failed Shuffles=0 Merged Map outputs=1 GC time elapsed (ms)=331 CPU time spent (ms)=2820 Physical memory (bytes) snapshot=306024448 Virtual memory (bytes) snapshot=1690583040 Total committed heap usage (bytes)=136122368 Shuffle Errors // Shuffle错误计数器 BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters // 文件输入格式计数器 Bytes Read=89 // Map从HDFS上读取的字节数,共89个字节 File Output Format Counters // 文件输出格式计数器 Bytes Written=86 //Reduce输出到HDFS上的字节数,共86个字节
上面的信息就是内置计数器的一些信息,包括:
文件系统计数器(File System Counters)
作业计数器(Job Counters)
MapReduce框架计数器(Map-Reduce Framework)
Shuffle 错误计数器(Shuffle Errors)
文件输入格式计数器(File Output Format Counters)
文件输出格式计数器(File Input Format Counters)
2. 自定义计数器
Hadoop也支持自定义计数器,在Hadoop2.x中可以使用Context的getCounter()方法(其实是接口TaskAttemptContext的方法,Context继承了该接口)得到自定义计数器。
public Counter getCounter(Enum<?> counterName):Get the Counter for the given counterName
public Counter getCounter(String groupName, String counterName):Get the Counter for the given groupName and counterName
由此可见,可以通过枚举或者字符串来得到计数器。
计数器常见的方法有几下几个:
String getName():Get the name of the counter
String getDisplayName():Get the display name of the counter
long getValue():Get the current value
void setValue(long value):Set this counter by the given value
void increment(long incr):Increment this counter by the given value
假设现在要在控制台输出源文件中的一些敏感词的个数,这里设定“mapreduce”为敏感词,该如何做呢?
package com.oner.mr.mrcounter;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class WordCount {
public static void main(String[] args) throws IOException,
InterruptedException, URISyntaxException, ClassNotFoundException {
Path inPath = new Path("/MR_Counter/");// 输入目录
Path outPath = new Path("/MR_Counter/out");// 输出目录
Configuration conf = new Configuration();
// conf.set("fsdefaultFS", "hdfs://master:9000");
FileSystem fs = FileSystem.get(new URI("hdfs://master:9000"), conf,
"hadoop");
if (fs.exists(outPath)) {// 如果输出目录已存在,则删除
fs.delete(outPath, true);
}
Job job = Job.getInstance(conf);
job.setJarByClass(WordCount.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
job.waitForCompletion(true);
}
public static class MyMapper extends
Mapper<LongWritable, Text, Text, LongWritable> {
private static Text k = new Text();
private static LongWritable v = new LongWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
<span style="color:#FF0000;">Counter sensitiveCounter = context.getCounter("Sensitive Words:",
"mapreduce");</span>// 创建一个组是Sensitive Words,名是mapreduce的计数器
String line = value.toString();
String[] words = line.split(" ");
for (String word : words) {
<span style="color:#FF0000;">if (word.equalsIgnoreCase("mapreduce")) {//如果出现了mapreduce,则计数器值加1
sensitiveCounter.increment(1L);
}</span>
k.set(word);
context.write(k, v);
}
}
}
public static class MyReducer extends
Reducer<Text, LongWritable, Text, LongWritable> {
private static LongWritable v = new LongWritable();
@Override
protected void reduce(Text key, Iterable<LongWritable> values,
Reducer<Text, LongWritable, Text, LongWritable>.Context context)
throws IOException, InterruptedException {
int sum = 0;
for (LongWritable value : values) {
sum += value.get();
}
v.set(sum);
context.write(key, v);
}
}
}
打成jar包后重新执行,发现控制台中确实多了一组计数器Sensitive Words:,其中有一个名叫mapreduce的计数器,值为2。
相关文章推荐
- 详解HDFS Short Circuit Local Reads
- Hadoop_2.1.0 MapReduce序列图
- 使用Hadoop搭建现代电信企业架构
- 单机版搭建Hadoop环境图文教程详解
- hadoop常见错误以及处理方法详解
- hadoop 单机安装配置教程
- hadoop的hdfs文件操作实现上传文件到hdfs
- hadoop实现grep示例分享
- Apache Hadoop版本详解
- linux下搭建hadoop环境步骤分享
- hadoop client与datanode的通信协议分析
- hadoop中一些常用的命令介绍
- Hadoop单机版和全分布式(集群)安装
- 用PHP和Shell写Hadoop的MapReduce程序
- hadoop map-reduce中的文件并发操作
- Hadoop1.2中配置伪分布式的实例
- java结合HADOOP集群文件上传下载
- 让python在hadoop上跑起来
- 用python + hadoop streaming 分布式编程(一) -- 原理介绍,样例程序与本地调试
- Hadoop安装感悟