netty5源码分析(4)--学习笔记
2015-11-26 12:06
288 查看
NioMessageUnsafe注册EventLoop和promise
Unsafe接口
Handle分配bytebuf
register方法设置channel的EventLoop 也就是NioEventLoop,并且
接着调用AbstractNioChannel的doRegister(),将ServerSocketChannel注册到NioEventLoop的Selector上
javaChannel()是NioServerSocketChannel创建的时候的serversocketchannel服务器套接字通道,pipeline.fireChannelRegistered();pipeline在NioServerSocketChannel初始化是构造,把NioServerSocketChannel当前传入
DefaultChannelPipeline类构造方法 head的下一个contexthandler是TailContext,tail前一个是head,2个Context里方法上@Skip标记的方法不同,用来区别inbound和outbound,
平常代码里经常调用的pipeline.addLast就是把我们写的inbound和outbound加载链表中
生成AbstractChannelHandlerContext的tail,head
HeadContext是AbstractChannelHandlerContext子类,AbstractChannelHandlerContext里持有AbstractChannel,DefaultChannelPipeline,ChannelHandlerInvoker,
next.invoker().invokeChannelRegistered(next);
掉的默认的 DefaultChannelHandlerInvoker
ChannelHandlerInvokerUtil类
说白了就是模板模式,调用的我们自己的handler的响应方法
在ChannelHandlerContext链表结构里查找Inbound的Context,ctx.skipFlags的生成通过反射HeadContext,根据方法名和@Skip注解生成
AbstractChannelHandlerContext类
通过@Skip过滤ChannelHandlerContext
到这里bootstrap里initAndRegister()方法执行完毕,再次回到boostrap的bind()方法,doBind(final SocketAddress localAddress) 继续
过会继续..
Unsafe接口
/** * <em>Unsafe</em> operations that should <em>never</em> be called from user-code. These methods * are only provided to implement the actual transport, and must be invoked from an I/O thread except for the * following methods: * <ul> * <li>{@link #invoker()}</li> * <li>{@link #localAddress()}</li> * <li>{@link #remoteAddress()}</li> * <li>{@link #closeForcibly()}</li> * <li>{@link #register(EventLoop, ChannelPromise)}</li> * <li>{@link #deregister(ChannelPromise)}</li> * <li>{@link #voidPromise()}</li> * </ul> */ interface Unsafe { /** * Return the assigned {@link RecvByteBufAllocator.Handle} which will be used to allocate {@link ByteBuf}'s when * receiving data. */ RecvByteBufAllocator.Handle recvBufAllocHandle(); /** * Returns the {@link ChannelHandlerInvoker} which is used by default unless specified by a user. */ ChannelHandlerInvoker invoker(); /** * Return the {@link SocketAddress} to which is bound local or * {@code null} if none. */ SocketAddress localAddress(); /** * Return the {@link SocketAddress} to which is bound remote or * {@code null} if none is bound yet. */ SocketAddress remoteAddress(); /** * Register the {@link Channel} of the {@link ChannelPromise} and notify * the {@link ChannelFuture} once the registration was complete. * <p> * It's only safe to submit a new task to the {@link EventLoop} from within a * {@link ChannelHandler} once the {@link ChannelPromise} succeeded. Otherwise * the task may or may not be rejected. * </p> */ void register(EventLoop eventLoop, ChannelPromise promise); /** * Bind the {@link SocketAddress} to the {@link Channel} of the {@link ChannelPromise} and notify * it once its done. */ void bind(SocketAddress localAddress, ChannelPromise promise); /** * Connect the {@link Channel} of the given {@link ChannelFuture} with the given remote {@link SocketAddress}. * If a specific local {@link SocketAddress} should be used it need to be given as argument. Otherwise just * pass {@code null} to it. * * The {@link ChannelPromise} will get notified once the connect operation was complete. */ void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise); /** * Disconnect the {@link Channel} of the {@link ChannelFuture} and notify the {@link ChannelPromise} once the * operation was complete. */ void disconnect(ChannelPromise promise); /** * Close the {@link Channel} of the {@link ChannelPromise} and notify the {@link ChannelPromise} once the * operation was complete. */ void close(ChannelPromise promise); /** * Closes the {@link Channel} immediately without firing any events. Probably only useful * when registration attempt failed. */ void closeForcibly(); /** * Deregister the {@link Channel} of the {@link ChannelPromise} from {@link EventLoop} and notify the * {@link ChannelPromise} once the operation was complete. */ void deregister(ChannelPromise promise); /** * Schedules a read operation that fills the inbound buffer of the first {@link ChannelHandler} in the * {@link ChannelPipeline}. If there's already a pending read operation, this method does nothing. */ void beginRead(); /** * Schedules a write operation. */ void write(Object msg, ChannelPromise promise); /** * Flush out all write operations scheduled via {@link #write(Object, ChannelPromise)}. */ void flush(); /** * Return a special ChannelPromise which can be reused and passed to the operations in {@link Unsafe}. * It will never be notified of a success or error and so is only a placeholder for operations * that take a {@link ChannelPromise} as argument but for which you not want to get notified. */ ChannelPromise voidPromise(); /** * Returns the {@link ChannelOutboundBuffer} of the {@link Channel} where the pending write requests are stored. */ ChannelOutboundBuffer outboundBuffer(); }
Handle分配bytebuf
/** * Allocates a new receive buffer whose capacity is probably large enough to read all inbound data and small enough * not to waste its space. */ public interface RecvByteBufAllocator { /** * Creates a new handle. The handle provides the actual operations and keeps the internal information which is * required for predicting an optimal buffer capacity. */ Handle newHandle(); interface Handle { /** * Creates a new receive buffer whose capacity is probably large enough to read all inbound data and small * enough not to waste its space. */ ByteBuf allocate(ByteBufAllocator alloc); /** * Similar to {@link #allocate(ByteBufAllocator)} except that it does not allocate anything but just tells the * capacity. */ int guess(); /** * Records the the actual number of read bytes in the previous read operation so that the allocator allocates * the buffer with potentially more correct capacity. * * @param actualReadBytes the actual number of read bytes in the previous read operation */ void record(int actualReadBytes); } }
register方法设置channel的EventLoop 也就是NioEventLoop,并且
eventLoop.execute(new OneTimeTask() { @Override public void run() { register0(promise); } });通过执行execute(Runable)方法设置SingleThreadEventExecutor里的thread对象,用于判断eventLoop.inEventLoop()..(原来是在这赋值的)
boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false; registered = true; eventLoop.acceptNewTasks(); safeSetSuccess(promise); pipeline.fireChannelRegistered();
接着调用AbstractNioChannel的doRegister(),将ServerSocketChannel注册到NioEventLoop的Selector上
@Override protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(((NioEventLoop) eventLoop().unwrap()).selector, 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. ((NioEventLoop) eventLoop().unwrap()).selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } }
javaChannel()是NioServerSocketChannel创建的时候的serversocketchannel服务器套接字通道,pipeline.fireChannelRegistered();pipeline在NioServerSocketChannel初始化是构造,把NioServerSocketChannel当前传入
DefaultChannelPipeline类构造方法 head的下一个contexthandler是TailContext,tail前一个是head,2个Context里方法上@Skip标记的方法不同,用来区别inbound和outbound,
DefaultChannelPipeline(AbstractChannel channel) { if (channel == null) { throw new NullPointerException("channel"); } this.channel = channel; tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; }
平常代码里经常调用的pipeline.addLast就是把我们写的inbound和outbound加载链表中
@Override public ChannelPipeline addLast(String name, ChannelHandler handler) { return addLast((ChannelHandlerInvoker) null, name, handler); } @Override public ChannelPipeline addLast(ChannelHandlerInvoker invoker, String name, ChannelHandler handler) { synchronized (this) { name = filterName(name, handler); addLast0(name, new DefaultChannelHandlerContext(this, invoker, name, handler)); } return this; } private void addLast0(final String name, AbstractChannelHandlerContext newCtx) { checkMultiplicity(newCtx); AbstractChannelHandlerContext prev = tail.prev; newCtx.prev = prev; newCtx.next = tail; prev.next = newCtx; tail.prev = newCtx; name2ctx.put(name, newCtx); callHandlerAdded(newCtx); }
生成AbstractChannelHandlerContext的tail,head
public ChannelPipeline fireChannelRegistered() { head.fireChannelRegistered(); return this; }
HeadContext是AbstractChannelHandlerContext子类,AbstractChannelHandlerContext里持有AbstractChannel,DefaultChannelPipeline,ChannelHandlerInvoker,
@Override public ChannelHandlerContext fireChannelRegistered() { AbstractChannelHandlerContext next = findContextInbound(); next.invoker().invokeChannelRegistered(next); return this; }
next.invoker().invokeChannelRegistered(next);
掉的默认的 DefaultChannelHandlerInvoker
@Override public void invokeChannelRegistered(final ChannelHandlerContext ctx) { if (executor.inEventLoop()) { invokeChannelRegisteredNow(ctx); } else { executor.execute(new OneTimeTask() { @Override public void run() { invokeChannelRegisteredNow(ctx); } }); } }
ChannelHandlerInvokerUtil类
public static void invokeChannelRegisteredNow(ChannelHandlerContext ctx) { try { ctx.handler().channelRegistered(ctx); } catch (Throwable t) { notifyHandlerException(ctx, t); } }
说白了就是模板模式,调用的我们自己的handler的响应方法
在ChannelHandlerContext链表结构里查找Inbound的Context,ctx.skipFlags的生成通过反射HeadContext,根据方法名和@Skip注解生成
static int skipFlags0(Class<? extends ChannelHandler> handlerType) { int flags = 0; try { if (isSkippable(handlerType, "handlerAdded")) { flags |= MASK_HANDLER_ADDED; } if (isSkippable(handlerType, "handlerRemoved")) { flags |= MASK_HANDLER_REMOVED; } if (isSkippable(handlerType, "exceptionCaught", Throwable.class)) { flags |= MASK_EXCEPTION_CAUGHT; } if (isSkippable(handlerType, "channelRegistered")) { flags |= MASK_CHANNEL_REGISTERED; } if (isSkippable(handlerType, "channelUnregistered")) { flags |= MASK_CHANNEL_UNREGISTERED; } if (isSkippable(handlerType, "channelActive")) { flags |= MASK_CHANNEL_ACTIVE; } if (isSkippable(handlerType, "channelInactive")) { flags |= MASK_CHANNEL_INACTIVE; } if (isSkippable(handlerType, "channelRead", Object.class)) { flags |= MASK_CHANNEL_READ; } if (isSkippable(handlerType, "channelReadComplete")) { flags |= MASK_CHANNEL_READ_COMPLETE; } if (isSkippable(handlerType, "channelWritabilityChanged")) { flags |= MASK_CHANNEL_WRITABILITY_CHANGED; } if (isSkippable(handlerType, "userEventTriggered", Object.class)) { flags |= MASK_USER_EVENT_TRIGGERED; } if (isSkippable(handlerType, "bind", SocketAddress.class, ChannelPromise.class)) { flags |= MASK_BIND; } if (isSkippable(handlerType, "connect", SocketAddress.class, SocketAddress.class, ChannelPromise.class)) { flags |= MASK_CONNECT; } if (isSkippable(handlerType, "disconnect", ChannelPromise.class)) { flags |= MASK_DISCONNECT; } if (isSkippable(handlerType, "close", ChannelPromise.class)) { flags |= MASK_CLOSE; } if (isSkippable(handlerType, "deregister", ChannelPromise.class)) { flags |= MASK_DEREGISTER; } if (isSkippable(handlerType, "read")) { flags |= MASK_READ; } if (isSkippable(handlerType, "write", Object.class, ChannelPromise.class)) { flags |= MASK_WRITE; } if (isSkippable(handlerType, "flush")) { flags |= MASK_FLUSH; } } catch (Exception e) { // Should never reach here. PlatformDependent.throwException(e); } return flags; }
AbstractChannelHandlerContext类
static final int MASK_HANDLER_ADDED = 1; static final int MASK_HANDLER_REMOVED = 1 << 1; private static final int MASK_EXCEPTION_CAUGHT = 1 << 2; private static final int MASK_CHANNEL_REGISTERED = 1 << 3; private static final int MASK_CHANNEL_UNREGISTERED = 1 << 4; private static final int MASK_CHANNEL_ACTIVE = 1 << 5; private static final int MASK_CHANNEL_INACTIVE = 1 << 6; private static final int MASK_CHANNEL_READ = 1 << 7; private static final int MASK_CHANNEL_READ_COMPLETE = 1 << 8; private static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 9; private static final int MASK_USER_EVENT_TRIGGERED = 1 << 10; private static final int MASK_BIND = 1 << 11; private static final int MASK_CONNECT = 1 << 12; private static final int MASK_DISCONNECT = 1 << 13; private static final int MASK_CLOSE = 1 << 14; private static final int MASK_DEREGISTER = 1 << 15; private static final int MASK_READ = 1 << 16; private static final int MASK_WRITE = 1 << 17; private static final int MASK_FLUSH = 1 << 18; private static final int MASKGROUP_INBOUND = MASK_EXCEPTION_CAUGHT | MASK_CHANNEL_REGISTERED | MASK_CHANNEL_UNREGISTERED | MASK_CHANNEL_ACTIVE | MASK_CHANNEL_INACTIVE | MASK_CHANNEL_READ | MASK_CHANNEL_READ_COMPLETE | MASK_CHANNEL_WRITABILITY_CHANGED | MASK_USER_EVENT_TRIGGERED; private static final int MASKGROUP_OUTBOUND = MASK_BIND | MASK_CONNECT | MASK_DISCONNECT | MASK_CLOSE | MASK_DEREGISTER | MASK_READ | MASK_WRITE | MASK_FLUSH;
通过@Skip过滤ChannelHandlerContext
private AbstractChannelHandlerContext findContextInbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.next; } while ((ctx.skipFlags & MASKGROUP_INBOUND) == MASKGROUP_INBOUND); return ctx; }
到这里bootstrap里initAndRegister()方法执行完毕,再次回到boostrap的bind()方法,doBind(final SocketAddress localAddress) 继续
private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.executor = channel.eventLoop(); } doBind0(regFuture, channel, localAddress, promise); } }); return promise; } }
过会继续..
相关文章推荐
- 如何用js得到当前页面的url信息方法
- Bootstrap 标签
- Android WebView使用及苏宁易付宝支付相关问题
- js过滤HTML标签完整实例
- 终于明白了程序员的局限性
- 详解Java的Spring框架中的注解的用法
- [Storm] java.io.FileNotFoundException: File '../stormconf.ser' does not exist
- 请求管理员帮忙下!
- hbase总结:hbase连接异常
- Axure 全局辅助线(转)
- mysql中AES_ENCRYPT、AES_DNCRYPT及CONVERT的用法
- 利用JS验证查询参数-选择月份后必须选择年份
- ios按钮的选中与不选中
- box-shadow学习
- 利用JS验证查询参数-选择月份后必须选择年份
- 使用EVAL处理jqchart jquery 折线图返回数据无效的解决办法
- 监控应用服务
- C语言_变量的使用
- Storm:最火的流式处理框架
- oracle 数据复制(不同数据库、不同用户,无中转文件) impdp