hadoop 的RMI实现分析。(请参考hadoop RMI 的源码)
2007-09-11 12:02
633 查看
Hadoop中的分布式对象
Hadoop的分布式对象的实现主要是靠 Serve,RPC,Client这三个类。其中RPC封装了Serve和Client,向用户提供统一的调用接口,(即用户只需要和RPC打交道就可以实现RMI的基本功能)。研究RPC对深入理解RMI很有帮助RPC说起:
任何一个对象如果他想拥有提供远程服务的能力,那么必须调用
server RPC.getServer(Instance ,bindedAddress,pot,1,fasle,conf)
然后调用 返回的server的start方法,例如jobtrack在提供远程服务的操作:
InetSocketAddress addr = getAddress(conf);this.localMachine = addr.getHostName();
this.port = addr.getPort();
this.interTrackerServer = RPC.getServer(this, addr.getHostName(), addr.getPort(), 10, false, conf);
this.interTrackerServer.start();
而对于Client应用程序来说他要想使用远程对象提供的服务需要 RPC.getClient(Conf)得到一个Client对象然后调用Client中的Call方法,Call原形为:Call(Wtitable,interAddress)其中Wtitable事实上是实现了Wtitable接口的invocation类。其实这一过程通常是由代理proxy完成。Rpc的代理使用时getProxy(….)它的代理对象主要实现了远程对象所提供的服务的接口。
Client端的实现:
1.首先在建立client对象以后,调用client的call方法,
在call方法中:
调用getConnetion与远程主机建立连接;
调用将call中invocation参数包装成一个Call对象;
将此Call对象用SentParm()方法发送出去;;
调用发送出去的Call对象的Wait方法(继承于Object类);
远程对象返回Value被保存在了Call的value字段中;
Call对象Notify正在等待返回结果的call方法;
至此client中的远程方法调用完成。
2.关于连接(Connetion)的建立及在client/server通讯过程中的管理:
1).Connetin的建立是由getConnetion创立的。每个Connection 都是一个线程它们在程序运行时不断的监听Socket上的server返回值。Connetion监听到serve端关闭了连接的时候就将调用close函数关闭socket。
2).每个Client在建立的时候都同时建立一个Deamon线程:ConnetionCuller即由此线程负责定期检查所有的Connection是否IsIdol()太长的时间,然后将所有的idol太久的Connection的shouldColeconnection的设为true,并notify 这个connction。
3).这时要注意了,Connection的run方法中在每次读Socket之前都要调用WaitForWork方法。而waitforWork的返回值为!shouldcloseConnection。
//wait till someone signals us to start reading RPC response or
//close the connection. If we are idle long enough (blocked in wait),
//the ConnectionCuller thread will wake us up and ask us to close the
//connection.
//We need to wait when inUse is 0 or socket is null (it may be null if
//the Connection object has been created but the socket connection
//has not been setup yet). We stop waiting if we have been asked to close
//connection
while ((inUse == 0 || socket == null) && !shouldCloseConnection) {
try {
wait();//依靠connectionculler,或者由数据到来唤醒
} catch (InterruptedException e) {}
}
return !shouldCloseConnection;
}
注:一些相关的思考当soket上由数据到来的时候调用socket.wait一定会收到notify,但是如果socket被包装到某个类中,那末这个类的对象调用wait是否会收到notify的信息呢?
分析以上代码可以知道如果wait是被ConnectionCuller唤醒的则返回的是false(此时connection中的run跳出循环结束连接Close),如果是被到达的数据唤醒的那么返回值是true。一言以蔽之ConnectionCuller决定了某个Connection是否要被关闭,而具体的关闭动作是由Connection中的run完成的。
这正如上面waitforwork前的注释所说。
3.这个Cilent一些细节实现技巧:
需要注意的是Client和serve之间的连接关闭问题,一次远程调用完成后并没有规定谁负责关闭这个连接,这就产生了一个问题,长时间idol的连接怎样回收?
这里应用了一个ConnectionCuller来回收所有的idol connection。
Serve端的实现:
1. Serve端采用了无阻塞IO的形式,在Serve通过start启动的时候,serve启动了一个侦听线程Listener,和多个处理线程handler,listener负责建立连接,和读取调用请求,并把读取到的调用请求以Call的形式放入队列CallerQueue中。而handler负责从CallerQueue中的caller取出,调用在RPC中实现的call方法进行处理。
2. Listener的具体实现:
1> listener作为一个Deamon线程从一开始就不断监听指定端口的连接请求。当有连接到来时调用DoAccept方法将由此连接请求得到的SocketChannel加入到Selector中进行NIO,同时把这个SocketChannel和一个Connetion对象邦定(后面当这个Socket上有Caller到来的时候就可以调用这个Connection上的处理函数,从Socket上读取Call将这个call放到队列中)。
2> handler从一开始就不断察看CallQueue中是否有请求到来,如果有,那么简单的调用serve的Call方法处理得到调用结果value,然后将结果写回与call绑定的Socket连接(即这个call来自的socket连接)。
注:handler的数目在Serve运行的过程中是固定的,而Connection根据请求的连接数目是动态变化的。在这里起到了线程池的作用。
以下是listener的run方法:
Public run(){
LOG.info(getName() + ": starting");
SERVER.set(Server.this);
while (running) {
SelectionKey key = null;
try {
selector.select();
Iterator iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
key = (SelectionKey)iter.next();
iter.remove();
try {
if (key.isValid()) {
if (key.isAcceptable())
doAccept(key);
else if (key.isReadable())
doRead(key);
}
} catch (IOException e) {
key.cancel();
}
key = null;
}
} catch (OutOfMemoryError e) {
// we can run out of memory if we have too many threads
// log the event and sleep for a minute and give
// some thread(s) a chance to finish
LOG.warn("Out of Memory in server select", e);
closeCurrentConnection(key, e);
cleanupConnections(true);
try { Thread.sleep(60000); } catch (Exception ie) {}
} catch (Exception e) {
closeCurrentConnection(key, e);
}
cleanupConnections(false);
}
LOG.info("Stopping " + this.getName());
synchronized (this) {
try {
acceptChannel.close();
selector.close();
} catch (IOException e) { }
selector= null;
acceptChannel= null;
connectionList = null;
}
相关文章推荐
- Hadoop源码分析笔记(十):数据节点--流式接口的实现
- 分布式系统Hadoop源码阅读与分析(一):作业调度器实现机制
- Hadoop跨集群数据拷贝工具DISTCP内部源码实现分析
- Hadoop NameNode 高可用实现源码分析
- epoll源码实现分析[整理]
- muduo源码分析之实现TCP网络库(连接的接收和关闭)
- Hadoop源码分析HDFS ClientProtocol——create
- 可变参数列表实现机制与printf()函数源码分析
- hadoop(2.7.3) 源码分析--RPC部分
- Hadoop RPC机制+源码分析
- python协程的实现(greenlet源码分析)
- Android实现异步任务机制AsyncTask 的使用及源码分析
- 【MyBatis源码分析】插件实现原理
- 阿里巴巴Dubbo实现的源码分析
- STL源码分析----神奇的 list 的 sort 算法实现
- [Android源码分析]蓝牙文件传输过程解析之UI实现
- 集合框架源码分析三(实现类篇ArrayList,LinkedList,HashMap)
- google开源库glog源码实现分析
- hadoop源码分析 jobsplit
- HBase源码分析之org.apache.hadoop.hbase.ipc包