您的位置:首页 > 其它

netty5笔记-总体流程分析5-客户端连接过程

2016-01-26 18:00 453 查看

前面几篇文章,我们从服务端的视角,分析了从启动到接收连接,到连接的read-write-close。现在我们开始切换到客户端的视角,看看客户端连接服务端的一些实现细节。

还是从snoop的example代码开始,见HttpSnoopClient(稍有修改):

 

public static void main(String[] args) throws Exception {
// 配制client.
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new HttpSnoopClientInitializer(null));

// 创建连接.
Channel ch = b.connect("127.0.0.1", 80).sync().channel();

// 构造HTTP请求.
HttpRequest request = new DefaultFullHttpRequest(
HttpVersion.HTTP_1_1, HttpMethod.GET, uri.getRawPath());
request.headers().set(HttpHeaderNames.HOST, host);
request.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
request.headers().set(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.GZIP);

// 发送HTTP请求.
ch.writeAndFlush(request);

// 等待服务端关闭连接
ch.closeFuture().sync();
} finally {
// 关闭group.
group.shutdownGracefully();
}
}

有了前面ServerBootstrap的铺垫,这里就比较简单了,ServerBootstrap和Bootstrap都继承自AbstractBootstrap。但需要注意的是,在ServerBootstrap中group、channel、handler都是针对NioServerSocketChannel的设置,而切到Bootstrap后,group、channel、handler则是针对NioSocketChannel的设置了。相应的在ServerBootstrap中设置NioSocketChannel的属性和选项使用childAttr、childOption,而Bootstrap中设置NioSocketChannel则是直接使用attr、option。

 

下面直接进入主题,b.connect("127.0.0.1", 80)创建了一个客户端的链接,调用的方法:

 

public ChannelFuture connect(String inetHost, int inetPort) {
return connect(InetSocketAddress.createUnresolved(inetHost, inetPort));
}

/**
* 创建一个连接.
*/
public ChannelFuture connect(SocketAddress remoteAddress) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
}
// 主要验证必要的参数是否设置,如group何channelFactory(通过channel方法设置)
validate();
// 解析并连接
return doResolveAndConnect(remoteAddress, localAddress());
}

private ChannelFuture doResolveAndConnect(SocketAddress remoteAddress, final SocketAddress localAddress) {
// 初始化并注册到EventLoop(注册这块过程与server端类似,不再细讲)
final ChannelFuture regFuture = initAndRegister();
if (regFuture.cause() != null) {
return regFuture;
}

final Channel channel = regFuture.channel();
final EventLoop eventLoop = channel.eventLoop();
final NameResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);

if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {
// Resolver has no idea about what to do with the specified remote address or it's resolved already.
return doConnect(remoteAddress, localAddress, regFuture, channel.newPromise());
}
// 对remoteAddress进行解析,如果地址为ip(127.0.0.1)则直接返回,如地址为域名(www.baidu.com)则需要解析为ip(180.97.33.107)
final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);
final Throwable resolveFailureCause = resolveFuture.cause();

if (resolveFailureCause != null) {
// Failed to resolve immediately
channel.close();
return channel.newFailedFuture(resolveFailureCause);
}

// 在解析完成后调用doConnect(最终调用doConnect0)连接远程的端口
if (resolveFuture.isDone()) {
// Succeeded to resolve immediately; cached? (or did a blocking lookup)
return doConnect(resolveFuture.getNow(), localAddress, regFuture, channel.newPromise());
}

// Wait until the name resolution is finished.
final ChannelPromise connectPromise = channel.newPromise();
resolveFuture.addListener(new FutureListener<SocketAddress>() {
@Override
public void operationComplete(Future<SocketAddress> future) throws Exception {
if (future.cause() != null) {
channel.close();
connectPromise.setFailure(future.cause());
} else {
doConnect(future.getNow(), localAddress, regFuture, connectPromise);
}
}
});

return connectPromise;
}

// 初始化pipeline、各个option和attr
void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
p.addLast(handler());

final Map<ChannelOption<?>, Object> options = options();
synchronized (options) {
for (Entry<ChannelOption<?>, Object> e: options.entrySet()) {
try {
if (!channel.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + channel, t);
}
}
}

final Map<AttributeKey<?>, Object> attrs = attrs();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
}
}

private static void doConnect0(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelFuture regFuture,
final ChannelPromise connectPromise) {

// 这个方法在 channelRegistered()之前调用. 这样就给handlers机会在channelRegistered()方法中设置pipeline
final Channel channel = connectPromise.channel();
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
// 执行channel的connect方法
if (localAddress == null) {
channel.connect(remoteAddress, connectPromise);
} else {
channel.connect(remoteAddress, localAddress, connectPromise);
}
connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
connectPromise.setFailure(regFuture.cause());
}
}
});
}

这个过程比较简单,1、创建一个NioSocketChannel实例,并用Bootstrap中的参数初始化该实例;2、将创建的channel注册到EventLoop中;3、创建一个解析host的任务(如果是ip,任务直接完成,如果是域名需要将域名解析为ip);4、解析完成后,在EventLoop中调用channel.connect,连接到远程端口。

channel.connect将connect任务交给pipeline去处理,最终调用到TailContext中的connect方法,该方法调用代码:unsafe.connect(...)。

 

// AbstractNioUnsafe
public final void connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}

try {
if (connectPromise != null) {
throw new IllegalStateException("connection attempt already made");
}

boolean wasActive = isActive();
// 创建连接,如果连接立即成功了则返回true, 否则会返回false
if (doConnect(remoteAddress, localAddress)) {
// 完成连接过程
fulfillConnectPromise(promise, wasActive);
} else {
connectPromise = promise;
requestedRemoteAddress = remoteAddress;

// 如果设置了连接超时时间,则创建一个超时检测任务,如果超时未连接成功则关闭连接
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new OneTimeTask() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
// 如果连接被取消,则关闭前面创建的超时检测任务并关闭连接
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isCancelled()) {
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
close(voidPromise());
}
}
});
}
} catch (Throwable t) {
promise.tryFailure(annotateConnectException(t, remoteAddress));
closeIfClosed();
}
}

private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
if (promise == null) {
// Closed via cancellation and the promise has been notified already.
return;
}

// 如果取消了连接则此处会返回false
boolean promiseSet = promise.trySuccess();

// 只要是连接确实打开了,则无论是否被取消channelActive都会触发
if (!wasActive && isActive()) {
pipeline().fireChannelActive();
}

// 如果用户取消了连接,则会调用close方法,此方法会触发channelInactive
if (!promiseSet) {
close(voidPromise());
}
}

// doConnect来自NioSocketChannel
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (localAddress != null) {
javaChannel().socket().bind(localAddress);
}

boolean success = false;
try {
// 发起连接请求,由于是非阻塞模式,因此主要是两种情况:1、连接本地,可能会立即完成,此时返回true;2、其他情况,没有立即完成连接,返回false
boolean connected = javaChannel().connect(remoteAddress);
if (!connected) {
// 注册OP_CONNECT事件,如果连接成功则会进入NioEventLoop中的processSelectedKey方法
selectionKey().interestOps(SelectionKey.OP_CONNECT);
}
success = true;
return connected;
} finally {
if (!success) {
doClose();
}
}
}

// NioEventLoop中的processSelectedKey方法片段
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// 移除OP_CONNECT否则会导致cpu 100%
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
// 此调用最终也会调用上面的fulfillConnectPromise,进而触发后面的channelActive
unsafe.finishConnect();
}

上面的连接过程也是比较简单:

1、发起连接请求。1.1如果立即成功则执行连接成功的后续处理,如channelActive方法的调用;1.2如果未立即成功,则将连接事件注册到Selector;

2、Selector检测到连接事件后触发unsafe.finishConnect,该方法最终也执行连接成功的后续处理(同1.1);

3、1.2的分支会创建一个超时检测任务,如果超过指定时间未连接成功,则直接关闭此次连接请求。

 

 

 

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: