您的位置:首页 > 移动开发 > IOS开发

netty5.0之server端NioServerSocketChannel的init和register流程

2014-12-05 19:31 519 查看
Channel经历了initAndRegister和doBind0两步骤,本篇主要学习intAndRegister的流程,该流程主要做了:使用group初始化channel;channel参数赋值,初始化监听handler;第一次启动SingleThreadEventExecutor,并把register任务加入taskQueue中。代码见FIg
1。



Fig 1
从方法名称看出channel有init和register两步,下面看代码

final ChannelFuture initAndRegister() {
Channel channel;
try {
channel = createChannel();//tag1.1
} catch (Throwable t) {
return VoidChannel.INSTANCE.newFailedFuture(t);
}

try {
init(channel);//tag1.2
} catch (Throwable t) {
channel.unsafe().closeForcibly();
return channel.newFailedFuture(t);
}

ChannelPromise regFuture = channel.newPromise();
channel.unsafe().register(regFuture);//tag1.3
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}

// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
//    i.e. It's safe to attempt bind() or connect() now beause the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
//    added to the event loop's task queue for later execution.
//    i.e. It's safe to attempt bind() or connect() now:
//         because bind() or connect() will be executed *after* the scheduled registration task is executed
//         because register(), bind(), and connect() are all bound to the same thread.

return regFuture;
}
对代码中标注的tag1.1,tag1.2,tag1.3进行说明

tag1.1

ServerBootStrap override了AbstractBootStrap。group.next()就是轮询从children[]中取值,

@Override
Channel createChannel() {
EventLoop eventLoop = group().next();
return channelFactory().newChannel(eventLoop, childGroup);
}

ServerBootStrap$ServerBootstrapChannelFactory的newChannel如下:

private static final class ServerBootstrapChannelFactory<T extends ServerChannel>
implements ServerChannelFactory<T> {

private final Class<? extends T> clazz;

ServerBootstrapChannelFactory(Class<? extends T> clazz) {
this.clazz = clazz;
}

@Override
public T newChannel(EventLoop eventLoop, EventLoopGroup childGroup) {
try {
Constructor<? extends T> constructor = clazz.getConstructor(EventLoop.class, EventLoopGroup.class);
return constructor.newInstance(eventLoop, childGroup);
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}

@Override
public String toString() {
return StringUtil.simpleClassName(clazz) + ".class";
}
}
tag1.2
ServerBootStrap override父类方法

需要注意两点:

1、ChannelInitializer的initChannel方式是在何处被调用(在下篇说明)

2、ServerBootstrapAcceptor作为handler到pipeline中

@Override
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options();
synchronized (options) {
channel.config().setOptions(options);
}

final Map<AttributeKey<?>, Object> attrs = attrs();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}

ChannelPipeline p = channel.pipeline();
if (handler() != null) {
p.addLast(handler());
}

final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}

p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new ServerBootstrapAcceptor(currentChildHandler, currentChildOptions,
currentChildAttrs));
}
});
}
tag1.3
从下图中看出调用了AbstractUnsafe的register方法,由于eventloop(每一个channel都有属于自己的eventloop)是NioEventLoop的实例,所以层次调用了doStartThread



Fig 2
我们看看SingleThreadEventExecutor的execute和doStartThread方法

doStartThread方法的一个主要任务是启动:SingleThreadventExecutor的run方法,run方法的作用就是运行taskQueue的每个任务,一个任务就是channel的bind,在下一篇文章讲解。

@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}

boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}

if (!addTaskWakesUp) {
wakeup(inEventLoop);
}
}


private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}

boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
if (state < ST_SHUTTING_DOWN) {
state = ST_SHUTTING_DOWN;
}

// Check if confirmShutdown() was called at the end of the loop.
if (success && gracefulShutdownStartTime == 0) {
logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
"before run() implementation terminates.");
}

try {
// Run all remaining tasks and shutdown hooks.
for (;;) {
if (confirmShutdown()) {
break;
}
}
} finally {
try {
cleanup();
} finally {
synchronized (stateLock) {
state = ST_TERMINATED;
}
threadLock.release();
if (!taskQueue.isEmpty()) {
logger.warn(
"An event executor terminated with " +
"non-empty task queue (" + taskQueue.size() + ')');
}

terminationFuture.setSuccess(null);
}
}
}
}
});
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: