您的位置:首页 > 其它

Netty源码分析(六)—Future和Promis分析

2017-11-24 19:42 756 查看

Netty源码分析(六)—Future和Promis分析

Future用来在异步执行中获取提前执行的结果

个人主页:tuzhenyu’s page

原文地址:Netty源码分析(六)—Future和Promis分析

(0) JDK中的Callable/Future模型

Callable与Runnable的区别在于Callable线程执行有返回值,Callable/Future模型是通过Future获取Callable线程的返回值;Callable线程的执行必须通过线程池的submit()提交;

public static void main(String[] args) throws InterruptedException,ExecutionException{

final ExecutorService exec = Executors.newFixedThreadPool(5);

Callable call = new Callable(){

public String call() throws Exception{

Thread.sleep(1000 * 5);

return "Other less important but longtime things.";

}

};

Future task = exec.submit(call);

//重要的事情

Thread.sleep(1000 * 3);

System.out.println("Let's do important things.");

//其他不重要的事情

String obj = (String)task.get();

System.out.println(obj);

//关闭线程池

exec.shutdown();

}


(1) JDK AIO中的Future使用

AsynchronousServerSocketChannel异步Socket执行accept()方法时不会阻塞线程,程序会继续执行;Future的get方法会判断任务是否执行完成,如果完成就返回结果,否则阻塞线程,直到任务完成。

public static void main(String[] args) throws Exception{
AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(30000));
while (true){
Future<AsynchronousSocketChannel> future = serverSocketChannel.accept();
AsynchronousSocketChannel socketChann
4000
el = future.get();
socketChannel.write(ByteBuffer.wrap("哈哈哈".getBytes("UTF-8"))).get();
}
}


(2) Netty中的Future

Netty扩展了Java的Future,最主要的改进就是增加了监听器Listener接口,通过监听器可以让异步执行更加有效率,不需要通过get来等待异步执行结束,而是通过监听器回调来精确地控制异步执行结束的时间点。

ChannelFuture接口扩展了Netty的Future接口,表示一种没有返回值的异步调用,同时关联了Channel,跟一个Channel绑定

private void doConnect(final Logger logger,final String host, final int port) {

ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));

future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture f) throws Exception {
if (!f.isSuccess()) {
logger.info("Started Tcp Client Failed");
f.channel().eventLoop().schedule( new Runnable() {
@Override
public void run() {
doConnect(logger,host,port);
}
}, 200, TimeUnit.MILLISECONDS);
}
}
});
}


(3) Netty中的Promise

Promise接口也扩展了Future接口,它表示一种可写的Future,可以通过setSeccess()方法或者setFailure()方法设置执行的状态;Promise通过状态的设置和检测器Listener的添加可以实现回调机制;

public static void main(String[] args) {
final DefaultEventExecutor executor = new DefaultEventExecutor();
Promise<Integer> promise = executor.newPromise();
promise.addListener(new GenericFutureListener<Future<? super Integer>>() {
public void operationComplete(Future<? super Integer> future) throws Exception {
System.out.println("promise is finish");
}
});
System.out.println("after promise");
promise.setSuccess(10);
System.out.println("after set promise");
}


after promise

after set promise

promise is finish


(4) Future源码分析

JDK的Future接口主要用来获取异步执行的结果,通过get()方法获取执行结果,如果没有执行完成则阻塞线程;

// 取消异步操作
boolean cancel(boolean mayInterruptIfRunning);
// 异步操作是否取消
boolean isCancelled();
// 异步操作是否完成,正常终止、异常、取消都是完成
boolean isDone();
// 阻塞直到取得异步操作结果
V get() throws InterruptedException, ExecutionException;
// 同上,但最长阻塞时间为timeout
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;


Netty的Future接口添加了addListener()方法用来添加监听器,当异步操作完成时候进行回调;

// 异步操作完成且正常终止
boolean isSuccess();
// 异步操作是否可以取消
boolean isCancellable();
// 异步操作失败的原因
Throwable cause();
// 添加一个监听者,异步操作完成时回调,类比javascript的回调函数
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
// 阻塞直到异步操作完成
Future<V> await() throws InterruptedException;
// 同上,但异步操作失败时抛出异常
Future<V> sync() throws InterruptedException;
// 非阻塞地返回异步结果,如果尚未完成返回null
V getNow();


AbstractFuture主要实现Future的get()方法,取得Future关联的异步操作结果;

调用await()方法阻塞当前线程,直至异步执行完成被唤醒;

public V get() throws InterruptedException, ExecutionException {
await();

Throwable cause = cause();
if (cause == null) {
return getNow();
}
if (cause instanceof CancellationException) {
throw (CancellationException) cause;
}
throw new ExecutionException(cause);
}


CompleteFuture表示一个异步操作已完成的结果,该类的实例在异步操作完成时创建,返回给用户;同时通过addListener()方法添加的回调函数也是添加到CompleteFuture中,在异步操作完成时执行;

在CompleteFuture中定义有一个EvnetExecutor执行器,也就是一个线程用来当异步操作完成后执行监听器中的Listener

因为CompleteFuture是异步操作完成后的结果,所以立即通知Listener执行

// 执行器,用来异步操作执行完毕后执行Listener中定义的操作
private final EventExecutor executor;

// 这有一个构造方法,可知executor是必须的
protected CompleteFuture(EventExecutor executor) {
this.executor = executor;
}


public Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
// 由于这是一个已完成的Future,所以立即通知Listener执行
DefaultPromise.notifyListener(executor(), this, listener);
return this;
}

public Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener) {
// 由于已完成,Listener中的操作已完成,没有需要删除的Listener
return this;
}


(5) Promise源码分析

Promise的默认实现是DefaultPromise类,主要的类属性包括执行器,监视器等

private volatile Object result;
private final EventExecutor executor;
private Object listeners;


DefaultPromise实例的创建与一个执行器也就是一个执行线程绑定;

public DefaultPromise(EventExecutor executor) {
this.executor = checkNotNull(executor, "executor");
}


添加监听器在DefaultPromise状态变化时回调执行,将监视器赋值给类属性判断异步操作是否完成,如果完成则执行监视器

public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
checkNotNull(listener, "listener");

synchronized (this) {
addListener0(listener);
}

if (isDone()) {
notifyListeners();
}

return this;
}


通过setSucess方法改变DefaultPromise状态完成异步操作,唤醒监听器执行功能线程;

public Promise<V> setSuccess(V result) {
if (setSuccess0(result)) {
notifyListeners();
return this;
}
throw new IllegalStateException("complete already: " + this);
}


Promise的sync()方法和awaite()方法类似,都是用来将线程阻塞等待被唤醒;sync()方法和awaite()方法区别在于sync()方法在异步操作失败时会抛出异常;

public Promise<V> sync() throws InterruptedException {
await();
rethrowIfFailed();  // 异步操作失败抛出异常
return this;
}


public Promise<V> await() throws InterruptedException {
// 异步操作已经完成,直接返回
if (isDone()) {
return this;
}
if (Thread.interrupted()) {
throw new InterruptedException(toString());
}
// 死锁检测
checkDeadLock();
// 同步使修改waiters的线程只有一个
synchronized (this) {
while (!isDone()) { // 等待直到异步操作完成
incWaiters();   // ++waiters;
try {
wait(); // JDK方法
} finally {
decWaiters(); // --waiters
}
}
}
return this;
}


结论

Future/Promise主要是用来执行异步回调,主要区别在于Promise能够设置异步执行的状态
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: