您的位置:首页 > 其它

netty 服务端发布源码分析

2017-04-09 22:53 417 查看
//从serverBootstrap的doBind开始
//eventloop执行的时候会判断当前Thread和eventloop启动的thread是不是同一个,不是的话会创建一个
//所有的注册到selector方法,绑定ip和端口方法,都是封装成一个个task
//每个eventLoop会维护一个selector和taskQueue,负责处理客户端请求和内部任务,如ServerSocketChannel注册和ServerSocket绑定等。
private ChannelFuture doBind(final SocketAddress localAddress) {

//创建NioServerSocketChannel
//初始化NioServerSocketChannel
//绑定到parentEventLoopGroop上,从它的group中挑选一个eventloop
//绑定selector事件
//initAndRegister这个方法同时会调用ServerbootStrap的init(),init()方法设置pipeline和ChannelHandler
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}

//通过ChannelFuture判断注册是否完成,因为这个时候是交个eventloop去执行的,异步执行
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
//serverSocketChannel已经初始化好了,Channel上的Pipeline也设置好了\
//socketChannel也已经注册到eventloop上的selector上了
//eventloop后面又执行了一个绑定ip和端口的task,也就是传统的socketChannel.socket()绑定端口和ip
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
//如果异步执行还没有完成,则绑定监听器,重写operationComplete方法
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586 promise.registered();

doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}

/**
绑定的流程
创建NioServerSocketChannel
初始化NioServerSocketChannel
绑定到parentEventLoopGroop上,从它的group中挑选一个eventloop

*/
final ChannelFuture initAndRegister() {
Channel channel = null;
try {

//创建NioServerSocketChannel实例
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}

//io.netty.channel.nio.NioEventLoopGroup@5a955565
//注册的时候调用next,bossGroup虽然创建了eventloop,但是只注册到第一个eventloop
//executors[idx.getAndIncrement() & executors.length - 1];
//config()===>ServerBootstrapConfig config
//group()====>io.netty.channel.nio.NioEventLoopGroup
//register(Channel channel)====>next().register(channel)====>NioEventLoop.register()
//===>SingleThreadEventExecutor.
/*
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
//io.netty.channel.nio.AbstractNioMessageChannel$NioMessageUnsafe.register(this,promise)
//AbstractChannel.AbstractUnsafe.register(EventLoop eventLoop, final ChannelPromise promise)
promise.channel().unsafe().register(this, promise);
return promise;
}

AbstractNioChannel.doRegister();

@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}

*/

//这里注册到selector,同时创建一个线程SingleThreadEventExecutor在轮询,也就是nioEventLoop的run方法
/*
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});

Thread t = newThread(new DefaultRunnableDecorator(r), prefix + nextId.incrementAndGet());

FastThreadLocalThread(threadGroup, r, name);===>r指向DefaultRunnableDecorator
DefaultRunnableDecorator====>匿名指向==>SingleThreadEventExecutor.this.run();===>指向NioEventloop.run()方法

*/
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}

// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
//    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
//    added to the event loop's task queue for later execution.
//    i.e. It's safe to attempt bind() or connect() now:
//         because bind() or connect() will be executed *after* the scheduled registration task is executed
//         because register(), bind(), and connect() are all bound to the same thread.

return regFuture;
}

//对Channe进行初始化,包括设置pipeline
//serverBootstrap,在Pipeline中设置添加ServerBootstrapAcceptor

@Override
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
channel.config().setOptions(options);
}

final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}

ChannelPipeline p = channel.pipeline();

final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}

p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}

// We add this handler via the EventLoop as the user may have used a ChannelInitializer as handler.
// In this case the initChannel(...) method will only be called after this method returns. Because
// of this we need to ensure we add our handler in a delayed fashion so all the users handler are
// placed in front of the ServerBootstrapAcceptor.
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
//添加Acceptor Handler
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}

//绑定ip端口,这个是封装成task传递给eventLoop去执行的,此时eventLoop已经启动,在轮询
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
//这个方法表示serverSocketChannel已经注册到selector上了
// This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}

@Override
public void execute(Runnable command) {
//io.netty.util.concurrent.DefaultThreadFactory
threadFactory.newThread(command).start();
}

new FastThreadLocalThread();
//nioEventLoopGroup-2-

io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator
io.netty.util.concurrent.SingleThreadEventExecutor.run

//最后还是指向NioEventLoop的run方法

@Override
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));

// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
//    'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
//    'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).

if (wakenUp.get()) {
selector.wakeup();
}
default:
// fallthrough
}

cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}

private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}

boolean success = false;
updateLastExecutionTime();
try {
//最后还是指向NioEventLoop的run方法
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
for (;;) {
int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break;
}
}

// Check if confirmShutdown() was called at the end of the loop.
if (success && gracefulShutdownStartTime == 0) {
logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
"before run() implementation terminates.");
}

try {
// Run all remaining tasks and shutdown hooks.
for (;;) {
if (confirmShutdown()) {
break;
}
}
} finally {
try {
cleanup();
} finally {
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
threadLock.release();
if (!taskQueue.isEmpty()) {
logger.warn(
"An event executor terminated with " +
"non-empty task queue (" + taskQueue.size() + ')');
}

terminationFuture.setSuccess(null);
}
}
}
}
});
}

//NioServerSocketChannel封装
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: