您的位置:首页 > 编程语言 > Java开发

「java」从websocket服务器的启动分析netty3.10源码

2018-03-01 15:42 609 查看
**

1.首先是创建bootstrap对象

**

// Configure the server.

ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(

Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));

创建bootstrap对象的时候先创建一个频道工厂
ChannelFactory
,它会初始化boss线程和多个worker线程。

public NioServerSocketChannelFactory(

Executor bossExecutor, int bossCount, WorkerPool workerPool) {

this(new NioServerBossPool(bossExecutor, bossCount, null), workerPool);

}

worker线程的初始化方法在
NioWorkerPool
的构造函数调用

NioServerBossPool的初始化方法会创建NioWorker对象,每个worker对象都会创建一个死锁检测worker线程,worker对象的数量由用户定义

NioServerBoss构造方法会调用
AbstractNioSelector.openSelector(determiner)
方法打开多路复用器,同时调用DeadLockProofWorker.start()创建一个死锁检测worker线程,在这个线程中会通过用户设置的执行器executor跑NioWorker对象(它是个runable)。

NioServerBossPool的构造方法最后调用waitForWorkerThreads()方法等待所有的worker线程初始化完毕,这是通过闭锁来进行判断的,闭锁的完成条件是NioWorker对象的父方法run()运行起来后调用startupLatch.countDown()方法。

boss线程的初始化和worker差不多

**

2.绑定本地地址

**

public Channel bind(final SocketAddress localAddress) {

//bind的异步操作结果对象(里面会创建一个新的频道)

ChannelFuture future = bindAsync(localAddress);

//阻塞到绑定结束 Wait for the future.
future.awaitUninterruptibly();
if (!future.isSuccess()) {
future.getChannel().close().awaitUninterruptibly();
throw new ChannelException("Failed to bind to: " + localAddress, future.getCause());
}

return future.getChannel();


}

public ChannelFuture bindAsync(final SocketAddress localAddress) {

if (localAddress == null) {

throw new NullPointerException(“localAddress”);

}

//本质是一个频道处理器(专用于绑定频道到指定的本地地址)

Binder binder = new Binder(localAddress);//绑定者

//获取用户设置的父频道处理器

ChannelHandler parentHandler = getParentHandler();

//新建一个默认管道作为boss管道

ChannelPipeline bossPipeline = pipeline();

//添加绑定者到boss管道中

bossPipeline.addLast(“binder”, binder);

//用户设置了父频道处理器就加到boss管道中

if (parentHandler != null) {

bossPipeline.addLast(“userHandler”, parentHandler);

}

//通过用户设置的频道工厂创建一个持有boss管道的频道
Channel channel = getFactory().newChannel(bossPipeline);

//结果future
final ChannelFuture bfuture = new DefaultChannelFuture(channel, false);
//添加回调以得到结果
binder.bindFuture.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
bfuture.setSuccess();
} else {
// Call close on bind failure
bfuture.getChannel().close();
bfuture.setFailure(future.getCause());
}
}
});
return bfuture;


}

在通过频道工厂NioServerSocketChannelFactory创建频道对象Channel的时候,NioServerSocketChannel的构造函数最后会调用fireChannelOpen(this)方法

public static void fireChannelOpen(Channel channel) {

// Notify the parent handler.

if (channel.getParent() != null) {

fireChildChannelStateChanged(channel.getParent(), channel);

}

channel.getPipeline().sendUpstream(
new UpstreamChannelStateEvent(
channel, ChannelState.OPEN, Boolean.TRUE));


}

可以看到在这里管道pipeline的sendUpstream()方法被调用,它会让管道持有的处理器handler对象顺着链表依次执行handleUpstream()方法。

而Binder是第一个注册的,所以它当然是第一个执行sendUpstream()方法(这是Binder父类SimpleChannelUpstreamHandler的方法)。最后会调用Binder的channelOpen()方法

private final class Binder extends SimpleChannelUpstreamHandler {

private final SocketAddress localAddress;
private final Map<String, Object> childOptions =
new HashMap<String, Object>();
private final DefaultChannelFuture bindFuture = new DefaultChannelFuture(null, false);
Binder(SocketAddress localAddress) {
this.localAddress = localAddress;
}

@Override
public void channelOpen(
ChannelHandlerContext ctx,
ChannelStateEvent evt) {

try {
evt.getChannel().getConfig().setPipelineFactory(getPipelineFactory());

// Split options into two categories: parent and child.
Map<String, Object> allOptions = getOptions();
Map<String, Object> parentOptions = new HashMap<String, Object>();
for (Entry<String, Object> e: allOptions.entrySet()) {
if (e.getKey().startsWith("child.")) {
childOptions.put(
e.getKey().substring(6),
e.getValue());
} else if (!"pipelineFactory".equals(e.getKey())) {
parentOptions.put(e.getKey(), e.getValue());
}
}

// Apply parent options.
evt.getChannel().getConfig().setOptions(parentOptions);
} finally {
//调用下一个handler的sendUpstream()方法
ctx.sendUpstream(evt);
}
//调用频道Channel的bind()方法绑定本地地址
evt.getChannel().bind(localAddress).addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
bindFuture.setSuccess();
} else {
bindFuture.setFailure(future.getCause());
}
}
});
}


可以看到方法的结尾会调用频道Channel的bind()方法绑定本地地址。最后他会调用ChannelPipelin的sendDownStream()方法。

public void sendDownstream(ChannelEvent e) {

DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail);

if (tail == null) {

try {

getSink().eventSunk(this, e);

return;

} catch (Throwable t) {

notifyHandlerException(e, t);

return;

}

}

sendDownstream(tail, e);


}

如果有DownstreamHandler,它会让管道持有的处理器handler对象顺着链表依次执行handleDownstream()方法。而没有的时候就是执行ChannelSink的eventSunk()方法。这个sink是ChannelFactory创建channel创建的NioServerSocketPipelineSink。

public void eventSunk(

ChannelPipeline pipeline, ChannelEvent e) throws Exception {

Channel channel = e.getChannel();

if (channel instanceof NioServerSocketChannel) {

handleServerSocket(e);

} else if (channel instanceof NioSocketChannel) {

handleAcceptedSocket(e);

}

}

接着调用handleServerSocket()方法处理绑定地址事件BOUND

private static void handleServerSocket(ChannelEvent e) {

if (!(e instanceof ChannelStateEvent)) {

return;

}

ChannelStateEvent event = (ChannelStateEvent) e;
NioServerSocketChannel channel =
(NioServerSocketChannel) event.getChannel();
ChannelFuture future = event.getFuture();
ChannelState state = event.getState();
Object value = event.getValue();

switch (state) {
case OPEN:
if (Boolean.FALSE.equals(value)) {
((NioServerBoss) channel.boss).close(channel, future);
}
break;
case BOUND:
if (value != null) {
((NioServerBoss) channel.boss).bind(channel, future, (SocketAddress) value);
} else {
((NioServerBoss) channel.boss).close(channel, future);
}
break;
default:
break;
}


}

最终调用到Boss的bind()方法,注册一个任务到boss对象的任务队列。这个任务会放到任务线程中进行处理。

void bind(final NioServerSocketChannel channel, final ChannelFuture future,

final SocketAddress localAddress) {

registerTask(new RegisterTask(channel, future, localAddress));

}

我们来看看这个任务做了什么

public void run() {

boolean bound = false;

boolean registered = false;

try {

channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());

bound = true;

future.setSuccess();
fireChannelBound(channel, channel.getLocalAddress());
channel.socket.register(selector, SelectionKey.OP_ACCEPT, channel);

registered = true;
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
} finally {
if (!registered && bound) {
close(channel, future);
}
}


}

可以看到它是通过nio的SelectableChannel绑定本地端口,有兴趣可以去了解下jdk的nio是怎么实现的。

public void run() {

boolean bound = false;

boolean registered = false;

try {

//通过nio的SelectableChannel绑定本地端口

channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());

bound = true;

future.setSuccess();
//绑定完本地地址后在发一个BOUND事件给上行处理器
fireChannelBound(channel, channel.getLocalAddress());
channel.socket.register(selector, SelectionKey.OP_ACCEPT, channel);

registered = true;
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
} finally {
if (!registered && bound) {
close(channel, future);
}
}


}

至此服务器基本启动完成
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  java netty nio