您的位置:首页 > 其它

netty4.0.x源码分析—ChannelPipeline

2013-09-23 17:16 417 查看
备注:本文的分析基于netty 4.0.9final版本

1、ChannelPipeline结构图



2、关键类和接口分析

上一篇关于Channel的文章,在AbstractChannel的介绍中,以及提到了pipeline,这是操作处理的入口,是一个比较重要的概念,这里有必要对pipeline分析一下。

1)ChannelInboundInvoker

/**
* Interface which is shared by others which need to fire inbound events
*/
interface ChannelInboundInvoker {

/**
* A {@link Channel} was registered to its {@link EventLoop}.
*
* This will result in having the  {@link ChannelInboundHandler#channelRegistered(ChannelHandlerContext)} method
* called of the next  {@link ChannelInboundHandler} contained in the  {@link ChannelPipeline} of the
* {@link Channel}.
*/
ChannelInboundInvoker fireChannelRegistered();

/**
* A {@link Channel} was unregistered from its {@link EventLoop}.
*
* This will result in having the  {@link ChannelInboundHandler#channelUnregistered(ChannelHandlerContext)} method
* called of the next  {@link ChannelInboundHandler} contained in the  {@link ChannelPipeline} of the
* {@link Channel}.
*/
@Deprecated
ChannelInboundInvoker fireChannelUnregistered();

/**
* A {@link Channel} is active now, which means it is connected.
*
* This will result in having the  {@link ChannelInboundHandler#channelActive(ChannelHandlerContext)} method
* called of the next  {@link ChannelInboundHandler} contained in the  {@link ChannelPipeline} of the
* {@link Channel}.
*/
ChannelInboundInvoker fireChannelActive();

/**
* A {@link Channel} is inactive now, which means it is closed.
*
* This will result in having the  {@link ChannelInboundHandler#channelInactive(ChannelHandlerContext)} method
* called of the next  {@link ChannelInboundHandler} contained in the  {@link ChannelPipeline} of the
* {@link Channel}.
*/
ChannelInboundInvoker fireChannelInactive();

/**
* A {@link Channel} received an {@link Throwable} in one of its inbound operations.
*
* This will result in having the  {@link ChannelInboundHandler#exceptionCaught(ChannelHandlerContext, Throwable)}
* method  called of the next  {@link ChannelInboundHandler} contained in the  {@link ChannelPipeline} of the
* {@link Channel}.
*/
ChannelInboundInvoker fireExceptionCaught(Throwable cause);

/**
* A {@link Channel} received an user defined event.
*
* This will result in having the  {@link ChannelInboundHandler#userEventTriggered(ChannelHandlerContext, Object)}
* method  called of the next  {@link ChannelInboundHandler} contained in the  {@link ChannelPipeline} of the
* {@link Channel}.
*/
ChannelInboundInvoker fireUserEventTriggered(Object event);

/**
* A {@link Channel} received a message.
*
* This will result in having the {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)}
* method  called of the next {@link ChannelInboundHandler} contained in the  {@link ChannelPipeline} of the
* {@link Channel}.
*/
ChannelInboundInvoker fireChannelRead(Object msg);

ChannelInboundInvoker fireChannelReadComplete();

/**
* Triggers an {@link ChannelInboundHandler#channelWritabilityChanged(ChannelHandlerContext)}
* event to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
*/
ChannelInboundInvoker fireChannelWritabilityChanged();
}
这接口定义比较简单,即有事件数据从底层到应用时,用它来做相应的处理。

2)ChannelOutboundInvoker

/**
* Interface which is shared by others which need to execute outbound logic.
*/
interface ChannelOutboundInvoker {

/**
* Request to bind to the given {@link SocketAddress} and notify the {@link ChannelFuture} once the operation
* completes, either because the operation was successful or because of an error.
* <p>
* This will result in having the
* {@link ChannelOutboundHandler#bind(ChannelHandlerContext, SocketAddress, ChannelPromise)} method
* called of the next {@link ChannelOutboundHandler} contained in the  {@link ChannelPipeline} of the
* {@link Channel}.
*/
ChannelFuture bind(SocketAddress localAddress);

/**
* Request to connect to the given {@link SocketAddress} and notify the {@link ChannelFuture} once the operation
* completes, either because the operation was successful or because of an error.
* <p>
* If the connection fails because of a connection timeout, the {@link ChannelFuture} will get failed with
* a {@link ConnectTimeoutException}. If it fails because of connection refused a {@link ConnectException}
* will be used.
* <p>
* This will result in having the
* {@link ChannelOutboundHandler#connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)}
* method called of the next {@link ChannelOutboundHandler} contained in the  {@link ChannelPipeline} of the
* {@link Channel}.
*/
ChannelFuture connect(SocketAddress remoteAddress);

/**
* Request to connect to the given {@link SocketAddress} while bind to the localAddress and notify the
* {@link ChannelFuture} once the operation completes, either because the operation was successful or because of
* an error.
* <p>
* This will result in having the
* {@link ChannelOutboundHandler#connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)}
* method called of the next {@link ChannelOutboundHandler} contained in the  {@link ChannelPipeline} of the
* {@link Channel}.
*/
ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress);

/**
* Request to disconnect from the remote peer and notify the {@link ChannelFuture} once the operation completes,
* either because the operation was successful or because of an error.
* <p>
* This will result in having the
* {@link ChannelOutboundHandler#disconnect(ChannelHandlerContext, ChannelPromise)}
* method called of the next {@link ChannelOutboundHandler} contained in the  {@link ChannelPipeline} of the
* {@link Channel}.
*/
ChannelFuture disconnect();

/**
* Request to close this ChannelOutboundInvoker and notify the {@link ChannelFuture} once the operation completes,
* either because the operation was successful or because of
* an error.
*
* After it is closed it is not possible to reuse it again.
* <p>
* This will result in having the
* {@link ChannelOutboundHandler#close(ChannelHandlerContext, ChannelPromise)}
* method called of the next {@link ChannelOutboundHandler} contained in the  {@link ChannelPipeline} of the
* {@link Channel}.
*/
ChannelFuture close();

/**
* Request to deregister this ChannelOutboundInvoker from the previous assigned {@link EventExecutor} and notify the
* {@link ChannelFuture} once the operation completes, either because the operation was successful or because of
* an error.
* <p>
* This will result in having the
* {@link ChannelOutboundHandler#deregister(ChannelHandlerContext, ChannelPromise)}
* method called of the next {@link ChannelOutboundHandler} contained in the  {@link ChannelPipeline} of the
* {@link Channel}.
*
*/
@Deprecated
ChannelFuture deregister();

/**
* Request to bind to the given {@link SocketAddress} and notify the {@link ChannelFuture} once the operation
* completes, either because the operation was successful or because of an error.
*
* The given {@link ChannelPromise} will be notified.
* <p>
* This will result in having the
* {@link ChannelOutboundHandler#bind(ChannelHandlerContext, SocketAddress, ChannelPromise)} method
* called of the next {@link ChannelOutboundHandler} contained in the  {@link ChannelPipeline} of the
* {@link Channel}.
*/
ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise);

/**
* Request to connect to the given {@link SocketAddress} and notify the {@link ChannelFuture} once the operation
* completes, either because the operation was successful or because of an error.
*
* The given {@link ChannelFuture} will be notified.
*
* <p>
* If the connection fails because of a connection timeout, the {@link ChannelFuture} will get failed with
* a {@link ConnectTimeoutException}. If it fails because of connection refused a {@link ConnectException}
* will be used.
* <p>
* This will result in having the
* {@link ChannelOutboundHandler#connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)}
* method called of the next {@link ChannelOutboundHandler} contained in the  {@link ChannelPipeline} of the
* {@link Channel}.
*/
ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise);

/**
* Request to connect to the given {@link SocketAddress} while bind to the localAddress and notify the
* {@link ChannelFuture} once the operation completes, either because the operation was successful or because of
* an error.
*
* The given {@link ChannelPromise} will be notified and also returned.
* <p>
* This will result in having the
* {@link ChannelOutboundHandler#connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)}
* method called of the next {@link ChannelOutboundHandler} contained in the  {@link ChannelPipeline} of the
* {@link Channel}.
*/
ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);

/**
* Request to disconnect from the remote peer and notify the {@link ChannelFuture} once the operation completes,
* either because the operation was successful or because of an error.
*
* The given {@link ChannelPromise} will be notified.
* <p>
* This will result in having the
* {@link ChannelOutboundHandler#disconnect(ChannelHandlerContext, ChannelPromise)}
* method called of the next {@link ChannelOutboundHandler} contained in the  {@link ChannelPipeline} of the
* {@link Channel}.
*/
ChannelFuture disconnect(ChannelPromise promise);

/**
* Request to close this ChannelOutboundInvoker and notify the {@link ChannelFuture} once the operation completes,
* either because the operation was successful or because of
* an error.
*
* After it is closed it is not possible to reuse it again.
* The given {@link ChannelPromise} will be notified.
* <p>
* This will result in having the
* {@link ChannelOutboundHandler#close(ChannelHandlerContext, ChannelPromise)}
* method called of the next {@link ChannelOutboundHandler} contained in the  {@link ChannelPipeline} of the
* {@link Channel}.
*/
ChannelFuture close(ChannelPromise promise);

/**
* Request to deregister this ChannelOutboundInvoker from the previous assigned {@link EventExecutor} and notify the
* {@link ChannelFuture} once the operation completes, either because the operation was successful or because of
* an error.
*
* The given {@link ChannelPromise} will be notified.
* <p>
* This will result in having the
* {@link ChannelOutboundHandler#deregister(ChannelHandlerContext, ChannelPromise)}
* method called of the next {@link ChannelOutboundHandler} contained in the  {@link ChannelPipeline} of the
* {@link Channel}.
*/
@Deprecated
ChannelFuture deregister(ChannelPromise promise);

/**
* Request to Read data from the {@link Channel} into the first inbound buffer, triggers an
* {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)} event if data was
* read, and triggers a
* {@link ChannelInboundHandler#channelReadComplete(ChannelHandlerContext) channelReadComplete} event so the
* handler can decide to continue reading.  If there's a pending read operation already, this method does nothing.
* <p>
* This will result in having the
* {@link ChannelOutboundHandler#read(ChannelHandlerContext)}
* method called of the next {@link ChannelOutboundHandler} contained in the  {@link ChannelPipeline} of the
* {@link Channel}.
*/
ChannelOutboundInvoker read();

/**
* Request to write a message via this ChannelOutboundInvoker through the {@link ChannelPipeline}.
* This method will not request to actual flush, so be sure to call {@link #flush()}
* once you want to request to flush all pending data to the actual transport.
*/
ChannelFuture write(Object msg);

/**
* Request to write a message via this ChannelOutboundInvoker through the {@link ChannelPipeline}.
* This method will not request to actual flush, so be sure to call {@link #flush()}
* once you want to request to flush all pending data to the actual transport.
*/
ChannelFuture write(Object msg, ChannelPromise promise);

/**
* Request to flush all pending messages via this ChannelOutboundInvoker.
*/
ChannelOutboundInvoker flush();

/**
* Shortcut for call {@link #write(Object, ChannelPromise)} and {@link #flush()}.
*/
ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);

/**
* Shortcut for call {@link #write(Object)} and {@link #flush()}.
*/
ChannelFuture writeAndFlush(Object msg);
}
这个接口定义用于处理从应用到底层的事件数据。

3)ChannelPipeline

ChannelPipeline继承ChannelInboundInvoker和ChannelOutboundInvoker,它既是一个inboundinvoke,又是一个outboundinvoke,同时它也是ChannelHandler的管理者,提供了很多方法对handler进行操作。

4)DefaultChannelPipeline

在分析DefaultChannelPipeline之前,不得不先分析DefaultChannelHandlerContext(实现ChannelHandlerContext),因为ChannelPipeline的所有handler执行都是间接由ChannelhandlerContext执行的,在后面DefaultChannelPipeline的定义中,我们可以看到这点。

ChannelHandlerContext接口定义了很多方法,关键代码如下:

/**
* Return the {@link Channel} which is bound to the {@link ChannelHandlerContext}.
*/
Channel channel();

/**
* The {@link EventExecutor} that is used to dispatch the events. This can also be used to directly
* submit tasks that get executed in the event loop. For more information please refer to the
* {@link EventExecutor} javadoc.
*/
EventExecutor executor();
返回当前HandlerContext的Channel,以及返回当前EventExcutor。

DefaultChannelHandlerContext具体的final类,关键代码如下:

volatile DefaultChannelHandlerContext next;
volatile DefaultChannelHandlerContext prev;

private final boolean inbound;
private final boolean outbound;
private final AbstractChannel channel;
private final DefaultChannelPipeline pipeline;
private final String name;
private final ChannelHandler handler;
private boolean removed;

// Will be set to null if no child executor should be used, otherwise it will be set to the
// child executor.
final EventExecutor executor;
private ChannelFuture succeededFuture;
从上述代码中可以看出,DefaultChannelHandlerContext其实是一个上下文环境,它里面包括了当前Channel,当前的pipeline,以及当前的handler。另外它还提供了很多方法的实现,列如下面的函数。

@Override
public ChannelHandlerContext fireChannelActive() {
final DefaultChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelActive();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelActive();
}
});
}
return this;
}
从这个函数中,可以看出Handler的执行是由executor以线程的方式执行的。

分析完DefaultChannelHandlerContext,下面再来看DefaultChannelPipeline(实现Channelpipeline)。它是一个具体的handler管理者,handler的类型可以是inboundhandler,也可以是outboundhandler。关键代码如下:

final AbstractChannel channel;

final DefaultChannelHandlerContext head;
final DefaultChannelHandlerContext tail;

private final Map<String, DefaultChannelHandlerContext> name2ctx =
new HashMap<String, DefaultChannelHandlerContext>(4);

final Map<EventExecutorGroup, EventExecutor> childExecutors =
new IdentityHashMap<EventExecutorGroup, EventExecutor>();

public DefaultChannelPipeline(AbstractChannel channel) {
if (channel == null) {
throw new NullPointerException("channel");
}
this.channel = channel;

TailHandler tailHandler = new TailHandler();
tail = new DefaultChannelHandlerContext(this, null, generateName(tailHandler), tailHandler);

HeadHandler headHandler = new HeadHandler(channel.unsafe());
head = new DefaultChannelHandlerContext(this, null, generateName(headHandler), headHandler);

head.next = tail;
tail.prev = head;
}

从上述代码中可以看出,每个Channel都有一个对应的pipeline,每个pipeline有两个ChannelHandlerContext,分别包含默认的TailHandler和HeadHandler,并且它们构成一个双向链表结构。TailHandler处理inbound类型的数据;HeadHandler处理outbound类型的数据。

TailHandler是一个内部类,它实现ChannelInboundHandler接口,定义如下:

// A special catch-all handler that handles both bytes and messages.
static final class TailHandler implements ChannelInboundHandler {

@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception { }

@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { }

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { }

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { }

@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { }

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { }

@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { }

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { }

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.warn(
"An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
"It usually means the last handler in the pipeline did not handle the exception.", cause);
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
logger.debug(
"Discarded inbound message {} that reached at the tail of the pipeline. " +
"Please check your pipeline configuration.", msg);
} finally {
ReferenceCountUtil.release(msg);
}
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { }
}
TailHandler的实现函数都是空的,这说明对于底层上来应用的数据,用户必须定义Handler来处理,不能使用默认的Handler进行处理。

HeadHandler也是一个内部类,它实现ChannelOutboundHandler接口,定义如下:

static final class HeadHandler implements ChannelOutboundHandler {

protected final Unsafe unsafe;

protected HeadHandler(Unsafe unsafe) {
this.unsafe = unsafe;
}

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// NOOP
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// NOOP
}

@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
unsafe.bind(localAddress, promise);
}

@Override
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
unsafe.connect(remoteAddress, localAddress, promise);
}

@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
unsafe.disconnect(promise);
}

@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
unsafe.close(promise);
}

@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
unsafe.deregister(promise);
}

@Override
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
}

@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
unsafe.flush();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause);
}
}
HeaderHandler的实现函数都是基于unsafe对象的函数实现的,所以对于OutBound类型的数据,即应用往底层的数据,可以使用默认的Handler进行处理。

3、基于DefaultChannelPipeline的fireChannelActive函数分析handler的调用过程

1)fireChannelActive定义

@Override
public ChannelPipeline fireChannelActive() {
head.fireChannelActive();

if (channel.config().isAutoRead()) {
channel.read();
}

return this;
}


2) fireChannelActive实现分析

@Override
public ChannelHandlerContext fireChannelActive() {
final DefaultChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelActive();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelActive();
}
});
}
return this;
}<pre name="code" class="java">    private DefaultChannelHandlerContext findContextInbound() {
DefaultChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}


private void invokeChannelActive() {
try {
((ChannelInboundHandler) handler).channelActive(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}

上面的代码逻辑还是比较简单的,就是从Channelhandlercontext的Head链表中获取上下文,然后调用相应的Handler执行函数。作为开发者,这时必须定义自己的Handler,并且实现ChannelActive函数,因为这是从底层到应用的数据,这就是平时我们在编写服务器端的代码时,都会定义一个继承InboundHandler的handler,并且覆盖channelActive函数的原因。

4、总结

本文分析了ChannelPipeline对Handler的管理过程,它的主要思路是利用ChannelHandlerContext上下文获取Handler,然后间接调用Handler的函数实现处理逻辑。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: