您的位置:首页 > 运维架构

Hadoop RPC 源码解析

2014-02-20 15:10 393 查看
RPC源码分析(package org.apache.hadoop.ipc):(Client阻塞IO编程)(服务端非阻塞NIO编程)



通过Java的动态代理(Dynamic Proxy)与反射(Reflect)实现

一:建立连接
getConnection分析:
Connection connection;
//如果connections连接池中有对应的连接对象,就不需重新创建了;如果没有就需重新创建一个连接对象。
//但请注意,该//连接对象只是存储了remoteId的信息,其实还并没有和服务端建立连接。
do {
synchronized (connections) {
connection = connections.get(remoteId);
if (connection == null) {
connection = new Connection(remoteId);
connections.put(remoteId, connection);
}
}
} while (!connection.addCall(call)); //将call对象放入对应连接中的calls池,shouldCloseConnection=false连接未关闭放入call跳出循环下执行
//这句代码才是真正的完成了和服务端建立连接
connection.setupIOstreams();
return connection;
connection.setupIOstreams分析:

setupConnection();//生成socket绑定socket ip地址
InputStream inStream = NetUtils.getInputStream(socket);
OutputStream outStream = NetUtils.getOutputStream(socket);

......
this.in = new DataInputStream(new BufferedInputStream(new PingInputStream(inStream)));//包装client对象属性 in,out输入输出流
this.out = new DataOutputStream(new BufferedOutputStream(outStream));

//当连接socket建立后,启动线程接收 Connection extends Thread start the receiver thread after the socket connection
has been set up
start();

setupConnection() 分析:

this.socket = socketFactory.createSocket();
//通过socketFactory创建client.socket,绑定地址InetSocketAddress

5)Client$Connection包含ConnectionId 线程
二:发送数据
connection.sendParam(call)分析:

d.writeInt(call.id);//将call.id写入d
call.param.write(d);//将param(Invocation实现writeable)写入d
byte[] data = d.getData();//获取d数据字节数组
int dataLength = d.getLength();//获取d数据字节数组长度
out.writeInt(dataLength); //首先向输出流Out,写入字节数组长度
out.write(data, 0, dataLength);//将字节数据data写入Out,根据偏移量
out.flush();//刷新 Out
三:接收数据

Connection.run方法体

public void run() {
while (waitForWork()) {//calls池未空,连接未关闭,running 则循环等待接收 wait here for work - read or close connection
receiveResponse();//接收响应
}
close();
}

int id = in.readInt(); // try to read an id
Call call = calls.get(id); //calls池中获取对应call
int state = in.readInt(); // read call status
if (state == Status.SUCCESS.state) {
Writable value = ReflectionUtils.newInstance(valueClass, conf);//反射生成存储返回值对象
value.readFields(in);
//将流值读取进Value对象(Writable类型)
call.setValue(value); //将value对象赋给call,
调用callComplete();//唤醒在call等待线程
calls.remove(id); //将calls池除去该id
}



(1)Listener线程
该线程负责监听客户端请求以及数据的接受,然后将接收到的数据组成一个Call实例,放到请求队列里面。
(2) Handler线程
该线程负责从请求队列中取出调用请求,通过调用抽象方法
public abstract Writable call(Class<?> protocol, Writable param, long receiveTime)来进行处理调用请求,并且将结果返回给客户端。
(3) Responder线程
响应数据由Handler线程返回给客户端,但如果有未写完的数据,则由Responder线程返回客户端。
Listener线程构造方法:

public Listener() throws IOException {
address = new InetSocketAddress(bindAddress, port);
// 创建ServerSocketChannel,并设置成非阻塞式
acceptChannel = ServerSocketChannel.open();
acceptChannel.configureBlocking(false);
// 将server socket绑定到本地端口
bind(acceptChannel.socket(), address, backlogLength);
port = acceptChannel.socket().getLocalPort();
// 获得一个selector
selector= Selector.open();
readers = new Reader[readThreads];
readPool = Executors.newFixedThreadPool(readThreads);
//启动多个reader线程,为了防止请求多时服务端响应延时的问题
for (int i = 0; i < readThreads; i++) {
Selector readSelector = Selector.open();
Reader reader = new Reader(readSelector);
readers[i] = reader;
readPool.execute(reader);
}
// 注册连接事件
acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
this.setName("IPC Server listener on " + port);
this.setDaemon(true);
}

Listener.run方法:

public void run() {
•••
while (running) {
SelectionKey key = null;
try {
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
key = iter.next();
iter.remove();
try {
if (key.isValid()) {
if (key.isAcceptable())
doAccept(key);//具体的连接方法
}
} catch (IOException e) {
}
key = null;
}
} catch (OutOfMemoryError e) {
•••
}

Server.Listener.doAccept()分析:

void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
Connection c = null;
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel;
while ((channel = server.accept()) != null) { //建立连接
channel.configureBlocking(false);
channel.socket().setTcpNoDelay(tcpNoDelay);
Reader reader = getReader(); //从readers池中获得一个reader
try {
reader.startAdd(); // 激活readSelector,设置adding为true
SelectionKey readKey = reader.registerChannel(channel);//将读事件设置成兴趣事件
c = new Connection(readKey, channel, System.currentTimeMillis());//创建一个连接对象

readKey.attach(c); //将connection对象注入readKey
synchronized (connectionList) {
connectionList.add(numConnections, c);
numConnections++;
}
•••
} finally {
//设置adding为false,采用notify()唤醒一个reader,其实代码十三中启动的每个reader都使
//用了wait()方法等待。因篇幅有限,就不贴出源码了
reader.finishAdd();
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: