您的位置:首页 > 产品设计 > UI/UE

Hadoop 中SequenceFile的简介

2014-12-10 22:33 267 查看

概念

SequenceFile是一个由二进制序列化过的key/value的字节流组成的文本存储文件,它可以在map/reduce过程中的input/output
的format时被使用。在map/reduce过程中,map处理文件的临时输出就是使用SequenceFile处理过的。 所以一般的SequenceFile均是在FileSystem中生成,供map调用的原始文件。

特点

SequenceFile是
Hadoop 的一个重要数据文件类型,它提供key-value的存储,但与传统key-value存储(比如hash表,btree)不同的是,它是append only的,于是你不能对已存在的key进行写操作。
SequenceFile可以作为小文件的容器,可以通过它将小文件包装起来传递给MapReduce进行处理。

SequenceFile提供了两种定位文件位置的方法

seek(long poisitiuion):poisition必须是记录的边界,否则调用next()方法时会报错
sync(long
poisition):Poisition可以不是记录的边界,如果不是边界,会定位到下一个同步点,如果Poisition之后没有同步点了,会跳转到文件的结尾位置

状态

SequenceFile 有三种压缩态:
  Uncompressed – 未进行压缩的状
  record compressed -
对每一条记录的value值进行了压缩(文件头中包含上使用哪种压缩算法的信息)
  block compressed – 当数据量达到一定大小后,将停止写入进行整体压缩,整体压缩的方法是把所有的keylength,key,vlength,value
分别合在一起进行整体压缩,块的压缩效率要比记录的压缩效率高

格式

每一个SequenceFile都包含一个“头”(header)。Header包含了以下几部分。
1.SEQ三个字母的byte数组
2.Version number的byte,目前为数字3的byte
3.Key和Value的类名
4.压缩相关的信息
5.其他用户定义的元数据
6.同步标记,sync marker
对于每一条记录(K-V),其内部格式根据是否压缩而不同。SequenceFile的压缩方式有两种,“记录压缩”(record compression)和“块压缩”(block compression)。如果是记录压缩,则只压缩Value的值。如果是块压缩,则将多条记录一并压缩,包括Key和Value。具体格式如下面两图所示:





 

扩展实现

MapFile  一个key-value 对应的查找数据结构,由数据文件/data 和索引文件 /index 组成,数据文件中包含所有需要存储的key-value对,按key的顺序排列。索引文件包含一部分key值,用以指向数据文件的关键位置
SetFile – 基于 MapFile 实现的,他只有key,value为不可变的数据。
ArrayFile – 也是基于 MapFile 实现,他就像我们使用的数组一样,key值为序列化的数字。 
    
BloomMapFile – 他在 MapFile 的基础上增加了一个 /bloom 文件,包含的是二进制的过滤表,在每一次写操作完成时,会更新这个过滤表。

操作

读写文件
package com.eric.hadoop.io.sequencefile;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
/**
* @author Eric.sunah 2014年12月10日
*
*/
public class SequenceFileDemo {

private static final String OPERA_FILE = "./output.seq";
/**
* 随便从网上截取的一段文本
*/
private static String[]    testArray = { "<plugin>                                                                     ",
"  <groupId>org.apache.avro</groupId>                                         ",
"  <artifactId>avro-maven-plugin</artifactId>                                 ",
"  <version>1.7.7</version>                                                   ",
"  <executions>                                                               ",
"    <execution>                                                              ",
"      <phase>generate-sources</phase>                                        ",
"      <goals>                                                                ",
"        <goal>schema</goal>                                                  ",
"      </goals>                                                               ",
"      <configuration>                                                        ",
"        <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory> ",
"        <outputDirectory>${project.basedir}/src/main/java/</outputDirectory> ",
"      </configuration>                                                       ",
"    </execution>                                                             ",
"  </executions>                                                              ",
"</plugin>                                                                    ",
"<plugin>                                                                     ",
"  <groupId>org.apache.maven.plugins</groupId>                                ",
"  <artifactId>maven-compiler-plugin</artifactId>                             ",
"  <configuration>                                                            ",
"    <source>1.6</source>                                                     ",
"    <target>1.6</target>                                                     ",
"  </configuration>                                                           ", "</plugin>"};

public static void main(String[] args) throws IOException {
writeSequenceFile(OPERA_FILE);
readSequenceFile(OPERA_FILE);
}

private static void readSequenceFile(String inputFile) throws IOException {
Configuration config = new Configuration();
Path path = new Path(inputFile);
SequenceFile.Reader reader = null;
try {

FileSystem fs = FileSystem.get(URI.create(inputFile), config);
reader = new SequenceFile.Reader(fs, path, config);
IntWritable key = new IntWritable();
Text value = new Text();
long posion = reader.getPosition();
// reader.next()返回非空的话表示正在读,如果返回null表示已经读到文件结尾的地方
while (reader.next(key, value)) {
//打印同步点的位置信息
String syncMark = reader.syncSeen() ? "*" : "";
System.out.printf("[%s\t%s]\t%s\t%s\n", posion, syncMark, key, value);
posion = reader.getPosition();
}
} finally {
IOUtils.closeStream(reader);
}

}

/**
* 写Sequence File 文件
*
* @param outputFile
* @throws IOException
*/
private static void writeSequenceFile(String outputFile) throws IOException {
Configuration config = new Configuration();
Path path = new Path(outputFile);
IntWritable key = new IntWritable();
Text value = new Text();
SequenceFile.Writer writer = null;
try {

FileSystem fs = FileSystem.get(URI.create(outputFile), config);
writer = SequenceFile.createWriter(fs, config, path, key.getClass(), value.getClass());
for (int i = 1; i < 2000; i++) {
key.set(2000 - i);
value.set(testArray[i % testArray.length]);
System.out.printf("[%s]\t%s\t%s\n", writer.getLength() + "", key, value);
// 通过Append方法进行写操作
writer.append(key, value);
}
} finally {
IOUtils.closeStream(writer);
}

}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息