您的位置:首页 > 运维架构

Hadoop源码分析笔记(二):Hadoop序列化与压缩

2013-05-28 22:13 513 查看

Hadoop文件序列化

本文着重讲述Hadoop面向海量数据处理的输入输出。代码请见org.apache.hadoop.io包。

序列化用途

对象的序列化是用于将对象编码成一个字节流,以及从字节流中重新够着对象。

一般来说,序列化有以下三种主要用途:

一、作为一种持久化格式:一个对象被序列化以后,它的编码可以被存储到磁盘中,以用于反序列化。

二、作为一种通信数据格式:序列化结果可以从一个正在运行的虚拟机,通过网络被传输到另一个虚拟机上面。

三、作为一种拷贝和clone的机制:将对象序列化到内存的缓冲区中,然后通过反序列化,可以得到一个已存在对象进行深拷贝的新对象。

Java序列化机制

Java本身提供的序列化机制是将对象转为连续的byte数据,这些数据可以在日后还原为原先的对象状态。在Java中使一个类的实例可被序列化非常简单,之需要在类中声明实现Serializable接口就可。这个借口本身只是一个标志。不需要实现任何成员函数。

任何类可以通过声明它实现了Serializable接口,立即就可以获得Java提供的序列化功能。如Hadoop中的Block类也可以修改为,其实在实际应用中Hadoop的Block类并没有使用Java提供的序列化机制。

public class Block implements Writable,Comparable<Block>,Serializable

由于序列化主要应用在于I/O相关的一些操作上,其实现了通过一对输入/输出流来实现的。如果想对某个对象执行序列化的动作,可以在某种OutputStream对象的基础上面创建一个对象流ObjectOutputStream对象,然后调用writeObject()方法就可以达到目的。

由于Java本身提供的序列化机制,在将对象序列化的时候会保持大量的附加信息,以至于序列化以后的对象过于庞大,对于需要保持和处理大规模数据的Hadoop来说,需要一个新的序列化机制。

Hadoop序列化机制

和Java的序列化机制不同(在对象ObjectOutputStream对象上调用writeObject()方法),Hadoop的序列化机制通过调用对象的write()方法(它带有一个类型为DataOutput的参数),将序列化到流中。反序列化则通过调用对象的readFields(),从流中读取数据。另外,在Java序列化机制中,反序列化会不断创建新的对象,但Hadoop的序列化机制的反序列化过程中,用户可以复用对象,因此也更效率。下面贴出Block类的wirte()方法和readFields方法,如下所示:

/////////////////////////////////////
// Writable
/////////////////////////////////////
public void write(DataOutput out) throws IOException {
out.writeLong(blockId);
out.writeLong(numBytes);
out.writeLong(generationStamp);
}

public void readFields(DataInput in) throws IOException {
this.blockId = in.readLong();
this.numBytes = in.readLong();
this.generationStamp = in.readLong();
if (numBytes < 0) {
throw new IOException("Unexpected block size: " + numBytes);
}
}


下面我们贴上一段实例看下,Hadoop中org.apache.hadoop.hdfs.protocol.Block类的的使用:

Block block=new Block(2080L,232L,232L);
..............
ByteArrayOutPutStream bout=new ByteArrayOutputStream();
DataOutputStream dout=new DataOutputStream(bout);
//序列化对象到输出流中
block.write(dout);
dout.close();
........


经观察,Block类的write方法,在Block对象序列化时只输出了3长整形,因此,相对于Java提供的序列化机制来说,Hadoop的序列化结果紧凑而且快速。

Hadoop序列化机制的特点

一、紧凑:可以节省数据中心带宽等稀缺资源。

二、快速:在进程间通信时会大量使用序列化机制。

三、可扩展:系统发展时,系统间通信的协议会升级,类的定义会发生变化,序列化机制需要指出这些升级和变化。

四、互操作:支持不同开发语言之间的通信。

Hadoop Writable机制

为了支持以上特性,Hadoop引入org.apache.hadoop.io.Writable接口,作为所有序列化对象必须支持实现的接口。实现如下:

public interface Writable {
/**
* Serialize the fields of this object to <code>out</code>.
*/
void write(DataOutput out) throws IOException;

/**
* Deserialize the fields of this object from <code>in</code>.
*/
void readFields(DataInput in) throws IOException;


下面是一段自我实现的Writable接口的实例。

public class MyWritable implements Writable {
// Some data
private int counter;
private long timestamp;

public void write(DataOutput out) throws IOException {
out.writeInt(counter);
out.writeLong(timestamp);
}

public void readFields(DataInput in) throws IOException {
counter = in.readInt();
timestamp = in.readLong();
}

public static MyWritable read(DataInput in) throws IOException {
MyWritable w = new MyWritable();
w.readFields(in);
return w;
}
}


此外,在Hadoop序列化机制中还包括另外几个重要接口:WritableComparable、RawComparator和WritableComparator。

其中,WritableComparable,很显然,它提供了类型比较的能力,这对于MapReduce中的排序等有重要的意义,该接口继承自Writable接口和Comparable接口,而我们知道在Java中Comparable接口用户进行类型比较。像Hadoop中的ByteWritable、IntWritable、DoubleWritable等其他Java基本类型对于的Writable类型,都继承自WritableComparable接口。

另外,鉴于Hadoop中对于效率的看重,因此Hadoop I/O包中还提供具有高效比较能力的RawComparator接口。它允许执行者比较流中读取的未序列化为对象的记录,从而省去了创建对象的所有开销。这三个接口的相关定义如下:

public interface RawComparator<T> extends Comparator<T> {

public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);

}

public interface WritableComparable<T> extends Writable, Comparable<T> {
}

public class WritableComparator implements RawComparator {
......
}

针对Java基本类型、字符串、枚举、Writable、空值、Writable的其他子类,ObjectWritable提供了一个封装,适用于字段需要使用多种类型。ObjectWritable可应用与Hadoop远程过程调用中参数的序列化和反序列化。ObjectWritable的另一个典型应用时在需要序列化不同类型的对象到某一个字段,如在SequenceFile的值中保持不同类型的对象(如LongWriteable值或者Text值)时,可以将该值声明为ObjectWritable。

Hadoop序列化框架

目前现在我们开发中普遍应用Avro、Thrift、Protocol Buffer等序列化框架。作为Hadoop的序列化框架,该框架由Serialization实现(org.apache.hadoop.io.serializer包中)。Serialization是一个接口,使用抽象工厂的设计模式,提供了一系列和序列化相关并相互依赖对象的接口。目前Hadoop支持两个Serialization的实现,分别是支持Writable机制的WritableSerialization和支持Java序列化的JavaSerialization。相关接口如下:

public interface Serialization<T> {

/**
* Allows clients to test whether this {@link Serialization}
* supports the given class.
*/
boolean accept(Class<?> c);

/**
* @return a {@link Serializer} for the given class.
*/
Serializer<T> getSerializer(Class<T> c);

/**
* @return a {@link Deserializer} for the given class.
*/
Deserializer<T> getDeserializer(Class<T> c);
}

public interface Serializer<T> {
/**
* <p>Prepare the serializer for writing.</p>
*/
void open(OutputStream out) throws IOException;

/**
* <p>Serialize <code>t</code> to the underlying output stream.</p>
*/
void serialize(T t) throws IOException;

/**
* <p>Close the underlying output stream and clear up any resources.</p>
*/
void close() throws IOException;



Hadoop数据压缩

压缩广泛应用与海量数据处理中,在Hadoop中,压缩应用与文件存储、Map阶段到Reduce阶段的数据交换等情景。在使用压缩方式方面,主要考虑压缩速度和压缩文件的可分割性。常见的压缩格式如gzip、LZO、DEFLATE不支持分割,而zip、bzip格式支持分割文件。Hadoop的压缩算法以及编码解码器源码目录是org.apche.hadoop.io.conpress包中。Hadoop通过编码/解码器为基础的抽象工厂方法,支持多种压缩方法。org.apache.hadoop.io.compress.CompressionCodec

接口如下,而它的实例则根据压缩方法由org.apache.hadoop.io.compress.CompressionCodecFactory提供:

public interface CompressionCodec {

/**
* Create a {@link CompressionOutputStream} that will write to the given
* {@link OutputStream}.
*
* @param out the location for the final output stream
* @return a stream the user can write uncompressed data to have it compressed
* @throws IOException
*/
CompressionOutputStream createOutputStream(OutputStream out)
throws IOException;

/**
* Create a {@link CompressionOutputStream} that will write to the given
* {@link OutputStream} with the given {@link Compressor}.
*
* @param out the location for the final output stream
* @param compressor compressor to use
* @return a stream the user can write uncompressed data to have it compressed
* @throws IOException
*/
CompressionOutputStream createOutputStream(OutputStream out,
Compressor compressor)
throws IOException;

/**
* Get the type of {@link Compressor} needed by this {@link CompressionCodec}.
*/
Class<? extends Compressor> getCompressorType();

/**
* Create a new {@link Compressor} for use by this {@link CompressionCodec}.
*/
Compressor createCompressor();

/**
* Create a stream decompressor that will read from the given input stream.
*
* @param in the stream to read compressed bytes from
* @return a stream to read uncompressed bytes from
*/
CompressionInputStream createInputStream(InputStream in) throws IOException;

/**
* Create a {@link CompressionInputStream} that will read from the given
*
* @param in the stream to read compressed bytes from
* @param decompressor decompressor to use
* @return a stream to read uncompressed bytes from
* @throws IOException
*/
CompressionInputStream createInputStream(InputStream in,
Decompressor decompressor)
throws IOException;

/**
* Get the type of {@link Decompressor} needed by this {@link CompressionCodec}.
*/
Class<? extends Decompressor> getDecompressorType();

/**
* Create a new {@link Decompressor} for use by this {@link CompressionCodec}.
*/
Decompressor createDecompressor();

/**
* Get the default filename extension for this kind of compression.
*/
String getDefaultExtension();
}

压缩器(Compressor)和解压器(Decmopressor)是Hadoop压缩框架中的一对重要概念,commpressor可以插入压缩输出流的实现中,提供具体的压缩功能,相反Decompressor提供具体的解压功能并插入到CompressionInputStream流中。对应的实现分别是java.util.zip.Deflater和java.util.zip.Inflater。

压缩流(CompressorOutPutStream)和解压缩流(CompressionInputStream)是Hadoop压缩框架的另一对重要概念,它提供了基于流的压缩解压缩能力。

另外,考虑到数据压缩往往是计算密集型的操作,考虑到性能,Hadoop还提供了一些本地库来压缩和解压。

版权申明:本文部分摘自【蔡斌、陈湘萍】所著【Hadoop技术内幕 深入解析Hadoop Common和HDFS架构设计与实现原理】一书,仅作为学习笔记,用于技术交流,其商业版权由原作者保留,推荐大家购买图书研究,转载请保留原作者,谢谢!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: