您的位置:首页 > 理论基础 > 计算机网络

一步一步分析odl【第二弹】openflow协议栈的运行流程之建立tcp连接

2015-09-24 00:01 633 查看
接上一弹继续讲。
在上一弹最后讲到在SwitchConnectionProvider的startup()方法中创建了一个Tcp server,这个Server的实例为TcpHandler,
public class TcpHandler implements ServerFacade 


TcpHandler实现了ServerFacade接口,而ServerFacade接口又继承自如下接口,可见TcpHandler其实是实现了Runnable接口,是一个线程。

public interface ServerFacade extends ShutdownProvider, OnlineProvider, Runnable

在startup()方法最后启动了这个线程:

new Thread(serverFacade).start();
result = serverFacade.getIsOnlineFuture();

那接下来我们看看这个线程都干了什么,找到TcpHandleer的run()方法:

/**
* Starts server on selected port.
* 在指定的端口上启动server
*/
@Override
public void run() {
/*
* 下面基本上就是一个标准的Netty server的启动过程
*/
//首先是新建两个线程池,根据配置(threadConfig)来设置线程池的大小
//如果threadConfig为null则使用默认设置,目前odl中使用的是默认值
//默认值为:Runtime.getRuntime().availableProcessors() * 2
//可见默认值也不是个固定的数
//bossGroup线程池用来接受客户端的连接请求
//workerGroup线程池用来处理boss线程池里面的连接的数据
if (threadConfig != null) {
bossGroup = new NioEventLoopGroup(threadConfig.getBossThreadCount());
workerGroup = new NioEventLoopGroup(threadConfig.getWorkerThreadCount());
} else {
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
}

/*
* We generally do not perform IO-unrelated tasks, so we want to have
* all outstanding tasks completed before the executing thread goes
* back into select.
* 我们通常不执行IO无关的任务,所以我们想要把所有未完成的任务都完成后
* 再返回调用这个线程的地方
* Any other setting means netty will measure the time it spent selecting
* and spend roughly proportional time executing tasks.
*/
workerGroup.setIoRatio(100);

final ChannelFuture f;
try {
ServerBootstrap b = new ServerBootstrap();
//下面的一些配置主要关心channelInitializer就好了
//channelInitializer是用来处理连接的
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.DEBUG))
.childHandler(channelInitializer)
.option(ChannelOption.SO_BACKLOG, 128)
.option(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY , true)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, DEFAULT_WRITE_HIGH_WATERMARK * 1024)
.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, DEFAULT_WRITE_LOW_WATERMARK * 1024)
.childOption(ChannelOption.WRITE_SPIN_COUNT, DEFAULT_WRITE_SPIN_COUNT);

//startupAddress默认是null
if (startupAddress != null) {
f = b.bind(startupAddress.getHostAddress(), port).sync();
} else {
f = b.bind(port).sync();
}
} catch (InterruptedException e) {
LOGGER.error("Interrupted while binding port {}", port, e);
return;
}
//后面是一些收尾工作,可以不用关心了
try {
InetSocketAddress isa = (InetSocketAddress) f.channel().localAddress();
address = isa.getHostString();

// Update port, as it may have been specified as 0
this.port = isa.getPort();

LOGGER.debug("address from tcphandler: {}", address);
isOnlineFuture.set(true);
LOGGER.info("Switch listener started and ready to accept incoming tcp/tls connections on port: {}", port);
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
LOGGER.error("Interrupted while waiting for port {} shutdown", port, e);
} finally {
shutdown();
}
}

channelInitializer是由ChannelInitializerFactory的createPublishingChannelInitializer()方法创建的实例

/**
* @return PublishingChannelInitializer that initializes new channels
*/
public TcpChannelInitializer createPublishingChannelInitializer() {
TcpChannelInitializer initializer = new TcpChannelInitializer();
initializer.setSwitchIdleTimeout(switchIdleTimeOut);
initializer.setDeserializationFactory(deserializationFactory);
initializer.setSerializationFactory(serializationFactory);
initializer.setTlsConfiguration(tlsConfig);
initializer.setSwitchConnectionHandler(switchConnectionHandler);
initializer.setOutboungQueueSize(outboundQueueSize);
return initializer;
}

可见channelInitializer就是TcpChannelInitializer,channelInitializer持有如下实例:
deserializationFactory(DeserializationFactory) 用于解码tcp的数据包为openflow消息
serializationFactory(SerializationFactory) 用于将openflow消息编码为tcp数据包
switchConnectionHandler(SwitchConnectionHandlerImpl)用于管理设备连接事件(后面会讲到)
接下来查看下TcpChannelInitializer的构造方法,

/**
* default ctor
*/
public TcpChannelInitializer() {
this( new DefaultChannelGroup("netty-receiver", null), new ConnectionAdapterFactoryImpl() );
}

/**
* Testing Constructor
*
*/
protected TcpChannelInitializer( DefaultChannelGroup channelGroup, ConnectionAdapterFactory connAdaptorFactory ) {
allChannels = channelGroup ;
connectionAdapterFactory = connAdaptorFactory ;
}

可见在构造TcpChannelInitializer时还创建了一个ConnectionAdapterFactoryImpl实例,这是一个工厂类,由其创建的ConnectionAdapterImpl实例,是整个openflowjava的核心类非常重要。
TcpChannelInitializer明显就是用来处理tcp连接的,当有一条新的tcp请求发送到指定的端口时(6653或6633)会触发

TcpChannelInitializer的initChannel()方法,(如何触发的我觉得可以不用关心,有兴趣的同学可以自习学习netty的类库),下面给一个netty的框架图直观的感受下:







Netty逻辑架构图

从网络(TCP/UDP层)读取的inbound消息,需要经过解码,将二进制的数据报转换成应用层协议消息或者业务消息(对于odl来说就是openflow消息),才能够被上层的应用逻辑识别和处理;同理,用户发送到网络的outbound业务消息,需要经过编码转换成二进制字节数组(对于Netty就是ByteBuf)才能够发送到网络对端。编码和解码功能是NIO框架的有机组成部分,无论是由业务定制扩展实现,还是NIO框架内置编解码能力,该功能是必不可少的。

接着看initChannel()方法,这里面的OFDecoder和OFEncoder就对应上图中的DecoderHandler和EncoderHandler。

@Override
protected void initChannel(final SocketChannel ch) {
InetAddress switchAddress = ch.remoteAddress().getAddress();
int port = ch.localAddress().getPort();
int remotePort = ch.remoteAddress().getPort();
LOGGER.info("Incoming connection from (remote address): " + switchAddress.toString()
+ ":" + remotePort + " --> :" + port);
if (!getSwitchConnectionHandler().accept(switchAddress)) {
ch.disconnect();
LOGGER.info("Incoming connection rejected");
return;
}
LOGGER.info("Incoming connection accepted - building pipeline");
allChannels.add(ch);
ConnectionFacade connectionFacade = null;
//请直接看这里
//下面这个语句是为每个tcp的channel创建一个ConnectionAdapterImpl实例
//也就是说有几个tcp连接就会有几个ConnectionAdapterImpl实例
connectionFacade = connectionAdapterFactory.createConnectionFacade(ch, null, outboundQueueSize);
try {
LOGGER.debug("calling plugin: " + getSwitchConnectionHandler());
//再看这里通知SwitchConnectionHandler与一个客户端建立tcp连接了
//对于sdn来说一个客户端就是一个设备
getSwitchConnectionHandler().onSwitchConnected(connectionFacade);
connectionFacade.checkListeners();
//请注意下面添加handler的顺序
ch.pipeline().addLast(PipelineHandlers.IDLE_HANDLER.name(), new IdleHandler(getSwitchIdleTimeout(), TimeUnit.MILLISECONDS));
boolean tlsPresent = false;

// If this channel is configured to support SSL it will only support SSL
if (getTlsConfiguration() != null) {
tlsPresent = true;
SslContextFactory sslFactory = new SslContextFactory(getTlsConfiguration());
SSLEngine engine = sslFactory.getServerContext().createSSLEngine();
engine.setNeedClientAuth(true);
engine.setUseClientMode(false);
ch.pipeline().addLast(PipelineHandlers.SSL_HANDLER.name(), new SslHandler(engine));
}
ch.pipeline().addLast(PipelineHandlers.OF_FRAME_DECODER.name(),
new OFFrameDecoder(connectionFacade, tlsPresent));
//openflow版本检测
ch.pipeline().addLast(PipelineHandlers.OF_VERSION_DETECTOR.name(), new OFVersionDetector());
//这个OFDecoder用于解码
OFDecoder ofDecoder = new OFDecoder();
ofDecoder.setDeserializationFactory(getDeserializationFactory());
ch.pipeline().addLast(PipelineHandlers.OF_DECODER.name(), ofDecoder);
//这个OFEncoder用于编码
OFEncoder ofEncoder = new OFEncoder();
ofEncoder.setSerializationFactory(getSerializationFactory());
ch.pipeline().addLast(PipelineHandlers.OF_ENCODER.name(), ofEncoder);
//解码后的消息由它处理
ch.pipeline().addLast(PipelineHandlers.DELEGATING_INBOUND_HANDLER.name(), new DelegatingInboundHandler(connectionFacade));
if (!tlsPresent) {
connectionFacade.fireConnectionReadyNotification();
}
} catch (Exception e) {
LOGGER.error("Failed to initialize channel", e);
ch.close();
}
}


注意下这句:

getSwitchConnectionHandler().onSwitchConnected(connectionFacade);

调用的是SwitchConnectionHandlerImpl的onSwitchConnected方法

@Override
public void onSwitchConnected(ConnectionAdapter connectionAdapter) {
ConnectionConductor conductor = ConnectionConductorFactory.createConductor(
connectionAdapter, queueProcessor);
conductor.setErrorHandler(errorHandler);
}

可见每一个tcp连接对应一个ConnectionAdapter实例,每一个ConnectionAdapter对应一个ConnectionConductor实例(ConnectionConductorImpl),创建ConnectionConductor实例的过程如下:

public static ConnectionConductor createConductor(ConnectionAdapter connectionAdapter,
QueueProcessor<OfHeader, DataObject> queueProcessor) {
ConnectionConductor connectionConductor = new ConnectionConductorImpl(connectionAdapter);
connectionConductor.setQueueProcessor(queueProcessor);
connectionConductor.setId(conductorId.getAndIncrement());
connectionConductor.init();
return connectionConductor;
}

ConnectionConductorImpl的构造方法和init方法:

public ConnectionConductorImpl(ConnectionAdapter connectionAdapter, int ingressMaxQueueSize) {
this.connectionAdapter = connectionAdapter;
this.ingressMaxQueueSize = ingressMaxQueueSize;
conductorState = CONDUCTOR_STATE.HANDSHAKING;
firstHelloProcessed = false;
handshakeManager = new HandshakeManagerImpl(connectionAdapter,
ConnectionConductor.versionOrder.get(0), ConnectionConductor.versionOrder);
handshakeManager.setUseVersionBitmap(isBitmapNegotiationEnable);
handshakeManager.setHandshakeListener(this);
portFeaturesUtils = PortFeaturesUtil.getInstance();
}

@Override
public void init() {
int handshakeThreadLimit = 1;
hsPool = new ThreadPoolLoggingExecutor(handshakeThreadLimit, handshakeThreadLimit, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
"OFHandshake-" + conductorId);

connectionAdapter.setMessageListener(this);
connectionAdapter.setSystemListener(this);
connectionAdapter.setConnectionReadyListener(this);
queue = QueueKeeperFactory.createFairQueueKeeper(queueProcessor, ingressMaxQueueSize);
}

可见ConnectionConductorImpl中持有ConnectionAdapterImpl的实例以及QueueProcessorLightImpl的实例,而ConnectionConductorImpl同时又作为一个观察者,存在于ConnectionAdapterImpl的实例中。
讲到这,客户端与openflow协议栈的tcp连接已经建立了,接下来就是设备与协议栈之间的通信过程了,放在下一弹吧。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: