private static class ClientPool {

TransportClient[] clients;

Object[] locks;

public ClientPool(int size) {

clients = new TransportClient[size];

locks = new Object[size];

for (int i = 0; i < size; i++) {

locks[i] = new Object();



public TransportClient createClient(String remoteHost, int remotePort) throws IOException {

// Get connection from the connection pool first.

// If it is not found or not active, create a new one.

final InetSocketAddress address = new InetSocketAddress(remoteHost, remotePort);

// Create the ClientPool if we don't have it yet.

ClientPool clientPool = connectionPool.get(address);

if (clientPool == null) {

connectionPool.putIfAbsent(address, new ClientPool(numConnectionsPerPeer));

clientPool = connectionPool.get(address);


int clientIndex = rand.nextInt(numConnectionsPerPeer);

TransportClient cachedClient = clientPool.clients[clientIndex];

if (cachedClient != null && cachedClient.isActive()) {

logger.trace("Returning cached connection to {}: {}", address, cachedClient);

return cachedClient;


// If we reach here, we don't have an existing connection open. Let's create a new one.

// Multiple threads might race here to create new connections. Keep only one of them active.

synchronized (clientPool.locks[clientIndex]) {

cachedClient = clientPool.clients[clientIndex];

if (cachedClient != null) {

if (cachedClient.isActive()) {

    logger.trace("Returning cached connection to {}: {}", address, cachedClient);

    return cachedClient;

} else {

logger.info("Found inactive connection to {}, creating a new one.", address);



clientPool.clients[clientIndex] = createClient(address);

return clientPool.clients[clientIndex];




public TransportClient createUnmanagedClient(String remoteHost, int remotePort)

throws IOException {

final InetSocketAddress address = new InetSocketAddress(remoteHost, remotePort);

return createClient(address);


/** Create a completely new {@link TransportClient} to the remote address. */

private TransportClient createClient(InetSocketAddress address) throws IOException {

logger.debug("Creating new connection to " + address);

Bootstrap bootstrap = new Bootstrap();



// Disable Nagle's Algorithm since we don't want packets to wait

.option(ChannelOption.TCP_NODELAY, true)

.option(ChannelOption.SO_KEEPALIVE, true)

.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs())

.option(ChannelOption.ALLOCATOR, pooledAllocator);

final AtomicReference<TransportClient> clientRef = new AtomicReference<TransportClient>();

final AtomicReference<Channel> channelRef = new AtomicReference<Channel>();

bootstrap.handler(new ChannelInitializer<SocketChannel>() {


public void initChannel(SocketChannel ch) {

TransportChannelHandler clientHandler = context.initializePipeline(ch);





// Connect to the remote server

long preConnect = System.nanoTime();

ChannelFuture cf = bootstrap.connect(address);

if (!cf.awaitUninterruptibly(conf.connectionTimeoutMs())) {

throw new IOException(

String.format("Connecting to %s timed out (%s ms)", address, conf.connectionTimeoutMs()));

} else if (cf.cause() != null) {

throw new IOException(String.format("Failed to connect to %s", address), cf.cause());


TransportClient client = clientRef.get();

Channel channel = channelRef.get();

assert client != null : "Channel future completed successfully with null client";

// Execute any client bootstraps synchronously before marking the Client as successful.

long preBootstrap = System.nanoTime();

logger.debug("Connection to {} successful, running bootstraps...", address);

try {

for (TransportClientBootstrap clientBootstrap : clientBootstraps) {

clientBootstrap.doBootstrap(client, channel);


} catch (Exception e) { // catch non-RuntimeExceptions too as bootstrap may be written in Scala

long bootstrapTimeMs = (System.nanoTime() - preBootstrap) / 1000000;

logger.error("Exception while bootstrapping client after " + bootstrapTimeMs + " ms", e);


throw Throwables.propagate(e);


long postBootstrap = System.nanoTime();

logger.debug("Successfully created connection to {} after {} ms ({} ms spent in bootstraps)",

address, (postBootstrap - preConnect) / 1000000, (postBootstrap - preBootstrap) / 1000000);

return client;


下面看下TransportClient,该类有两个作用:获取数据和发送请求,获取数据用来获取预先协议好的数据流,把数据打散成块(大小为KB和MB)便于传输,当TransportClient要从流上获取数据时,流相关的配置不是TCP的传输层做的,而是需要调用TransportClient的sendRPC执行一些配置。具体流程如下:client.sendRPC(new OpenFile("/foo") 返回一个StreamId = 10client.fetchChunk(streamId=100,chunkIndex= 0,callback)client.fetchChunk(streamId=100,chunkIndex= 1,callback)
client.sendRPC(new CloseStream(100))
private final Channel channel;

private final TransportResponseHandler handler;

@Nullable private String clientId;

public void fetchChunk(

long streamId,

final int chunkIndex,

final ChunkReceivedCallback callback) {

final String serverAddr = NettyUtils.getRemoteAddress(channel);

final long startTime = System.currentTimeMillis();

logger.debug("Sending fetch chunk request {} to {}", chunkIndex, serverAddr);

final StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex);

handler.addFetchRequest(streamChunkId, callback);

channel.writeAndFlush(new ChunkFetchRequest(streamChunkId)).addListener(

new ChannelFutureListener() {


public void operationComplete(ChannelFuture future) throws Exception {

if (future.isSuccess()) {

long timeTaken = System.currentTimeMillis() - startTime;

logger.trace("Sending request {} to {} took {} ms", streamChunkId, serverAddr,


} else {

String errorMsg = String.format("Failed to send request %s to %s: %s", streamChunkId,

serverAddr, future.cause());

logger.error(errorMsg, future.cause());



    try {

callback.onFailure(chunkIndex, new IOException(errorMsg, future.cause()));

} catch (Exception e) {

logger.error("Uncaught exception in RPC response callback handler!", e);






[/code]callback有两个方法,这里要说明下他们的回调机制,onFailure在channel的IO操作失败后调用,就是ChannelFuture失败时候调用,ChannelFuture是IO操作的结果。onSuccess调用时在channel的事件处理流程中使用,context.initializePipeline(ch)给channel注册了一个TransportChannelHandler,TransportChannelHandler包含了TransportResponseHandler对象,它把响应结果转发给TransportResponseHandler用于处理服务器端响应,handler.addFetchRequest(streamChunkId, callback)映射每个响应对应的回调接口。在对应响应到来时调用对应回调接口。channel.writeAndFlush的对象需要实现Encodable接口。该接口的一些方法被MessageDecoder和MessageEncoder调用
public void stream(final String streamId, final StreamCallback callback) {

final String serverAddr = NettyUtils.getRemoteAddress(channel);

final long startTime = System.currentTimeMillis();

logger.debug("Sending stream request for {} to {}", streamId, serverAddr);

// Need to synchronize here so that the callback is added to the queue and the RPC is

// written to the socket atomically, so that callbacks are called in the right order

// when responses arrive.

synchronized (this) {


channel.writeAndFlush(new StreamRequest(streamId)).addListener(

  new ChannelFutureListener() {


  public void operationComplete(ChannelFuture future) throws Exception {

  if (future.isSuccess()) {

  long timeTaken = System.currentTimeMillis() - startTime;

logger.trace("Sending request for {} to {} took {} ms", streamId, serverAddr,


} else {

String errorMsg = String.format("Failed to send request for %s to %s: %s", streamId,

  serverAddr, future.cause());

  logger.error(errorMsg, future.cause());


      try {

callback.onFailure(streamId, new IOException(errorMsg, future.cause()));

} catch (Exception e) {

  logger.error("Uncaught exception in RPC response callback handler!", e);







public byte[] sendRpcSync(byte[] message, long timeoutMs) {

final SettableFuture<byte[]> result = SettableFuture.create();

sendRpc(message, new RpcResponseCallback() {


public void onSuccess(byte[] response) {




public void onFailure(Throwable e) {




try {

return result.get(timeoutMs, TimeUnit.MILLISECONDS);

} catch (ExecutionException e) {

throw Throwables.propagate(e.getCause());

} catch (Exception e) {

throw Throwables.propagate(e);




