您的位置:首页 > 其它

普通文本压缩成RcFile的通用类

2016-01-17 09:56 246 查看
工作中用到了RcFile来存储和读取RcFile格式的文件,记录下。

RcFile是FaceBook开发的一个集行存储和列存储的优点于一身,压缩比更高,读取列更快,它在MapReduce环境中大规模数据处理中扮演着重要的角色。

读取操作:

Java代码


job信息:

Job job = new Job();

job.setJarByClass(类.class);

//设定输入文件为RcFile格式

job.setInputFormatClass(RCFileInputFormat.class);

//普通输出

job.setOutputFormatClass(TextOutputFormat.class);

//设置输入路径

RCFileInputFormat.addInputPath(job, new Path(srcpath));

//MultipleInputs.addInputPath(job, new Path(srcpath), RCFileInputFormat.class);

// 输出

TextOutputFormat.setOutputPath(job, new Path(respath));

// 输出key格式

job.setOutputKeyClass(Text.class);

//输出value格式

job.setOutputValueClass(NullWritable.class);

//设置mapper类

job.setMapperClass(ReadTestMapper.class);

//这里没设置reduce,reduce的操作就是读Text类型文件,因为mapper已经给转换了。

code = (job.waitForCompletion(true)) ? 0 : 1;

// mapper 类

pulic class ReadTestMapper extends Mapper<LongWritable, BytesRefArrayWritable, Text, NullWritable> {

@Override

protected void map(LongWritable key, BytesRefArrayWritable value, Context context) throws IOException, InterruptedException {

// TODO Auto-generated method stub

Text txt = new Text();

//因为RcFile行存储和列存储,所以每次进来的一行数据,Value是个列簇,遍历,输出。

StringBuffer sb = new StringBuffer();

for (int i = 0; i < value.size(); i++) {

BytesRefWritable v = value.get(i);

txt.set(v.getData(), v.getStart(), v.getLength());

if(i==value.size()-1){

sb.append(txt.toString());

}else{

sb.append(txt.toString()+"\t");

}

}

context.write(new Text(sb.toString()),NullWritable.get());

}

}

输出压缩为RcFile格式:

Java代码


job信息:

Job job = new Job();

Configuration conf = job.getConfiguration();

//设置每行的列簇数

RCFileOutputFormat.setColumnNumber(conf, 4);

job.setJarByClass(类.class);

FileInputFormat.setInputPaths(job, new Path(srcpath));

RCFileOutputFormat.setOutputPath(job, new Path(respath));

job.setInputFormatClass(TextInputFormat.class);

job.setOutputFormatClass(RCFileOutputFormat.class);

job.setMapOutputKeyClass(LongWritable.class);

job.setMapOutputValueClass(BytesRefArrayWritable.class);

job.setMapperClass(OutPutTestMapper.class);

conf.set("date", line.getOptionValue(DATE));

//设置压缩参数

conf.setBoolean("mapred.output.compress", true);

conf.set("mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec");

code = (job.waitForCompletion(true)) ? 0 : 1;

mapper类:

public class OutPutTestMapper extends Mapper<LongWritable, Text, LongWritable, BytesRefArrayWritable> {

@Override

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String line = value.toString();

String day = context.getConfiguration().get("date");

if (!line.equals("")) {

String[] lines = line.split(" ", -1);

if (lines.length > 3) {

String time_temp = lines[1];

String times = timeStampDate(time_temp);

String d = times.substring(0, 10);

if (day.equals(d)) {

byte[][] record = {lines[0].getBytes("UTF-8"), lines[1].getBytes("UTF-8"),lines[2].getBytes("UTF-8"), lines[3].getBytes("UTF-8")};

BytesRefArrayWritable bytes = new BytesRefArrayWritable(record.length);

for (int i = 0; i < record.length; i++) {

BytesRefWritable cu = new BytesRefWritable(record[i], 0, record[i].length);

bytes.set(i, cu);

}

context.write(key, bytes);

}

}

}

}

转载,请超链接形式标明文章原始出处和作者。

永久链接: http://smallboby.iteye.com/blog/1592531。
感谢。不当之处,请指教。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: