您的位置:首页 > 其它

netty4.0.x源码分析—event

2013-09-23 09:35 399 查看
备注:本文的分析基于netty4.0.9final版本

1、event总体结构图

event总体结构类图如下:





2、event关键类和接口分析

1)基于NioEventLoop对关键类和接口进行分析,下面是它的关系图:



EventExecutor

相当于只有一个EventExcutor的EventExecutorGroup,它的next方法返回的是自己的引用,并且它还提供了方法判断线程是否在eventloop中执行,它是一个任务执行器。

EventExecutorGroup它继承了ScheduledExecutorService, Iterable<EventExecutor>,可以被看作是任务的调度执行器和EventExecutor容器,主要是定义了一些submit和schedule方法(用于线程的执行),以及next方法(返回一个EventExecutor实例)。

EventLoopGroup

它继承EventExecutorGroup,额外提供3个方法,一个next返回空闲的EventLoop,register方法注册Channel到EventLoop中。

EventLoop接口同时继承了EventExecutor和EventLoopGroup,因此它既是一个执行器,又是容器,提供一个parent方法,返回它所属的EventLoopGroup。其实它相当于是只有一个EventLoop的EventLoopGroup。

AbstractEventExecutor

它继承AbstractExecutorService并且实现EventExecutor接口,提供submit,schedule,已经next等方法。

SingleThreadEventExecutor

它继承AbstractEventExecutor,具体实现代码如下:

private final EventExecutorGroup parent;
private final Queue<Runnable> taskQueue;
final Queue<ScheduledFutureTask<?>> delayedTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>();

private final Thread thread;
<pre name="code" class="java">    protected SingleThreadEventExecutor(
EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {

if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}

this.parent = parent;
this.addTaskWakesUp = addTaskWakesUp;

thread = threadFactory.newThread(new Runnable() {
@Override
public void run() {
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
if (state < ST_SHUTTING_DOWN) {
state = ST_SHUTTING_DOWN;
}

// Check if confirmShutdown() was called at the end of the loop.
if (success && gracefulShutdownStartTime == 0) {
logger.error(
"Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
"before run() implementation terminates.");
}

try {
// Run all remaining tasks and shutdown hooks.
for (;;) {
if (confirmShutdown()) {
break;
}
}
} finally {
try {
cleanup();
} finally {
synchronized (stateLock) {
state = ST_TERMINATED;
}
threadLock.release();
if (!taskQueue.isEmpty()) {
logger.warn(
"An event executor terminated with " +
"non-empty task queue (" + taskQueue.size() + ')');
}

terminationFuture.setSuccess(null);
}
}
}
}
});

taskQueue = newTaskQueue();
}


从上述代码可以看出,这个就是事件循环的具体实现代码了,大概意思是基于阻塞队列,从队列中取得待执行的任务执行,并且加入线程同步的考虑,开发者在使用时不需要考虑线程同步的问题。

SingleThreadEventLoop

它继承了SingleThreadEventExecutor并且实现了EventLoop接口,提供注册Channel到事件循环中的函数,以及获取EventLoopGroup和EventLoop的函数。

NioEventLoop

它继承SingleThreadEventLoop,具体参考如下代码:

/**
* The NIO {@link Selector}.
*/
Selector selector;
private SelectedSelectionKeySet selectedKeys;

private final SelectorProvider provider;

<pre name="code" class="java">    NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
super(parent, threadFactory, false);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
provider = selectorProvider;
selector = openSelector();
}

private Selector openSelector() {
final Selector selector;
try {
selector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}

if (DISABLE_KEYSET_OPTIMIZATION) {
return selector;
}

try {
SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

Class<?> selectorImplClass =
Class.forName("sun.nio.ch.SelectorImpl", false, ClassLoader.getSystemClassLoader());
selectorImplClass.isAssignableFrom(selector.getClass());
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

selectedKeysField.setAccessible(true);
publicSelectedKeysField.setAccessible(true);

selectedKeysField.set(selector, selectedKeySet);
publicSelectedKeysField.set(selector, selectedKeySet);

selectedKeys = selectedKeySet;
logger.trace("Instrumented an optimized java.util.Set into: {}", selector);
} catch (Throwable t) {
selectedKeys = null;
logger.trace("Failed to instrument an optimized java.util.Set into: {}", selector, t);
}

return selector;
}


从这几个数据属性和代码可以看出这时netty开始调用JDK的Socket函数,包括我们熟悉的selector和key。也就是说真正调用底层socket的地方是在NioEventLoop中。

2)基于NioEventLoopGroup对关键类和接口进行分析,下面是它的关系图:



EventExecutorGroup

它继承了ScheduledExecutorService, Iterable<EventExecutor>,可以被看作是任务的调度执行器和EventExecutor容器

主要是定义了一些submit和schedule方法(用于线程的执行),以及next方法(返回一个EventExecutor实例)。

EventExecutor是一个特殊的EventExecutorGroup,它的next方法返回的是自己的引用,并且它还提供了方法判断线程是否在eventloop中执行,它是一个任务执行器。

EventLoopGroup

它继承EventExecutorGroup,提供3个方法,一个next返回空闲的EventLoop,register方法注册Channel到EventLoop中。

EventLoop接口同时继承了EventExecutor和EventLoopGroup,因此它既是一个执行器,又是容器,提供一个parent方法,返回它所属的EventLoopGroup。

AbstractEventExecutorGroup

它实现EventExecutorGroup接口的submit和schedule方法。

MultithreadEventExecutorGroup

它继承AbstractEventExecutorGroup类,具体实现代码如下

/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
*   http://www.apache.org/licenses/LICENSE-2.0 *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.concurrent;

import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Set;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Abstract base class for {@link EventExecutorGroup} implementations that handles their tasks with multiple threads at
* the same time.
*/
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {

private final EventExecutor[] children;
private final AtomicInteger childIndex = new AtomicInteger();
private final AtomicInteger terminatedChildren = new AtomicInteger();
private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);

/**
* Create a new instance.
*
* @param nThreads          the number of threads that will be used by this instance.
* @param threadFactory     the ThreadFactory to use, or {@code null} if the default should be used.
* @param args              arguments which will passed to each {@link #newChild(ThreadFactory, Object...)} call
*/
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}

if (threadFactory == null) {
threadFactory = newDefaultThreadFactory();
}

children = new SingleThreadEventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(threadFactory, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}

for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
Thread.currentThread().interrupt();
break;
}
}
}
}
}

final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};

for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
}

protected ThreadFactory newDefaultThreadFactory() {
return new DefaultThreadFactory(getClass());
}

@Override
public EventExecutor next() {
return children[Math.abs(childIndex.getAndIncrement() % children.length)];
}

@Override
public Iterator<EventExecutor> iterator() {
return children().iterator();
}

/**
* Return the number of {@link EventExecutor} this implementation uses. This number is the maps
* 1:1 to the threads it use.
*/
public final int executorCount() {
return children.length;
}

/**
* Return a safe-copy of all of the children of this group.
*/
protected Set<EventExecutor> children() {
Set<EventExecutor> children = Collections.newSetFromMap(new LinkedHashMap<EventExecutor, Boolean>());
Collections.addAll(children, this.children);
return children;
}

/**
* Create a new EventExecutor which will later then accessible via the {@link #next()}  method. This method will be
* called for each thread that will serve this {@link MultithreadEventExecutorGroup}.
*
*/
protected abstract EventExecutor newChild(
ThreadFactory threadFactory, Object... args) throws Exception;

}
从代码中可以看出,它定义了一个EventExcutor类型的child数组,具体类型是SingleThreadEventExecutor。也就是说它实际上是多个SingleThreadEventExecutor,这个上面已经有过介绍了。

NioEventLoopGroup

它继承MultithreadEventLoopGroup,提供了几个额外的方法,如rebuildSelectors(重新生成selector),setIoRatio(设置IO处理的时间)等,重写newChild方法,具体代码如下:

@Override
protected EventExecutor newChild(
ThreadFactory threadFactory, Object... args) throws Exception {
return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]);
}
实际上就是返回一个NioEventLoop对象,参考NioEventLoop的分析。

3、总结

其实整个event就是围绕着Loop和Excutor进行的,LoopGroup和ExcutorGroup相当于Loop和Excutor的容器,Group中包括了多个Loop和多个Excutor,所以单个Loop和Excutor也可以理解为一个Group,但其中只有一个Loop和Excutor。Loop用于事件循环,Excutor用于任务的提交调度执行。

备注:这里简单的对event事件的总体结构进行了分析,很多地方还不是很详细,具体细节还需要进一步分析代码。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: