netty 客户端初始化 and 注册过程分析
2017-09-23 00:51
513 查看
一、BootStrap.connect() 分析
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) { ........................................ 参数校验 ........................................ validate(); ........................................ 解析与连接 ........................................ return doResolveAndConnect(remoteAddress, localAddress); }
doResolveAndConnect 方法
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) { ........... ........................................ 初始化和注册 ........................................ final ChannelFuture regFuture = initAndRegister(); ........... } }
initAndRegister方法
final ChannelFuture initAndRegister() { Channel channel = null; try { .................................................. 1. 实例化NioSocketChannel .................................................. channel = channelFactory.newChannel(); .................................................. 2. NioSocketChannel的 option方法设置 & atrr 方法设置 .................................................. init(channel); } catch (Throwable t) { if (channel != null) { channel.unsafe().closeForcibly(); } GlobalEventExecutor.INSTANCE).setFailure(t); } .................................................. 3. config().group()为 NioEventLoopGroup的实例 config().group().register()的实现为MultiThreadEventLoopGroup.register 方法 .................................................. ChannelFuture regFuture = config().group().register(channel); }
二、MultiThreadEventLoopGroup.register(Channel channel)
@Override public EventLoop next() { return (EventLoop) super.next(); } @Override public ChannelFuture register(Channel channel) { ....................................... register 方法为:SingleThreadEventLoop.register(Channel) ....................................... return next().register(channel); }
三、SingleThreadEventLoop .register(Channel) 方法
@Override public ChannelFuture register(Channel channel) { return register(new DefaultChannelPromise(channel, this)); } @Override public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); ....................................... promise.channel().unsafe() 为 NioSocketChannelUnsafe 实例 实际调用方法为 AbstractUnsafe.register ....................................... promise.channel().unsafe().register(this, promise); return promise; }
四、 AbstractUnsafe. register
public final void register(EventLoop eventLoop, final ChannelPromise promise) { ...... .......................... 设置NioSocketChannel 的eventloop =SingleThreadEventLoop .......................... AbstractChannel.this.eventLoop = eventLoop; .......................... eventLoop.execute():启动一个新的线程然后执行 实际调用方法为 SingleThreadEventLoop.execute .......................... eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); ...... }
1. register0 过程
.......................... AbstractUnsafe 标记为已注册 .......................... boolean firstRegistration = neverRegistered; .......................... .......................... doRegister(); neverRegistered = false; .......................... NioSocketChannel 标记为已注册 .......................... registered = true; .......................... .......................... pipeline.invokeHandlerAddedIfNeeded(); .......................... .......................... safeSetSuccess(promise); .......................... .......................... pipeline.fireChannelRegistered();
1.1 doRegister()分析
实际调用方法为 NioSochetChannel 的 doRegister@Override protected void doRegister() throws Exception { for (;;) { try { .......................... 将jdk的 socketChannel 注册到多路复用器上 .......................... selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } } }
2. eventLoop.execute(Runnable) 过程
这里的eventLoop 为SingleThreadEventLoop实例实际调用方法 SingleThreadEventExecutor.execute,代码如下:
public void execute(Runnable task) { boolean inEventLoop = inEventLoop(); .......................... 启动线程 .......................... startThread(); .......................... 将任务添加到 taskQueue中 .......................... addTask(task); }
2.1 startThread();
private void startThread() { doStartThread(); }
2.1.1 doStartThread();
executor.execute(new Runnable() { @Override public void run() { ... ..... .......................... 为了标记 eventLoo 是否已经启动 .......................... thread = Thread.currentThread(); boolean success = false; updateLastExecutionTime(); try { SingleThreadEventExecutor.this.run(); success = true; } } .... .... });
2.1.1.1 executor.execute(runnable)
实现方法为ThreadPerTaskExecutor.executepublic void execute(Runnable command) { threadFactory.newThread(command).start(); }
2.1.1.2 SingleThreadEventExecutor.this.run()
此方法非常关键,是eventLoop的核心实现方法为 :NioEventLoop.run()方法
@Override protected void run() { for (;;) { .......................... 查询就绪的selectedKey .......................... select(wakenUp.getAndSet(false)); .......................... 执行就绪的selectedKey .......................... processSelectedKeys(); .......................... 执行任务 .......................... runAllTasks(); } }
processSelectedKeys();
processSelectedKeysOptimized 方法
private void processSelectedKeysOptimized() { for (int i = 0; i < selectedKeys.size; ++i) { final SelectionKey k = selectedKeys.keys[i]; selectedKeys.keys[i] = null; final Object a = k.attachment(); if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } if (needsToSelectAgain) { selectedKeys.reset(i + 1); selectAgain(); i = -1; } } }
processSelectedKey 方法
int readyOps = k.readyOps(); .......................... 处理 connnect 事件 .......................... if ((readyOps & SelectionKey.OP_CONNECT) != 0) { int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } .......................... 处理 write 事件 .......................... if ((readyOps & SelectionKey.OP_WRITE) != 0) { ch.unsafe().forceFlush(); } .......................... 处理 read 和 accept 事件 .......................... if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); }
runAllTasks();
protected boolean runAllTasks() { assert inEventLoop(); boolean fetchedAll; boolean ranAtLeastOne = false; do { .......................... ScheduledTaskQueue 中的task 放入 taskQueue 中 .......................... fetchedAll = fetchFromScheduledTaskQueue(); .......................... 执行taskQueue中的任务 .......................... if (runAllTasksFrom(taskQueue)) { ranAtLeastOne = true; } } while (!fetchedAll); .......................... 执行tailQueue中的任务 .......................... afterRunningAllTasks(); return ranAtLeastOne; }
2.2 addTask(task);
protected void addTask(Runnable task) { .......................... taskQueue 中增加task .......................... if (!offerTask(task)) { reject(task); } }
相关文章推荐
- Netty 源码分析(三):服务器端的初始化和注册过程
- Netty源码分析(二)—客户端初始化
- Spring源码分析----IOC容器的实现(IoC容器的初始化过程(定位、载入解析、注册))
- Netty源码之Channel初始化和注册过程
- 【Netty源码分析】客户端connect服务端过程
- Netty源码分析之客户端启动过程
- 【Netty源码分析】客户端connect服务端过程
- Netty 4.0源码分析1:服务端启动过程中的Channel与EventLoopGroup的注册
- 【Netty源码分析】客户端connect服务端过程
- netty 客户端连接过程分析
- JNI注册调用源码分析完整过程-安卓4.4
- Android 中WiFi初始化过程分析
- 异步回调过程分析(客户端:cURL获取网页源码)
- Netty学习之旅----源码分析netty服务端初始化流程(Reactor主从模式实现)
- Elasticsearch源码分析十一--查询分析器Analyzer注册过程
- dubbo作为消费者注册过程分析
- kernel 3.0.31 usb_init 子系统初始化过程分析
- rtems初始化过程分析
- Dalvik虚拟机JNI方法的注册过程分析
- Netty5源码分析--2.客户端启动过程