MapReduce (hive表SequenceFile的结果做输入)、MultipleOutputs和Reduce端迭代iterable的一些说明
2014-03-06 23:39
309 查看
很长时间以来一直写hive,嵌套脚本、偶尔写UDF. 最近用Hive的dynamic partition和多路插入做一些事情,很遗憾的结果是非常不稳定,有时能成功,有时失败。(可能是因为hive版本的问题,查了一些资料也没查的太清楚,因为服务器不能随便动,就想用mapreduce的多路输出吧)。
1.首先这个多路插入也是用的hive的表,表的输出是SequenceFile格式。
按说sequencefile格式输入,取决于内部的Key/value格式。
在驱动类里需要添加
Job job=new Job(getConf(),"dsp_data");
job.setInputFormatClass(SequenceFileInputFormat.class);
SequenceFileInputFormat.addInputPath(job, input1);
SequenceFileInputFormat.addInputPath(job, input2);
Mapper函数的输入:
public class * extends Mapper<BytesWritable , Text, TextPair,TextPair>{}
2.MultipleOutPuts使用:
private static Text value = new Text();
private MultipleOutputs<Text, Text> mos;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
mos = new MultipleOutputs<Text,Text>(context);
}
Iterator<TextPair> iter = values.iterator();
TextPair middle=iter.next();
if (! middle.getSecond().equals("0")) return;
// String[] middle_fields=middle.getFirst().toString().split("\t",-1);
while(iter.hasNext()){
TextPair xx=iter.next();
if (xx.getSecond().toString().equals("0")) continue;
String[] xx_fields=xx.getFirst().toString().split("\t");
if(xx_fields.length<3) continue;
String custom_id=xx_fields[xx_fields.length-1];
value.set(xx_fields[0]+"\t"+xx_fields[1]+"\t"+middle.getFirst().toString());
mos.write(key.getFirst(), value, custom_id+"/");
}
@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
super.cleanup(context);
mos.close();
}
3.上面的语句有点问题。
在于middle的使用,因为reduce中iterable values使用的对象都是反序列化出来的,而指定的具体的类都是由一个初始化的对象,不断更新里面的字段实现的。
上面的例子,就造成了middle指向的对象没变,但是实际对象中的内容已经被更新成了新序列化的结果,得不到middle最初赋值地方的值。
解决办法有两个:将middle中,需要的数据部分事先取出来。 另外一个实现TextPair的clone或者实现一个get方法,获得一个新对象来解决。
1.首先这个多路插入也是用的hive的表,表的输出是SequenceFile格式。
按说sequencefile格式输入,取决于内部的Key/value格式。
在驱动类里需要添加
Job job=new Job(getConf(),"dsp_data");
job.setInputFormatClass(SequenceFileInputFormat.class);
SequenceFileInputFormat.addInputPath(job, input1);
SequenceFileInputFormat.addInputPath(job, input2);
Mapper函数的输入:
public class * extends Mapper<BytesWritable , Text, TextPair,TextPair>{}
2.MultipleOutPuts使用:
private static Text value = new Text();
private MultipleOutputs<Text, Text> mos;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
mos = new MultipleOutputs<Text,Text>(context);
}
Iterator<TextPair> iter = values.iterator();
TextPair middle=iter.next();
if (! middle.getSecond().equals("0")) return;
// String[] middle_fields=middle.getFirst().toString().split("\t",-1);
while(iter.hasNext()){
TextPair xx=iter.next();
if (xx.getSecond().toString().equals("0")) continue;
String[] xx_fields=xx.getFirst().toString().split("\t");
if(xx_fields.length<3) continue;
String custom_id=xx_fields[xx_fields.length-1];
value.set(xx_fields[0]+"\t"+xx_fields[1]+"\t"+middle.getFirst().toString());
mos.write(key.getFirst(), value, custom_id+"/");
}
@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
super.cleanup(context);
mos.close();
}
3.上面的语句有点问题。
在于middle的使用,因为reduce中iterable values使用的对象都是反序列化出来的,而指定的具体的类都是由一个初始化的对象,不断更新里面的字段实现的。
上面的例子,就造成了middle指向的对象没变,但是实际对象中的内容已经被更新成了新序列化的结果,得不到middle最初赋值地方的值。
解决办法有两个:将middle中,需要的数据部分事先取出来。 另外一个实现TextPair的clone或者实现一个get方法,获得一个新对象来解决。
相关文章推荐
- mapreduce作业接受序列化文件(SequenceFile)作为文件输入的WordCount程序
- MapReduce 的格式输入----SequenceFileInputFormat ---源码分析
- hive存储格式sequencefile和rcfile的对比
- p3:An open source pcap packet and NetFlow file analysis tool using Hadoop MapReduce and Hive.
- 关于 hadoop reduce 阶段遍历 Iterable 的 2 个“坑”---(为何数据会显示最后一个和二次迭代时,数据消失)
- 新版api mapreduce reduce结果写入mysql
- hive sequencefile导入文件遇到FAILED: SemanticException Unable to load data to destination table. Error: The file that you are trying to load does not match the file format of the destination table.错误
- Hive 中的复合数据结构简介以及一些函数的用法说明
- hive 存储格式和压缩方式 一:Snappy + SequenceFile
- hive存储格式sequencefile和rcfile的对比
- MapReduce和Hive支持递归子目录作为输入
- 输入的InputFormat----SequenceFileInputFormat
- hive中如何确定一个mapreduce作业的reduce数量
- 开发一个坐标计算工具, A表示向左移动,D表示向右移动,W表示向上移动,S表示向下移动。从(0,0)点开始移动,从输入字符串里面读取一些坐标,并将最终输入结果输出到输出文件里面。
- Hive 中的复合数据结构简介以及一些函数的用法说明
- Mapreduce不设置reduce,只执行map的输出结果
- hive使用orcfile parquet sequencefile
- (c++)设圆半径r,圆柱高h 求圆周长C1、圆面积Sa、圆球表面积Sb、圆球体积Va、圆柱体积Vb。 用scanf输入数据,输出计算结果,输出时要求文字说明,取小数点后两位数字。请编程序。 PI=3
- MapReduce设置Map和Reduce函数,但是map输出结果后,reduce没有输出,也没有报错
- HBase、Hive、MapReduce、Hadoop、Spark 开发环境搭建后的一些步骤(export导出jar包方式 或 Ant 方式)