netty5.0之server端NioServerSocketChannel的init和register流程
2014-12-05 19:31
519 查看
Channel经历了initAndRegister和doBind0两步骤,本篇主要学习intAndRegister的流程,该流程主要做了:使用group初始化channel;channel参数赋值,初始化监听handler;第一次启动SingleThreadEventExecutor,并把register任务加入taskQueue中。代码见FIg
1。
Fig 1
从方法名称看出channel有init和register两步,下面看代码
tag1.1
ServerBootStrap override了AbstractBootStrap。group.next()就是轮询从children[]中取值,
ServerBootStrap$ServerBootstrapChannelFactory的newChannel如下:
ServerBootStrap override父类方法
需要注意两点:
1、ChannelInitializer的initChannel方式是在何处被调用(在下篇说明)
2、ServerBootstrapAcceptor作为handler到pipeline中
从下图中看出调用了AbstractUnsafe的register方法,由于eventloop(每一个channel都有属于自己的eventloop)是NioEventLoop的实例,所以层次调用了doStartThread
Fig 2
我们看看SingleThreadEventExecutor的execute和doStartThread方法
doStartThread方法的一个主要任务是启动:SingleThreadventExecutor的run方法,run方法的作用就是运行taskQueue的每个任务,一个任务就是channel的bind,在下一篇文章讲解。
1。
Fig 1
从方法名称看出channel有init和register两步,下面看代码
final ChannelFuture initAndRegister() { Channel channel; try { channel = createChannel();//tag1.1 } catch (Throwable t) { return VoidChannel.INSTANCE.newFailedFuture(t); } try { init(channel);//tag1.2 } catch (Throwable t) { channel.unsafe().closeForcibly(); return channel.newFailedFuture(t); } ChannelPromise regFuture = channel.newPromise(); channel.unsafe().register(regFuture);//tag1.3 if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } // If we are here and the promise is not failed, it's one of the following cases: // 1) If we attempted registration from the event loop, the registration has been completed at this point. // i.e. It's safe to attempt bind() or connect() now beause the channel has been registered. // 2) If we attempted registration from the other thread, the registration request has been successfully // added to the event loop's task queue for later execution. // i.e. It's safe to attempt bind() or connect() now: // because bind() or connect() will be executed *after* the scheduled registration task is executed // because register(), bind(), and connect() are all bound to the same thread. return regFuture; }对代码中标注的tag1.1,tag1.2,tag1.3进行说明
tag1.1
ServerBootStrap override了AbstractBootStrap。group.next()就是轮询从children[]中取值,
@Override Channel createChannel() { EventLoop eventLoop = group().next(); return channelFactory().newChannel(eventLoop, childGroup); }
ServerBootStrap$ServerBootstrapChannelFactory的newChannel如下:
private static final class ServerBootstrapChannelFactory<T extends ServerChannel> implements ServerChannelFactory<T> { private final Class<? extends T> clazz; ServerBootstrapChannelFactory(Class<? extends T> clazz) { this.clazz = clazz; } @Override public T newChannel(EventLoop eventLoop, EventLoopGroup childGroup) { try { Constructor<? extends T> constructor = clazz.getConstructor(EventLoop.class, EventLoopGroup.class); return constructor.newInstance(eventLoop, childGroup); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + clazz, t); } } @Override public String toString() { return StringUtil.simpleClassName(clazz) + ".class"; } }tag1.2
ServerBootStrap override父类方法
需要注意两点:
1、ChannelInitializer的initChannel方式是在何处被调用(在下篇说明)
2、ServerBootstrapAcceptor作为handler到pipeline中
@Override void init(Channel channel) throws Exception { final Map<ChannelOption<?>, Object> options = options(); synchronized (options) { channel.config().setOptions(options); } final Map<AttributeKey<?>, Object> attrs = attrs(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); if (handler() != null) { p.addLast(handler()); } final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new ServerBootstrapAcceptor(currentChildHandler, currentChildOptions, currentChildAttrs)); } }); }tag1.3
从下图中看出调用了AbstractUnsafe的register方法,由于eventloop(每一个channel都有属于自己的eventloop)是NioEventLoop的实例,所以层次调用了doStartThread
Fig 2
我们看看SingleThreadEventExecutor的execute和doStartThread方法
doStartThread方法的一个主要任务是启动:SingleThreadventExecutor的run方法,run方法的作用就是运行taskQueue的每个任务,一个任务就是channel的bind,在下一篇文章讲解。
@Override public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } boolean inEventLoop = inEventLoop(); if (inEventLoop) { addTask(task); } else { startThread(); addTask(task); if (isShutdown() && removeTask(task)) { reject(); } } if (!addTaskWakesUp) { wakeup(inEventLoop); } }
private void doStartThread() { assert thread == null; executor.execute(new Runnable() { @Override public void run() { thread = Thread.currentThread(); if (interrupted) { thread.interrupt(); } boolean success = false; updateLastExecutionTime(); try { SingleThreadEventExecutor.this.run(); success = true; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); } finally { if (state < ST_SHUTTING_DOWN) { state = ST_SHUTTING_DOWN; } // Check if confirmShutdown() was called at the end of the loop. if (success && gracefulShutdownStartTime == 0) { logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " + "before run() implementation terminates."); } try { // Run all remaining tasks and shutdown hooks. for (;;) { if (confirmShutdown()) { break; } } } finally { try { cleanup(); } finally { synchronized (stateLock) { state = ST_TERMINATED; } threadLock.release(); if (!taskQueue.isEmpty()) { logger.warn( "An event executor terminated with " + "non-empty task queue (" + taskQueue.size() + ')'); } terminationFuture.setSuccess(null); } } } } }); }
相关文章推荐
- netty5.0之server端NioServerSocketChannel的bind分析
- netty5笔记-总体流程分析4-NioServerSocketChannel
- netty NioServerSocketChannel注册流程一
- java NIO之ServerSocketChannel的使用
- Java netty之NioServerSocketChannel
- Java NIO ServerSocketChannel
- Java NIO ServerSocketChannel
- Java NIO(10-ServerSocketChannel)
- NIO ServerSocketChannel ScoketChannel
- 《Java 源码分析》:Java NIO 之 ServerSocketChannel
- Java NIO ServerSocketChannel
- Java NIO ServerSocketChannel
- 【Java NIO的深入研究】 ServerSocketChannel
- NIO编程之ServerSocketChannel用法详解
- Java NIO ServerSocketChannel
- JAVA-NIO之Socket/ServerSocket Channel(详解)
- Java Socket:Java-NIO-ServerSocketChannel
- Java NIO 学习(四)--ServerSocketChannel与SocketChannel
- Java NIO 之 ServerSocketChannel SocketChannel
- Java Socket:Java-NIO-ServerSocketChannel