您的位置:首页 > Web前端 > Node.js

DataNode本地数据存储和管理--ncp_block_verification.log.curr和dncp_block_verification.log.prev

2016-04-07 17:18 591 查看
DataNode本地数据存储和管理

由于DataNode数可以成千上万,NameNode只有一个,为了减轻NameNode的负担,NameNode上并不永久保存那个DataNode上有那些数据块的信息,而是通过DataNode启动时的上报,来更新NameNode上的映射表。

DataNode和NameNode建立连接以后,就会不断地和NameNode保持心跳。心跳的返回包含了NameNode对DataNode的一些命令,如删除数据库或者是把数据块复制到另一个DataNode。NameNode不会发起到DataNode的请求,在通信过程中,它们是严格的客户端/服务器架构。

在客户端做写操作的时候,DataNode需要相互配合,保证写操作的一致性。

DataNode的实现包括两部分,一部分是对本地数据块的管理,另一部分,就是和其他的实体打交道。

-----------------------DataNode本地数据存储结构Storage类------------------------------------------

所有的数据就存放在dfs/data里面:

storage里存的东西是一些出错信息;

in_use.lock是一个空文件,它的作用是如果需要对整个系统做排斥操作,应用应该获取它,以它为锁;

current存的是当前有效的数据块,detach存放的是分离文件(用于分离硬链接,即copy-on-write),tmp保存的是一些操作需要的临时数据块。current下包含了一系列的数据块文件和数据块元数据文件;

数据块文件显然保存了HDFS中的数据,数据块最大可以刡64M。每个数据块文件都会有对应的数据块元数据文件。里面存放的是数据块的校验信息。下面是数据块文件名和它的元数据文件名的例子:

blk_3148782637964391313 ---->数据文件

blk_3148782637964391313_242812.meta --->元数据文件(校验文件)

上面的例子中,3148782637964391313是数据块的ID号,242812是数据块的stamp,用于一致性检查。

在current目录下有下面几个文件:

dncp_block_verification.log.curr和dncp_block_verification.log.prev,它记录了一些DataNode对文件系定时统做一致性检查需要的信息。

VERSION,保存了一些文件系统的元信息。

DataNode存储的目录结构:

${dfs.data.dir}/current/VERSION

/blk_<id_1>

/blk_<id_1>_genStamp.meta

/blk_<id_2>

/blk_<id_2>_genStamp.meta

/...

/blk_<id_64>

/blk_<id_64>_genStamp.meta

/subdir0/

/subdir1/

/...

/subdir63/

/previous/

/detach/

/tmp/

/in_use.lock

/storage

类Storage保存了和存储相关的信息,它继承了StorageInfo。StorageInfo是一个简单的存储信息类,包含变量:

public int layoutVersion; // HDFS的固定数据结构的版本

public int namespaceID; // storage的namespaceId

public long cTime; //创建的时间

其中,HDFS的固定数据结构的版本是由一个叫layoutVersion负整数定义的,这个版本号与Hadoop分布的发行号是不相干的。无论何时版本号都是减一(比如:版本-18过了就是-19)。当版本号变小时HDFS就需要升级,因此如果HDFS保存的layout是个旧的版本,那么新的namenode(datanode)就无法操作。

Storage.StorageDirectory是存储文件夹类:包含了备份/恢复机制,Storage.StorageDirectory.analyzeStorage通过判断文件夹current,previous,previous.tmp,removed.tmp,finalized.Tmp,checkpoint.tmp的存在组合情况,来检测该文件夹下的一致性(由于升级/回滚/提交finalize是一个过程,所以可能出现中途中断的情况),返回相应的状态。

StorageDirectory.doRecover根据返回的状态在doRecover中进行相应的恢复操作:

COMPLETE_UPGRADE:mv previous.tmp -> previous

RECOVER_UPGRADE:mv previous.tmp -> current

COMPLETE_FINALIZE:rm finalized.tmp

COMPLETE_ROLLBACK:rm removed.tmp

RECOVER_ROLLBACK:mv removed.tmp -> current

COMPLETE_CHECKPOINT:mv lastcheckpoint.tmp -> previous.checkpoint

RECOVER_CHECKPOINT:mv lastcheckpoint.tmp -> current

current文件夹下的VERSION就是通过StorageDirectory.read/write进行生成读取的。

storageDirectory.lock/unlock用于控制多个并发程序对文件夹的访问操作。lock通过调用FileChannel.tryLock对文件in_use.lock进行锁定,要对这个文件夹进行操作时候要先获取这个锁,如果获取不到就不能访问。在获取文件夹的状态时要进行lock,因为获取后要在doRecover中根据状态对文件夹进行相应的恢复操作,在doRecover结束后进行unlock。即在对文件夹操作前要lock,操作结束后unlock。

Storage.DirIterator是一个用于访问存储文件夹类型为DirIterator.dirType的迭代器。其hasNext和next都与List操作一样,只是对要访问的类型做了限定,是DirIterator.dirType类型的元素,其他类型文件视为透明。

数据是存储在Storage.storageDirs这些文件夹下的。

DataStorage是Storage的子类,用于描述datanode上的存储文件夹(datanode下可以有多个文件夹用于存放数据,命名为subdir0...。因为HDFS规定了一个目录能存放Block的数目(dfs.datanode.numblocks,默认64)。如果数目多了会用创建新的子目录来存放新块。),并拥有update/rollback/finalize等存储文件夹操作,在recoverTransitionRead调用StorageDirectory.analyzeStorage分析文件夹的状态,出错时调用doRecover进行恢复。

-----------------------DataNode本地数据存储管理FSDataset类------------------------------------------

所有和数据块相关的操作,都在FSDataset相关的类中进行处理。

DataNode的存储结构由大到小为卷FSVolume、目录FSDir、文件(block和元数据等)。

DataNodeBlockInfo类存放的是Block在文件系统上的信息:FSVolume所属的卷,File所属的文件,detached是否分离(即是否不与其他文件建立硬链接)。

DataNodeBlockInfo.DetachFile是用于分离文件。在升级update后,previous目录和current目录会通过硬链接指到同一个文件,previous保存的是旧版本文件,当要修改current的文件时,需要将这个文件进行分离(copy-on-write),使得不会影响到previous中的文件,detachFile就是做这个的:在detach目录下创建复制一个相同的文件,然后将这个文件复制到current下,这样current下文件的硬链接就断掉了。

在DataNode中,不需要考虑块属于哪个文件,‘块文件’指的是数据块的文件,而非所属的文件。

由于HDFS规定了一个目录能存放Block的数目,所以一个Storage上存在多个目录。

对应的,FSDataset中用FSVolume来对应一个Storage(一个dfs中可以有多个Storage),FSDir对应一个目录,所有的FSVolume由FSVolumeSet管理,

FSDataset中通过一个FSVolumeSet对象,就可以管理它的所有存储空间。当然这些文件、文件夹都是指存放在current下。

FSDir是一个目录类,包含变量:

private synchronized List<Thread> tryUpdateBlock(      Block oldblock, Block newblock) throws IOException {    //check ongoing create threads    final ActiveFile activefile = ongoingCreates.get(oldblock);//获取与此块相关的文件及访问这个块的线程    if (activefile != null && !activefile.threads.isEmpty()) {//如果近期有对此块进行操作,返回存活的操作线程      //remove dead threads      for(Iterator<Thread> i = activefile.threads.iterator(); i.hasNext(); ) {        final Thread t = i.next();        if (!t.isAlive()) {          i.remove();        }      }      //return living threads      //近期有对旧块进行操作,将存活的线程返回      if (!activefile.threads.isEmpty()) {        return new ArrayList<Thread>(activefile.threads);      }    }    //近期无对此块操作的线程,就更新块    //No ongoing create threads is alive.  Update block.    File blockFile = findBlockFile(oldblock.getBlockId());//获得旧块的文件    if (blockFile == null) {      throw new IOException("Block " + oldblock + " does not exist.");    }    //获得旧块的元数据文件    File oldMetaFile = findMetaFile(blockFile);    //获得旧块中的genStamp    long oldgs = parseGenerationStamp(blockFile, oldMetaFile);        //rename meta file to a tmp file    //先将旧块元数据文件重命名    File tmpMetaFile = new File(oldMetaFile.getParent(),        oldMetaFile.getName()+"_tmp" + newblock.getGenerationStamp());    if (!oldMetaFile.renameTo(tmpMetaFile)){      throw new IOException("Cannot rename block meta file to " + tmpMetaFile);    }    //旧块stamp比新块stamp大,不合法    if (oldgs > newblock.getGenerationStamp()) {      throw new IOException("Cannot update block (id=" + newblock.getBlockId()          + ") generation stamp from " + oldgs          + " to " + newblock.getGenerationStamp());    }        //update length    //新块的大小大于旧块的大小    if (newblock.getNumBytes() > oldblock.getNumBytes()) {      throw new IOException("Cannot update block file (=" + blockFile          + ") length from " + oldblock.getNumBytes() + " to " + newblock.getNumBytes());    }    //新块的大小小于旧块的大小    if (newblock.getNumBytes() < oldblock.getNumBytes()) {      //截断旧块和旧块的元数据文件      truncateBlock(blockFile, tmpMetaFile, oldblock.getNumBytes(), newblock.getNumBytes());    }    //rename the tmp file to the new meta file (with new generation stamp)    //将重命名后的旧元数据文件重命名为新块元数据文件    File newMetaFile = getMetaFile(blockFile, newblock);    if (!tmpMetaFile.renameTo(newMetaFile)) {      throw new IOException("Cannot rename tmp meta file to " + newMetaFile);    }    updateBlockMap(ongoingCreates, oldblock, newblock);    updateBlockMap(volumeMap, oldblock, newblock);    // verify that the contents of the stored block     // matches the block file on disk.    validateBlockMetadata(newblock);    return null;  }  //truncateBlock对旧块blockFile和对应的元数据文件metaFile进行截断,截断后旧块长度为newlen(newlen<oldlen)。  static private void truncateBlock(File blockFile, File metaFile,      long oldlen, long newlen) throws IOException {    if (newlen == oldlen) {      return;    }    if (newlen > oldlen) {      throw new IOException("Cannout truncate block to from oldlen (=" + oldlen          + ") to newlen (=" + newlen + ")");    }    //由于只是对就块进行截断,所有新块的最后一个校验和字段可能在旧块中不一样,    //所有setLength进行截断后,要读取最后一个校验和字段    DataChecksum dcs = BlockMetadataHeader.readHeader(metaFile).getChecksum();     int checksumsize = dcs.getChecksumSize();    int bpc = dcs.getBytesPerChecksum();    long n = (newlen - 1)/bpc + 1;//校验和的段数    long newmetalen = BlockMetadataHeader.getHeaderSize() + n*checksumsize;//新的校验和文件的长度    long lastchunkoffset = (n - 1)*bpc;//最后一个校验和字段的偏移位置    int lastchunksize = (int)(newlen - lastchunkoffset); //最后一个校验和的开始位置    byte[] b = new byte[Math.max(lastchunksize, checksumsize)];     RandomAccessFile blockRAF = new RandomAccessFile(blockFile, "rw");//对旧块进行读取    try {      //truncate blockFile       blockRAF.setLength(newlen);//截断旧块      //read last chunk      blockRAF.seek(lastchunkoffset);//读取最后一个校验和字段      blockRAF.readFully(b, 0, lastchunksize);    } finally {      blockRAF.close();    }    //compute checksum    dcs.update(b, 0, lastchunksize);//计算新块的最后一个校验和字段的校验和    dcs.writeValue(b, 0, false);    //update metaFile     RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw");    try {      metaRAF.setLength(newmetalen);//截断旧块的元数据文件      metaRAF.seek(newmetalen - checksumsize);//移到最后一个校验和字段      metaRAF.write(b, 0, checksumsize);//将最后一个校验和字段校验值写入    } finally {      metaRAF.close();    }  }


FSDataset.finalizeBlock用于将tmp目录下的块和元数据文件移动到current下,同时将此块从ongoingcreate近期操作列表中移除,并修改volumeMap中块的位置值。提交(或叫:结束finalize)通过writeToBlock打开的block,返意味着写过程没有出错,可以正式把Block从tmp文件夹放到current文件夹。

FSDataset.unfinalizeBlock的工作相反:将块b从ongoingcreate、volumeMap中清除走。被FSDataset.cleanupBlock调用。

FSDataset.invalidate(Block invalidBlks[])用于将invalidBlks块和相应的元数据文件删除掉,从volumeMap中删除掉这个块记录,同时将更新块所在的卷的空间使用情况。

FSDataset.findBlockFile(long blockId) 用于根据blockId查找对应的快文件:先从ongoingcreate近期访问列表中找,找不到再找volumeMap哈希表。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: