kafka客户端源码解读(client-network包)
2017-07-24 18:37
435 查看
kafka-network层 send类源码解读
概述send类:实现将消息(byte),通过channel发送到目标broker集群。
主要的类:接口send类,实现类ByteBufferSend implements Send ,继承类NetworkSend extends ByteBufferSend。
主要关联类:channel类。
源码分析
send接口类: public interface Send { String destination(); boolean completed(); long writeTo(GatheringByteChannel channel) throws IOException; long size(); }
destination()–发送的目标集群地址。
completed()–判断是否完成发送。
writeTo(GatheringByteChannel channel)–将字节流写入到channel中。这里channel是生产者端-broker端的链路。将在channle篇章中分析。
size()—用来计算的buffer的字节大小。为什么需要计算发送字节大小,在继承类中NetworkSend中会解释下kafka消息体设计模式。
实现类ByteBufferSend
首先看构造方法:
public ByteBufferSend(String destination, ByteBuffer... buffers) { this.destination = destination; this.buffers = buffers; for (ByteBuffer buffer : buffers) remaining += buffer.remaining(); this.size = remaining; }
构造方法完成参数初始化,可以看到size是bytebuffer的大小。
public long writeTo(GatheringByteChannel channel) throws IOException { long written = channel.write(buffers); if (written < 0) throw new EOFException("Wrote negative bytes to channel. This shouldn't happen."); remaining -= written; pending = TransportLayers.hasPendingWrites(channel); return written; } }
从源码可以看出kafka消息最底层发送源码逻辑很简单。传送一个channel,写入buffer。这里kafka进行了解耦,send类和channel类,通过接口在writeTo()方法关联。具体实现类如何,相互不干扰。
pending = TransportLayers.hasPendingWrites(channel);这段源码从官方文档解释是用在阻塞channel中。具体判断条件是 return ((TransportLayer) channel).hasPendingWrites();
看方法的实现, @Override public boolean hasPendingWrites() { return netWriteBuffer.hasRemaining() },从中可以看出,有个buffer对象会记录传输的数据。在channel篇章中,研究其细节。
继承类:NetworkSend
看构造方法:
public NetworkSend(String destination, ByteBuffer buffer) {
super(destination, sizeDelimit(buffer));
}
参数destination是broker地址。buffer是需要传输的消息。构造方法中调用了sizeDelimit()方法。
private static ByteBuffer[] sizeDelimit(ByteBuffer buffer) {
return new ByteBuffer[] {sizeBuffer(buffer.remaining()), buffer};
}
sizeDelimit()方法中调用了sizeBuffer(),生成了一个新的buffer,buffer内容是参数buffer大小。源码如下:
private static ByteBuffer sizeBuffer(int size) {
ByteBuffer sizeBuffer = ByteBuffer.allocate(4);
sizeBuffer.putInt(size);
sizeBuffer.rewind();
return sizeBuffer;
}
从中可以看出构造方法最终传进去的super(buffer1+buffer2,)其中buffer2是我们发送到broker端消息,buffer1是一个4字节的大小的数字,其含义是消息buffer2的大小。从这里可以看出,kafka字节流的设计原则包含两个部分:
buffer=buffer1+buffer2
buffer1内容为buffer2.size
buffer2为消息。
最终在broker读取字节流时候,先去取出头四个字节,感知接下来需要读取多少字节。然后读取。
这三个类构成了send基本组成。高级应用通过创建NetworkSend方法,传入消息byte,调用其writeTo()方法,写入对应目标channel中。底层实现细节不用关心。
总结:send接口及其实现,一个目的是将消息buffer进行传输前进行处理,给消息加上四个字节的buffer,存储消息大小。也说明了kafka一次最大传输字节是有限定的。
同时与channel组合,松耦合,减少代码重复。直接传入一个channel对象即可完成消息发送。
相关文章推荐
- KClient——kafka消息中间件源码解读
- KClient——kafka消息中间件源码解读
- SSO单点登录系列1:cas客户端源码分析cas-client-java-2.1.1.jar
- Kafka 0.11客户端集群管理工具AdminClient
- 第37课:Kafka源码解读Consumer内幕解密
- EasyDarwin EasyClient开源流媒体客户端源码功能框架解析
- SSO单点登录系列1:cas客户端源码分析cas-client-java-2.1.1.jar
- kafka源码解析之十五客户端如何创建topic
- XE8-indy10中关于TIdTCPClientCustom.Connect的源码和解读
- Kafka读取__consumer_offsets和Kafka 0.11客户端管理工具AdminClient
- Kafka Eagle 源码解读
- 客户端计时器控件(clientTimer)的c#源码
- CSDN助手源码剖析(三)--自制Web Service客户端组件SoapClient
- spark源码二:yarn.client 客户端的调用流程
- Spark定制班第34课:Kafka源码解读概述和入口类经典解读
- 35:Kafka源码解读中分区数、Consumer并行度等
- Apache HttpAsyncClient源码解读 (如何实现异步IO)
- Kafka Eagle 源码解读
- 第35课:Kafka源码解读中分区数、Consumer并行度等
- 36:Kafka源码解读SocketServer下的Acceptor、Processor、Handler