您的位置:首页 > Web前端 > BootStrap

java netty之ServerBootstrap的启动

2016-02-17 17:06 986 查看
通过前面的几篇文章,对整个netty部分的架构已经运行原理都有了一定的了解,那么这篇文章来分析一个经常用到的类:ServerBootstrap,一般对于服务器端的编程它用到的都还算是比较的多。。看一看它的初始化,以及它的运行原理。。。

首先我们还是引入一段代码,通过分析这段代码来分析ServerBootstrap的运行。。。

[java] view plain copy

EventLoopGroup bossGroup = new NioEventLoopGroup(); //这个是用于serversocketchannel的eventloop

EventLoopGroup workerGroup = new NioEventLoopGroup(); //这个是用于处理accept到的channel

try {

ServerBootstrap b = new ServerBootstrap(); //构建serverbootstrap对象

b.group(bossGroup, workerGroup); //设置时间循环对象,前者用来处理accept事件,后者用于处理已经建立的连接的io

b.channel(NioServerSocketChannel.class); //用它来建立新accept的连接,用于构造serversocketchannel的工厂类

b.childHandler(new ChannelInitializer<SocketChannel>(){ //为accept channel的pipeline预添加的inboundhandler

@Override //当新连接accept的时候,这个方法会调用

protected void initChannel(SocketChannel ch) throws Exception {

// TODO Auto-generated method stub

ch.pipeline().addLast(new MyChannelHandler()); //为当前的channel的pipeline添加自定义的处理函数

}

});

//bind方法会创建一个serverchannel,并且会将当前的channel注册到eventloop上面,

//会为其绑定本地端口,并对其进行初始化,为其的pipeline加一些默认的handler

ChannelFuture f = b.bind(80).sync();

f.channel().closeFuture().sync(); //相当于在这里阻塞,直到serverchannel关闭

} finally {

bossGroup.shutdownGracefully();

workerGroup.shutdownGracefully();

}

这段代码在前面的文章也有用到,基本上其意思也都在上面的注释中说的比较清楚了,那么我们接下来具体的分析其中的方法调用,首先是ServerBootstrap的group方法:

[java] view plain copy

//这里parent用于执行server的accept时间事件,child才是用于执行获取的channel连接的事件

public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {

super.group(parentGroup);

if (childGroup == null) {

throw new NullPointerException("childGroup");

}

if (this.childGroup != null) {

throw new IllegalStateException("childGroup set already");

}

this.childGroup = childGroup;

return this;

}

这个方法是用来设置eventloopgroup,首先调用了父类的group方法(abstractbootstrap),就不将父类的方法列出来了,其实意思都差不多,eventloopgroup属性的值。。。

好了,接下来我们再来看一下channel方法:

[java] view plain copy

//构造serversocketchannel factory

public B channel(Class<? extends C> channelClass) {

if (channelClass == null) {

throw new NullPointerException("channelClass");

}

return channelFactory(new BootstrapChannelFactory<C>(channelClass)); //构造工厂类

}

/**

* {@link ChannelFactory} which is used to create {@link Channel} instances from

* when calling {@link #bind()}. This method is usually only used if {@link #channel(Class)}

* is not working for you because of some more complex needs. If your {@link Channel} implementation

* has a no-args constructor, its highly recommend to just use {@link #channel(Class)} for

* simplify your code.

*/

@SuppressWarnings("unchecked")

public B channelFactory(ChannelFactory<? extends C> channelFactory) {

if (channelFactory == null) {

throw new NullPointerException("channelFactory");

}

if (this.channelFactory != null) {

throw new IllegalStateException("channelFactory set already");

}

this.channelFactory = channelFactory; //设置

return (B) this;

}

该方法主要是用于构造用于产生channel的工厂类,在我们这段代码说白了就是用于实例化serversocketchannel的工厂类。。。

接下来我们再来看一下childHandler方法:

[java] view plain copy

//设置childHandler,这个是当有channel accept之后为其添加的handler

public ServerBootstrap childHandler(ChannelHandler childHandler) {

if (childHandler == null) {

throw new NullPointerException("childHandler");

}

this.childHandler = childHandler;

return this;

}

这个很简单吧,就是一个赋值,具体说他有什么用,前面的注释有说明,不过以后的分析会说明它有什么用的。。。

接下来我们来看一下bind方法,这个比较重要吧:

[java] view plain copy

//最终将会创建serverchannel,然后会将其绑定到这个地址,然后对其进行初始化

public ChannelFuture bind(int inetPort) {

return bind(new InetSocketAddress(inetPort));

}

好吧,接下来再来看bind方法:

[java] view plain copy

public ChannelFuture bind(SocketAddress localAddress) {

validate();

if (localAddress == null) {

throw new NullPointerException("localAddress");

}

return doBind(localAddress);

}

好吧,再来看看doBind方法:

[java] view plain copy

private ChannelFuture doBind(final SocketAddress localAddress) {

final ChannelFuture regPromise = initAndRegister(); //在这里创建serverchanel,并对其进行初始化,并将其注册到eventloop当中去

final Channel channel = regPromise.channel();

final ChannelPromise promise = channel.newPromise();

if (regPromise.isDone()) {

doBind0(regPromise, channel, localAddress, promise); //将当前的serverchannel绑定地址

} else {

regPromise.addListener(new ChannelFutureListener() {

@Override

public void operationComplete(ChannelFuture future) throws Exception {

doBind0(future, channel, localAddress, promise);

}

});

}

return promise;

}

这里调用了一个比较重要的方法:initAndRegister,我们来看看它的定义:

[java] view plain copy

//创建初始化以及注册serverchanel

final ChannelFuture initAndRegister() {

//利用工厂类创建channel

final Channel channel = channelFactory().newChannel();

try {

init(channel); //init函数留给了后面来实现,用于初始化channel,例如为其的pipeline加上handler

} catch (Throwable t) {

channel.unsafe().closeForcibly();

return channel.newFailedFuture(t);

}

ChannelPromise regPromise = channel.newPromise();

group().register(channel, regPromise); //将当前创建的serverchannel注册到eventloop上面去

if (regPromise.cause() != null) {

if (channel.isRegistered()) {

channel.close();

} else {

channel.unsafe().closeForcibly();

}

}

// If we are here and the promise is not failed, it's one of the following cases:

// 1) If we attempted registration from the event loop, the registration has been completed at this point.

// i.e. It's safe to attempt bind() or connect() now beause the channel has been registered.

// 2) If we attempted registration from the other thread, the registration request has been successfully

// added to the event loop's task queue for later execution.

// i.e. It's safe to attempt bind() or connect() now:

// because bind() or connect() will be executed *after* the scheduled registration task is executed

// because register(), bind(), and connect() are all bound to the same thread.

return regPromise;

}

代码还是很简单,而且也相对比较好理解,无非就是利用前面说到过的channel工厂类来创建一个serversocketchannel,然后调用init方法对这个刚刚生成的channel进行一些初始化的操作,然后在调用eventloopgroup的register方法,将当前这个channel的注册到group上,那么以后这个channel的事件都在这个group上面执行,说白了也就是一些accept。、。。

好,我们先来看看这个init方法吧:

[java] view plain copy

@Override

//初始化chanel,当用channel factory构造channel以后,会调用这个函数来初始化,说白了就是为当前的channel的pipeline加入一些handler

void init(Channel channel) throws Exception {

//先初始化一些配置

final Map<ChannelOption<?>, Object> options = options();

synchronized (options) {

channel.config().setOptions(options);

}

//初始化一些属性

final Map<AttributeKey<?>, Object> attrs = attrs();

synchronized (attrs) {

for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {

@SuppressWarnings("unchecked")

AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();

channel.attr(key).set(e.getValue());

}

}

//获取当前channel的pipeline

ChannelPipeline p = channel.pipeline();

if (handler() != null) {

p.addLast(handler());

}

final EventLoopGroup currentChildGroup = childGroup;

final ChannelHandler currentChildHandler = childHandler;

final Entry<ChannelOption<?>, Object>[] currentChildOptions;

final Entry<AttributeKey<?>, Object>[] currentChildAttrs;

synchronized (childOptions) {

currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));

}

synchronized (childAttrs) {

currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));

}

p.addLast(new ChannelInitializer<Channel>() {

@Override

public void initChannel(Channel ch) throws Exception {

//这是一个inboundher,将其加入到serverchannel的pipeline上面去

ch.pipeline().addLast(new ServerBootstrapAcceptor(

currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));

}

});

}

代码还是相对很简单,首先初始化一些配置参数,然后初始化属性,最后还要为当前的channel的pipeline添加一个handler,这个handler用来当channel注册到eventloop上面之后对其进行一些初始化,我们还是来看看channelInitalizer的定义吧:

[java] view plain copy

public abstract class ChannelInitializer<C extends Channel> extends ChannelStateHandlerAdapter {

private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelInitializer.class);

/**

* This method will be called once the {@link Channel} was registered. After the method returns this instance

* will be removed from the {@link ChannelPipeline} of the {@link Channel}.

*

* @param ch the {@link Channel} which was registered.

* @throws Exception is thrown if an error occours. In that case the {@link Channel} will be closed.

*/

protected abstract void initChannel(C ch) throws Exception;

@SuppressWarnings("unchecked")

@Override

public final void channelRegistered(ChannelHandlerContext ctx)

throws Exception {

boolean removed = false;

boolean success = false;

try {

//调用用户定义的init函数对当前的channel进行初始化

initChannel((C) ctx.channel());

ctx.pipeline().remove(this);

removed = true;

ctx.fireChannelRegistered();

success = true;

} catch (Throwable t) {

logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t);

} finally {

if (!removed) {

ctx.pipeline().remove(this);

}

if (!success) {

ctx.close();

}

}

}

@Override

public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {

ctx.fireInboundBufferUpdated();

}

}

它有一个channelRegistered方法,这个方法是在当前pipeline所属的channel注册到eventloop上面之后会激活的方法,它则是调用了用户自定义的函数来初始化channel,然后在将当前handler移除。。。也就是执行

[java] view plain copy

ch.pipeline().addLast(new ServerBootstrapAcceptor(

currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));

这里又为当前的serversocketchannel添加了另外一个handler,来看看该类型的定义吧:

[java] view plain copy

private static class ServerBootstrapAcceptor

extends ChannelStateHandlerAdapter implements ChannelInboundMessageHandler<Channel> {

private final EventLoopGroup childGroup;

private final ChannelHandler childHandler;

private final Entry<ChannelOption<?>, Object>[] childOptions;

private final Entry<AttributeKey<?>, Object>[] childAttrs;

@SuppressWarnings("unchecked")

ServerBootstrapAcceptor(

EventLoopGroup childGroup, ChannelHandler childHandler,

Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {

this.childGroup = childGroup; //这个是用于管理accept的channel的eventloop

this.childHandler = childHandler;

this.childOptions = childOptions;

this.childAttrs = childAttrs;

}

@Override

public MessageBuf<Channel> newInboundBuffer(ChannelHandlerContext ctx) throws Exception {

return Unpooled.messageBuffer();

}

@Override

@SuppressWarnings("unchecked")

//当有数据进来的时候,会调用这个方法来处理数据,这里进来的数据就是accept的channel

public void inboundBufferUpdated(ChannelHandlerContext ctx) {

MessageBuf<Channel> in = ctx.inboundMessageBuffer(); //获取buf

for (;;) {

Channel child = in.poll();

if (child == null) {

break;

}

child.pipeline().addLast(childHandler); //为accept的channel的pipeline加入用户定义的初始化handler

for (Entry<ChannelOption<?>, Object> e: childOptions) {

try {

if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {

logger.warn("Unknown channel option: " + e);

}

} catch (Throwable t) {

logger.warn("Failed to set a channel option: " + child, t);

}

}

for (Entry<AttributeKey<?>, Object> e: childAttrs) {

child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());

}

try {

childGroup.register(child); //将当前accept的channel注册到eventloop

} catch (Throwable t) {

child.unsafe().closeForcibly();

logger.warn("Failed to register an accepted channel: " + child, t);

}

}

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

final ChannelConfig config = ctx.channel().config();

if (config.isAutoRead()) {

// stop accept new connections for 1 second to allow the channel to recover

// See https://github.com/netty/netty/issues/1328
config.setAutoRead(false);

ctx.channel().eventLoop().schedule(new Runnable() {

@Override

public void run() {

config.setAutoRead(true);

}

}, 1, TimeUnit.SECONDS);

}

// still let the exceptionCaught event flow through the pipeline to give the user

// a chance to do something with it

ctx.fireExceptionCaught(cause);

}

}

主要是有一个比较重要的方法,inboundBufferUpdated,这个方法是在有数据进来的时候会调用的,用于处理进来的数据,也就是accept到的channel,这里就知道我们定义的chidHandler的用处了吧,netty会将这个handler直接加入到刚刚accept到的channel的pipeline上面去。。。最后还要讲当前accept到的channel注册到child eventloop上面去,这里也就完完全全的明白了最开始定义的两个eventloopgroup的作用了。。。

好了,serversocketchannel的init以及register差不多了,然后会调用doBind0方法,将当前的serversocketchannel绑定到一个本地端口,

[java] view plain copy

//将chanel绑定到一个本地地址

private static void doBind0(

final ChannelFuture regFuture, final Channel channel,

final SocketAddress localAddress, final ChannelPromise promise) {

// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up

// the pipeline in its channelRegistered() implementation.

channel.eventLoop().execute(new Runnable() {

@Override

//匿名内部类想要访问外面的参数,那么外面的参数必须是要final的才行

public void run() {

if (regFuture.isSuccess()) {

//调用channel的bind方法,将当前的channl绑定到一个本地地址,其实是调用的是pipeline的bind方法,但是最终又是调用的当前

//channel的unsafe对象的bind方法

channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);

} else {

promise.setFailure(regFuture.cause());

}

}

});

}

其实这里调用bind方法最终还是调用serversocketchannel的unsafe对象的bind方法。。。。

到这里,整个serverbootstrap 就算初始化完成了,而且也可以开始运行了。。。

[java] view plain copy

b.childHandler(new ChannelInitializer<SocketChannel>(){ //为accept channel的pipeline预添加的inboundhandler

@Override //当新连接accept的时候,这个方法会调用

protected void initChannel(SocketChannel ch) throws Exception {

// TODO Auto-generated method stub

ch.pipeline().addLast(new MyChannelHandler()); //为当前的channel的pipeline添加自定义的处理函数

}

});

这段代码的意思是对于刚刚accept到的channel,将会在它的pipeline上面添加handler,这个handler的用处主要是就是用户自定义的initChannel方法,就是初始化这个channel,说白了就是为它的pipeline上面添加自己定义的handler。。。

这样整个serverbootstrap是怎么运行的也就差不多了。。。

刚开始接触到netty的时候觉得这里一头雾水,通过这段时间对其代码的阅读,总算搞懂了其整个运行的原理,而且觉得其设计还是很漂亮的,虽然有的时候会觉得有那么一点点的繁琐。。。。

整个运行过程总结为一下几个步骤:

(1)创建用于两个eventloopgroup对象,一个用于管理serversocketchannel,一个用于管理accept到的channel

(2)创建serverbootstrap对象,

(3)设置eventloopgroup

(4)创建用于构建用到的channel的工厂类

(5)设置childhandler,它的主要功能主要是用户定义代码来初始化accept到的channel

(6)创建serversocketchannel,并对它进行初始化,绑定端口,以及register,并为serversocketchannel的pipeline设置默认的handler

通过这几个步骤,整个serverbootstrap也就算是运行起来了。。。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: