您的位置:首页 > 其它

Netty 学习 - 异步操作中的Future和Promise

2016-12-25 16:06 435 查看
   本文继续介绍Netty的相关知识,主要讲解异步操作中的Future和Promise

由于Netty中的Future都是异步IO操作,结果是未知的,因此命名为ChannelFuture,表示跟Channel的操作有关

ChannelFuture提供了一系列不同于JDK Future的API,用于获取操作结果,添加事件监听器,取消IO操作,同步等待。

Netty强烈建议直接通过添加监听器的方式获取IO结果,而不是通过同步等待的方式

 

如果用户操作调用了sync或者await方法,会在对应的future对象上阻塞用户线程,例如future.channel().closeFuture().sync()

而最终触发future对象的notify动作都是通过eventLoop线程轮询任务完成的,例如对关闭的sync,因为不论是用户直接关闭或者eventLoop的轮询状态关闭,都会在eventLoop的线程内完成notify动作,所以不要在IO线程内调用future对象的sync或者await方法,因为应用程序代码都是编写的channelHandler,而channelHandler是在eventLoop的线程内执行的,所以是不能在channelHandler中调用sync或者await方法的

对于Future而言,提供的API有get与get(timeout) 前者是一直阻塞的get方法,后者带了超时时间,具体实现方式是在Future上执行wait方法,将当前运行线程进行阻塞,达到阻塞效果,而后通过eventLoop线程对Future对象进行notifyAll,保证唤醒阻塞的线程

而Future只有获取查询的API,类似于JDK中的Future,为了扩展对异步结果的写操作,Netty提供了Promise,继承于Future,可以用于设置IO操作的结果,在AbstratChannel的代码中可以看到对相关的IO操作都会新建Promise作为具体IO函数的参数,这样就能异步立即返回Promise,当IO操作后续发生异常或者完成时,将会设置promise的结果,在设置结果的过程中,包括以下三步

1  设置result的值

2  notifyAll,唤醒在本Promise上等待的线程

3  回调listener

下面以最常见的关闭等待操作为例,在大部分的网上例子中都会有如下的代码:

            Channel channel = b.bind(8080).sync().channel();

            channel.closeFuture().sync();

那么下面的channel.closeFuture().sync()实际是如何工作

channel.closeFuture()不做任何操作,只是简单的返回channel对象中的closeFuture对象,对于每个Channel对象,都会有唯一的一个CloseFuture,用来表示关闭的Future,

所有执行channel.closeFuture().sync()就是执行的CloseFuturn的sync方法,从上面的解释可以知道,这步是会将当前线程阻塞在CloseFuture上

一般来说,编写以上代码的都是在Main线程中用来启动ServerBootStrap的,所以Main线程会被阻塞,保证服务端Channel的正常运行

那么什么时候Main线程会被唤醒继续执行呢

在channel中有close方法,当调用close方法后,会按照pipeline.close   -  tail.close   - head.close - unsafe.close -  abstractChannel.doClose0 

 

private void doClose0(ChannelPromise promise) {

            try {

                doClose();

                closeFuture.setClosed();

                safeSetSuccess(promise);

            } catch (Throwable t) {

                closeFuture.setClosed();

                safeSetFailure(promise, t);

            }

        }

其中doClose方法是抽象方法,用来真正关闭IO连接,例如javaChannel.close

第二步则是用来处理CloseFuture的关闭,setClosed会执行trySuccess,这样就会在CloseFuture对象上执行notifyAll以及回调listener等,唤醒Main线程

第三步则是用来处理用户Future,前面说过在AbstratChannel的代码中可以看到对相关的IO操作都会新建Promise作为具体IO函数的参数,例如对于channel.close方法,分别有参数的方法和没有参数的方法,没有参数的方法实际在底层会自动new一个promise用来异步的返回结果,也可以在应用程序中主动编写一个Promise用来处理应用逻辑

在Promise对象上执行

  @Override

    public boolean trySuccess(V result) {

        if (setSuccess0(result)) {

            notifyListeners();

            return true;

        }

        return false;

    }

其中setSuccess0中会有  checkNotifyWaiters();的步骤,唤醒在该Promise对象上等待的线程,再notifyListeners进行回调Listener,完成设置动作

 

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: