Hadoop IO 特性详解(2)
2015-12-27 16:41
519 查看
http://blog.csdn.net/mrcharles/article/details/50378381
这一次我们接着分析文件IO校验的相关代码,看看最底层是如何实现这种大数据集的文件校验的,不得不说设计这个系统的程序员是世界上最具有智慧的一群人,面对复杂难解的问题总是可以找到很好的解决方法。
其实对于文件校验这件事情,hadoop为什么重要上一篇文章讲过几个方面,提到的bitrot衰减其实很多人没有直观感受。我就举一个直观的例子以便于普通人感受一下bitrot的影响。一个磁盘,十年前我放500GB的日本爱情动作片在上面,第二天,第三天我再去打开它也还是没问题,高清无码,一年之后我再打开,可能也还是没问题。5年的时候,你可能发现有时候怎么会卡顿,十年之后基本上已经不能够完整读取了。可惜了你的500GB大片。
为什么?因为时间越长,硬件总会经受各种损坏,温度,湿度,外力,甚至自身运行,即使一直不用他,也会发生变化,导致数据损坏,这就是bitrot.
言归正传,看校验
这个ChecksumFileSystem在packageorg.apache.hadoop.fs中,继承FilterFileSystem,FilterFileSystem继承FileSystem类。那么首先看最上面的父类FileSystem:
代码很多,其实功能也很清晰简单:我们闭上眼睛想想如果要多文件进行操作,就需要创建文件,创建输出流,接收输入流,对输入流的数据进行校验,有时候还要追加数据到某一个文件,还包括其他一些文件的常规操作。
明白了这些想法,再来看这个类里面的代码,其实就简单了。
根据microheart的说法:
Hadoop抽象文件系统的方法分为两部分:
处理文件和目录的相关事务
以下部分来自microheart的git写的太好我又没本事分析这么透彻,看半天眼睛都花了。所以引用之。
读写文件数据
FileSystem接口
fs.FileSystem
FileSystem抽象类主要包含一下几类接口:
打开或创建文件:FileSystem.open(),FileSystem.create(),FileSystem.append()
读取文件流数据:FSDataInputStream.read()
写文件流数据:FSDataOutputStream.write()
关闭文件:FSDataInputStream.close(),FSDataOutputStream.close()
删除文件:FileSystem.delete()
文件重命名:FileSystem.rename()
创建目录:FileSystem.mkdirs()
定位文件流位置:FSDataInputStream.seek()
获取目录/文件属性:FileSystem.getFileStatus(),FileSystem.get*()
设置目录/文件属性:FileSystem.set*()
设置/获取当前目录:FileSystem.getWorkingDirectory(),FileSystem.setWorkingDirectory()
获取具体的文件系统:FileSystem.get(),FileSystem.getLocal()
FileSystem.get()为工厂模式实现,用于创建多种文件系统产品。
FileStatus
Hadoop通过FileSystem.getFileStatus()可获得文件/目录的属性,这些属性封装在FileStatus中。FileStatus返回给客户端关于文件的元数据信息,包含路径,长度、修改时间、访问时间等基本信息和分布式文件系统特有的副本数。
//Interfacethatrepresentstheclientsideinformationforafile.
publicclassFileStatusimplementsWritable,Comparable{
privatePathpath;
privatelonglength;
privatebooleanisdir;
privateshortblock_replication;
privatelongblocksize;
privatelongmodification_time;
privatelongaccess_time;
privateFsPermissionpermission;
privateStringowner;
privateStringgroup;
...
}
FileStatus实现了Writable接口,因此FileStatus对象可序列化后在网络上传输。FileStatus几乎包含了文件/目录的所有属性,这样设计的好处可以减少在分布式系统中进行网络传输的次数。
Hadoop基于流机制进行文件读写。通过FileSystem.open()可创建FSDataInputStream;通过FileSystem.create()/append()可创建FSDataOutputStream。
FSDataInputStream实现了Seekable接口和PositionedReadable接口FSDataInputStream是装饰器模式的典型运用,实现Seekable接口和PositionedReadable接口借助其装饰的InputStream对象。
Seekable接口提供了在流中进行随机存取的方法,可在流中随机定位位置,然后读取输入流。seekToNewSource()重新选择一个副本。
PositionedReadable接口提供了从输入流中某个位置读取数据的方法,这些方法读取数据后并不改变流的当前位置。read()和readFully()方法都是线程安全的,区别在于:前者试图读取指定长度的数据,后者读取制定长度的数据,直到读满缓冲区或者流结束。
FSInputStream抽象类继承InputStream,并实现PositionedReadable接口。FSInputStream拥有多个子类,具体的文件系统实现相应的输入流。
FSDataOutputStream继承DataOutputStream,Hadoop文件系统不支持随机写,因而没有实现Seekable接口。FSDataOutputStream实现了Syncable接口,Syncable.sync()将流中的数据同步至设备中。
Hadoop提供大量具体的文件系统实现,以满足用户访问各种数据需求。这些文件系统直接或者间接的继承org.apache.hadoop.fs.FileSystem。
其中FilterFileSystem类似于java.io.FilterInputStream,用于在已有的文件系统之上提供新的功能,同样是包装器设计模式的运用。ChecksumFileSystem用于在原始文件系统之上提供校验功能。
继承关系为:
其他的不多说,着重分析一下checksum的运行原理吧。
ChecksumFileSystem继承FilterFileSystem,基于CRC-32提供对文件系统的数据校验。与其他文件系统一样,ChecksumFileSystem需要提供处理文件/目录相关事务和文件读写服务。
这部分逻辑主要保持数据文件和CRC-32校验信息文件的一致性,如数据文件重命名,则校验文件也需要重命名。如果数据文件为:foo.txt,则校验文件为:.foo.txt.crc
以ChecksumFileSystem.delete()方法删除文件文件为例。若文件为目录则递归删除(recursive=true);若为普通文件,则删除对应的校验文件(若存在)。
Hadoop读文件时,需要从数据文件和校验文件中分别读出内容,并根据校验信息对读入的数据文件内容进行校验,以判断文件的完整性。注:若校验事变,ChecksumFileSystem无法确定是数据文件出错还是校验文件出错。
读数据流程与ChecksumFSInputChecker和其父类FSInputChecker相关。FSInputChecker的成员变量包含数据缓冲区、校验和缓冲区和读取位置等变量。
ChecksumFSInputChecker构造方法对基类FSInputChecker的成员进行初始化,基于CRC-32校验,校验和大小为4字节。对校验文件首先要进行版本校验,即文件头部是否匹配魔数"crc\0"
FSInputChecker.read()循环调用read1()方法直到读取len个字节或者没有数据可读,返回读取的字节数。
FSInputChecker.read1()方法为了提高效率,减少内存复制的次数,若当前FSInputChecker.buf没有数据可读且要读取的len字节数大于或等于数据块大小(buf.length,默认512字节),则通过readchecksumChunk()方法将数据直接读取目标数组中,而不需经过FSInputChecker.buf的中转。若buf没有数据可读且读取的len字节数小于数据块大小,则通过fill()方法从数据流中一次读取一个数据块。
FSInputChecker.readChecksumChunk()方法通常需要对读取的字节序列进行校验(默认为true),若校验不通过,可选择新的副本进行重读,如果进行了retriesLeft次重读仍然不能校验通过,则抛出异常。readChunk()方法是一个抽象方法,FSInputChecker的子类实现它,以定义实际读取数据的逻辑。
ChecksumFileSystem.ChecksumFSInputChecker实现了readChunk()的逻辑。readChunk()它读取数据块和校验数据和,不进行两者的校验。getChecksumFilePos()方法定位到校验和文件中pos位置对应块的边界,以便读取一个数据块对应的完整校验和。
与文件/目录元数据信息的维护和读文件相比,写文件相对起来比较复杂,ChecksumFileSystem需要维护字节流上的数据读写和基于块的校验和关系。一般而言,每{io.bytes.per.checksum}(默认512)个数据字节对应一个单独的校验和,CRC-32校验和的输出为4个字节。因此校验数据所带来的存储开销小于1%。
ChecksumFSOutputSummer继承FSOutputSummer,在基本的具体文件系统的输出流上,添加数据文件和校验文件流的输出。继承关系:OutputStream<--FSOutputSummer<--ChecksumFSOutputSummer
FSOutputSummer是一个生成校验和的通用输出流,包含4个成员变量。
FSOutputSummer逻辑非常清晰,根据提供的字节数组,每{io.bytes.per.checksum}求出一个校验和,并根据子类所实现的writeChunk()方法写出到响应的输出流中,在ChecksumFSOutputSummer中,则分别写入文件数据流和校验文件数据流。
FSOutputSummer.write()方法循环调用write1()方法进行校验和计算和数据流输出。当buf的count数等于buf.length,则将数据和校验和输出到对应的流中。
write1()方法是用了一个实用的技巧,若当前缓冲区的写入字节数为0(count=0)且需要写入的字节数据长度大于或等于块(buf.length)的长度,则直接进行校验和计算,避免将数据拷贝到缓冲区,然后再计算校验和,减少内存拷贝的次数。write1()方法尽可能的写入多的数据,但一次最多写入一个块。
ChecksumFileSystem.CheckSumFSOutputSummer提供了构造FSOutputSummer所需要的参数。校验和采用PureJavaCrc32,校验和长度4字节,缓冲大小为512字节(默认)。
构造ChecksumFSOutputSummer时,就往校验和文件流中写入魔数CHECKSUM_VERSION("crc\0")和校验块长度。FSOutputSummer抽象了大部分和数据分块、计算校验和的相关功能,ChecksumFSOutputSummer在此基础上提供了具体的文件流输出。
由此可见,hadoop对于文件校验有一套精心的设计,文件系统和文件读写都会避免错误的产生。虽然额外的校验可能会导致性能的占用,但是一些公司经过摸索也找出了解决方案:
据ggjucheng:hdfs都是存储大文件的,默认每512字节就做一个crc校验,客户端在读写文件都要做这个校验,这个对hdfs的性能消耗是比较大的,crc最开始是采用jni调用,但是jni调用都要做上下文切换,加上每512字节就做一次crc校验,所以导致jvm切换很频繁,后来修改为pure
java的crc校验,性能还提高了下,如果是几百兆就做一个crc校验,那么jni调用导致的上下文切换少些,那么jni就还有优势,但是在hadoop这个应用场景明显不合适。后来淘宝的针对hadoop的crc场景,定制了jvm,将crc指令优化为调用硬件指令,性能测试报告证明提高了hdfs性能的20%-30%。
到此,文件校验分析就结束了。接下来我会接着介绍压缩编码解码方面的原理。
Charles于2015-12-22PhnomPenh
这一次我们接着分析文件IO校验的相关代码,看看最底层是如何实现这种大数据集的文件校验的,不得不说设计这个系统的程序员是世界上最具有智慧的一群人,面对复杂难解的问题总是可以找到很好的解决方法。
其实对于文件校验这件事情,hadoop为什么重要上一篇文章讲过几个方面,提到的bitrot衰减其实很多人没有直观感受。我就举一个直观的例子以便于普通人感受一下bitrot的影响。一个磁盘,十年前我放500GB的日本爱情动作片在上面,第二天,第三天我再去打开它也还是没问题,高清无码,一年之后我再打开,可能也还是没问题。5年的时候,你可能发现有时候怎么会卡顿,十年之后基本上已经不能够完整读取了。可惜了你的500GB大片。
为什么?因为时间越长,硬件总会经受各种损坏,温度,湿度,外力,甚至自身运行,即使一直不用他,也会发生变化,导致数据损坏,这就是bitrot.
言归正传,看校验
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | /**************************************************************** *AbstractChecksumedFileSystem. *ItprovideabasicimplementationofaChecksumedFileSystem, *whichcreatesachecksumfileforeachrawfile. *Itgenerates&verifieschecksumsattheclientside. * *****************************************************************/ @InterfaceAudience.Public @InterfaceStability.Stable publicabstractclassChecksumFileSystemextendsFilterFileSystem{ privatestaticfinalbyte[]CHECKSUM_VERSION= new byte[]{ 'c' , 'r' , 'c' ,0}; privateintbytesPerChecksum=512; privatebooleanverifyChecksum= true ; privatebooleanwriteChecksum= true ; } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 | publicabstractclassFileSystemextendsConfiguredimplementsCloseable{ /** *作用是将本地文件拷贝到目标文件,如果目标还是在本地就不执行任何操作,如果是远程就执行 *@paramfsOutputFilepathofoutputfile *@paramtmpLocalFilepathtolocaltmpfile */ publicvoidcompleteLocalOutput(PathfsOutputFile,PathtmpLocalFile) throwsIOException{ moveFromLocalFile(tmpLocalFile,fsOutputFile); } /** *将本地文件src拷贝到远程中去,也就是增加到FS中,操作之后本地文件还是原封不动,保持完整。 *@paramsrcpath *@paramdstpath */ publicvoidcopyFromLocalFile(Pathsrc,Pathdst) throwsIOException{ copyFromLocalFile( false ,src,dst); } /** *在指定的地点利用给定的校验和选项创建一个FSDataOutputStream *@paramfthefilenametoopen *@parampermission访问权限 *@paramflags{@linkCreateFlag}stouseforthisstream. *@parambufferSizethesizeofthebuffertobeused. *@paramreplicationrequiredblockreplicationforthefile.副本 *@paramblockSize *@paramprogress *@paramchecksumOptchecksumparameter.Ifnull,thevalues *foundinconfwillbeused. *@throwsIOException *@see#setPermission(Path,FsPermission) */ publicFSDataOutputStreamcreate(Pathf, FsPermissionpermission, EnumSet<CreateFlag>flags, intbufferSize, shortreplication, longblockSize, Progressableprogress, ChecksumOptchecksumOpt)throwsIOException{ //Checksumoptionsareignoredbydefault.Thefilesystemsthat //implementchecksumneedtooverridethismethod.Thefull //supportiscurrentlyonlyavailableinDFS. return create(f,permission,flags.contains(CreateFlag.OVERWRITE), bufferSize,replication,blockSize,progress); } /**Returntrueifffileisachecksumfilename.是不是校验和文件呢。.crc结尾嘛,上一篇文章已经讲过这个点了*/ publicstaticbooleanisChecksumFile(Pathfile){ Stringname=file.getName(); return name.startsWith( "." )&&name.endsWith( ".crc" ); } /**Returnthenameofthechecksumfileassociatedwithafile.*/ publicPathgetChecksumFile(Pathfile){ return new Path(file.getParent(), "." +file.getName()+ ".crc" ); } /**Returnthelengthofthechecksumfilegiventhesizeofthe *actualfile. **/ publiclonggetChecksumFileLength(Pathfile,longfileSize){ return getChecksumLength(fileSize,getBytesPerSum()); } /** *Setwhethertoverifychecksum. */ @Override publicvoidsetVerifyChecksum(booleanverifyChecksum){ this .verifyChecksum=verifyChecksum; } @Override publicvoidsetWriteChecksum(booleanwriteChecksum){ this .writeChecksum=writeChecksum; } /**gettherawfilesystem*/ @Override publicFileSystemgetRawFileSystem(){ return fs; } publicbooleanreportChecksumFailure(Pathf,FSDataInputStream in , longinPos,FSDataInputStreamsums,longsumsPos){ return false ;} } |
明白了这些想法,再来看这个类里面的代码,其实就简单了。
根据microheart的说法:
Hadoop抽象文件系统的方法分为两部分:
处理文件和目录的相关事务
以下部分来自
读写文件数据
FileSystem接口
fs.FileSystem
FileSystem抽象类主要包含一下几类接口:
打开或创建文件:FileSystem.open(),FileSystem.create(),FileSystem.append()
读取文件流数据:FSDataInputStream.read()
写文件流数据:FSDataOutputStream.write()
关闭文件:FSDataInputStream.close(),FSDataOutputStream.close()
删除文件:FileSystem.delete()
文件重命名:FileSystem.rename()
创建目录:FileSystem.mkdirs()
定位文件流位置:FSDataInputStream.seek()
获取目录/文件属性:FileSystem.getFileStatus(),FileSystem.get*()
设置目录/文件属性:FileSystem.set*()
设置/获取当前目录:FileSystem.getWorkingDirectory(),FileSystem.setWorkingDirectory()
获取具体的文件系统:FileSystem.get(),FileSystem.getLocal()
FileSystem.get()为工厂模式实现,用于创建多种文件系统产品。
FileStatus
Hadoop通过FileSystem.getFileStatus()可获得文件/目录的属性,这些属性封装在FileStatus中。FileStatus返回给客户端关于文件的元数据信息,包含路径,长度、修改时间、访问时间等基本信息和分布式文件系统特有的副本数。
//Interfacethatrepresentstheclientsideinformationforafile.
publicclassFileStatusimplementsWritable,Comparable{
privatePathpath;
privatelonglength;
privatebooleanisdir;
privateshortblock_replication;
privatelongblocksize;
privatelongmodification_time;
privatelongaccess_time;
privateFsPermissionpermission;
privateStringowner;
privateStringgroup;
...
}
FileStatus实现了Writable接口,因此FileStatus对象可序列化后在网络上传输。FileStatus几乎包含了文件/目录的所有属性,这样设计的好处可以减少在分布式系统中进行网络传输的次数。
FSDataInputStream/FSDataOutputStream
Hadoop基于流机制进行文件读写。通过FileSystem.open()可创建FSDataInputStream;通过FileSystem.create()/append()可创建FSDataOutputStream。FSDataInputStream实现了Seekable接口和PositionedReadable接口FSDataInputStream是装饰器模式的典型运用,实现Seekable接口和PositionedReadable接口借助其装饰的InputStream对象。
publicclassFSDataInputStreamextendsDataInputStream
implementsSeekable,PositionedReadable,Closeable,HasFileDescriptor{
publicFSDataInputStream(InputStreamin)throwsIOException{
super(in);
if(!(ininstanceofSeekable)||!(ininstanceofPositionedReadable)){
thrownewIllegalArgumentException("InisnotaninstanceofSeekableorPositionedReadable");
}
}
publicsynchronizedvoidseek(longdesired)throwsIOException{
((Seekable)in).seek(desired);
}
publicvoidreadFully(longposition,byte[]buffer)
throwsIOException{
((PositionedReadable)in).readFully(position,buffer,0,buffer.length);
}
...
}
Seekable接口提供了在流中进行随机存取的方法,可在流中随机定位位置,然后读取输入流。seekToNewSource()重新选择一个副本。
publicinterfaceSeekable{
//Seektothegivenoffsetfromthestartofthefile.
voidseek(longpos)throwsIOException;
//Returnthecurrentoffsetfromthestartofthefile
longgetPos()throwsIOException;
//Seeksadifferentcopyofthedata.Returnstrueiffoundanewsource,falseotherwise.
booleanseekToNewSource(longtargetPos)throwsIOException;
}
PositionedReadable接口提供了从输入流中某个位置读取数据的方法,这些方法读取数据后并不改变流的当前位置。read()和readFully()方法都是线程安全的,区别在于:前者试图读取指定长度的数据,后者读取制定长度的数据,直到读满缓冲区或者流结束。
publicinterfacePositionedReadable{
publicintread(longposition,byte[]buffer,intoffset,intlength)throwsIOException;
publicvoidreadFully(longposition,byte[]buffer,intoffset,intlength)throwsIOException;
publicvoidreadFully(longposition,byte[]buffer)throwsIOException;
}
FSInputStream抽象类继承InputStream,并实现PositionedReadable接口。FSInputStream拥有多个子类,具体的文件系统实现相应的输入流。
FSDataOutputStream继承DataOutputStream,Hadoop文件系统不支持随机写,因而没有实现Seekable接口。FSDataOutputStream实现了Syncable接口,Syncable.sync()将流中的数据同步至设备中。
publicclassFSDataOutputStreamextendsDataOutputStreamimplementsSyncable{...}
Hadoop具体文件系统
Hadoop提供大量具体的文件系统实现,以满足用户访问各种数据需求。这些文件系统直接或者间接的继承org.apache.hadoop.fs.FileSystem。其中FilterFileSystem类似于java.io.FilterInputStream,用于在已有的文件系统之上提供新的功能,同样是包装器设计模式的运用。ChecksumFileSystem用于在原始文件系统之上提供校验功能。
继承关系为:
FileSystem<--FilterFileSystem<--ChecksumFileSystem<--LocalFileSystem
<--ChecksumDistributeFileSystem
其他的不多说,着重分析一下checksum的运行原理吧。
ChecksumFileSystem
ChecksumFileSystem继承FilterFileSystem,基于CRC-32提供对文件系统的数据校验。与其他文件系统一样,ChecksumFileSystem需要提供处理文件/目录相关事务和文件读写服务。
文件/目录相关事务
这部分逻辑主要保持数据文件和CRC-32校验信息文件的一致性,如数据文件重命名,则校验文件也需要重命名。如果数据文件为:foo.txt,则校验文件为:.foo.txt.crc以ChecksumFileSystem.delete()方法删除文件文件为例。若文件为目录则递归删除(recursive=true);若为普通文件,则删除对应的校验文件(若存在)。
publicbooleandelete(Pathf,booleanrecursive)throwsIOException{
FileStatusfstatus=null;
try{
fstatus=fs.getFileStatus(f);
}catch(FileNotFoundExceptione){
returnfalse;
}
if(fstatus.isDir()){
returnfs.delete(f,recursive);
}else{
PathcheckFile=getChecksumFile(f);
if(fs.exists(checkFile)){
fs.delete(checkFile,true);
}
returnfs.delete(f,true);
}
}
读文件
Hadoop读文件时,需要从数据文件和校验文件中分别读出内容,并根据校验信息对读入的数据文件内容进行校验,以判断文件的完整性。注:若校验事变,ChecksumFileSystem无法确定是数据文件出错还是校验文件出错。读数据流程与ChecksumFSInputChecker和其父类FSInputChecker相关。FSInputChecker的成员变量包含数据缓冲区、校验和缓冲区和读取位置等变量。
abstractpublicclassFSInputCheckerextendsFSInputStream{
protectedPathfile;//Thefilenamefromwhichdataisreadfrom
privateChecksumsum;
privatebooleanverifyChecksum=true;
privatebyte[]buf;//数据缓冲区
privatebyte[]checksum;//校验和缓冲区
privateintpos;
privateintcount;
privateintnumOfRetries;//出错重试次数
privatelongchunkPos=0;//cachedfileposition
...
}
ChecksumFSInputChecker构造方法对基类FSInputChecker的成员进行初始化,基于CRC-32校验,校验和大小为4字节。对校验文件首先要进行版本校验,即文件头部是否匹配魔数"crc\0"
publicChecksumFSInputChecker(ChecksumFileSystemfs,Pathfile,intbufferSize)
throwsIOException{
super(file,fs.getFileStatus(file).getReplication());
...
try{
...
if(!Arrays.equals(version,CHECKSUM_VERSION))
thrownewIOException("Notachecksumfile:"+sumFile);
this.bytesPerSum=sums.readInt();
set(fs.verifyChecksum,newPureJavaCrc32(),bytesPerSum,4);
}catch(...){//ignore
set(fs.verifyChecksum,null,1,0);
}
}
FSInputChecker.read()循环调用read1()方法直到读取len个字节或者没有数据可读,返回读取的字节数。
publicsynchronizedintread(byte[]b,intoff,intlen)throwsIOException{
...//参数校验
intn=0;
for(;;){
intnread=read1(b,off+n,len-n);
if(nread<=0)
return(n==0)?nread:n;
n+=nread;
if(n>=len)
returnn;
}
}
FSInputChecker.read1()方法为了提高效率,减少内存复制的次数,若当前FSInputChecker.buf没有数据可读且要读取的len字节数大于或等于数据块大小(buf.length,默认512字节),则通过readchecksumChunk()方法将数据直接读取目标数组中,而不需经过FSInputChecker.buf的中转。若buf没有数据可读且读取的len字节数小于数据块大小,则通过fill()方法从数据流中一次读取一个数据块。
privateintread1(byteb[],intoff,intlen)throwsIOException{
intavail=count-pos;
if(avail<=0){
if(len>=buf.length){
intnread=readChecksumChunk(b,off,len);//readachunktouserbufferdirectly;avoidonecopy
returnnread;
}else{
fill();//readachunkintothelocalbuffer
if(count<=0){
return-1;
}else{
avail=count;
}
}
}
//copycontentofthelocalbuffertotheuserbuffer
intcnt=(avail<len)?avail:len;
System.arraycopy(buf,pos,b,off,cnt);
pos+=cnt;
returncnt;
}
FSInputChecker.readChecksumChunk()方法通常需要对读取的字节序列进行校验(默认为true),若校验不通过,可选择新的副本进行重读,如果进行了retriesLeft次重读仍然不能校验通过,则抛出异常。readChunk()方法是一个抽象方法,FSInputChecker的子类实现它,以定义实际读取数据的逻辑。
privateintreadChecksumChunk(byteb[],intoff,intlen)throwsIOException{
//invalidatebuffer
count=pos=0;
intread=0;
booleanretry=true;
intretriesLeft=numOfRetries;
do{
retriesLeft--;
try{
read=readChunk(chunkPos,b,off,len,checksum);
if(read>0){
if(needChecksum()){
sum.update(b,off,read);
verifySum(chunkPos);
}
chunkPos+=read;
}
retry=false;
}catch(ChecksumExceptionce){
if(retriesLeft==0){
throwce;
}
if(seekToNewSource(chunkPos)){//重试一个新的数据副本
seek(chunkPos);
}else{
throwce;
}
}
}while(retry);
returnread;
}
ChecksumFileSystem.ChecksumFSInputChecker实现了readChunk()的逻辑。readChunk()它读取数据块和校验数据和,不进行两者的校验。getChecksumFilePos()方法定位到校验和文件中pos位置对应块的边界,以便读取一个数据块对应的完整校验和。
//ChecksumFSInputChecker.readChunk()
protectedintreadChunk(longpos,byte[]buf,intoffset,intlen,
byte[]checksum)throwsIOException{
booleaneof=false;
if(needChecksum()){
try{
longchecksumPos=getChecksumFilePos(pos);
if(checksumPos!=sums.getPos()){
sums.seek(checksumPos);
}
sums.readFully(checksum);
}catch(EOFExceptione){
eof=true;
}
len=bytesPerSum;
}
if(pos!=datas.getPos()){
datas.seek(pos);
}
intnread=readFully(datas,buf,offset,len);
if(eof&&nread>0){
thrownewChecksumException("Checksumerror:"+file+"at"+pos,pos);
}
returnnread;
}
写文件
与文件/目录元数据信息的维护和读文件相比,写文件相对起来比较复杂,ChecksumFileSystem需要维护字节流上的数据读写和基于块的校验和关系。一般而言,每{io.bytes.per.checksum}(默认512)个数据字节对应一个单独的校验和,CRC-32校验和的输出为4个字节。因此校验数据所带来的存储开销小于1%。ChecksumFSOutputSummer继承FSOutputSummer,在基本的具体文件系统的输出流上,添加数据文件和校验文件流的输出。继承关系:OutputStream<--FSOutputSummer<--ChecksumFSOutputSummer
FSOutputSummer是一个生成校验和的通用输出流,包含4个成员变量。
abstractpublicclassFSOutputSummerextendsOutputStream{
privateChecksumsum;//datachecksum计算校验和
privatebytebuf[];//internalbufferforstoringdatabeforeitischecksumed输出数据缓冲区
privatebytechecksum[];//internalbufferforstoringchecksum校验和缓冲区
privateintcount;//Thenumberofvalidbytesinthebuffer.已使用空间计数
...
}
FSOutputSummer逻辑非常清晰,根据提供的字节数组,每{io.bytes.per.checksum}求出一个校验和,并根据子类所实现的writeChunk()方法写出到响应的输出流中,在ChecksumFSOutputSummer中,则分别写入文件数据流和校验文件数据流。
//ChecksumFileSystem.CheckSumFSOutputSummer
privatestaticclassChecksumFSOutputSummerextendsFSOutputSummer{
privateFSDataOutputStreamdatas;
privateFSDataOutputStreamsums;
...
@Override
protectedvoidwriteChunk(byte[]b,intoffset,intlen,byte[]checksum)throwsIOException{
datas.write(b,offset,len);
sums.write(checksum);
}
}
FSOutputSummer.write()方法循环调用write1()方法进行校验和计算和数据流输出。当buf的count数等于buf.length,则将数据和校验和输出到对应的流中。
publicsynchronizedvoidwrite(byteb[],intoff,intlen)throwsIOException{
...//参数校验
for(intn=0;n<len;n+=write1(b,off+n,len-n)){}
}
privateintwrite1(byteb[],intoff,intlen)throwsIOException{
if(count==0&&len>=buf.length){
finalintlength=buf.length;
sum.update(b,off,length);
writeChecksumChunk(b,off,length,false);
returnlength;
}
//copyuserdatatolocalbuffer
intbytesToCopy=buf.length-count;
bytesToCopy=(len<bytesToCopy)?len:bytesToCopy;
sum.update(b,off,bytesToCopy);
System.arraycopy(b,off,buf,count,bytesToCopy);
count+=bytesToCopy;
if(count==buf.length){//localbufferisfull
flushBuffer();
}
returnbytesToCopy;
}
privatevoidwriteChecksumChunk(byteb[],intoff,intlen,booleankeep)throwsIOException{
inttempChecksum=(int)sum.getValue();
if(!keep){
sum.reset();
}
int2byte(tempChecksum,checksum);//整数转字节数组
writeChunk(b,off,len,checksum);
}
write1()方法是用了一个实用的技巧,若当前缓冲区的写入字节数为0(count=0)且需要写入的字节数据长度大于或等于块(buf.length)的长度,则直接进行校验和计算,避免将数据拷贝到缓冲区,然后再计算校验和,减少内存拷贝的次数。write1()方法尽可能的写入多的数据,但一次最多写入一个块。
ChecksumFileSystem.CheckSumFSOutputSummer提供了构造FSOutputSummer所需要的参数。校验和采用PureJavaCrc32,校验和长度4字节,缓冲大小为512字节(默认)。
publicChecksumFSOutputSummer(ChecksumFileSystemfs,Pathfile,booleanoverwrite,intbufferSize,
shortreplication,longblockSize,Progressableprogress)throwsIOException{
super(newPureJavaCrc32(),fs.getBytesPerSum(),4);
intbytesPerSum=fs.getBytesPerSum();
this.datas=fs.getRawFileSystem().create(file,overwrite,bufferSize,replication,blockSize,progress);
intsumBufferSize=fs.getSumBufferSize(bytesPerSum,bufferSize);
this.sums=fs.getRawFileSystem().create(fs.getChecksumFile(file),true,sumBufferSize,replication,blockSize);
sums.write(CHECKSUM_VERSION,0,CHECKSUM_VERSION.length);
sums.writeInt(bytesPerSum);
}
构造ChecksumFSOutputSummer时,就往校验和文件流中写入魔数CHECKSUM_VERSION("crc\0")和校验块长度。FSOutputSummer抽象了大部分和数据分块、计算校验和的相关功能,ChecksumFSOutputSummer在此基础上提供了具体的文件流输出。
由此可见,hadoop对于文件校验有一套精心的设计,文件系统和文件读写都会避免错误的产生。虽然额外的校验可能会导致性能的占用,但是一些公司经过摸索也找出了解决方案:
据
java的crc校验,性能还提高了下,如果是几百兆就做一个crc校验,那么jni调用导致的上下文切换少些,那么jni就还有优势,但是在hadoop这个应用场景明显不合适。后来淘宝的针对hadoop的crc场景,定制了jvm,将crc指令优化为调用硬件指令,性能测试报告证明提高了hdfs性能的20%-30%。
到此,文件校验分析就结束了。接下来我会接着介绍压缩编码解码方面的原理。
Charles于2015-12-22PhnomPenh
相关文章推荐
- 非常全的linux面试笔试题及参考答案
- 初步掌握HDFS的架构及原理3
- 编译安装软件包之Apache
- CentOS 6.5 MySQL5.6.26源码安装
- Linux之进程查看工具介绍
- Linux之进程查看工具介绍
- Hadoop Serialization(third edition)hadoop序列化详解(最新版) (1)
- Linux Shell 逐行读取文件 ( txt , sh , csv等)
- Hadoop编码解码【压缩解压缩】机制详解(1)
- jdbc.properties
- linux中权限的介绍,分类和实施
- CentOS 6.5 Git源码安装
- OpenStack集成容器方案解读
- 数据中心架构部署
- 政府行业IT运维管理解决方案
- CentOS 6.5 Ruby源码安装
- CentOS系统中Firefox浏览器的flash player安装
- CentOS 7.0 服务器安装
- Mac OS X Install Docker
- win7下用VMware 安装CentOS 7 64位