您的位置:首页 > Web前端 > Node.js

hadoop源代码分析(2)-hdfs.server.datanode包-DataXceiverServer类【原创】

2013-01-27 18:12 696 查看

一 准备

  hadop版本:1.0.3

  学习方法:在学习datanode类过程中,发现它引用DataXceiverServer这个类,同时配合DataNode的理解,学习DataXceiverServer。

  时间:2013-01-27

二 DataXceiverServer功能描述

  DataXceiverServer类是DataNode的辅助类,它最主要是用来实现客户端或其他数据节点与当前节点通信,并负责接收/发送数据块。这个类的创建是为了监听来自客户端或其他数据节点的请求。 它的实现通信的方法不是用hadoop IPC,而是用jdk本身就有的ServerSocket。

三 DataXceiverServer如何实现其功能

  1、通信的准备工作

  按照我们通俗的理解,接收和发送数据块,需要知道数据块的大小,传输使用手段,通信的双方,用多少个线程来通信,带宽情况。这些都在这个类里有实现。

  2、通信

  通信用的方式是ServerSocket,并且采用线程的方式,因此实现类Runnable接口。最主要的方法如代码所示。

View DataXceiverServer Code

package org.apache.hadoop.hdfs.server.datanode;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.nio.channels.AsynchronousCloseException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.server.balancer.Balancer;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StringUtils;

import org.apache.hadoop.conf.Configuration;

/**
* DataXceiverServer类用来接接收/发送数据块。这个类的创建是为了监听来自客户端或其他数据节点的请求。
* 这个小服务器没有利用hadoop IPC。
*/
class DataXceiverServer implements Runnable, FSConstants {
public static final Log LOG = DataNode.LOG;

ServerSocket ss;
DataNode datanode;
// Record all sockets opened for data transfer
/**
* 返回由指定的映射支持的同步映射,为数据传出记录所有打开的sockets
*/
Map<Socket, Socket> childSockets = Collections.synchronizedMap(
new HashMap<Socket, Socket>());

/**
* Maximal number of concurrent xceivers per node.
* Enforcing the limit is required in order to avoid data-node
* running out of memory.
* 每个节点当前最大的xceivers数目
* 必须强制限制数目时为类防止datanode内存溢出
*/
static final int MAX_XCEIVER_COUNT = 256;
int maxXceiverCount = MAX_XCEIVER_COUNT;

/** A manager to make sure that cluster balancing does not
* take too much resources.
*
* It limits the number of block moves for balancing and
* the total amount of bandwidth they can use.
* BlockBalanceThrotter数据块均衡管理器确保集群均衡,不消耗太多的资源
* 为了均衡,它限制数据块移动的数目和他们能用的总带宽
*/
static class BlockBalanceThrottler extends BlockTransferThrottler {
private int numThreads;

/**Constructor
*
* @param bandwidth Total amount of bandwidth can be used for balancing
*/
private BlockBalanceThrottler(long bandwidth) {
super(bandwidth);
LOG.info("Balancing bandwith is "+ bandwidth + " bytes/s");
}

/** Check if the block move can start.
*
* Return true if the thread quota is not exceeded and
* the counter is incremented; False otherwise.
* 数据块同时移动的最大数目为5,超过5,返回false,不然线程数自增。
*/
synchronized boolean acquire() {
if (numThreads >= Balancer.MAX_NUM_CONCURRENT_MOVES) {
return false;
}
numThreads++;
return true;
}

/** Mark that the move is completed. The thread counter is decremented. */
synchronized void release() {
numThreads--;
}
}

BlockBalanceThrottler balanceThrottler;

/**
* We need an estimate for block size to check if the disk partition has
* enough space. For now we set it to be the default block size set
* in the server side configuration, which is not ideal because the
* default block size should be a client-size configuration.
* A better solution is to include in the header the estimated block size,
* i.e. either the actual block size or the default block size.
* 我们需要评估数据块大小来检检查磁盘分区是否有足够的空间。当前我们把它设置为默认的数据块大小。
* 在服务器端的配置,默认的数据块大小就显得不理想,因为默认数据块大小本应是为客户端配置的。
* 一个更号的解决方案是把比如实际数据块大小或默认数据块大小包含在待评估的数据块大小的头部。
*/
long estimateBlockSize;

/**
* 数据接收服务器构造方法
* @param ss 服务器端套接字
* @param conf 配置
* @param datanode 数据节点
*/
DataXceiverServer(ServerSocket ss, Configuration conf,
DataNode datanode) {

this.ss = ss;
this.datanode = datanode;
/**
* 根据配置项读取手工配置的最大接收数目,与MAX_XCEIVER_COUNT
*/
this.maxXceiverCount = conf.getInt("dfs.datanode.max.xcievers",
MAX_XCEIVER_COUNT);
// 根据配置项dfs.block.size读取数据块大小,默认为64M。
this.estimateBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);

//set up parameter for cluster balancing,默认为1M。
this.balanceThrottler = new BlockBalanceThrottler(
conf.getLong("dfs.balance.bandwidthPerSec", 1024L*1024));
}

/**
* 运行数据块接收服务器,前提,datanode要运行
*/
public void run() {
while (datanode.shouldRun) {
try {
Socket s = ss.accept();
/**
* 侦听并接受来自客户端或其他服务器的连接请求,ss为执行当前方法的数据节点
*/
s.setTcpNoDelay(true);
new Daemon(datanode.threadGroup,
new DataXceiver(s, datanode, this)).start();
} catch (SocketTimeoutException ignored) {
// wake up to see if should continue to run
} catch (AsynchronousCloseException ace) {
LOG.warn(datanode.dnRegistration + ":DataXceiveServer:"
+ StringUtils.stringifyException(ace));
datanode.shouldRun = false;
} catch (IOException ie) {
LOG.warn(datanode.dnRegistration + ":DataXceiveServer: IOException due to:"
+ StringUtils.stringifyException(ie));
} catch (Throwable te) {
LOG.error(datanode.dnRegistration + ":DataXceiveServer: Exiting due to:"
+ StringUtils.stringifyException(te));
datanode.shouldRun = false;
}
}
try {
ss.close();
} catch (IOException ie) {
LOG.warn(datanode.dnRegistration + ":DataXceiveServer: Close exception due to: "
+ StringUtils.stringifyException(ie));
}
LOG.info("Exiting DataXceiveServer");
}
/**
* 杀死此线程,前提时确保数据节点已经关闭运行,然后关掉ServerSocket
*/
void kill() {
assert datanode.shouldRun == false :
"shoudRun should be set to false before killing";
try {
this.ss.close();
} catch (IOException ie) {
LOG.warn(datanode.dnRegistration + ":DataXceiveServer.kill(): "
+ StringUtils.stringifyException(ie));
}

// close all the sockets that were accepted earlier
/**
* 关闭各个socket的安全方法
*/
synchronized (childSockets) {
for (Iterator<Socket> it = childSockets.values().iterator();
it.hasNext();) {
Socket thissock = it.next();
try {
thissock.close();
} catch (IOException e) {
}
}
}
}
}


五 DataXceiverServer相关类、接口简述

  DataXceiverServer实现Runnalble和FSConstants接口,关联org.apache.hadoop.hdfs.server.balancer.Balancer,内部类:BlockBalanceThrottler。Runnalble接口:实现此接口的类视为创建一个线程,它必须实现void run()方法,启动该线程将导致在独立执行中调用DataXceiverServer的run方法。

  FSConstants接口:定义一些文件系统常量,在DataXceiverServer类用到的常量有:DEFAULT_BLOCK_SIZE:默认数据块大小(64M)。

  Balancer:磁盘空间平衡器,在DataXceiverServer中限制类最大线程数不能超过Balancer.MAX_NUM_CONCURRENT_MOVES(值为5)。

  BlockBalanceThrottler数据块均衡管理器确保集群均衡,不消耗太多的资源。为了均衡,它限制数据块移动的数目和他们能用的总带宽。限制数据块最大数目为5,带宽为1M。

六 结语

  1、感觉hadoop的源码层层嵌套,同时逻辑性又特别强,有难度。这个类的学习让我对套接字的通信、线程使用、线程安全、后台线程都得到进一步的加深。

  2、备忘:学习DataXceiver类。

  原文出处:http://www.cnblogs.com/caoyuanzhanlang

  

草原战狼淘宝小店:http://xarxf.taobao.com/ 淘宝搜小矮人鞋坊,主营精致美丽时尚女鞋,为您的白雪公主挑一双哦。谢谢各位博友的支持。

==========================================================================================================

  =================================== 以上分析仅代表个人观点,欢迎指正与交流 ============================================

  =================================== 尊重劳动成果,转载请注明出处,万分感谢 ============================================

  ==========================================================================================================

  


  
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐