您的位置:首页 > 其它

netty5源码分析(4)--学习笔记

2015-11-26 12:06 288 查看
NioMessageUnsafe注册EventLoop和promise

Unsafe接口

/**
* <em>Unsafe</em> operations that should <em>never</em> be called from user-code. These methods
* are only provided to implement the actual transport, and must be invoked from an I/O thread except for the
* following methods:
* <ul>
*   <li>{@link #invoker()}</li>
*   <li>{@link #localAddress()}</li>
*   <li>{@link #remoteAddress()}</li>
*   <li>{@link #closeForcibly()}</li>
*   <li>{@link #register(EventLoop, ChannelPromise)}</li>
*   <li>{@link #deregister(ChannelPromise)}</li>
*   <li>{@link #voidPromise()}</li>
* </ul>
*/
interface Unsafe {

/**
* Return the assigned {@link RecvByteBufAllocator.Handle} which will be used to allocate {@link ByteBuf}'s when
* receiving data.
*/
RecvByteBufAllocator.Handle recvBufAllocHandle();

/**
* Returns the {@link ChannelHandlerInvoker} which is used by default unless specified by a user.
*/
ChannelHandlerInvoker invoker();

/**
* Return the {@link SocketAddress} to which is bound local or
* {@code null} if none.
*/
SocketAddress localAddress();

/**
* Return the {@link SocketAddress} to which is bound remote or
* {@code null} if none is bound yet.
*/
SocketAddress remoteAddress();

/**
* Register the {@link Channel} of the {@link ChannelPromise} and notify
* the {@link ChannelFuture} once the registration was complete.
* <p>
* It's only safe to submit a new task to the {@link EventLoop} from within a
* {@link ChannelHandler} once the {@link ChannelPromise} succeeded. Otherwise
* the task may or may not be rejected.
* </p>
*/
void register(EventLoop eventLoop, ChannelPromise promise);

/**
* Bind the {@link SocketAddress} to the {@link Channel} of the {@link ChannelPromise} and notify
* it once its done.
*/
void bind(SocketAddress localAddress, ChannelPromise promise);

/**
* Connect the {@link Channel} of the given {@link ChannelFuture} with the given remote {@link SocketAddress}.
* If a specific local {@link SocketAddress} should be used it need to be given as argument. Otherwise just
* pass {@code null} to it.
*
* The {@link ChannelPromise} will get notified once the connect operation was complete.
*/
void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);

/**
* Disconnect the {@link Channel} of the {@link ChannelFuture} and notify the {@link ChannelPromise} once the
* operation was complete.
*/
void disconnect(ChannelPromise promise);

/**
* Close the {@link Channel} of the {@link ChannelPromise} and notify the {@link ChannelPromise} once the
* operation was complete.
*/
void close(ChannelPromise promise);

/**
* Closes the {@link Channel} immediately without firing any events.  Probably only useful
* when registration attempt failed.
*/
void closeForcibly();

/**
* Deregister the {@link Channel} of the {@link ChannelPromise} from {@link EventLoop} and notify the
* {@link ChannelPromise} once the operation was complete.
*/
void deregister(ChannelPromise promise);

/**
* Schedules a read operation that fills the inbound buffer of the first {@link ChannelHandler} in the
* {@link ChannelPipeline}.  If there's already a pending read operation, this method does nothing.
*/
void beginRead();

/**
* Schedules a write operation.
*/
void write(Object msg, ChannelPromise promise);

/**
* Flush out all write operations scheduled via {@link #write(Object, ChannelPromise)}.
*/
void flush();

/**
* Return a special ChannelPromise which can be reused and passed to the operations in {@link Unsafe}.
* It will never be notified of a success or error and so is only a placeholder for operations
* that take a {@link ChannelPromise} as argument but for which you not want to get notified.
*/
ChannelPromise voidPromise();

/**
* Returns the {@link ChannelOutboundBuffer} of the {@link Channel} where the pending write requests are stored.
*/
ChannelOutboundBuffer outboundBuffer();
}


Handle分配bytebuf

/**
* Allocates a new receive buffer whose capacity is probably large enough to read all inbound data and small enough
* not to waste its space.
*/
public interface RecvByteBufAllocator {

/**
* Creates a new handle.  The handle provides the actual operations and keeps the internal information which is
* required for predicting an optimal buffer capacity.
*/
Handle newHandle();

interface Handle {
/**
* Creates a new receive buffer whose capacity is probably large enough to read all inbound data and small
* enough not to waste its space.
*/
ByteBuf allocate(ByteBufAllocator alloc);

/**
* Similar to {@link #allocate(ByteBufAllocator)} except that it does not allocate anything but just tells the
* capacity.
*/
int guess();

/**
* Records the the actual number of read bytes in the previous read operation so that the allocator allocates
* the buffer with potentially more correct capacity.
*
* @param actualReadBytes the actual number of read bytes in the previous read operation
*/
void record(int actualReadBytes);
}
}


register方法设置channel的EventLoop 也就是NioEventLoop,并且

eventLoop.execute(new OneTimeTask() {
@Override
public void run() {
register0(promise);
}
});
通过执行execute(Runable)方法设置SingleThreadEventExecutor里的thread对象,用于判断eventLoop.inEventLoop()..(原来是在这赋值的)

boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
eventLoop.acceptNewTasks();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();


接着调用AbstractNioChannel的doRegister(),将ServerSocketChannel注册到NioEventLoop的Selector上

@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(((NioEventLoop) eventLoop().unwrap()).selector, 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
((NioEventLoop) eventLoop().unwrap()).selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}


javaChannel()是NioServerSocketChannel创建的时候的serversocketchannel服务器套接字通道,pipeline.fireChannelRegistered();pipeline在NioServerSocketChannel初始化是构造,把NioServerSocketChannel当前传入

DefaultChannelPipeline类构造方法 head的下一个contexthandler是TailContext,tail前一个是head,2个Context里方法上@Skip标记的方法不同,用来区别inbound和outbound,

DefaultChannelPipeline(AbstractChannel channel) {
if (channel == null) {
throw new NullPointerException("channel");
}
this.channel = channel;

tail = new TailContext(this);
head = new HeadContext(this);

head.next = tail;
tail.prev = head;
}


平常代码里经常调用的pipeline.addLast就是把我们写的inbound和outbound加载链表中

    @Override
    public ChannelPipeline addLast(String name, ChannelHandler handler) {
        return addLast((ChannelHandlerInvoker) null, name, handler);
    }

@Override
public ChannelPipeline addLast(ChannelHandlerInvoker invoker, String name, ChannelHandler handler) {
synchronized (this) {
name = filterName(name, handler);
addLast0(name, new DefaultChannelHandlerContext(this, invoker, name, handler));
}
return this;
}

private void addLast0(final String name, AbstractChannelHandlerContext newCtx) {
checkMultiplicity(newCtx);

AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;

name2ctx.put(name, newCtx);

callHandlerAdded(newCtx);
}


生成AbstractChannelHandlerContext的tail,head

public ChannelPipeline fireChannelRegistered() {
head.fireChannelRegistered();
return this;
}


HeadContext是AbstractChannelHandlerContext子类,AbstractChannelHandlerContext里持有AbstractChannel,DefaultChannelPipeline,ChannelHandlerInvoker,

@Override
public ChannelHandlerContext fireChannelRegistered() {
AbstractChannelHandlerContext next = findContextInbound();
next.invoker().invokeChannelRegistered(next);
return this;
}


next.invoker().invokeChannelRegistered(next);

掉的默认的 DefaultChannelHandlerInvoker

@Override
public void invokeChannelRegistered(final ChannelHandlerContext ctx) {
if (executor.inEventLoop()) {
invokeChannelRegisteredNow(ctx);
} else {
executor.execute(new OneTimeTask() {
@Override
public void run() {
invokeChannelRegisteredNow(ctx);
}
});
}
}


ChannelHandlerInvokerUtil类

public static void invokeChannelRegisteredNow(ChannelHandlerContext ctx) {
try {
ctx.handler().channelRegistered(ctx);
} catch (Throwable t) {
notifyHandlerException(ctx, t);
}
}


说白了就是模板模式,调用的我们自己的handler的响应方法

在ChannelHandlerContext链表结构里查找Inbound的Context,ctx.skipFlags的生成通过反射HeadContext,根据方法名和@Skip注解生成

static int skipFlags0(Class<? extends ChannelHandler> handlerType) {
int flags = 0;
try {
if (isSkippable(handlerType, "handlerAdded")) {
flags |= MASK_HANDLER_ADDED;
}
if (isSkippable(handlerType, "handlerRemoved")) {
flags |= MASK_HANDLER_REMOVED;
}
if (isSkippable(handlerType, "exceptionCaught", Throwable.class)) {
flags |= MASK_EXCEPTION_CAUGHT;
}
if (isSkippable(handlerType, "channelRegistered")) {
flags |= MASK_CHANNEL_REGISTERED;
}
if (isSkippable(handlerType, "channelUnregistered")) {
flags |= MASK_CHANNEL_UNREGISTERED;
}
if (isSkippable(handlerType, "channelActive")) {
flags |= MASK_CHANNEL_ACTIVE;
}
if (isSkippable(handlerType, "channelInactive")) {
flags |= MASK_CHANNEL_INACTIVE;
}
if (isSkippable(handlerType, "channelRead", Object.class)) {
flags |= MASK_CHANNEL_READ;
}
if (isSkippable(handlerType, "channelReadComplete")) {
flags |= MASK_CHANNEL_READ_COMPLETE;
}
if (isSkippable(handlerType, "channelWritabilityChanged")) {
flags |= MASK_CHANNEL_WRITABILITY_CHANGED;
}
if (isSkippable(handlerType, "userEventTriggered", Object.class)) {
flags |= MASK_USER_EVENT_TRIGGERED;
}
if (isSkippable(handlerType, "bind", SocketAddress.class, ChannelPromise.class)) {
flags |= MASK_BIND;
}
if (isSkippable(handlerType, "connect", SocketAddress.class, SocketAddress.class, ChannelPromise.class)) {
flags |= MASK_CONNECT;
}
if (isSkippable(handlerType, "disconnect", ChannelPromise.class)) {
flags |= MASK_DISCONNECT;
}
if (isSkippable(handlerType, "close", ChannelPromise.class)) {
flags |= MASK_CLOSE;
}
if (isSkippable(handlerType, "deregister", ChannelPromise.class)) {
flags |= MASK_DEREGISTER;
}
if (isSkippable(handlerType, "read")) {
flags |= MASK_READ;
}
if (isSkippable(handlerType, "write", Object.class, ChannelPromise.class)) {
flags |= MASK_WRITE;
}
if (isSkippable(handlerType, "flush")) {
flags |= MASK_FLUSH;
}
} catch (Exception e) {
// Should never reach here.
PlatformDependent.throwException(e);
}

return flags;
}


AbstractChannelHandlerContext类

static final int MASK_HANDLER_ADDED = 1;
static final int MASK_HANDLER_REMOVED = 1 << 1;

private static final int MASK_EXCEPTION_CAUGHT = 1 << 2;
private static final int MASK_CHANNEL_REGISTERED = 1 << 3;
private static final int MASK_CHANNEL_UNREGISTERED = 1 << 4;
private static final int MASK_CHANNEL_ACTIVE = 1 << 5;
private static final int MASK_CHANNEL_INACTIVE = 1 << 6;
private static final int MASK_CHANNEL_READ = 1 << 7;
private static final int MASK_CHANNEL_READ_COMPLETE = 1 << 8;
private static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 9;
private static final int MASK_USER_EVENT_TRIGGERED = 1 << 10;

private static final int MASK_BIND = 1 << 11;
private static final int MASK_CONNECT = 1 << 12;
private static final int MASK_DISCONNECT = 1 << 13;
private static final int MASK_CLOSE = 1 << 14;
private static final int MASK_DEREGISTER = 1 << 15;
private static final int MASK_READ = 1 << 16;
private static final int MASK_WRITE = 1 << 17;
private static final int MASK_FLUSH = 1 << 18;

private static final int MASKGROUP_INBOUND = MASK_EXCEPTION_CAUGHT |
MASK_CHANNEL_REGISTERED |
MASK_CHANNEL_UNREGISTERED |
MASK_CHANNEL_ACTIVE |
MASK_CHANNEL_INACTIVE |
MASK_CHANNEL_READ |
MASK_CHANNEL_READ_COMPLETE |
MASK_CHANNEL_WRITABILITY_CHANGED |
MASK_USER_EVENT_TRIGGERED;

private static final int MASKGROUP_OUTBOUND = MASK_BIND |
MASK_CONNECT |
MASK_DISCONNECT |
MASK_CLOSE |
MASK_DEREGISTER |
MASK_READ |
MASK_WRITE |
MASK_FLUSH;


通过@Skip过滤ChannelHandlerContext

private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while ((ctx.skipFlags & MASKGROUP_INBOUND) == MASKGROUP_INBOUND);
return ctx;
}


到这里bootstrap里initAndRegister()方法执行完毕,再次回到boostrap的bind()方法,doBind(final SocketAddress localAddress) 继续

private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}

if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586 promise.executor = channel.eventLoop();
}
doBind0(regFuture, channel, localAddress, promise);
}
});
return promise;
}
}


过会继续..
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: