hadoop reduce 阶段遍历 Iterable 的 2 个“坑”
2017-08-15 20:44
197 查看
之前有童鞋问到了这样一个问题:为什么我在 reduce 阶段遍历了一次 Iterable 之后,再次遍历的时候,数据都没了呢?可能有童鞋想当然的回答:Iterable
只能单向遍历一次,就这样简单的原因。。。事实果真如此吗?
还是用代码说话:
[java] view
plain copy
package com.test;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class T {
public static void main(String[] args) {
// 只要实现了Iterable接口的对象都可以使用for-each循环。
// Iterable接口只由iterator方法构成,
// iterator()方法是java.lang.Iterable接口,被Collection继承。
/*public interface Iterable<T> {
Iterator<T> iterator();
}*/
Iterable<String> iter = new Iterable<String>() {
public Iterator<String> iterator() {
List<String> l = new ArrayList<String>();
l.add("aa");
l.add("bb");
l.add("cc");
return l.iterator();
}
};
for(int count : new int[] {1, 2}){
for (String item : iter) {
System.out.println(item);
}
System.out.println("---------->> " + count + " END.");
}
}
}
结果当然是很正常的完整无误的打印了两遍 Iterable
的值。那究竟是什么原因导致了 reduce 阶段的 Iterable
只能被遍历一次呢?
我们先看一段测试代码:
测试数据:
[java] view
plain copy
a 3
a 4
b 50
b 60
a 70
b 8
a 9
[java] view
plain copy
<pre name="code" class="java">import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class TestIterable {
public static class M1 extends Mapper<Object, Text, Text, Text> {
private Text oKey = new Text();
private Text oVal = new Text();
String[] lineArr;
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
lineArr = value.toString().split(" ");
oKey.set(lineArr[0]);
oVal.set(lineArr[1]);
context.write(oKey, oVal);
}
}
public static class R1 extends Reducer<Text, Text, Text, Text> {
List<String> valList = new ArrayList<String>();
List<Text> textList = new ArrayList<Text>();
String strAdd;
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException,
InterruptedException {
valList.clear();
textList.clear();
strAdd = "";
for (Text val : values) {
valList.add(val.toString());
textList.add(val);
}
// 坑之 1 :为神马输出的全是最后一个值?why?
for(Text text : textList){
strAdd += text.toString() + ", ";
}
System.out.println(key.toString() + "\t" + strAdd);
System.out.println(".......................");
// 我这样干呢?对了吗?
strAdd = "";
for(String val : valList){
strAdd += val + ", ";
}
System.out.println(key.toString() + "\t" + strAdd);
System.out.println("----------------------");
// 坑之 2 :第二次遍历的时候为什么得到的都是空?why?
valList.clear();
strAdd = "";
for (Text val : values) {
valList.add(val.toString());
}
for(String val : valList){
strAdd += val + ", ";
}
System.out.println(key.toString() + "\t" + strAdd);
System.out.println(">>>>>>>>>>>>>>>>>>>>>>");
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("mapred.job.queue.name", "regular");
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
System.out.println("------------------------");
Job job = new Job(conf, "TestIterable");
job.setJarByClass(TestIterable.class);
job.setMapperClass(M1.class);
job.setReducerClass(R1.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 输入输出路径
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileSystem.get(conf).delete(new Path(otherArgs[1]), true);
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
在
Eclipse 控制台中的结果如下:
[java] view
plain copy
a 9, 9, 9, 9,
.......................
a 3, 4, 70, 9,
----------------------
a
>>>>>>>>>>>>>>>>>>>>>>
b 8, 8, 8,
.......................
b 50, 60, 8,
----------------------
b
>>>>>>>>>>>>>>>>>>>>>>
关于第
1 个坑:对象重用( objects
reuse )
reduce方法的javadoc中已经说明了会出现的问题:
The framework calls this method for each <key, (list of values)> pair in the grouped inputs. Output values must be of the same type as input values. Input keys must not be altered. The framework will reuse
the key and value objects that are passed into the reduce, therefore the application should clone the objects they want to keep a copy of.
也就是说虽然reduce方法会反复执行多次,但key和value相关的对象只有两个,reduce会反复重用这两个对象。所以如果要保存key或者value的结果,只能将其中的值取出另存或者重新clone一个对象(例如Text
store = new Text(value) 或者 String a = value.toString()),而不能直接赋引用。因为引用从始至终都是指向同一个对象,你如果直接保存它们,那最后它们都指向最后一个输入记录。会影响最终计算结果而出错。
看到这里,我想你会恍然大悟:这不是刚毕业找工作,面试官常问的问题:String 是不可变对象但为什么能相加呢?为什么字符串相加不提倡用 String,而用 StringBuilder ?如果你还不清楚这个问题怎么回答,建议你看看这篇《深入理解
String, StringBuffer 与 StringBuilder 的区别》http://my.oschina.net/leejun2005/blog/102377
关于第 2 个坑:http://stackoverflow.com/questions/6111248/iterate-twice-on-values
The Iterator you receive from that Iterable's iterator() method is special. The values may not all be in memory; Hadoop may
be streaming them from disk. They aren't really backed by a Collection, so it's nontrivial to allow multiple iterations.
只能单向遍历一次,就这样简单的原因。。。事实果真如此吗?
还是用代码说话:
[java] view
plain copy
package com.test;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class T {
public static void main(String[] args) {
// 只要实现了Iterable接口的对象都可以使用for-each循环。
// Iterable接口只由iterator方法构成,
// iterator()方法是java.lang.Iterable接口,被Collection继承。
/*public interface Iterable<T> {
Iterator<T> iterator();
}*/
Iterable<String> iter = new Iterable<String>() {
public Iterator<String> iterator() {
List<String> l = new ArrayList<String>();
l.add("aa");
l.add("bb");
l.add("cc");
return l.iterator();
}
};
for(int count : new int[] {1, 2}){
for (String item : iter) {
System.out.println(item);
}
System.out.println("---------->> " + count + " END.");
}
}
}
结果当然是很正常的完整无误的打印了两遍 Iterable
的值。那究竟是什么原因导致了 reduce 阶段的 Iterable
只能被遍历一次呢?
我们先看一段测试代码:
测试数据:
[java] view
plain copy
a 3
a 4
b 50
b 60
a 70
b 8
a 9
[java] view
plain copy
<pre name="code" class="java">import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class TestIterable {
public static class M1 extends Mapper<Object, Text, Text, Text> {
private Text oKey = new Text();
private Text oVal = new Text();
String[] lineArr;
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
lineArr = value.toString().split(" ");
oKey.set(lineArr[0]);
oVal.set(lineArr[1]);
context.write(oKey, oVal);
}
}
public static class R1 extends Reducer<Text, Text, Text, Text> {
List<String> valList = new ArrayList<String>();
List<Text> textList = new ArrayList<Text>();
String strAdd;
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException,
InterruptedException {
valList.clear();
textList.clear();
strAdd = "";
for (Text val : values) {
valList.add(val.toString());
textList.add(val);
}
// 坑之 1 :为神马输出的全是最后一个值?why?
for(Text text : textList){
strAdd += text.toString() + ", ";
}
System.out.println(key.toString() + "\t" + strAdd);
System.out.println(".......................");
// 我这样干呢?对了吗?
strAdd = "";
for(String val : valList){
strAdd += val + ", ";
}
System.out.println(key.toString() + "\t" + strAdd);
System.out.println("----------------------");
// 坑之 2 :第二次遍历的时候为什么得到的都是空?why?
valList.clear();
strAdd = "";
for (Text val : values) {
valList.add(val.toString());
}
for(String val : valList){
strAdd += val + ", ";
}
System.out.println(key.toString() + "\t" + strAdd);
System.out.println(">>>>>>>>>>>>>>>>>>>>>>");
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("mapred.job.queue.name", "regular");
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
System.out.println("------------------------");
Job job = new Job(conf, "TestIterable");
job.setJarByClass(TestIterable.class);
job.setMapperClass(M1.class);
job.setReducerClass(R1.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 输入输出路径
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileSystem.get(conf).delete(new Path(otherArgs[1]), true);
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
在
Eclipse 控制台中的结果如下:
[java] view
plain copy
a 9, 9, 9, 9,
.......................
a 3, 4, 70, 9,
----------------------
a
>>>>>>>>>>>>>>>>>>>>>>
b 8, 8, 8,
.......................
b 50, 60, 8,
----------------------
b
>>>>>>>>>>>>>>>>>>>>>>
关于第
1 个坑:对象重用( objects
reuse )
reduce方法的javadoc中已经说明了会出现的问题:
The framework calls this method for each <key, (list of values)> pair in the grouped inputs. Output values must be of the same type as input values. Input keys must not be altered. The framework will reuse
the key and value objects that are passed into the reduce, therefore the application should clone the objects they want to keep a copy of.
也就是说虽然reduce方法会反复执行多次,但key和value相关的对象只有两个,reduce会反复重用这两个对象。所以如果要保存key或者value的结果,只能将其中的值取出另存或者重新clone一个对象(例如Text
store = new Text(value) 或者 String a = value.toString()),而不能直接赋引用。因为引用从始至终都是指向同一个对象,你如果直接保存它们,那最后它们都指向最后一个输入记录。会影响最终计算结果而出错。
看到这里,我想你会恍然大悟:这不是刚毕业找工作,面试官常问的问题:String 是不可变对象但为什么能相加呢?为什么字符串相加不提倡用 String,而用 StringBuilder ?如果你还不清楚这个问题怎么回答,建议你看看这篇《深入理解
String, StringBuffer 与 StringBuilder 的区别》http://my.oschina.net/leejun2005/blog/102377
关于第 2 个坑:http://stackoverflow.com/questions/6111248/iterate-twice-on-values
The Iterator you receive from that Iterable's iterator() method is special. The values may not all be in memory; Hadoop may
be streaming them from disk. They aren't really backed by a Collection, so it's nontrivial to allow multiple iterations.
相关文章推荐
- 关于 hadoop reduce 阶段遍历 Iterable 的 2 个“坑”
- 关于 hadoop reduce 阶段遍历 Iterable 的 2 个“坑”
- hadoop reduce 阶段遍历 Iterable 的 2 个“坑”
- 关于hadoop reduce阶段遍历Iterable的注意事项
- 关于 hadoop reduce 阶段遍历 Iterable 的 2 个“坑”
- hadoop reduce 阶段遍历 Iterable 的 2 个“坑”
- 关于hadoop reduce阶段遍历Iterable的注意事项
- Hadoop reduce阶段遍历Iterable的问题及解决方案
- 关于 hadoop reduce 阶段遍历 Iterable 的 2 个“坑”
- 关于 hadoop reduce 阶段遍历 Iterable 的 2 个“坑”
- 关于 hadoop reduce 阶段遍历 Iterable 的 2 个“坑”---(为何数据会显示最后一个和二次迭代时,数据消失)
- Hadoop备忘:Reduce阶段Iterable<VALUEIN> values中的每个值都共享一个对象
- Hadoop提供的reduce函数中Iterable 接口只能遍历一次的问题
- Hadoop备忘:Reduce阶段Iterable<VALUEIN> values中的每个值都共享一个对象
- Hadoop-Mapreduce map—>reduce阶段图解
- Reduce阶段对value遍历两次的解决方法
- hadoop执行mapreduce任务,能够map,不能reduce,Shuffle阶段报错
- Hadoop MapTask/ReduceTask各阶段耗费时间的测试
- Hadoop MapTask/ReduceTask各阶段耗费时间的测试
- Hadoop提供的reduce函数中Iterable 接口只能遍历一次的问题