您的位置:首页 > 其它

netty5笔记-总体流程分析5-客户端连接过程

2016-01-26 18:00 453 查看
前面几篇文章,我们从服务端的视角,分析了从启动到接收连接,到连接的read-write-close。现在我们开始切换到客户端的视角,看看客户端连接服务端的一些实现细节。

还是从snoop的example代码开始,见HttpSnoopClient(稍有修改):

public static void main(String[] args) throws Exception {
// 配制client.
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new HttpSnoopClientInitializer(null));

// 创建连接.
Channel ch = b.connect("127.0.0.1", 80).sync().channel();

// 构造HTTP请求.
HttpRequest request = new DefaultFullHttpRequest(
HttpVersion.HTTP_1_1, HttpMethod.GET, uri.getRawPath());
request.headers().set(HttpHeaderNames.HOST, host);
request.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
request.headers().set(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.GZIP);

// 发送HTTP请求.
ch.writeAndFlush(request);

// 等待服务端关闭连接
ch.closeFuture().sync();
} finally {
// 关闭group.
group.shutdownGracefully();
}
}
有了前面ServerBootstrap的铺垫,这里就比较简单了,ServerBootstrap和Bootstrap都继承自AbstractBootstrap。但需要注意的是,在ServerBootstrap中group、channel、handler都是针对NioServerSocketChannel的设置,而切到Bootstrap后,group、channel、handler则是针对NioSocketChannel的设置了。相应的在ServerBootstrap中设置NioSocketChannel的属性和选项使用childAttr、childOption,而Bootstrap中设置NioSocketChannel则是直接使用attr、option。

下面直接进入主题,b.connect("127.0.0.1", 80)创建了一个客户端的链接,调用的方法:

public ChannelFuture connect(String inetHost, int inetPort) {
return connect(InetSocketAddress.createUnresolved(inetHost, inetPort));
}

/**
* 创建一个连接.
*/
public ChannelFuture connect(SocketAddress remoteAddress) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
}
// 主要验证必要的参数是否设置,如group何channelFactory(通过channel方法设置)
validate();
// 解析并连接
 return doResolveAndConnect(remoteAddress, localAddress());
}

    private ChannelFuture doResolveAndConnect(SocketAddress remoteAddress, final SocketAddress localAddress) {
// 初始化并注册到EventLoop(注册这块过程与server端类似,不再细讲)
        final ChannelFuture regFuture = initAndRegister();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        final Channel channel = regFuture.channel();
        final EventLoop eventLoop = channel.eventLoop();
        final NameResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);

        if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {
            // Resolver has no idea about what to do with the specified remote address or it's resolved already.
            return doConnect(remoteAddress, localAddress, regFuture, channel.newPromise());
        }
// 对remoteAddress进行解析,如果地址为ip(127.0.0.1)则直接返回,如地址为域名(www.baidu.com)则需要解析为ip(180.97.33.107)
        final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);
        final Throwable resolveFailureCause = resolveFuture.cause();

        if (resolveFailureCause != null) {
            // Failed to resolve immediately
            channel.close();
            return channel.newFailedFuture(resolveFailureCause);
        }

// 在解析完成后调用doConnect(最终调用doConnect0)连接远程的端口
        if (resolveFuture.isDone()) {
            // Succeeded to resolve immediately; cached? (or did a blocking lookup)
            return doConnect(resolveFuture.getNow(), localAddress, regFuture, channel.newPromise());
        }

        // Wait until the name resolution is finished.
        final ChannelPromise connectPromise = channel.newPromise();
        resolveFuture.addListener(new FutureListener<SocketAddress>() {
            @Override
            public void operationComplete(Future<SocketAddress> future) throws Exception {
                if (future.cause() != null) {
                    channel.close();
                    connectPromise.setFailure(future.cause());
                } else {
                    doConnect(future.getNow(), localAddress, regFuture, connectPromise);
                }
            }
        });

        return connectPromise;
    }

// 初始化pipeline、各个option和attr
void init(Channel channel) throws Exception {
        ChannelPipeline p = channel.pipeline();
        p.addLast(handler());

        final Map<ChannelOption<?>, Object> options = options();
        synchronized (options) {
            for (Entry<ChannelOption<?>, Object> e: options.entrySet()) {
                try {
                    if (!channel.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
                        logger.warn("Unknown channel option: " + e);
                    }
                } catch (Throwable t) {
                    logger.warn("Failed to set a channel option: " + channel, t);
                }
            }
        }

        final Map<AttributeKey<?>, Object> attrs = attrs();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }
        }
    }

private static void doConnect0(
            final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelFuture regFuture,
            final ChannelPromise connectPromise) {

        // 这个方法在 channelRegistered()之前调用. 这样就给handlers机会在channelRegistered()方法中设置pipeline
        final Channel channel = connectPromise.channel();
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
// 执行channel的connect方法
                    if (localAddress == null) {
                        channel.connect(remoteAddress, connectPromise);
                    } else {
                        channel.connect(remoteAddress, localAddress, connectPromise);
                    }
                    connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    connectPromise.setFailure(regFuture.cause());
                }
            }
        });
    }

这个过程比较简单,1、创建一个NioSocketChannel实例,并用Bootstrap中的参数初始化该实例;2、将创建的channel注册到EventLoop中;3、创建一个解析host的任务(如果是ip,任务直接完成,如果是域名需要将域名解析为ip);4、解析完成后,在EventLoop中调用channel.connect,连接到远程端口。

channel.connect将connect任务交给pipeline去处理,最终调用到TailContext中的connect方法,该方法调用代码:unsafe.connect(...)。

// AbstractNioUnsafe
public final void connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}

try {
if (connectPromise != null) {
throw new IllegalStateException("connection attempt already made");
}

boolean wasActive = isActive();
// 创建连接,如果连接立即成功了则返回true, 否则会返回false
if (doConnect(remoteAddress, localAddress)) {
// 完成连接过程
fulfillConnectPromise(promise, wasActive);
} else {
connectPromise = promise;
requestedRemoteAddress = remoteAddress;

// 如果设置了连接超时时间,则创建一个超时检测任务,如果超时未连接成功则关闭连接
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new OneTimeTask() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
// 如果连接被取消,则关闭前面创建的超时检测任务并关闭连接
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isCancelled()) {
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
close(voidPromise());
}
}
});
}
} catch (Throwable t) {
promise.tryFailure(annotateConnectException(t, remoteAddress));
closeIfClosed();
}
}

private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
            if (promise == null) {
                // Closed via cancellation and the promise has been notified already.
                return;
            }

            // 如果取消了连接则此处会返回false
            boolean promiseSet = promise.trySuccess();

            // 只要是连接确实打开了,则无论是否被取消channelActive都会触发
            if (!wasActive && isActive()) {
                pipeline().fireChannelActive();
            }

            // 如果用户取消了连接,则会调用close方法,此方法会触发channelInactive
            if (!promiseSet) {
                close(voidPromise());
            }
        }

// doConnect来自NioSocketChannel
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
        if (localAddress != null) {
            javaChannel().socket().bind(localAddress);
        }

        boolean success = false;
        try {
// 发起连接请求,由于是非阻塞模式,因此主要是两种情况:1、连接本地,可能会立即完成,此时返回true;2、其他情况,没有立即完成连接,返回false
            boolean connected = javaChannel().connect(remoteAddress);
            if (!connected) {
// 注册OP_CONNECT事件,如果连接成功则会进入NioEventLoop中的processSelectedKey方法
                selectionKey().interestOps(SelectionKey.OP_CONNECT);
            }
            success = true;
            return connected;
        } finally {
            if (!success) {
                doClose();
            }
        }
    }

// NioEventLoop中的processSelectedKey方法片段       
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // 移除OP_CONNECT否则会导致cpu 100%
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);
// 此调用最终也会调用上面的fulfillConnectPromise,进而触发后面的channelActive
                unsafe.finishConnect();
            }
上面的连接过程也是比较简单:

1、发起连接请求。1.1如果立即成功则执行连接成功的后续处理,如channelActive方法的调用;1.2如果未立即成功,则将连接事件注册到Selector;

2、Selector检测到连接事件后触发unsafe.finishConnect,该方法最终也执行连接成功的后续处理(同1.1);

3、1.2的分支会创建一个超时检测任务,如果超过指定时间未连接成功,则直接关闭此次连接请求。

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: