普通文本压缩成RcFile的通用类
2016-01-17 09:56
246 查看
工作中用到了RcFile来存储和读取RcFile格式的文件,记录下。
RcFile是FaceBook开发的一个集行存储和列存储的优点于一身,压缩比更高,读取列更快,它在MapReduce环境中大规模数据处理中扮演着重要的角色。
读取操作:
Java代码
![](https://oscdn.geek-share.com/Uploads/Images/Content/201908/07/75f1111b7755c08f1e8df44ce4fdf519.png)
job信息:
Job job = new Job();
job.setJarByClass(类.class);
//设定输入文件为RcFile格式
job.setInputFormatClass(RCFileInputFormat.class);
//普通输出
job.setOutputFormatClass(TextOutputFormat.class);
//设置输入路径
RCFileInputFormat.addInputPath(job, new Path(srcpath));
//MultipleInputs.addInputPath(job, new Path(srcpath), RCFileInputFormat.class);
// 输出
TextOutputFormat.setOutputPath(job, new Path(respath));
// 输出key格式
job.setOutputKeyClass(Text.class);
//输出value格式
job.setOutputValueClass(NullWritable.class);
//设置mapper类
job.setMapperClass(ReadTestMapper.class);
//这里没设置reduce,reduce的操作就是读Text类型文件,因为mapper已经给转换了。
code = (job.waitForCompletion(true)) ? 0 : 1;
// mapper 类
pulic class ReadTestMapper extends Mapper<LongWritable, BytesRefArrayWritable, Text, NullWritable> {
@Override
protected void map(LongWritable key, BytesRefArrayWritable value, Context context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
Text txt = new Text();
//因为RcFile行存储和列存储,所以每次进来的一行数据,Value是个列簇,遍历,输出。
StringBuffer sb = new StringBuffer();
for (int i = 0; i < value.size(); i++) {
BytesRefWritable v = value.get(i);
txt.set(v.getData(), v.getStart(), v.getLength());
if(i==value.size()-1){
sb.append(txt.toString());
}else{
sb.append(txt.toString()+"\t");
}
}
context.write(new Text(sb.toString()),NullWritable.get());
}
}
输出压缩为RcFile格式:
Java代码
![](https://oscdn.geek-share.com/Uploads/Images/Content/201908/07/75f1111b7755c08f1e8df44ce4fdf519.png)
job信息:
Job job = new Job();
Configuration conf = job.getConfiguration();
//设置每行的列簇数
RCFileOutputFormat.setColumnNumber(conf, 4);
job.setJarByClass(类.class);
FileInputFormat.setInputPaths(job, new Path(srcpath));
RCFileOutputFormat.setOutputPath(job, new Path(respath));
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(RCFileOutputFormat.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(BytesRefArrayWritable.class);
job.setMapperClass(OutPutTestMapper.class);
conf.set("date", line.getOptionValue(DATE));
//设置压缩参数
conf.setBoolean("mapred.output.compress", true);
conf.set("mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec");
code = (job.waitForCompletion(true)) ? 0 : 1;
mapper类:
public class OutPutTestMapper extends Mapper<LongWritable, Text, LongWritable, BytesRefArrayWritable> {
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String day = context.getConfiguration().get("date");
if (!line.equals("")) {
String[] lines = line.split(" ", -1);
if (lines.length > 3) {
String time_temp = lines[1];
String times = timeStampDate(time_temp);
String d = times.substring(0, 10);
if (day.equals(d)) {
byte[][] record = {lines[0].getBytes("UTF-8"), lines[1].getBytes("UTF-8"),lines[2].getBytes("UTF-8"), lines[3].getBytes("UTF-8")};
BytesRefArrayWritable bytes = new BytesRefArrayWritable(record.length);
for (int i = 0; i < record.length; i++) {
BytesRefWritable cu = new BytesRefWritable(record[i], 0, record[i].length);
bytes.set(i, cu);
}
context.write(key, bytes);
}
}
}
}
转载,请超链接形式标明文章原始出处和作者。
永久链接: http://smallboby.iteye.com/blog/1592531。
感谢。不当之处,请指教。
RcFile是FaceBook开发的一个集行存储和列存储的优点于一身,压缩比更高,读取列更快,它在MapReduce环境中大规模数据处理中扮演着重要的角色。
读取操作:
Java代码
![](https://oscdn.geek-share.com/Uploads/Images/Content/201908/07/75f1111b7755c08f1e8df44ce4fdf519.png)
job信息:
Job job = new Job();
job.setJarByClass(类.class);
//设定输入文件为RcFile格式
job.setInputFormatClass(RCFileInputFormat.class);
//普通输出
job.setOutputFormatClass(TextOutputFormat.class);
//设置输入路径
RCFileInputFormat.addInputPath(job, new Path(srcpath));
//MultipleInputs.addInputPath(job, new Path(srcpath), RCFileInputFormat.class);
// 输出
TextOutputFormat.setOutputPath(job, new Path(respath));
// 输出key格式
job.setOutputKeyClass(Text.class);
//输出value格式
job.setOutputValueClass(NullWritable.class);
//设置mapper类
job.setMapperClass(ReadTestMapper.class);
//这里没设置reduce,reduce的操作就是读Text类型文件,因为mapper已经给转换了。
code = (job.waitForCompletion(true)) ? 0 : 1;
// mapper 类
pulic class ReadTestMapper extends Mapper<LongWritable, BytesRefArrayWritable, Text, NullWritable> {
@Override
protected void map(LongWritable key, BytesRefArrayWritable value, Context context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
Text txt = new Text();
//因为RcFile行存储和列存储,所以每次进来的一行数据,Value是个列簇,遍历,输出。
StringBuffer sb = new StringBuffer();
for (int i = 0; i < value.size(); i++) {
BytesRefWritable v = value.get(i);
txt.set(v.getData(), v.getStart(), v.getLength());
if(i==value.size()-1){
sb.append(txt.toString());
}else{
sb.append(txt.toString()+"\t");
}
}
context.write(new Text(sb.toString()),NullWritable.get());
}
}
输出压缩为RcFile格式:
Java代码
![](https://oscdn.geek-share.com/Uploads/Images/Content/201908/07/75f1111b7755c08f1e8df44ce4fdf519.png)
job信息:
Job job = new Job();
Configuration conf = job.getConfiguration();
//设置每行的列簇数
RCFileOutputFormat.setColumnNumber(conf, 4);
job.setJarByClass(类.class);
FileInputFormat.setInputPaths(job, new Path(srcpath));
RCFileOutputFormat.setOutputPath(job, new Path(respath));
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(RCFileOutputFormat.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(BytesRefArrayWritable.class);
job.setMapperClass(OutPutTestMapper.class);
conf.set("date", line.getOptionValue(DATE));
//设置压缩参数
conf.setBoolean("mapred.output.compress", true);
conf.set("mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec");
code = (job.waitForCompletion(true)) ? 0 : 1;
mapper类:
public class OutPutTestMapper extends Mapper<LongWritable, Text, LongWritable, BytesRefArrayWritable> {
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String day = context.getConfiguration().get("date");
if (!line.equals("")) {
String[] lines = line.split(" ", -1);
if (lines.length > 3) {
String time_temp = lines[1];
String times = timeStampDate(time_temp);
String d = times.substring(0, 10);
if (day.equals(d)) {
byte[][] record = {lines[0].getBytes("UTF-8"), lines[1].getBytes("UTF-8"),lines[2].getBytes("UTF-8"), lines[3].getBytes("UTF-8")};
BytesRefArrayWritable bytes = new BytesRefArrayWritable(record.length);
for (int i = 0; i < record.length; i++) {
BytesRefWritable cu = new BytesRefWritable(record[i], 0, record[i].length);
bytes.set(i, cu);
}
context.write(key, bytes);
}
}
}
}
转载,请超链接形式标明文章原始出处和作者。
永久链接: http://smallboby.iteye.com/blog/1592531。
感谢。不当之处,请指教。
相关文章推荐
- Searching with regular sentences will only get you so far – if you need to find something a bit tricky turn to these advanced yet simple methods--转
- 一小时学会Markdown写作
- httpd2.4在CentOS 6 上编译安装
- 什么是HINSTANCE
- linux cads
- FlatNetworking
- jq学习
- HDU 1241
- Android中进程生命周期的优先级
- Tomcat:The selected server is enabled 异常解决办法
- poj3468 splay(成段跟新 区间求和)
- 奔波
- StringUtils.isBlank()函数
- boost学习2.6:data_time库(2,处理日期)
- 拆解一个简单的KeyFile保护
- 马太效应
- 【Linux】下为普通用户添加sudo权限
- 死锁的四个必要条件
- java封装、继承和多态
- 批处理