Hadoop RPC源码解析——Server类(一)
2015-02-15 21:01
351 查看
Hadoop RPC主要由三大部分组成:Client、Server和RPC,如下表所示。
这三部分在Hadoop RPC架构中的位置如下图所示:
Hadoop采用了Master/Slave结构。其中,Master是整个系统的单节点,这是制约系统性能和扩展性的最关键因素之一,而Master通过Server接受并处理所有Slave发送的请求,这就要求Server类将高并发和可扩展性作为设计目标。为此Server采用了具有提高并发处理能力的技术,如线程池、事件驱动和Reactor设计模式等。
Reactor是并发编程中的一种基本事件驱动的设计模式。它具有以下两个特点:通过派发/分离I/O操作事件提高系统的并发性能;提供了粗粒度的并发控制,使用单线程实现,避免复杂的同步处理。典型的Reactor模式的工作原理图
ipc包下的server类实现了典型的Reactor设计模式,它主要分为三个步骤:接收请求、处理请求和返回结果,如下图3-12所示。
1、 接收请求:该阶段的主要任务是接收来自各个客户端的请求,并将它们封装成固定的格式(call)放到一个共享队列(callQueue)中。该阶段内部又分为两个子阶段:建立连接和接收请求,分别由Listener和Reader完成。整个Server只有一个Listener线程,它统一负责监听来自客户端的连接请求。一旦有新的请求到达,它会采用轮询的方式从线程池中选择一个Reader线程进行处理。Listener和Reader线程内部各包含了一个Selector对象,分别用于监听可接受事件和可读事件。因此Listener线程主要监听是否有新连接请求到达,而Reader线程主要监听客户端连接中是否有新的RPC请求到达,并将RPC请求封装成Call对象,放到callQueue中。
2、 处理请求
该阶段主要任务是从callQueue中获取Call对象,在执行对应的函数调用后尝试直接将结果返回给客户端。但由于某些函数调用返回结构很大或者网络速度过慢,可能难以将结果一次性发送给客户端,此时Handler将尝试将后续任务交给Responder线程。
3、 返回结果
Server端仅有一个Responder线程,它内部包含一个Selector对象,用于监听可写事件。当Handler没能将结果一次性发送给客户端时,会向该Selector对象注册可写事件,进而由Responder线程采用异步方式继续发送未发送完成的结果。
本文主要解析Server类。Server类主要包含了以下几个内部类
分析完客户端的代码,接下来我们分析服务器端的代码,也就是Server类。由以上的分析我们知道,Client端的底层通信直接采用了阻塞式的IO编程,但服务器端却没有采用阻塞式编程,因为当很多Client想要连接到服务端时,如果采用阻塞的IO,那么会对服务器端的性能造成很大的影响。因此Hadoop采用了Java NIO来实现Server端。
Server类是一个抽象类,不能被初始化,那么问题就来了,Hadoop中怎样初始化RPC的服务器呢。但同时我们应该要想到,当NameNode初始化时一定会初始化RPC的服务端,下面列出NameNode初始化的源代码
通过以上代码可知,RPC的server对象是通过RPC类的getServer()方法得到的,下面来具体看这个方法。
查看Listener的构造方法
上面我们看到,当该线程刚开始运行时由于没有客户端连接到服务器,所以会一直阻塞在“readSelector.select()”处。
回到代码十五,我们接下来看Responder类的构造函数
注意,其中的内部类Responder、Listener、Handler都继承了Thread,所以这里将这三个类对应的实例启动后,就启动了这些线程,并相继调用这些线程的run()方法。
从上表得知Server中的内部类Listener用来监听连接,那么我们就来看一下Listener类的run()方法
代码二十一对接收到的连接进行一些处理,如启动readers中的一个可用线程,该线程专门用来接受这个连接的读事件,在该连接上注册感兴趣的事件为读事件,然后将该连接附加到该key上。下面分别查看代码二十一中涉及的代码。首先查看getReader()方法。
接下来查看startAdd()方法,在该方法中,唤醒了reader的run()方法中阻塞的select()方法。由于adding为true,所以会进入while循环处于wait状态。如果当执行wakeup时,readSelector线程并没有阻塞在select()处,那么任何时候只要执行到select()方法,该方法都会返回,如果没有返回值那就在while里面等待,直到在finishAdd()中唤醒该线程。
接下来查看代码二十一中registerChannel()方法。该方法非常简单,就是将channel感兴趣的读事件注册到readSelector上。
回到代码二十一的Connection构造方法。下面查看Server的内部类Connection的构造方法Connection。
内部类 | 功能 |
Client | 连接服务器、传递函数名和相应的参数、等待结果 |
Server | 主要接受Client的请求、执行相应的函数、返回结果 |
RPC | 外部编程接口,主要是为通信的服务方提供代理 |
Hadoop采用了Master/Slave结构。其中,Master是整个系统的单节点,这是制约系统性能和扩展性的最关键因素之一,而Master通过Server接受并处理所有Slave发送的请求,这就要求Server类将高并发和可扩展性作为设计目标。为此Server采用了具有提高并发处理能力的技术,如线程池、事件驱动和Reactor设计模式等。
Reactor是并发编程中的一种基本事件驱动的设计模式。它具有以下两个特点:通过派发/分离I/O操作事件提高系统的并发性能;提供了粗粒度的并发控制,使用单线程实现,避免复杂的同步处理。典型的Reactor模式的工作原理图
ipc包下的server类实现了典型的Reactor设计模式,它主要分为三个步骤:接收请求、处理请求和返回结果,如下图3-12所示。
1、 接收请求:该阶段的主要任务是接收来自各个客户端的请求,并将它们封装成固定的格式(call)放到一个共享队列(callQueue)中。该阶段内部又分为两个子阶段:建立连接和接收请求,分别由Listener和Reader完成。整个Server只有一个Listener线程,它统一负责监听来自客户端的连接请求。一旦有新的请求到达,它会采用轮询的方式从线程池中选择一个Reader线程进行处理。Listener和Reader线程内部各包含了一个Selector对象,分别用于监听可接受事件和可读事件。因此Listener线程主要监听是否有新连接请求到达,而Reader线程主要监听客户端连接中是否有新的RPC请求到达,并将RPC请求封装成Call对象,放到callQueue中。
2、 处理请求
该阶段主要任务是从callQueue中获取Call对象,在执行对应的函数调用后尝试直接将结果返回给客户端。但由于某些函数调用返回结构很大或者网络速度过慢,可能难以将结果一次性发送给客户端,此时Handler将尝试将后续任务交给Responder线程。
3、 返回结果
Server端仅有一个Responder线程,它内部包含一个Selector对象,用于监听可写事件。当Handler没能将结果一次性发送给客户端时,会向该Selector对象注册可写事件,进而由Responder线程采用异步方式继续发送未发送完成的结果。
本文主要解析Server类。Server类主要包含了以下几个内部类
内部类 | 功能 |
Call | 用于存储客户端发来的请求 |
Listener | 监听类,用于监听客户端发来的请求,同时该类内部还有一个静态类Listener.Reader,当监听器监听到用户请求时,并让Reader读取用户请求 |
Responder | 响应RPC请求类,请求处理完毕,由Responder发送给请求客户端 |
Connection | 连接类,真正的客户端请求读取逻辑在这个类中 |
Handler | 请求处理类,会循环阻塞读取callQueue中的call对象,对其进行操作 |
Server类是一个抽象类,不能被初始化,那么问题就来了,Hadoop中怎样初始化RPC的服务器呢。但同时我们应该要想到,当NameNode初始化时一定会初始化RPC的服务端,下面列出NameNode初始化的源代码
//代码十二 //org.apache.hadoop.hdfs.server.namenode#initialize /** * 初始化NameNode */ private void initialize(Configuration conf) throws IOException { // 创建 rpc server InetSocketAddress dnSocketAddr = getServiceRpcServerAddress(conf); if (dnSocketAddr != null) { int serviceHandlerCount = conf.getInt(DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY, DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT); //获得serviceRPCServer this.serviceRpcServer = RPC.getServer(this, dnSocketAddr.getHostName(), dnSocketAddr.getPort(), serviceHandlerCount, false, conf, namesystem.getDelegationTokenSecretManager()); this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress(); setRpcServiceServerAddress(conf); } //获得Server this.server = RPC.getServer(this, socAddr.getHostName(), socAddr.getPort(), handlerCount, false, conf, namesystem .getDelegationTokenSecretManager()); …… startHttpServer(conf); this.server.start(); //启动RPC Server if (serviceRpcServer != null) { serviceRpcServer.start(); } startTrashEmptier(conf); }
通过以上代码可知,RPC的server对象是通过RPC类的getServer()方法得到的,下面来具体看这个方法。
//代码十三 //RPC#getServer public static Server getServer(final Object instance, final String bindAddress, final int port,final int numHandlers, final boolean verbose, Configuration conf, SecretManager<? extends TokenIdentifier> secretManager) throws IOException { return new Server(instance, conf, bindAddress, port, numHandlers, verbose, secretManager); }查看Server类的构造函数
//代码十四 // RPC.Server#Server public Server(Object instance, Configuration conf, String bindAddress, int port, int numHandlers, boolean verbose, SecretManager<? extends TokenIdentifier> secretManager) throws IOException { su 4000 per(bindAddress, port, Invocation.class, numHandlers, conf, classNameBase(instance.getClass().getName()), secretManager); this.instance = instance; this.verbose = verbose; }这里我们可以发现在Serever类的构造函数中,调用了父类的构造函数。因此可以说getServer()方法是一个创建Server对象的工厂方法,但创建的却是RPC.Server类,而该类又调用了父类ipc.Server类的构造函数,因此我们就明白了Server的初始化方法。查看父类的构造函数
//代码十五 //Server#Server protected Server(String bindAddress, int port, Class<? extends Writable> paramClass, int handlerCount, Configuration conf, String serverName, SecretManager<? extends TokenIdentifier> secretManager) throws IOException { this.bindAddress = bindAddress; this.conf = conf; this.port = port; this.paramClass = paramClass; this.handlerCount = handlerCount; …… // Start the listener here and let it bind to the port //创建一个Listener类的实例 listener = new Listener(); this.port = listener.getAddress().getPort(); this.rpcMetrics = RpcInstrumentation.create(serverName, this.port); this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", false); // Create the responder here //创建一个Responder类的实例 responder = new Responder(); if (isSecurityEnabled) { SaslRpcServer.init(conf); } }
查看Listener的构造方法
//代码十六 //Server.Listener#Listener public Listener() throws IOException { address = new InetSocketAddress(bindAddress, port); // Create a new server socket and set to non blocking mode acceptChannel = ServerSocketChannel.open();//获得一个Server Socket实例 acceptChannel.configureBlocking(false);//设置成非阻塞模式 // Bind the server socket to the local host and port bind(acceptChannel.socket(), address, backlogLength);//将server socket绑定到address的地址和port端口 port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port // create a selector; selector= Selector.open();//获得一个Selector实例 readers = new Reader[readThreads]; //创建一个Reader对象的数组 readPool = Executors.newFixedThreadPool(readThreads);//创建一个线程池 for (int i = 0; i < readThreads; i++) { Selector readSelector = Selector.open(); Reader reader = new Reader(readSelector); readers[i] = reader; readPool.execute(reader); } // Register accepts on the server socket with the selector. acceptChannel.register(selector, SelectionKey.OP_ACCEPT);//注册accept事件 this.setName("IPC Server listener on " + port); this.setDaemon(true); }这里我们创建了一个Listener的内部类Reader的数组readers,里面又创建了readThreads个Reader类的实例,并且启动了这些Reader线程,此后这些线程就会去执行run()方法。查看Reader类的run()方法
//代码十七 //Server.Listener.Reader#run public void run() { LOG.info("Starting SocketReader"); synchronized (this) { while (running) { SelectionKey key = null; try { //阻塞直到有连接到来,在startAdd()中来wakeup readSelector.select(); while (adding) { //等待,直到finishAdd后来notify this.wait(1000); } //获得感兴趣的事件发生的channl的key并读其中的数据 Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator(); while (iter.hasNext()) { key = iter.next(); iter.remove(); if (key.isValid()) { if (key.isReadable()) { doRead(key); } } key = null; } } catch (InterruptedException e) { if (running) { // unexpected -- log it LOG.info(getName() + " caught: " + StringUtils.stringifyException(e)); } } catch (IOException ex) { LOG.error("Error in Reader", ex); } } } }
上面我们看到,当该线程刚开始运行时由于没有客户端连接到服务器,所以会一直阻塞在“readSelector.select()”处。
回到代码十五,我们接下来看Responder类的构造函数
//代码十八 // Server#Responder#Responder Responder() throws IOException { this.setName("IPC Server Responder"); this.setDaemon(true); writeSelector = Selector.open(); // create a selector pending = 0; }在该类中,创建了一个Selector对象,并赋值给成员变量writeSelector。其中“pending”代表等待被注册的连接数。接下来就是要启动Server去提供服务。我们查看Server类的start()方法
//代码十九 // Server#start /** * 启动服务器。在处理调用之前必须先启动服务器*/ public synchronized void start() { responder.start(); //启动Responder listener.start(); //启动Listener handlers = new Handler[handlerCount]; //创建handler池 for (int i = 0; i < handlerCount; i++) { handlers[i] = new Handler(i); //创建handler handlers[i].start(); //启动handler } }
注意,其中的内部类Responder、Listener、Handler都继承了Thread,所以这里将这三个类对应的实例启动后,就启动了这些线程,并相继调用这些线程的run()方法。
从上表得知Server中的内部类Listener用来监听连接,那么我们就来看一下Listener类的run()方法
代码二十 // Server.Listener#run public void run() { …… while (running) { SelectionKey key = null; try { //等待连接过来,若有感兴趣的事件发生,则返回 selector.select(); Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while (iter.hasNext()) { key = iter.next(); iter.remove(); try { if (key.isValid()) { if (key.isAcceptable()) //如果为可接受事件,则进入下面函数进行具体处理 doAccept(key); } } catch (IOException e) { } key = null; } } catch (OutOfMemoryError e) { …… try { Thread.sleep(60000); } catch (Exception ie) {} } catch (Exception e) { closeCurrentConnection(key, e); } cleanupConnections(false); } …… } }查看doAccept()方法来了解具体是怎样接受请求的
//代码二十一 // Server.Listener#doAccept void doAccept(SelectionKey key) throws IOException, OutOfMemoryError { Connection c = null; ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel channel; //建立连接 while ((channel = server.accept()) != null) { channel.configureBlocking(false); channel.socket().setTcpNoDelay(tcpNoDelay); //从readers池中获得一个reader线程 Reader reader = getReader(); try { //激活readSelector,设置adding为true reader.startAdd(); //将读事件设置成感兴趣的事件 SelectionKey readKey = reader.registerChannel(channel); //创建一个连接对象 c = new Connection(readKey, channel, System.currentTimeMillis()); //将connection对象注入readKey readKey.attach(c); synchronized (connectionList) { connectionList.add(numConnections, c); numConnections++; } if (LOG.isDebugEnabled()) LOG.debug("Server connection from " + c.toString() + "; # active connections: " + numConnections + "; # queued calls: " + callQueue.size()); } finally { reader.finishAdd(); } } }
代码二十一对接收到的连接进行一些处理,如启动readers中的一个可用线程,该线程专门用来接受这个连接的读事件,在该连接上注册感兴趣的事件为读事件,然后将该连接附加到该key上。下面分别查看代码二十一中涉及的代码。首先查看getReader()方法。
//代码二十二 // Server.Listener#getReader //该方法将返回Readers中下一个可用的reader,循环数组实现 Reader getReader() { currentReader = (currentReader + 1) % readers.length; return readers[currentReader]; }
接下来查看startAdd()方法,在该方法中,唤醒了reader的run()方法中阻塞的select()方法。由于adding为true,所以会进入while循环处于wait状态。如果当执行wakeup时,readSelector线程并没有阻塞在select()处,那么任何时候只要执行到select()方法,该方法都会返回,如果没有返回值那就在while里面等待,直到在finishAdd()中唤醒该线程。
//代码二十三 // Server.Listener.Reader#startAdd //该函数用于让reader处于等待新channel被注册到readSelect上的状态 public void startAdd() { adding = true; readSelector.wakeup(); }
接下来查看代码二十一中registerChannel()方法。该方法非常简单,就是将channel感兴趣的读事件注册到readSelector上。
//代码二十四 // Server.Listener.Reader#startAdd public synchronized SelectionKey registerChannel(SocketChannel channel) throws IOException { return channel.register(readSelector, SelectionKey.OP_READ); }
回到代码二十一的Connection构造方法。下面查看Server的内部类Connection的构造方法Connection。
//代码二十五 // Server.Connection#Connection public Connection(SelectionKey key, SocketChannel channel, long lastContact) { this.channel = channel; this.lastContact = lastContact; this.data = null; this.dataLengthBuffer = ByteBuffer.allocate(4); this.unwrappedData = null; this.unwrappedDataLengthBuffer = ByteBuffer.allocate(4); this.socket = channel.socket(); this.addr = socket.getInetAddress(); if (addr == null) { this.hostAddress = "*Unknown*"; } else { this.hostAddress = addr.getHostAddress(); } this.remotePort = socket.getPort(); this.responseQueue = new LinkedList<Call>(); if (socketSendBufferSize != 0) { try { socket.setSendBufferSize(socketSendBufferSize); } catch (IOException e) { LOG.warn("Connection: unable to set socket send buffer size to " + socketSendBufferSize); } } }创建完Connection对象后,我们将它加入到connectionList这个Server类的成员变量中,该变量存储了从客户端到服务器的所有连接。处理完以上事情后就可以把检测通道上的读事件交给readSelector了,所以我们调用finishAdd()方法来唤醒reader类的run()方法中正在wait的线程。finishAdd()方法代码如下,很简单,就是唤醒线程,让该线程监听channel上的读事件
//代码二十六 // Server.Listener.Reader#finishAdd public synchronized void finishAdd() { adding = false; this.notify(); }下面的分析具体参见Hadoop RPC源码分析——server类(二)
相关文章推荐
- Hadoop RPC源码解析——Server类(二)
- Hadoop RPC远程过程调用源码解析及实例
- Hadoop源码解析之 rpc通信 client到server通信
- Hadoop源码分析之一(RPC机制之Server)
- Hadoop RPC源码解析——Client类
- Hadoop RPC源码分析之Server
- hadoop 2.6 源码解读之RPC Server 类高性能设计
- Hadoop最新版 RPC远程过程调用源码解析及实例
- Hadoop源码解析之 rpc通信 client到server通信
- HadoopRPC源码解析
- hadoop源码研读之路(六)----RPC的Client端和Server端
- Hadoop RPC 源码解析
- Hadoop RPC远程过程调用源码解析及实例
- 细水长流Hadoop源码分析(3)RPC Server初始化启动过程
- hadoop源码研读之路(六)----RPC的Client端和Server端
- Hadoop 源码解析-rpc扩展
- Hadoop RPC源码阅读-服务端Server
- Hadoop之RPC Server源码分析
- Hadoop源码分析之一(RPC机制之Server)
- Hadoop RPC源码解析——RPC框架详解