您的位置:首页 > 其它

netty 客户端初始化 and 注册过程分析

2017-09-23 00:51 513 查看

一、BootStrap.connect() 分析

public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {

........................................
参数校验
........................................
validate();

........................................
解析与连接
........................................
return doResolveAndConnect(remoteAddress, localAddress);
}


doResolveAndConnect 方法

private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
...........

........................................
初始化和注册
........................................
final ChannelFuture regFuture = initAndRegister();

...........
}
}


initAndRegister方法

final ChannelFuture initAndRegister() {
Channel channel = null;
try {
..................................................
1. 实例化NioSocketChannel
..................................................
channel = channelFactory.newChannel();

..................................................
2. NioSocketChannel的 option方法设置 & atrr 方法设置
..................................................
init(channel);

} catch (Throwable t) {
if (channel != null) {

channel.unsafe().closeForcibly();
}
GlobalEventExecutor.INSTANCE).setFailure(t);
}

..................................................
3. config().group()为 NioEventLoopGroup的实例

config().group().register()的实现为MultiThreadEventLoopGroup.register 方法
..................................................
ChannelFuture regFuture = config().group().register(channel);

}


二、MultiThreadEventLoopGroup.register(Channel channel)

@Override
public EventLoop next() {
return (EventLoop) super.next();
}

@Override
public ChannelFuture register(Channel channel) {

.......................................
register 方法为:SingleThreadEventLoop.register(Channel)
.......................................

return next().register(channel);
}


三、SingleThreadEventLoop .register(Channel) 方法

@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}

@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");

.......................................
promise.channel().unsafe() 为  NioSocketChannelUnsafe 实例
实际调用方法为 AbstractUnsafe.register
.......................................
promise.channel().unsafe().register(this, promise);
return promise;
}


四、 AbstractUnsafe. register

public final void register(EventLoop eventLoop, final ChannelPromise promise) {

......

..........................
设置NioSocketChannel 的eventloop =SingleThreadEventLoop
..........................
AbstractChannel.this.eventLoop = eventLoop;

..........................
eventLoop.execute():启动一个新的线程然后执行
实际调用方法为 SingleThreadEventLoop.execute
..........................
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});

......
}


1. register0 过程

..........................
AbstractUnsafe 标记为已注册
..........................
boolean firstRegistration = neverRegistered;

..........................

..........................
doRegister();

neverRegistered = false;

..........................
NioSocketChannel 标记为已注册
..........................
registered = true;

..........................

..........................
pipeline.invokeHandlerAddedIfNeeded();

..........................

..........................
safeSetSuccess(promise);

..........................

..........................
pipeline.fireChannelRegistered();


1.1 doRegister()分析

实际调用方法为 NioSochetChannel 的 doRegister

@Override
protected void doRegister() throws Exception {

for (;;) {
try {
..........................
将jdk的 socketChannel 注册到多路复用器上
..........................

selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
}
}
}


2. eventLoop.execute(Runnable) 过程

这里的eventLoop 为SingleThreadEventLoop实例

实际调用方法 SingleThreadEventExecutor.execute,代码如下:

public void execute(Runnable task) {
boolean inEventLoop = inEventLoop();
..........................
启动线程
..........................
startThread();

..........................
将任务添加到 taskQueue中
..........................
addTask(task);

}


2.1 startThread();

private void startThread() {

doStartThread();

}


2.1.1 doStartThread();

executor.execute(new Runnable() {
@Override
public void run() {

... .....

..........................
为了标记 eventLoo 是否已经启动
..........................

thread = Thread.currentThread();

boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
}
}

....  ....

});


2.1.1.1 executor.execute(runnable)

实现方法为ThreadPerTaskExecutor.execute

public void execute(Runnable command) {
threadFactory.newThread(command).start();
}


2.1.1.2 SingleThreadEventExecutor.this.run()

此方法非常关键,是eventLoop的核心

实现方法为 :NioEventLoop.run()方法

@Override
protected void run() {
for (;;) {

..........................
查询就绪的selectedKey
..........................

select(wakenUp.getAndSet(false));

..........................
执行就绪的selectedKey
..........................
processSelectedKeys();

..........................
执行任务
..........................
runAllTasks();

}
}


processSelectedKeys();

processSelectedKeysOptimized 方法

private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
selectedKeys.keys[i] = null;

final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}

if (needsToSelectAgain) {
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}


processSelectedKey 方法


int readyOps = k.readyOps();

..........................
处理 connnect 事件
..........................
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);

unsafe.finishConnect();
}

..........................
处理 write 事件
..........................

if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
..........................
处理 read  和  accept 事件
..........................
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}


runAllTasks();

protected boolean runAllTasks() {
assert inEventLoop();
boolean fetchedAll;
boolean ranAtLeastOne = false;

do {
..........................
ScheduledTaskQueue 中的task 放入 taskQueue 中
..........................
fetchedAll = fetchFromScheduledTaskQueue();

..........................
执行taskQueue中的任务
..........................

if (runAllTasksFrom(taskQueue)) {
ranAtLeastOne = true;
}
} while (!fetchedAll);

..........................
执行tailQueue中的任务
..........................

afterRunningAllTasks();
return ranAtLeastOne;
}


2.2 addTask(task);

protected void addTask(Runnable task) {

..........................
taskQueue 中增加task
..........................

if (!offerTask(task)) {
reject(task);
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: