SOA研究(3)-RPC的第二次尝试
2016-06-04 15:38
281 查看
在之前的文章里面,我们实现了简单的RPC调用,通过不断发消息,然后接收消息,显然它还不算真正的RPC调用。
定义refer方法,引用需要的远端接口。
因此,每次发送消息的时候需要发送一个消息ID给server,server返回响应的时候也需要带上这个消息ID。client根据这个ID找到被挂起的线程唤醒它,才能得到返回的结果。
有任务的时候加入到这个队列中
client轮询的时候需要检查task队列是否有任务,有的话注册相应的事件,多路复用选择器再进行操作。
收到响应后唤醒挂起的线程,这时才能返回真正调用的结果
https://github.com/Jdoing/example中以下目录example/example-soa/src/main/java/nio3/
设计思路
这次我们对它进行改进,模仿dubbo,定义Invoker类,它代表可以执行的实体。定义export方法,导出需要暴露的接口。定义refer方法,引用需要的远端接口。
NIO编程难点
NIO编程的难点在于它是非阻塞的,所以结果不是立即返回的,一般可以认为是异步返回的(虽然阻塞和异步有区别,但是这里理解为异步可能更简单些)。调用发送消息接口的线程需要挂起等待,而被调用的线程(网络传输的线程)却不能阻塞。直到有对应的消息返回时,才唤醒这个线程。这也是为什么NIO可以保持多个长连接,性能却不会受太大影响的原因。因为调用的NIO的线程在Channel上等待,而NIO自己却不会阻塞等待。NIO只会处理Channel中敢兴趣的事件。因此,每次发送消息的时候需要发送一个消息ID给server,server返回响应的时候也需要带上这个消息ID。client根据这个ID找到被挂起的线程唤醒它,才能得到返回的结果。
Java动态代理
一般在client端用动态代理实现远端调用。这样实现好处在于client调用时透明,不需要感知远端接口的存在。public <T> T refer(final Class<?> clazz) throws IOException { final SocketChannel channel = client.newSocketChanel(); return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, new InvocationHandler() { public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Invocation invocation = new Invocation(); invocation.setClazz(clazz); invocation.setMethodName(method.getName()); invocation.setParameterTypes(method.getParameterTypes()); invocation.setArguments(args); Request request = new Request(); request.setInvocation(invocation); client.addResultHolder(request.getMsgId()); client.send(channel, request); Response response = (Response) client.getResult(request.getMsgId()); return response.getResult(); } }); }
client实现
Client设计比较麻烦,一般需要一个队列,把发送的任务放入到队列中,然后唤醒selector。否则直接注册connect事件是不行。//任务队列 private Queue<Task> tasks = new ConcurrentLinkedQueue<>(); private static class Task { public static final int REGISTER = 1; public static final int CHANGEOPS = 2; public SocketChannel channel; public int type; public int ops; public Object data; public Task(SocketChannel channel, int type, int ops, Object data) { this.channel = channel; this.type = type; this.ops = ops; this.data = data; } }
有任务的时候加入到这个队列中
public void send(SocketChannel channel, Object object) throws InterruptedException { if (channel.isConnected()) { //add write task tasks.add(new Task(channel, Task.CHANGEOPS, SelectionKey.OP_WRITE, object)); } else { //add connect task tasks.add(new Task(channel, Task.REGISTER, SelectionKey.OP_CONNECT, object)); } //唤醒selector,这里很重要.负责selector一直阻塞,不会收到注册事件. selector.wakeup(); }
client轮询的时候需要检查task队列是否有任务,有的话注册相应的事件,多路复用选择器再进行操作。
while (true) { if (tasks.peek() != null) { Task task = tasks.remove(); switch (task.type) { case Task.CHANGEOPS: SelectionKey key = task.channel.keyFor(selector); key.interestOps(task.ops); key.attach(task.data); break; case Task.REGISTER: SelectionKey key2 = task.channel.register(selector, task.ops); key2.attach(task.data); break; default: throw new IllegalArgumentException("task.type error"); } } selector.select();
收到响应后唤醒挂起的线程,这时才能返回真正调用的结果
private void doRead(SelectionKey key) throws IOException, ClassNotFoundException { System.out.print("read data from server====>"); SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(Constant.BUFFER_SIZE); if (channel.read(buffer) > 0) { Object object = NIOUtil.getObject(buffer); Response response; if (object instanceof Response) { response = (Response) object; } else { throw new ClassCastException("object不能转换为result"); } //找到对应挂起的线程,唤醒它 ResultHolder resultHolder = resultHolderMap.get(response.getMsgId()); resultHolder.lock.lock(); try { resMap.put(response.getMsgId(), response); resultHolder.done.signal(); } finally { resultHolder.lock.unlock(); } //不要随便设置OP_WRITE,否则会耗尽CPU,只有在需要的时候才设置 // channel.register(selector, SelectionKey.OP_WRITE); } else { System.out.println("no data to read!"); } }
server实现
相对简单,用Map保存invoker的映射。每次收到请求时找到对应的Invoker调用返回结果给client。public class NIOServer extends Thread { private Selector selector; private ServerSocketChannel ssc; private ConcurrentHashMap<Class<?>, ServerInvoker> invokerMap = new ConcurrentHashMap<>(); public NIOServer() { } public boolean containInvoker(Class<?> clazz) { return invokerMap.containsKey(clazz); } public void addInvoker(Class<?> clazz, ServerInvoker invoker) { invokerMap.put(clazz, invoker); } public void run() { System.out.println("服务端线程已经启动!"); try { while (selector.select() > 0) { Set<SelectionKey> selectionKeySet = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeySet.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); try { processKey(key); } catch (ClosedSelectorException cek) { cek.printStackTrace(); } catch (CancelledKeyException ck) { ck.printStackTrace(); key.cancel(); } catch (Throwable e) { e.printStackTrace(); } } } if (selector != null) { selector.close(); } } catch (Throwable e) { e.printStackTrace(); } } private void processKey(SelectionKey key) throws Exception { if (key.isAcceptable()) { doAccept(key); } if (key.isReadable()) { doRead(key); } if (key.isWritable()) { doWrite(key); } } private void doAccept(SelectionKey key) throws IOException { ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel channel = server.accept(); channel.configureBlocking(false); channel.register(selector, SelectionKey.OP_READ); } private void doRead(SelectionKey key) throws IOException { SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(Constant.BUFFER_SIZE); try { buffer.clear(); if (channel.read(buffer) > 0) { System.out.println("read data from client..."); buffer.flip(); ObjectInputStream objectInputStream = new ObjectInputStream(new ByteArrayInputStream(buffer.array())); try { Request request = (Request) objectInputStream.readObject(); Invocation invocation = request.getInvocation(); Class<?> clazz = invocation.getClazz(); if (invokerMap.containsKey(clazz)) { ServerInvoker invoker = invokerMap.get(clazz); Object result = invoker.invoke(invocation); Response response = new Response(); response.setResult(result); response.setMsgId(request.getMsgId()); channel.register(selector, SelectionKey.OP_WRITE, response); } } finally { objectInputStream.close(); } } } catch (Throwable e) { e.printStackTrace(); key.cancel(); } } private void doWrite(SelectionKey key) throws Exception { Object data = key.attachment(); if (data == null) { return; } System.out.println("write data to client..."); SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer buffer = NIOUtil.getByteBuffer(data); while (buffer.hasRemaining()) { channel.write(buffer); } channel.register(selector, SelectionKey.OP_READ); } public void initServer(int port) throws IOException { ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); ssc.bind(new InetSocketAddress(port)); selector = Selector.open(); ssc.register(selector, SelectionKey.OP_ACCEPT); } public void stopServer() throws IOException { if (!selector.isOpen()) { selector.close(); } } public static void main(String[] args) throws IOException { NIOServer server = new NIOServer(); try { server.initServer(8859); server.start(); } catch (Exception e) { e.printStackTrace(); server.stopServer(); } } }
源码
托管在github上了,直接运行RpcProvider和RpcConsumer就行了。https://github.com/Jdoing/example中以下目录example/example-soa/src/main/java/nio3/
相关文章推荐
- spymemcached源码中Reactor模式分析
- Java IO与NIO的一些文件拷贝测试
- Java NIO工作原理的全面分析
- java的nio的使用示例分享
- Java NIO和IO的区别
- java十分钟速懂知识点——NIO
- Java IO/NIO学习总结
- windows 64位 安装zookeeper
- dubbo管理控制台安装和使用
- (IO密集型事务)同步,异步与CPU使用率关系
- 工作中的zookeeper
- 再说异步调用和NIO
- dubbo系列----rpc初探
- 企业信息化谈1
- SOA 的命运在何方
- DNS SOA replication configuration
- 架构设计师与SOA之一
- SOA,面向服务架构