一步一步分析odl【第二弹】openflow协议栈的运行流程之建立tcp连接
2015-09-24 00:01
633 查看
接上一弹继续讲。
在上一弹最后讲到在SwitchConnectionProvider的startup()方法中创建了一个Tcp server,这个Server的实例为TcpHandler,
TcpHandler实现了ServerFacade接口,而ServerFacade接口又继承自如下接口,可见TcpHandler其实是实现了Runnable接口,是一个线程。
在startup()方法最后启动了这个线程:
那接下来我们看看这个线程都干了什么,找到TcpHandleer的run()方法:
channelInitializer是由ChannelInitializerFactory的createPublishingChannelInitializer()方法创建的实例
可见channelInitializer就是TcpChannelInitializer,channelInitializer持有如下实例:
deserializationFactory(DeserializationFactory) 用于解码tcp的数据包为openflow消息
serializationFactory(SerializationFactory) 用于将openflow消息编码为tcp数据包
switchConnectionHandler(SwitchConnectionHandlerImpl)用于管理设备连接事件(后面会讲到)
接下来查看下TcpChannelInitializer的构造方法,
可见在构造TcpChannelInitializer时还创建了一个ConnectionAdapterFactoryImpl实例,这是一个工厂类,由其创建的ConnectionAdapterImpl实例,是整个openflowjava的核心类非常重要。
TcpChannelInitializer明显就是用来处理tcp连接的,当有一条新的tcp请求发送到指定的端口时(6653或6633)会触发
TcpChannelInitializer的initChannel()方法,(如何触发的我觉得可以不用关心,有兴趣的同学可以自习学习netty的类库),下面给一个netty的框架图直观的感受下:
Netty逻辑架构图
从网络(TCP/UDP层)读取的inbound消息,需要经过解码,将二进制的数据报转换成应用层协议消息或者业务消息(对于odl来说就是openflow消息),才能够被上层的应用逻辑识别和处理;同理,用户发送到网络的outbound业务消息,需要经过编码转换成二进制字节数组(对于Netty就是ByteBuf)才能够发送到网络对端。编码和解码功能是NIO框架的有机组成部分,无论是由业务定制扩展实现,还是NIO框架内置编解码能力,该功能是必不可少的。
接着看initChannel()方法,这里面的OFDecoder和OFEncoder就对应上图中的DecoderHandler和EncoderHandler。
注意下这句:
调用的是SwitchConnectionHandlerImpl的onSwitchConnected方法
可见每一个tcp连接对应一个ConnectionAdapter实例,每一个ConnectionAdapter对应一个ConnectionConductor实例(ConnectionConductorImpl),创建ConnectionConductor实例的过程如下:
ConnectionConductorImpl的构造方法和init方法:
可见ConnectionConductorImpl中持有ConnectionAdapterImpl的实例以及QueueProcessorLightImpl的实例,而ConnectionConductorImpl同时又作为一个观察者,存在于ConnectionAdapterImpl的实例中。
讲到这,客户端与openflow协议栈的tcp连接已经建立了,接下来就是设备与协议栈之间的通信过程了,放在下一弹吧。
在上一弹最后讲到在SwitchConnectionProvider的startup()方法中创建了一个Tcp server,这个Server的实例为TcpHandler,
public class TcpHandler implements ServerFacade
TcpHandler实现了ServerFacade接口,而ServerFacade接口又继承自如下接口,可见TcpHandler其实是实现了Runnable接口,是一个线程。
public interface ServerFacade extends ShutdownProvider, OnlineProvider, Runnable
在startup()方法最后启动了这个线程:
new Thread(serverFacade).start(); result = serverFacade.getIsOnlineFuture();
那接下来我们看看这个线程都干了什么,找到TcpHandleer的run()方法:
/** * Starts server on selected port. * 在指定的端口上启动server */ @Override public void run() { /* * 下面基本上就是一个标准的Netty server的启动过程 */ //首先是新建两个线程池,根据配置(threadConfig)来设置线程池的大小 //如果threadConfig为null则使用默认设置,目前odl中使用的是默认值 //默认值为:Runtime.getRuntime().availableProcessors() * 2 //可见默认值也不是个固定的数 //bossGroup线程池用来接受客户端的连接请求 //workerGroup线程池用来处理boss线程池里面的连接的数据 if (threadConfig != null) { bossGroup = new NioEventLoopGroup(threadConfig.getBossThreadCount()); workerGroup = new NioEventLoopGroup(threadConfig.getWorkerThreadCount()); } else { bossGroup = new NioEventLoopGroup(); workerGroup = new NioEventLoopGroup(); } /* * We generally do not perform IO-unrelated tasks, so we want to have * all outstanding tasks completed before the executing thread goes * back into select. * 我们通常不执行IO无关的任务,所以我们想要把所有未完成的任务都完成后 * 再返回调用这个线程的地方 * Any other setting means netty will measure the time it spent selecting * and spend roughly proportional time executing tasks. */ workerGroup.setIoRatio(100); final ChannelFuture f; try { ServerBootstrap b = new ServerBootstrap(); //下面的一些配置主要关心channelInitializer就好了 //channelInitializer是用来处理连接的 b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.DEBUG)) .childHandler(channelInitializer) .option(ChannelOption.SO_BACKLOG, 128) .option(ChannelOption.SO_REUSEADDR, true) .childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.TCP_NODELAY , true) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, DEFAULT_WRITE_HIGH_WATERMARK * 1024) .childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, DEFAULT_WRITE_LOW_WATERMARK * 1024) .childOption(ChannelOption.WRITE_SPIN_COUNT, DEFAULT_WRITE_SPIN_COUNT); //startupAddress默认是null if (startupAddress != null) { f = b.bind(startupAddress.getHostAddress(), port).sync(); } else { f = b.bind(port).sync(); } } catch (InterruptedException e) { LOGGER.error("Interrupted while binding port {}", port, e); return; } //后面是一些收尾工作,可以不用关心了 try { InetSocketAddress isa = (InetSocketAddress) f.channel().localAddress(); address = isa.getHostString(); // Update port, as it may have been specified as 0 this.port = isa.getPort(); LOGGER.debug("address from tcphandler: {}", address); isOnlineFuture.set(true); LOGGER.info("Switch listener started and ready to accept incoming tcp/tls connections on port: {}", port); f.channel().closeFuture().sync(); } catch (InterruptedException e) { LOGGER.error("Interrupted while waiting for port {} shutdown", port, e); } finally { shutdown(); } }
channelInitializer是由ChannelInitializerFactory的createPublishingChannelInitializer()方法创建的实例
/** * @return PublishingChannelInitializer that initializes new channels */ public TcpChannelInitializer createPublishingChannelInitializer() { TcpChannelInitializer initializer = new TcpChannelInitializer(); initializer.setSwitchIdleTimeout(switchIdleTimeOut); initializer.setDeserializationFactory(deserializationFactory); initializer.setSerializationFactory(serializationFactory); initializer.setTlsConfiguration(tlsConfig); initializer.setSwitchConnectionHandler(switchConnectionHandler); initializer.setOutboungQueueSize(outboundQueueSize); return initializer; }
可见channelInitializer就是TcpChannelInitializer,channelInitializer持有如下实例:
deserializationFactory(DeserializationFactory) 用于解码tcp的数据包为openflow消息
serializationFactory(SerializationFactory) 用于将openflow消息编码为tcp数据包
switchConnectionHandler(SwitchConnectionHandlerImpl)用于管理设备连接事件(后面会讲到)
接下来查看下TcpChannelInitializer的构造方法,
/** * default ctor */ public TcpChannelInitializer() { this( new DefaultChannelGroup("netty-receiver", null), new ConnectionAdapterFactoryImpl() ); } /** * Testing Constructor * */ protected TcpChannelInitializer( DefaultChannelGroup channelGroup, ConnectionAdapterFactory connAdaptorFactory ) { allChannels = channelGroup ; connectionAdapterFactory = connAdaptorFactory ; }
可见在构造TcpChannelInitializer时还创建了一个ConnectionAdapterFactoryImpl实例,这是一个工厂类,由其创建的ConnectionAdapterImpl实例,是整个openflowjava的核心类非常重要。
TcpChannelInitializer明显就是用来处理tcp连接的,当有一条新的tcp请求发送到指定的端口时(6653或6633)会触发
TcpChannelInitializer的initChannel()方法,(如何触发的我觉得可以不用关心,有兴趣的同学可以自习学习netty的类库),下面给一个netty的框架图直观的感受下:
Netty逻辑架构图
从网络(TCP/UDP层)读取的inbound消息,需要经过解码,将二进制的数据报转换成应用层协议消息或者业务消息(对于odl来说就是openflow消息),才能够被上层的应用逻辑识别和处理;同理,用户发送到网络的outbound业务消息,需要经过编码转换成二进制字节数组(对于Netty就是ByteBuf)才能够发送到网络对端。编码和解码功能是NIO框架的有机组成部分,无论是由业务定制扩展实现,还是NIO框架内置编解码能力,该功能是必不可少的。
接着看initChannel()方法,这里面的OFDecoder和OFEncoder就对应上图中的DecoderHandler和EncoderHandler。
@Override protected void initChannel(final SocketChannel ch) { InetAddress switchAddress = ch.remoteAddress().getAddress(); int port = ch.localAddress().getPort(); int remotePort = ch.remoteAddress().getPort(); LOGGER.info("Incoming connection from (remote address): " + switchAddress.toString() + ":" + remotePort + " --> :" + port); if (!getSwitchConnectionHandler().accept(switchAddress)) { ch.disconnect(); LOGGER.info("Incoming connection rejected"); return; } LOGGER.info("Incoming connection accepted - building pipeline"); allChannels.add(ch); ConnectionFacade connectionFacade = null; //请直接看这里 //下面这个语句是为每个tcp的channel创建一个ConnectionAdapterImpl实例 //也就是说有几个tcp连接就会有几个ConnectionAdapterImpl实例 connectionFacade = connectionAdapterFactory.createConnectionFacade(ch, null, outboundQueueSize); try { LOGGER.debug("calling plugin: " + getSwitchConnectionHandler()); //再看这里通知SwitchConnectionHandler与一个客户端建立tcp连接了 //对于sdn来说一个客户端就是一个设备 getSwitchConnectionHandler().onSwitchConnected(connectionFacade); connectionFacade.checkListeners(); //请注意下面添加handler的顺序 ch.pipeline().addLast(PipelineHandlers.IDLE_HANDLER.name(), new IdleHandler(getSwitchIdleTimeout(), TimeUnit.MILLISECONDS)); boolean tlsPresent = false; // If this channel is configured to support SSL it will only support SSL if (getTlsConfiguration() != null) { tlsPresent = true; SslContextFactory sslFactory = new SslContextFactory(getTlsConfiguration()); SSLEngine engine = sslFactory.getServerContext().createSSLEngine(); engine.setNeedClientAuth(true); engine.setUseClientMode(false); ch.pipeline().addLast(PipelineHandlers.SSL_HANDLER.name(), new SslHandler(engine)); } ch.pipeline().addLast(PipelineHandlers.OF_FRAME_DECODER.name(), new OFFrameDecoder(connectionFacade, tlsPresent)); //openflow版本检测 ch.pipeline().addLast(PipelineHandlers.OF_VERSION_DETECTOR.name(), new OFVersionDetector()); //这个OFDecoder用于解码 OFDecoder ofDecoder = new OFDecoder(); ofDecoder.setDeserializationFactory(getDeserializationFactory()); ch.pipeline().addLast(PipelineHandlers.OF_DECODER.name(), ofDecoder); //这个OFEncoder用于编码 OFEncoder ofEncoder = new OFEncoder(); ofEncoder.setSerializationFactory(getSerializationFactory()); ch.pipeline().addLast(PipelineHandlers.OF_ENCODER.name(), ofEncoder); //解码后的消息由它处理 ch.pipeline().addLast(PipelineHandlers.DELEGATING_INBOUND_HANDLER.name(), new DelegatingInboundHandler(connectionFacade)); if (!tlsPresent) { connectionFacade.fireConnectionReadyNotification(); } } catch (Exception e) { LOGGER.error("Failed to initialize channel", e); ch.close(); } }
注意下这句:
getSwitchConnectionHandler().onSwitchConnected(connectionFacade);
调用的是SwitchConnectionHandlerImpl的onSwitchConnected方法
@Override public void onSwitchConnected(ConnectionAdapter connectionAdapter) { ConnectionConductor conductor = ConnectionConductorFactory.createConductor( connectionAdapter, queueProcessor); conductor.setErrorHandler(errorHandler); }
可见每一个tcp连接对应一个ConnectionAdapter实例,每一个ConnectionAdapter对应一个ConnectionConductor实例(ConnectionConductorImpl),创建ConnectionConductor实例的过程如下:
public static ConnectionConductor createConductor(ConnectionAdapter connectionAdapter, QueueProcessor<OfHeader, DataObject> queueProcessor) { ConnectionConductor connectionConductor = new ConnectionConductorImpl(connectionAdapter); connectionConductor.setQueueProcessor(queueProcessor); connectionConductor.setId(conductorId.getAndIncrement()); connectionConductor.init(); return connectionConductor; }
ConnectionConductorImpl的构造方法和init方法:
public ConnectionConductorImpl(ConnectionAdapter connectionAdapter, int ingressMaxQueueSize) { this.connectionAdapter = connectionAdapter; this.ingressMaxQueueSize = ingressMaxQueueSize; conductorState = CONDUCTOR_STATE.HANDSHAKING; firstHelloProcessed = false; handshakeManager = new HandshakeManagerImpl(connectionAdapter, ConnectionConductor.versionOrder.get(0), ConnectionConductor.versionOrder); handshakeManager.setUseVersionBitmap(isBitmapNegotiationEnable); handshakeManager.setHandshakeListener(this); portFeaturesUtils = PortFeaturesUtil.getInstance(); } @Override public void init() { int handshakeThreadLimit = 1; hsPool = new ThreadPoolLoggingExecutor(handshakeThreadLimit, handshakeThreadLimit, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), "OFHandshake-" + conductorId); connectionAdapter.setMessageListener(this); connectionAdapter.setSystemListener(this); connectionAdapter.setConnectionReadyListener(this); queue = QueueKeeperFactory.createFairQueueKeeper(queueProcessor, ingressMaxQueueSize); }
可见ConnectionConductorImpl中持有ConnectionAdapterImpl的实例以及QueueProcessorLightImpl的实例,而ConnectionConductorImpl同时又作为一个观察者,存在于ConnectionAdapterImpl的实例中。
讲到这,客户端与openflow协议栈的tcp连接已经建立了,接下来就是设备与协议栈之间的通信过程了,放在下一弹吧。
相关文章推荐
- 承接游戏UI美术外包【厦门巨游网络科技有限公司】
- 湖北省政府发文提出建立基金 奖励有突出贡献乡村教师
- HTTP协议中POST、GET、HEAD、PUT等请求方法以及一些常见错误
- HDU 5029 Relief grain (2014年广州赛区网络赛H题)
- nine-patch 转载制作方式http://2528.iteye.com/blog/1326647
- Xcode7 使用NSURLSession发送HTTP请求报错
- .9.png 转载地址http://www.cnblogs.com/lwbqqyumidi/p/3373070.html
- TCP/UDP编程
- HTTPS那些事(二)SSL证书
- 多核心Linux内核路径优化的不二法门之-多核心平台TCP优化
- https://github.com/yrs244742688/GeneratePemWithMoAndEx RSA加密
- HTTPS连接的前几毫秒发生了什么——Amazon HTTPS案例分析
- 多核心Linux内核路径优化的不二法门之-多核心平台TCP优化
- Android网络状态的监听
- TCP/IP 抓包分析
- 网络请求封装2
- HDU 5437 Alisha’s Party(优先队列)(2015网络赛长春站)
- 网络请求的封装
- HTTP状态码
- 2015 ACM北京网络赛J题 hiho1236