netty学习及源码分析(一)
2016-10-26 16:22
525 查看
netty学习及源码分析(一)
一直以来都想要研究下netty,苦于平时工作中很少有用到netty的场景,一直拖到现在才有时间学习一下netty,废话不多说,直接进入正题:源码查看工具为idea,使用的netty版本为netty4。本节主要分析netty的server端一些简单的源码执行过程分析。
首先,创建netty server时需要先启动一个ServerBootstrap,它的构造方法为:
/** * Creates a new instance with the specified initial {@link ChannelFactory}. */ public ServerBootstrap(ChannelFactory channelFactory) { super(channelFactory); } /** * Creates a new instance with the specified initial {@link ChannelFactory}. */ protected Bootstrap(ChannelFactory channelFactory) { setFactory(channelFactory); }
Bootstrap中有个private volatile ChannelFactory factory属性,该属性是用来创建channel使用的。
ChannelFactory有很多实现类,以NioServerSocketChannelFactory为例,其构造方法为:
/** * Creates a new instance. * * @param bossExecutor * the {@link Executor} which will execute the boss threads * @param workerExecutor * the {@link Executor} which will execute the I/O worker threads * @param workerCount * the maximum number of I/O worker threads */ public NioServerSocketChannelFactory( Executor bossExecutor, Executor workerExecutor, int workerCount) {//需要传入两个初始的Executor即需要构造两个线程池,并传入worker的数量 if (bossExecutor == null) { throw new NullPointerException("bossExecutor"); } if (workerExecutor == null) { throw new NullPointerException("workerExecutor"); } if (workerCount <= 0) { throw new IllegalArgumentException( "workerCount (" + workerCount + ") " + "must be a positive integer."); } this.bossExecutor = bossExecutor; this.workerExecutor = workerExecutor; sink = new NioServerSocketPipelineSink(workerExecutor, workerCount); }
NioServerSocketPipelineSink是ChannelSink的实现类,作用是Receives and processes the terminal downstream {@link ChannelEvent}s,它的属性和构造方法如下:
private static final AtomicInteger nextId = new AtomicInteger(); private final int id = nextId.incrementAndGet(); private final NioWorker[] workers; private final AtomicInteger workerIndex = new AtomicInteger(); NioServerSocketPipelineSink(Executor workerExecutor, int workerCount) { workers = new NioWorker[workerCount];//构造NioServerSocketChannelFactory时传入的worker数目 for (int i = 0; i < workers.length; i ++) { workers[i] = new NioWorker(id, i + 1, workerExecutor); }
}
NioWorker(int bossId, int id, Executor executor) { this.bossId = bossId;//boss线程的id this.id = id;//当前线程的id this.executor = executor;//执行器 }
在ServerBootstrap中有重载的两个bind()方法用来创建Channel:public Channel bind()和public Channel bind(final SocketAddress localAddress),第一个方法是去Bootstrap的options中查找localAddress,每次打开一个channel时会向options中以键值对的形式put一对String->SocketAddress。首次调用时需要调用有参的方法。
“`
public Channel bind(final SocketAddress localAddress) { if (localAddress == null) { throw new NullPointerException("localAddress"); } final BlockingQueue<ChannelFuture> futureQueue = new LinkedBlockingQueue<ChannelFuture>(); ChannelHandler binder = new Binder(localAddress, futureQueue);//是SimpleChannelUpstreamHandler的子类 ChannelHandler parentHandler = getParentHandler(); ChannelPipeline bossPipeline = pipeline();//得到的是DefaultChannelPipeline实例 bossPipeline.addLast("binder", binder); if (parentHandler != null) { bossPipeline.addLast("userHandler", parentHandler); } Channel channel = getFactory().newChannel(bossPipeline); // Wait until the future is available. ChannelFuture future = null; boolean interrupted = false; do { try { future = futureQueue.poll(Integer.MAX_VALUE, TimeUnit.SECONDS); } catch (InterruptedException e) { interrupted = true; } } while (future == null); if (interrupted) { Thread.currentThread().interrupt(); } // Wait for the future. future.awaitUninterruptibly(); if (!future.isSuccess()) { future.getChannel().close().awaitUninterruptibly(); throw new ChannelException("Failed to bind to: " + localAddress, future.getCause()); } return channel; } ```
DefaultChannelPipeline的结构是一个链表,存的节点为DefaultChannelHandlerContext对象,对象里保存着上下结点的引用信息,而且pipeline中还维护着一个名称与Context的map对象,上面 bossPipeline.addLast(“binder”, binder)的操作为:
public synchronized void addLast(String name, ChannelHandler handler) { if (name2ctx.isEmpty()) { init(name, handler); } else { checkDuplicateName(name); DefaultChannelHandlerContext oldTail = tail; DefaultChannelHandlerContext newTail = new DefaultChannelHandlerContext(oldTail, null, name, handler); callBeforeAdd(newTail); oldTail.next = newTail; tail = newTail; name2ctx.put(name, newTail); callAfterAdd(newTail); } } private void init(String name, ChannelHandler handler) { DefaultChannelHandlerContext ctx = new DefaultChannelHandlerContext(null, null, name, handler); callBeforeAdd(ctx); head = tail = ctx; name2ctx.clear(); name2ctx.put(name, ctx); callAfterAdd(ctx); }
Channel channel = getFactory().newChannel(bossPipeline)执行的操作为NioServerSocketChannelFactory中的newChannel(bossPipeline)方法:
public ServerSocketChannel newChannel(ChannelPipeline pipeline) { return new NioServerSocketChannel(this, pipeline, sink); }
在NioServerSocketChannel中的属性和构造方法如下:
final ServerSocketChannel socket;//nio包中的ServerSocketChannel对象 final Lock shutdownLock = new ReentrantLock();//shutdown时用到的锁 volatile Selector selector;//nio中用到的selector private final ServerSocketChannelConfig config; NioServerSocketChannel( ChannelFactory factory, ChannelPipeline pipeline, ChannelSink sink) { super(factory, pipeline, sink); try { socket = ServerSocketChannel.open();//打开一个socket } catch (IOException e) { throw new ChannelException( "Failed to open a server socket.", e); } try { socket.configureBlocking(false);//配置成nio } catch (IOException e) { try { socket.close(); } catch (IOException e2) { logger.warn( "Failed to close a partially initialized socket.", e2); } throw new ChannelException("Failed to enter non-blocking mode.", e); } config = new DefaultServerSocketChannelConfig(socket.socket()); fireChannelOpen(this); }
fireChannelOpen方法的代码如下:
public static void fireChannelOpen(Channel channel) {
// Notify the parent handler.
if (channel.getParent() != null) {
fireChildChannelStateChanged(channel.getParent(), channel);
}
channel.getPipeline().sendUpstream(
new UpstreamChannelStateEvent(
channel, ChannelState.OPEN, Boolean.TRUE));
}
在DefaultChannelPipeline里面对应的sendUpstream方法为:
public void sendUpstream(ChannelEvent e) {
DefaultChannelHandlerContext head = getActualUpstreamContext(this.head);//取出之前放在pipeline中的ChannelHandlerContext对象
if (head == null) {
logger.warn(
“The pipeline contains no upstream handlers; discarding: ” + e);
return;
}
sendUpstream(head, e);
}
void sendUpstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
try {
((ChannelUpstreamHandler) ctx.getHandler()).handleUpstream(ctx, e);//getHandler()在刚启动BootStrap时取到的是Binder,其中 private final class Binder extends SimpleChannelUpstreamHandler 故会调用父类的handleUpstream(ctx,e)方法
} catch (Throwable t) {
notifyHandlerException(e, t);
}
}
进入Binder方法中的channelOpen()方法之前的方法栈调用如下:
紧接着的调用栈为:
未完待续!
相关文章推荐
- Netty学习之旅----源码分析Netty内存泄漏检测
- Netty学习之旅------源码分析Netty解码编码器实现原理
- Netty源码学习——ChannelPipeline模型分析
- Netty学习之旅------源码分析Netty线程本地分配机制与PooledByteBuf线程级对象池原理分析
- Netty学习之旅----源码分析netty服务端初始化流程(Reactor主从模式实现)
- Netty源码学习——EventLoopGroup原理:NioEventLoopGroup分析
- Netty学习之旅------源码分析ChannelPipeline实现原理
- DotNetty网络通信框架学习之源码分析
- Netty学习之旅------源码分析Netty内存池分配机制初探--PoolArena、PoolChunk、PoolSubpage等数据结构分析
- Netty学习之旅------再谈线程模型之源码分析NioEventLoopGroup、SingleThreadEventExecutor
- Netty学习之旅----源码分析内存分配与释放原理
- Netty学习6-ChanelHandler【2】调用过程源码分析
- Netty学习:ChannelHandler执行顺序详解,附源码分析
- DEDE源码分析与学习--index.php文件解读
- WinCE6.0学习之EBoot源码分析----startup.s
- ExtJs源码分析与学习—ExtJs元素Element(二)
- Linux设备驱动程序第三版学习(2)-字符设备驱动程序源码分析(续)
- WinCE6.0学习之EBoot源码分析----startup.s(五)
- Linux 学习数据专题【管理、编程、源码分析】——Linux相关图书选购指南
- Linux设备驱动程序第三版学习(1)-字符设备驱动程序源码分析