您的位置:首页 > 其它

netty学习及源码分析(一)

2016-10-26 16:22 525 查看

netty学习及源码分析(一)

一直以来都想要研究下netty,苦于平时工作中很少有用到netty的场景,一直拖到现在才有时间学习一下netty,废话不多说,直接进入正题:

源码查看工具为idea,使用的netty版本为netty4。本节主要分析netty的server端一些简单的源码执行过程分析。

首先,创建netty server时需要先启动一个ServerBootstrap,它的构造方法为:
/**

* Creates a new instance with the specified initial {@link ChannelFactory}.

*/

public ServerBootstrap(ChannelFactory channelFactory) {

super(channelFactory);

}

/**

* Creates a new instance with the specified initial {@link ChannelFactory}.

*/

protected Bootstrap(ChannelFactory channelFactory) {

setFactory(channelFactory);

}


Bootstrap中有个private volatile ChannelFactory factory属性,该属性是用来创建channel使用的。

ChannelFactory有很多实现类,以NioServerSocketChannelFactory为例,其构造方法为:

/**

* Creates a new instance.

*

* @param bossExecutor

*        the {@link Executor} which will execute the boss threads

* @param workerExecutor

*        the {@link Executor} which will execute the I/O worker threads

* @param workerCount

*        the maximum number of I/O worker threads

*/

public NioServerSocketChannelFactory(

Executor bossExecutor, Executor workerExecutor,

int workerCount) {//需要传入两个初始的Executor即需要构造两个线程池,并传入worker的数量

if (bossExecutor == null) {

throw new NullPointerException("bossExecutor");

}

if (workerExecutor == null) {

throw new NullPointerException("workerExecutor");

}

if (workerCount <= 0) {

throw new IllegalArgumentException(

"workerCount (" + workerCount + ") " +

"must be a positive integer.");

}

this.bossExecutor = bossExecutor;

this.workerExecutor = workerExecutor;

sink = new NioServerSocketPipelineSink(workerExecutor, workerCount);

}


NioServerSocketPipelineSink是ChannelSink的实现类,作用是Receives and processes the terminal downstream {@link ChannelEvent}s,它的属性和构造方法如下:

private static final AtomicInteger nextId = new AtomicInteger();

private final int id = nextId.incrementAndGet();
private final NioWorker[] workers;
private final AtomicInteger workerIndex = new AtomicInteger();

NioServerSocketPipelineSink(Executor workerExecutor, int workerCount) {
workers = new NioWorker[workerCount];//构造NioServerSocketChannelFactory时传入的worker数目
for (int i = 0; i < workers.length; i ++) {
workers[i] = new NioWorker(id, i + 1, workerExecutor);
}


}

NioWorker(int bossId, int id, Executor executor) {

this.bossId = bossId;//boss线程的id

this.id = id;//当前线程的id

this.executor = executor;//执行器

}


在ServerBootstrap中有重载的两个bind()方法用来创建Channel:public Channel bind()和public Channel bind(final SocketAddress localAddress),第一个方法是去Bootstrap的options中查找localAddress,每次打开一个channel时会向options中以键值对的形式put一对String->SocketAddress。首次调用时需要调用有参的方法。

“`

public Channel bind(final SocketAddress localAddress) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}

final BlockingQueue<ChannelFuture> futureQueue =
new LinkedBlockingQueue<ChannelFuture>();

ChannelHandler binder = new Binder(localAddress, futureQueue);//是SimpleChannelUpstreamHandler的子类
ChannelHandler parentHandler = getParentHandler();

ChannelPipeline bossPipeline = pipeline();//得到的是DefaultChannelPipeline实例
bossPipeline.addLast("binder", binder);
if (parentHandler != null) {
bossPipeline.addLast("userHandler", parentHandler);
}

Channel channel = getFactory().newChannel(bossPipeline);

// Wait until the future is available.
ChannelFuture future = null;
boolean interrupted = false;
do {
try {
future = futureQueue.poll(Integer.MAX_VALUE, TimeUnit.SECONDS);
} catch (InterruptedException e) {
interrupted = true;
}
} while (future == null);

if (interrupted) {
Thread.currentThread().interrupt();
}

// Wait for the future.
future.awaitUninterruptibly();
if (!future.isSuccess()) {
future.getChannel().close().awaitUninterruptibly();
throw new ChannelException("Failed to bind to: " + localAddress, future.getCause());
}

return channel;
}
```


DefaultChannelPipeline的结构是一个链表,存的节点为DefaultChannelHandlerContext对象,对象里保存着上下结点的引用信息,而且pipeline中还维护着一个名称与Context的map对象,上面 bossPipeline.addLast(“binder”, binder)的操作为:

public synchronized void addLast(String name, ChannelHandler handler) {

if (name2ctx.isEmpty()) {

init(name, handler);

} else {

checkDuplicateName(name);

DefaultChannelHandlerContext oldTail = tail;

DefaultChannelHandlerContext newTail = new DefaultChannelHandlerContext(oldTail, null, name, handler);

callBeforeAdd(newTail);

oldTail.next = newTail;

tail = newTail;

name2ctx.put(name, newTail);

callAfterAdd(newTail);

}

}

private void init(String name, ChannelHandler handler) {

DefaultChannelHandlerContext ctx = new DefaultChannelHandlerContext(null, null, name, handler);

callBeforeAdd(ctx);

head = tail = ctx;

name2ctx.clear();

name2ctx.put(name, ctx);

callAfterAdd(ctx);

}


Channel channel = getFactory().newChannel(bossPipeline)执行的操作为NioServerSocketChannelFactory中的newChannel(bossPipeline)方法:

public ServerSocketChannel newChannel(ChannelPipeline pipeline) {

return new NioServerSocketChannel(this, pipeline, sink);

}


在NioServerSocketChannel中的属性和构造方法如下:

final ServerSocketChannel socket;//nio包中的ServerSocketChannel对象

final Lock shutdownLock = new ReentrantLock();//shutdown时用到的锁

volatile Selector selector;//nio中用到的selector

private final ServerSocketChannelConfig config;

NioServerSocketChannel(

ChannelFactory factory,

ChannelPipeline pipeline,

ChannelSink sink) {

super(factory, pipeline, sink);

try {

socket = ServerSocketChannel.open();//打开一个socket

} catch (IOException e) {

throw new ChannelException(

"Failed to open a server socket.", e);

}

try {

socket.configureBlocking(false);//配置成nio

} catch (IOException e) {

try {

socket.close();

} catch (IOException e2) {

logger.warn(

"Failed to close a partially initialized socket.", e2);

}

throw new ChannelException("Failed to enter non-blocking mode.", e);

}

config = new DefaultServerSocketChannelConfig(socket.socket());

fireChannelOpen(this);

}


fireChannelOpen方法的代码如下:

public static void fireChannelOpen(Channel channel) {

// Notify the parent handler.

if (channel.getParent() != null) {

fireChildChannelStateChanged(channel.getParent(), channel);

}

channel.getPipeline().sendUpstream(

new UpstreamChannelStateEvent(

channel, ChannelState.OPEN, Boolean.TRUE));

}

在DefaultChannelPipeline里面对应的sendUpstream方法为:

public void sendUpstream(ChannelEvent e) {

DefaultChannelHandlerContext head = getActualUpstreamContext(this.head);//取出之前放在pipeline中的ChannelHandlerContext对象

if (head == null) {

logger.warn(

“The pipeline contains no upstream handlers; discarding: ” + e);

return;

}

sendUpstream(head, e);

}

void sendUpstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {

try {

((ChannelUpstreamHandler) ctx.getHandler()).handleUpstream(ctx, e);//getHandler()在刚启动BootStrap时取到的是Binder,其中 private final class Binder extends SimpleChannelUpstreamHandler 故会调用父类的handleUpstream(ctx,e)方法

} catch (Throwable t) {

notifyHandlerException(e, t);

}

}

进入Binder方法中的channelOpen()方法之前的方法栈调用如下:



紧接着的调用栈为:



未完待续!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: