您的位置:首页 > 其它

SOA研究(3)-RPC的第二次尝试

2016-06-04 15:38 281 查看
在之前的文章里面,我们实现了简单的RPC调用,通过不断发消息,然后接收消息,显然它还不算真正的RPC调用。

设计思路

这次我们对它进行改进,模仿dubbo,定义Invoker类,它代表可以执行的实体。定义export方法,导出需要暴露的接口。

定义refer方法,引用需要的远端接口。

NIO编程难点

NIO编程的难点在于它是非阻塞的,所以结果不是立即返回的,一般可以认为是异步返回的(虽然阻塞和异步有区别,但是这里理解为异步可能更简单些)。调用发送消息接口的线程需要挂起等待,而被调用的线程(网络传输的线程)却不能阻塞。直到有对应的消息返回时,才唤醒这个线程。这也是为什么NIO可以保持多个长连接,性能却不会受太大影响的原因。因为调用的NIO的线程在Channel上等待,而NIO自己却不会阻塞等待。NIO只会处理Channel中敢兴趣的事件。

因此,每次发送消息的时候需要发送一个消息ID给server,server返回响应的时候也需要带上这个消息ID。client根据这个ID找到被挂起的线程唤醒它,才能得到返回的结果。

Java动态代理

一般在client端用动态代理实现远端调用。这样实现好处在于client调用时透明,不需要感知远端接口的存在。

public <T> T refer(final Class<?> clazz) throws IOException {
final SocketChannel channel = client.newSocketChanel();
return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, new InvocationHandler() {
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Invocation invocation = new Invocation();
invocation.setClazz(clazz);
invocation.setMethodName(method.getName());
invocation.setParameterTypes(method.getParameterTypes());
invocation.setArguments(args);
Request request = new Request();
request.setInvocation(invocation);
client.addResultHolder(request.getMsgId());
client.send(channel, request);
Response response = (Response) client.getResult(request.getMsgId());
return response.getResult();
}
});
}


client实现

Client设计比较麻烦,一般需要一个队列,把发送的任务放入到队列中,然后唤醒selector。否则直接注册connect事件是不行。

//任务队列
private Queue<Task> tasks = new ConcurrentLinkedQueue<>();
private static class Task {
public static final int REGISTER = 1;
public static final int CHANGEOPS = 2;
public SocketChannel channel;
public int type;
public int ops;
public Object data;
public Task(SocketChannel channel, int type, int ops, Object data) {
this.channel = channel;
this.type = type;
this.ops = ops;
this.data = data;
}
}


有任务的时候加入到这个队列中

public void send(SocketChannel channel, Object object) throws InterruptedException {
if (channel.isConnected()) {
//add write task
tasks.add(new Task(channel, Task.CHANGEOPS, SelectionKey.OP_WRITE, object));
} else {
//add connect task
tasks.add(new Task(channel, Task.REGISTER, SelectionKey.OP_CONNECT, object));
}
//唤醒selector,这里很重要.负责selector一直阻塞,不会收到注册事件.
selector.wakeup();
}


client轮询的时候需要检查task队列是否有任务,有的话注册相应的事件,多路复用选择器再进行操作。

while (true) {
if (tasks.peek() != null) {
Task task = tasks.remove();
switch (task.type) {
case Task.CHANGEOPS:
SelectionKey key = task.channel.keyFor(selector);
key.interestOps(task.ops);
key.attach(task.data);
break;
case Task.REGISTER:
SelectionKey key2 = task.channel.register(selector, task.ops);
key2.attach(task.data);
break;
default:
throw new IllegalArgumentException("task.type error");
}
}
selector.select();


收到响应后唤醒挂起的线程,这时才能返回真正调用的结果

private void doRead(SelectionKey key) throws IOException, ClassNotFoundException {
System.out.print("read data from server====>");
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(Constant.BUFFER_SIZE);
if (channel.read(buffer) > 0) {
Object object = NIOUtil.getObject(buffer);
Response response;
if (object instanceof Response) {
response = (Response) object;
} else {
throw new ClassCastException("object不能转换为result");
}
//找到对应挂起的线程,唤醒它
ResultHolder resultHolder = resultHolderMap.get(response.getMsgId());
resultHolder.lock.lock();
try {
resMap.put(response.getMsgId(), response);
resultHolder.done.signal();
} finally {
resultHolder.lock.unlock();
}
//不要随便设置OP_WRITE,否则会耗尽CPU,只有在需要的时候才设置
//            channel.register(selector, SelectionKey.OP_WRITE);
} else {
System.out.println("no data to read!");
}
}


server实现

相对简单,用Map保存invoker的映射。每次收到请求时找到对应的Invoker调用返回结果给client。

public class NIOServer extends Thread {
private Selector selector;
private ServerSocketChannel ssc;
private ConcurrentHashMap<Class<?>, ServerInvoker> invokerMap = new ConcurrentHashMap<>();
public NIOServer() {
}
public boolean containInvoker(Class<?> clazz) {
return invokerMap.containsKey(clazz);
}
public void addInvoker(Class<?> clazz, ServerInvoker invoker) {
invokerMap.put(clazz, invoker);
}
public void run() {
System.out.println("服务端线程已经启动!");
try {
while (selector.select() > 0) {
Set<SelectionKey> selectionKeySet = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeySet.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
try {
processKey(key);
} catch (ClosedSelectorException cek) {
cek.printStackTrace();
} catch (CancelledKeyException ck) {
ck.printStackTrace();
key.cancel();
} catch (Throwable e) {
e.printStackTrace();
}
}
}
if (selector != null) {
selector.close();
}
} catch (Throwable e) {
e.printStackTrace();
}
}
private void processKey(SelectionKey key) throws Exception {
if (key.isAcceptable()) {
doAccept(key);
}
if (key.isReadable()) {
doRead(key);
}
if (key.isWritable()) {
doWrite(key);
}
}
private void doAccept(SelectionKey key) throws IOException {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel = server.accept();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_READ);
}
private void doRead(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(Constant.BUFFER_SIZE);
try {
buffer.clear();
if (channel.read(buffer) > 0) {
System.out.println("read data from client...");
buffer.flip();
ObjectInputStream objectInputStream = new ObjectInputStream(new ByteArrayInputStream(buffer.array()));
try {
Request request = (Request) objectInputStream.readObject();
Invocation invocation = request.getInvocation();
Class<?> clazz = invocation.getClazz();
if (invokerMap.containsKey(clazz)) {
ServerInvoker invoker = invokerMap.get(clazz);
Object result = invoker.invoke(invocation);
Response response = new Response();
response.setResult(result);
response.setMsgId(request.getMsgId());
channel.register(selector, SelectionKey.OP_WRITE, response);
}
} finally {
objectInputStream.close();
}
}
} catch (Throwable e) {
e.printStackTrace();
key.cancel();
}
}
private void doWrite(SelectionKey key) throws Exception {
Object data = key.attachment();
if (data == null) {
return;
}
System.out.println("write data to client...");
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = NIOUtil.getByteBuffer(data);
while (buffer.hasRemaining()) {
channel.write(buffer);
}
channel.register(selector, SelectionKey.OP_READ);
}
public void initServer(int port) throws IOException {
ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.bind(new InetSocketAddress(port));
selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);
}
public void stopServer() throws IOException {
if (!selector.isOpen()) {
selector.close();
}
}
public static void main(String[] args) throws IOException {
NIOServer server = new NIOServer();
try {
server.initServer(8859);
server.start();
} catch (Exception e) {
e.printStackTrace();
server.stopServer();
}
}
}


源码

托管在github上了,直接运行RpcProvider和RpcConsumer就行了。

https://github.com/Jdoing/example中以下目录example/example-soa/src/main/java/nio3/
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  soa nio