您的位置:首页 > 产品设计 > UI/UE

Hadoop的SequenceFile写实例

2013-02-03 10:06 531 查看
package com.mengyao.hadoop.hdfs;

import java.io.File;
import java.io.IOException;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Metadata;
import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.DefaultCodec;

/**
* 将本地文件以SequenceFile格式上传到HDFS上。
*
*
* SequeceFile是 Hadoop API提供的一种二进制文件支持。这种二进制文件直接将<key, value>对序列化到文件中。一般对小文件可以使用这种文件合并,即将文件名作为key,文件内容作为value序列化到大文件中。这种文件格式 有以下好处:
*         1)支持压缩,且可定制为基于Record或Block压缩(Block级压缩性能较优)
*         2)本地化任务支持:因为文件可以被切分,因此MapReduce任务时数据的本地化情况应该是非常好的。
*         3)难度低:因为是Hadoop框架提供的API,业务逻辑侧的修改比较简单。
*         坏处是需要一个合并文件的过程,且合并后的文件将不方便查看。

* SequenceFile 是一个由二进制序列化过的key/value的字节流组成的文本存储文件,它可以在map/reduce过程中的input/output 的format时被使用。在map/reduce过程中,map处理文件的临时输出就是使用SequenceFile处理过的。
* SequenceFile分别提供了读、写、排序的操作类。SequenceFile的操作中有三种处理方式:
*         1) 不压缩数据直接存储。 //enum.NONE
*         2) 压缩value值不压缩key值存储的存储方式。//enum.RECORD
*         3)key/value值都压缩的方式存储。//enum.BLOCK
*
* 注意:存储在SequenceFile中的key和value不一定Hadoop的Writable序列化类型,只要是能被Java的Serializable序列化和反序列化的任意类型都可以。
*
* @author mengyao
*
*/
public class SequenceFileWriter {

public static void main(String[] args) throws IOException {
args = new String[]{"/usr/local/mapreduces/bookOutline.txt", "/mapreduces/seqfile/book1.txt"};
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
File inFile = new File(args[0]);
Path outputPath = new Path(args[1]);
IntWritable key = new IntWritable();
Text value = new Text();
SequenceFile.Writer writer = null;
try {
writer = SequenceFile.createWriter(conf,
Writer.file(outputPath),
Writer.keyClass(key.getClass()),
Writer.valueClass(value.getClass()),
Writer.bufferSize(fs.getConf().getInt("io.file.buffer.size", 4096)),
Writer.replication(fs.getDefaultReplication(null)),
Writer.blockSize(134217728),
Writer.compression(SequenceFile.CompressionType.BLOCK, new DefaultCodec()),
Writer.progressable(null),
Writer.metadata(new Metadata()));
int count = 0;
for (String line : FileUtils.readLines(inFile)) {
key.set(count++);
value.set(line);
writer.append(key, value);
}
} finally {
IOUtils.closeStream(writer);
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: