您的位置:首页 > 其它

spark源码阅读之network(2)

2015-11-22 12:09 399 查看
在上节的解读中发现spark的源码中大量使用netty的buffer部分的api,该节将看到netty核心的一些api,比如channel:在Netty里,Channel是通讯的载体(网络套接字或组件的连接),而ChannelHandler负责Channel中的逻辑处理,channel支持读,写,绑定本地端口,连接远程等,Channel中所有的操作都是异步的,当发生io操作的时候将会返回一个ChannelFutrue的接口,在ChannelFutrue里面可以处理操作成功、失败、取消后的动作。有了这些理解就可以看client部分的源码了
TransportClientFactory是一个创建TransportClient的工厂类,该类为每一个网络地址都提供了一个连接池,相同的主机返回相同的TransportClient。所有的TransportClient都共享一个EventLoopGroup,该类用于处理channel的上的事件的。
private static class ClientPool {

TransportClient[] clients;

Object[] locks;


public ClientPool(int size) {

clients = new TransportClient[size];

locks = new Object[size];

for (int i = 0; i < size; i++) {

locks[i] = new Object();

}

}

[/code]ClientPool表示一个连接池,每一个地址对应一个连接池。连接池的大小spark.shuffle.io.numConnectionsPerPeer来指定。该连接池怎么使用呢。用户传递一个地址进来,该地址作为key到connectionPool中查找该地址对应的连接池,没有就生成一个,获取连接池后需要随机的获取一个连接,这个时候连接池中锁就用到了。
public TransportClient createClient(String remoteHost, int remotePort) throws IOException {

// Get connection from the connection pool first.

// If it is not found or not active, create a new one.

final InetSocketAddress address = new InetSocketAddress(remoteHost, remotePort);


// Create the ClientPool if we don't have it yet.

ClientPool clientPool = connectionPool.get(address);

if (clientPool == null) {

connectionPool.putIfAbsent(address, new ClientPool(numConnectionsPerPeer));

clientPool = connectionPool.get(address);

}


int clientIndex = rand.nextInt(numConnectionsPerPeer);

TransportClient cachedClient = clientPool.clients[clientIndex];


if (cachedClient != null && cachedClient.isActive()) {

logger.trace("Returning cached connection to {}: {}", address, cachedClient);

return cachedClient;

}


// If we reach here, we don't have an existing connection open. Let's create a new one.

// Multiple threads might race here to create new connections. Keep only one of them active.

synchronized (clientPool.locks[clientIndex]) {

cachedClient = clientPool.clients[clientIndex];


if (cachedClient != null) {

if (cachedClient.isActive()) {

    logger.trace("Returning cached connection to {}: {}", address, cachedClient);

    return cachedClient;

} else {

logger.info("Found inactive connection to {}, creating a new one.", address);

}

}

clientPool.clients[clientIndex] = createClient(address);

return clientPool.clients[clientIndex];

}

}

[/code]怎么创建一个全新的TransportClient,这块要使用netty的bootstrap类帮忙了,主要bootstrap的配置,缓存分配器使用缓存池来管理。调用bootstrap的handler函数给bootstrap添加了一个ChannelHandler,当bootstrap连接成功后回调该ChannelHandler,在ChannelHandler的initchannel的监听方法里面获取了连接通道SocketChannel,使用TransportContext的initializePipeline来初始化通道,就是给通道添加监听器。在这个方法里面我们拿到了TransportClient和channel。
为什么要使用AtomicReference来保存他们的引用呢?
说说我的理解:内部类只能使用外部类的final变量,局部final变量必须声明的时候初始化,如果不使用AtomicReference就无法保持内部类的一些对象的引用。
*/

public TransportClient createUnmanagedClient(String remoteHost, int remotePort)

throws IOException {

final InetSocketAddress address = new InetSocketAddress(remoteHost, remotePort);

return createClient(address);

}


/** Create a completely new {@link TransportClient} to the remote address. */

private TransportClient createClient(InetSocketAddress address) throws IOException {

logger.debug("Creating new connection to " + address);


Bootstrap bootstrap = new Bootstrap();

bootstrap.group(workerGroup)

.channel(socketChannelClass)

// Disable Nagle's Algorithm since we don't want packets to wait

.option(ChannelOption.TCP_NODELAY, true)

.option(ChannelOption.SO_KEEPALIVE, true)

.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs())

.option(ChannelOption.ALLOCATOR, pooledAllocator);


final AtomicReference<TransportClient> clientRef = new AtomicReference<TransportClient>();

final AtomicReference<Channel> channelRef = new AtomicReference<Channel>();


bootstrap.handler(new ChannelInitializer<SocketChannel>() {

@Override

public void initChannel(SocketChannel ch) {

TransportChannelHandler clientHandler = context.initializePipeline(ch);

clientRef.set(clientHandler.getClient());

channelRef.set(ch);

}

});


// Connect to the remote server

long preConnect = System.nanoTime();

ChannelFuture cf = bootstrap.connect(address);

if (!cf.awaitUninterruptibly(conf.connectionTimeoutMs())) {

throw new IOException(

String.format("Connecting to %s timed out (%s ms)", address, conf.connectionTimeoutMs()));

} else if (cf.cause() != null) {

throw new IOException(String.format("Failed to connect to %s", address), cf.cause());

}


TransportClient client = clientRef.get();

Channel channel = channelRef.get();

assert client != null : "Channel future completed successfully with null client";


// Execute any client bootstraps synchronously before marking the Client as successful.

long preBootstrap = System.nanoTime();

logger.debug("Connection to {} successful, running bootstraps...", address);

try {

for (TransportClientBootstrap clientBootstrap : clientBootstraps) {

clientBootstrap.doBootstrap(client, channel);

}

} catch (Exception e) { // catch non-RuntimeExceptions too as bootstrap may be written in Scala

long bootstrapTimeMs = (System.nanoTime() - preBootstrap) / 1000000;

logger.error("Exception while bootstrapping client after " + bootstrapTimeMs + " ms", e);

client.close();

throw Throwables.propagate(e);

}

long postBootstrap = System.nanoTime();


logger.debug("Successfully created connection to {} after {} ms ({} ms spent in bootstraps)",

address, (postBootstrap - preConnect) / 1000000, (postBootstrap - preBootstrap) / 1000000);


return client;

}

[/code]
下面看下TransportClient,该类有两个作用:获取数据和发送请求,获取数据用来获取预先协议好的数据流,把数据打散成块(大小为KB和MB)便于传输,当TransportClient要从流上获取数据时,流相关的配置不是TCP的传输层做的,而是需要调用TransportClient的sendRPC执行一些配置。具体流程如下:client.sendRPC(new OpenFile("/foo") 返回一个StreamId = 10client.fetchChunk(streamId=100,chunkIndex= 0,callback)client.fetchChunk(streamId=100,chunkIndex= 1,callback)
client.sendRPC(new CloseStream(100))
一个TransportClient可以使用在多个Streams上,但是一个streams只能和一个client绑定,以免响应顺序错乱。一个client有3个成员变量:channel用于写操作,向服务器端发送请求,TransportResponseHandler用于处理服务器端响应,clientId给client编号。
private final Channel channel;

private final TransportResponseHandler handler;

@Nullable private String clientId;

[/code]client有3个请求函数,一个是请求数据流中的一个数据块,用于数据传输,第二个是请求整个数据流,用于数据传输,第三个是发送控制请求。有点像ftp,一个用于控制,一个用于数据。
public void fetchChunk(

long streamId,

final int chunkIndex,

final ChunkReceivedCallback callback) {

final String serverAddr = NettyUtils.getRemoteAddress(channel);

final long startTime = System.currentTimeMillis();

logger.debug("Sending fetch chunk request {} to {}", chunkIndex, serverAddr);


final StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex);

handler.addFetchRequest(streamChunkId, callback);


channel.writeAndFlush(new ChunkFetchRequest(streamChunkId)).addListener(

new ChannelFutureListener() {

  @Override

public void operationComplete(ChannelFuture future) throws Exception {

if (future.isSuccess()) {

long timeTaken = System.currentTimeMillis() - startTime;

logger.trace("Sending request {} to {} took {} ms", streamChunkId, serverAddr,

timeTaken);

} else {

String errorMsg = String.format("Failed to send request %s to %s: %s", streamChunkId,

serverAddr, future.cause());

logger.error(errorMsg, future.cause());

handler.removeFetchRequest(streamChunkId);

channel.close();

    try {

callback.onFailure(chunkIndex, new IOException(errorMsg, future.cause()));

} catch (Exception e) {

logger.error("Uncaught exception in RPC response callback handler!", e);

}

}

}

});

}

[/code]callback有两个方法,这里要说明下他们的回调机制,onFailure在channel的IO操作失败后调用,就是ChannelFuture失败时候调用,ChannelFuture是IO操作的结果。onSuccess调用时在channel的事件处理流程中使用,context.initializePipeline(ch)给channel注册了一个TransportChannelHandler,TransportChannelHandler包含了TransportResponseHandler对象,它把响应结果转发给TransportResponseHandler用于处理服务器端响应,handler.addFetchRequest(streamChunkId, callback)映射每个响应对应的回调接口。在对应响应到来时调用对应回调接口。channel.writeAndFlush的对象需要实现Encodable接口。该接口的一些方法被MessageDecoder和MessageEncoder调用
sendRpc和上面方法一样,这里就不描述了,看下stream方法
public void stream(final String streamId, final StreamCallback callback) {

final String serverAddr = NettyUtils.getRemoteAddress(channel);

final long startTime = System.currentTimeMillis();

logger.debug("Sending stream request for {} to {}", streamId, serverAddr);


// Need to synchronize here so that the callback is added to the queue and the RPC is

// written to the socket atomically, so that callbacks are called in the right order

// when responses arrive.

synchronized (this) {

handler.addStreamCallback(callback);

channel.writeAndFlush(new StreamRequest(streamId)).addListener(

  new ChannelFutureListener() {

    @Override

  public void operationComplete(ChannelFuture future) throws Exception {

  if (future.isSuccess()) {

  long timeTaken = System.currentTimeMillis() - startTime;

logger.trace("Sending request for {} to {} took {} ms", streamId, serverAddr,

  timeTaken);

} else {

String errorMsg = String.format("Failed to send request for %s to %s: %s", streamId,

  serverAddr, future.cause());

  logger.error(errorMsg, future.cause());

  channel.close();

      try {

callback.onFailure(streamId, new IOException(errorMsg, future.cause()));

} catch (Exception e) {

  logger.error("Uncaught exception in RPC response callback handler!", e);

}

}

}

});

}

}

[/code]这里加了一个同步块,这样保证回调接口调用和请求的顺序一样。这里一个不明白的地方就是,同时发两个请求,第二个请求可能比第一个请求更快返回。怎么保证顺序一致呢?
sendRpcSysnc是一个非常有意思的方法,这里学习了Future怎么使用了。
public byte[] sendRpcSync(byte[] message, long timeoutMs) {

final SettableFuture<byte[]> result = SettableFuture.create();


sendRpc(message, new RpcResponseCallback() {

@Override

public void onSuccess(byte[] response) {

result.set(response);

}


@Override

public void onFailure(Throwable e) {

result.setException(e);

}

});


try {

return result.get(timeoutMs, TimeUnit.MILLISECONDS);

} catch (ExecutionException e) {

throw Throwables.propagate(e.getCause());

} catch (Exception e) {

throw Throwables.propagate(e);

}

}

[/code]匿名内部类智能使用外部类的final,要异步获取内部类的数据使用了一个SettableFuture。

来自为知笔记(Wiz)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: