Netty源码分析(六)—Future和Promis分析
2017-11-24 19:42
756 查看
Netty源码分析(六)—Future和Promis分析
Future用来在异步执行中获取提前执行的结果个人主页:tuzhenyu’s page
原文地址:Netty源码分析(六)—Future和Promis分析
(0) JDK中的Callable/Future模型
Callable与Runnable的区别在于Callable线程执行有返回值,Callable/Future模型是通过Future获取Callable线程的返回值;Callable线程的执行必须通过线程池的submit()提交;public static void main(String[] args) throws InterruptedException,ExecutionException{ final ExecutorService exec = Executors.newFixedThreadPool(5); Callable call = new Callable(){ public String call() throws Exception{ Thread.sleep(1000 * 5); return "Other less important but longtime things."; } }; Future task = exec.submit(call); //重要的事情 Thread.sleep(1000 * 3); System.out.println("Let's do important things."); //其他不重要的事情 String obj = (String)task.get(); System.out.println(obj); //关闭线程池 exec.shutdown(); }
(1) JDK AIO中的Future使用
AsynchronousServerSocketChannel异步Socket执行accept()方法时不会阻塞线程,程序会继续执行;Future的get方法会判断任务是否执行完成,如果完成就返回结果,否则阻塞线程,直到任务完成。public static void main(String[] args) throws Exception{ AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open(); serverSocketChannel.bind(new InetSocketAddress(30000)); while (true){ Future<AsynchronousSocketChannel> future = serverSocketChannel.accept(); AsynchronousSocketChannel socketChann 4000 el = future.get(); socketChannel.write(ByteBuffer.wrap("哈哈哈".getBytes("UTF-8"))).get(); } }
(2) Netty中的Future
Netty扩展了Java的Future,最主要的改进就是增加了监听器Listener接口,通过监听器可以让异步执行更加有效率,不需要通过get来等待异步执行结束,而是通过监听器回调来精确地控制异步执行结束的时间点。ChannelFuture接口扩展了Netty的Future接口,表示一种没有返回值的异步调用,同时关联了Channel,跟一个Channel绑定
private void doConnect(final Logger logger,final String host, final int port) { ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port)); future.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture f) throws Exception { if (!f.isSuccess()) { logger.info("Started Tcp Client Failed"); f.channel().eventLoop().schedule( new Runnable() { @Override public void run() { doConnect(logger,host,port); } }, 200, TimeUnit.MILLISECONDS); } } }); }
(3) Netty中的Promise
Promise接口也扩展了Future接口,它表示一种可写的Future,可以通过setSeccess()方法或者setFailure()方法设置执行的状态;Promise通过状态的设置和检测器Listener的添加可以实现回调机制;public static void main(String[] args) { final DefaultEventExecutor executor = new DefaultEventExecutor(); Promise<Integer> promise = executor.newPromise(); promise.addListener(new GenericFutureListener<Future<? super Integer>>() { public void operationComplete(Future<? super Integer> future) throws Exception { System.out.println("promise is finish"); } }); System.out.println("after promise"); promise.setSuccess(10); System.out.println("after set promise"); }
after promise after set promise promise is finish
(4) Future源码分析
JDK的Future接口主要用来获取异步执行的结果,通过get()方法获取执行结果,如果没有执行完成则阻塞线程;// 取消异步操作 boolean cancel(boolean mayInterruptIfRunning); // 异步操作是否取消 boolean isCancelled(); // 异步操作是否完成,正常终止、异常、取消都是完成 boolean isDone(); // 阻塞直到取得异步操作结果 V get() throws InterruptedException, ExecutionException; // 同上,但最长阻塞时间为timeout V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
Netty的Future接口添加了addListener()方法用来添加监听器,当异步操作完成时候进行回调;
// 异步操作完成且正常终止 boolean isSuccess(); // 异步操作是否可以取消 boolean isCancellable(); // 异步操作失败的原因 Throwable cause(); // 添加一个监听者,异步操作完成时回调,类比javascript的回调函数 Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener); Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener); // 阻塞直到异步操作完成 Future<V> await() throws InterruptedException; // 同上,但异步操作失败时抛出异常 Future<V> sync() throws InterruptedException; // 非阻塞地返回异步结果,如果尚未完成返回null V getNow();
AbstractFuture主要实现Future的get()方法,取得Future关联的异步操作结果;
调用await()方法阻塞当前线程,直至异步执行完成被唤醒;
public V get() throws InterruptedException, ExecutionException { await(); Throwable cause = cause(); if (cause == null) { return getNow(); } if (cause instanceof CancellationException) { throw (CancellationException) cause; } throw new ExecutionException(cause); }
CompleteFuture表示一个异步操作已完成的结果,该类的实例在异步操作完成时创建,返回给用户;同时通过addListener()方法添加的回调函数也是添加到CompleteFuture中,在异步操作完成时执行;
在CompleteFuture中定义有一个EvnetExecutor执行器,也就是一个线程用来当异步操作完成后执行监听器中的Listener
因为CompleteFuture是异步操作完成后的结果,所以立即通知Listener执行
// 执行器,用来异步操作执行完毕后执行Listener中定义的操作 private final EventExecutor executor; // 这有一个构造方法,可知executor是必须的 protected CompleteFuture(EventExecutor executor) { this.executor = executor; }
public Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) { // 由于这是一个已完成的Future,所以立即通知Listener执行 DefaultPromise.notifyListener(executor(), this, listener); return this; } public Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener) { // 由于已完成,Listener中的操作已完成,没有需要删除的Listener return this; }
(5) Promise源码分析
Promise的默认实现是DefaultPromise类,主要的类属性包括执行器,监视器等private volatile Object result; private final EventExecutor executor; private Object listeners;
DefaultPromise实例的创建与一个执行器也就是一个执行线程绑定;
public DefaultPromise(EventExecutor executor) { this.executor = checkNotNull(executor, "executor"); }
添加监听器在DefaultPromise状态变化时回调执行,将监视器赋值给类属性判断异步操作是否完成,如果完成则执行监视器
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) { checkNotNull(listener, "listener"); synchronized (this) { addListener0(listener); } if (isDone()) { notifyListeners(); } return this; }
通过setSucess方法改变DefaultPromise状态完成异步操作,唤醒监听器执行功能线程;
public Promise<V> setSuccess(V result) { if (setSuccess0(result)) { notifyListeners(); return this; } throw new IllegalStateException("complete already: " + this); }
Promise的sync()方法和awaite()方法类似,都是用来将线程阻塞等待被唤醒;sync()方法和awaite()方法区别在于sync()方法在异步操作失败时会抛出异常;
public Promise<V> sync() throws InterruptedException { await(); rethrowIfFailed(); // 异步操作失败抛出异常 return this; }
public Promise<V> await() throws InterruptedException { // 异步操作已经完成,直接返回 if (isDone()) { return this; } if (Thread.interrupted()) { throw new InterruptedException(toString()); } // 死锁检测 checkDeadLock(); // 同步使修改waiters的线程只有一个 synchronized (this) { while (!isDone()) { // 等待直到异步操作完成 incWaiters(); // ++waiters; try { wait(); // JDK方法 } finally { decWaiters(); // --waiters } } } return this; }
结论
Future/Promise主要是用来执行异步回调,主要区别在于Promise能够设置异步执行的状态相关文章推荐
- netty(九)源码分析之Future和Promise
- Netty线程模型、Future、Channel总结和源码分析
- netty源码分析(四)Netty提供的Future与ChannelFuture优势分析与源码讲解
- Netty5源码分析(七) -- 异步执行Future和Promise
- netty源码分析之ChannelFuture
- netty源码分析之Future/Promise
- Netty中的Future源码解读
- netty源码分析(一)-启动
- 【Netty源码分析】客户端connect服务端过程
- Netty源码分析之ChannelPipeline
- 【Netty源码分析】数据读取过程
- netty4.0.x源码分析—channel
- netty源码分析
- netty源码分析之ChannelHandler
- 源码之下无秘密 ── 做最好的 Netty 源码分析教程
- netty源码分析(十八)Netty底层架构系统总结与应用实践
- netty源码分析(十四)Netty初始化流程总结及Channel与ChannelHandlerContext作用域分析
- Netty源码分析之DelimiterBasedFrameDecoder
- DotNetty网络通信框架学习之源码分析
- 【图灵学院09】RPC底层通讯原理之Netty线程模型源码分析