您的位置:首页 > 其它

MapReduce读/写RCFile文件

2016-04-03 19:39 781 查看
RCFile是Facebook制定的一种高效存储结构,它结合了行存储和列存储的优点,应用于Hive,Pig等系统中。所以,RCFile是不可以用head或tail命令截取某些行重定向新文件中。由于RCFile广泛应用于Hive等系统中,所以我们就先看看用MapReduce怎么读取RCFile文件。

RCFile文件转换成CSV文件

public class RCFileToCSV {

public static class MapKlass extends Mapper<Object, BytesRefArrayWritable, NullWritable, Text>{

@Override
protected void map(Object key, BytesRefArrayWritable values, Context context) throws IOException, InterruptedException {
Text txt = new Text();
StringBuilder sb = new StringBuilder();
for(int i = 0; i < values.size(); i++){
BytesRefWritable value = values.get(i);
txt.set(value.getData(), value.getStart(), value.getLength());
if(i == values.size() - 1){
sb.append(txt.toString());
}
else {
sb.append(txt.toString() + "\t");
}
}
context.write(NullWritable.get(), new Text(sb.toString()));
}
}

public static void main(String[] args)throws Exception{
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
for(String arg : otherArgs){
System.out.println(arg);
}
if(otherArgs.length != 2){
System.err.println("Usage: jar <input> <output>");
System.exit(1);
}

Job job = new Job(conf, "RCFile to TXT");
job.setJarByClass(RCFileToCSV.class);
job.setMapperClass(MapKlass.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);

//Reduce's number is 0.
job.setNumReduceTasks(0);

job.setInputFormatClass(RCFileMapReduceInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

RCFileMapReduceInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}


CSV文件转换问RCFile文件

public class CSVToRCFile {

public static class ToRCFileMapper extends Mapper<Object, Text, NullWritable, BytesRefArrayWritable>{

private byte[] fieldData;
private int numCols;
private BytesRefArrayWritable bytes;

@Override
protected void setup(Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
numCols = context.getConfiguration().getInt("hive.io.rcfile.column.number.conf", 0);
bytes = new BytesRefArrayWritable(numCols);
}

@Override
protected void map(Object key, Text values,
Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
bytes.clear();
String[] cols = values.toString().split(",");
for(int i = 0; i < numCols; i++){
fieldData = cols[i].getBytes("UTF-8");
BytesRefWritable cu = new BytesRefWritable(fieldData, 0, fieldData.length);
bytes.set(i, cu);
}
context.write(NullWritable.get(), bytes);
}
}

public static void main(String[] args)throws Exception {
// TODO Auto-generated method stub
int rowGroupSize = 16 * 1024 * 1024;
int ioBufferSize = 128 * 1024;
int numCols = 0;
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if(otherArgs.length != 4){
System.err.println("Usage: <txt> <rcfile> <table name> <column number>");
System.exit(1);
}

conf.setInt("hive.io.rcfile.record.buffer.size", rowGroupSize);
conf.setInt("io.file.buffer.size", ioBufferSize);
numCols = Integer.parseInt(otherArgs[3]);

Job job = new Job(conf, "TXT to RCFile");
job.setJarByClass(CSVToRCFile.class);
job.setMapperClass(ToRCFileMapper.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(BytesRefArrayWritable.class);

//Reduce's number is 0.
job.setNumReduceTasks(0);

FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
job.setOutputFormatClass(RCFileMapReduceOutputFormat.class);
RCFileMapReduceOutputFormat.setColumnNumber(job.getConfiguration(), numCols);
RCFileMapReduceOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
RCFileMapReduceOutputFormat.setCompressOutput(job, false);

System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

注意:Hadoop和Hive的版本匹配。

参考文献:

http://www.thebigdata.cn/Hadoop/14976.html

http://www.tuicool.com/articles/YNfQn2
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  RCFile Hive MapReduce