spark源码阅读之network(1)
2015-11-22 12:09
465 查看
spark将在1.6中替换掉akka,而采用netty实现整个集群的rpc的框架,netty的内存管理和NIO支持将有效的提高spark集群的网络传输能力,为了看懂这块代码,在网上找了两本书看《netty in action》和《netty权威指南》,结合了spark的源码既学习了netty也看完了spark netty的部分源码。该部分源码掺杂了太多netty的东西,看起来还是有点累的。下面是我画的UML类图。https://onedrive.live.com/redir?resid=C6D5F843CD40C4E8!364&authkey=!ABHmp7kWEm5n9Y4&ithint=file%2cEAP
[/code]ManageredBuffer每一种数据来源有一个实现类。先看下数据来源为file的。
[/code]nioByteBuffer,如果数据大小小于spark.storage.memoryMapThreshold。那么使用ByteBufer读取通道的数据,如果大于等于该值,那么使用文件内存映射方式读取数据。createInputStream中返回一个控制读取长度的LimitedInputStream,这里使用guava的ByteStreamsconvertToNetty返回一个FileRegion。如果spark.shuffle.io.lazyFD设置为true那么使用LazyFileRegion,如果为false使用DefaultFileRegion。LazyFileRegion会在传输的时候生成FileChannel,注解说如果netty使用了epoll协议那么不可以使用LazyFileRegion。
数据源为ByteBuf的实现类,该类用Bytebuf来存储数据。
[/code]把一个bytebuf对象转成InputStream对象使用ByteBufInputStream对象来完成。还有bytebuf的duplicate()返回一个bytebuf映射同一份数据,任何一个修改结果都会影响另一个,注意引用计数。参见http://www.maljob.com/pages/newsDetail.html?id=394
还一个数据源为bytebuffer的实现
[/code] 这里面一个有意思的显示就是把bytebuffer转成bytebuf使用netty中Unpooled.wrappedBuffer()实现
来自为知笔记(Wiz)
缓存模块
network工程里面抽闲了一个ManagerBuffer的接口,该接口用来表示二进制数据中视图(表示数据的一部分),具体的实现依赖数据的来源,目前支持file,nio bytebuffer,netty bytebuf这3中数据来源。注意该接口具体的实现可能脱离了JVM GC的管理,比如NettyManagerBuffer是引用计数的,此时当该buffer传递给其他线程是需要调用retain/release来添加或减少引用。ManagerBuffer以ByteBuffer, InputStream和Netty对象三种方式对外显示这些数据,ByteBuffer由于消耗过大,不建议使用,添加了引用计数管理和数据大小查询。public abstract class ManagedBuffer {
/** Number of bytes of the data. */
public abstract long size();
/**
* Exposes this buffer's data as an NIO ByteBuffer. Changing the position and limit of the
* returned ByteBuffer should not affect the content of this buffer.
*/
// TODO: Deprecate this, usage may require expensive memory mapping or allocation.
public abstract ByteBuffer nioByteBuffer() throws IOException;
/**
* Exposes this buffer's data as an InputStream. The underlying implementation does not
* necessarily check for the length of bytes read, so the caller is responsible for making sure
* it does not go over the limit.
*/
public abstract InputStream createInputStream() throws IOException;
/**
* Increment the reference count by one if applicable.
*/
public abstract ManagedBuffer retain();
/**
* If applicable, decrement the reference count by one and deallocates the buffer if the
* reference count reaches zero.
*/
public abstract ManagedBuffer release();
/**
* Convert the buffer into an Netty object, used to write the data out.
*/
public abstract Object convertToNetty() throws IOException;
}
[/code]ManageredBuffer每一种数据来源有一个实现类。先看下数据来源为file的。
public final class FileSegmentManagedBuffer extends ManagedBuffer {
private final TransportConf conf;
private final File file;
private final long offset;
private final long length;
public FileSegmentManagedBuffer(TransportConf conf, File file, long offset, long length) {
this.conf = conf;
this.file = file;
this.offset = offset;
this.length = length;
}
@Override
public long size() {
return length;
}
@Override
public ByteBuffer nioByteBuffer() throws IOException {
FileChannel channel = null;
try {
channel = new RandomAccessFile(file, "r").getChannel();
// Just copy the buffer if it's sufficiently small, as memory mapping has a high overhead.
if (length < conf.memoryMapBytes()) {
ByteBuffer buf = ByteBuffer.allocate((int) length);
channel.position(offset);
while (buf.remaining() != 0) {
if (channel.read(buf) == -1) {
throw new IOException(String.format("Reached EOF before filling buffer\n" +
"offset=%s\nfile=%s\nbuf.remaining=%s",
offset, file.getAbsoluteFile(), buf.remaining()));
}
}
buf.flip();
return buf;
} else {
return channel.map(FileChannel.MapMode.READ_ONLY, offset, length);
}
} catch (IOException e) {
try {
if (channel != null) {
long size = channel.size();
throw new IOException("Error in reading " + this + " (actual file length " + size + ")",
e);
}
} catch (IOException ignored) {
// ignore
}
throw new IOException("Error in opening " + this, e);
} finally {
JavaUtils.closeQuietly(channel);
}
}
@Override
public InputStream createInputStream() throws IOException {
FileInputStream is = null;
try {
is = new FileInputStream(file);
ByteStreams.skipFully(is, offset);
return new LimitedInputStream(is, length);
} catch (IOException e) {
try {
if (is != null) {
long size = file.length();
throw new IOException("Error in reading " + this + " (actual file length " + size + ")",
e);
}
} catch (IOException ignored) {
// ignore
} finally {
JavaUtils.closeQuietly(is);
}
throw new IOException("Error in opening " + this, e);
} catch (RuntimeException e) {
JavaUtils.closeQuietly(is);
throw e;
}
}
@Override
public ManagedBuffer retain() {
return this;
}
@Override
public ManagedBuffer release() {
return this;
}
@Override
public Object convertToNetty() throws IOException {
if (conf.lazyFileDescriptor()) {
return new LazyFileRegion(file, offset, length);
} else {
FileChannel fileChannel = new FileInputStream(file).getChannel();
return new DefaultFileRegion(fileChannel, offset, length);
}
}
public File getFile() { return file; }
public long getOffset() { return offset; }
public long getLength() { return length; }
@Override
public String toString() {
return Objects.toStringHelper(this)
.add("file", file)
.add("offset", offset)
.add("length", length)
.toString();
}
}
[/code]nioByteBuffer,如果数据大小小于spark.storage.memoryMapThreshold。那么使用ByteBufer读取通道的数据,如果大于等于该值,那么使用文件内存映射方式读取数据。createInputStream中返回一个控制读取长度的LimitedInputStream,这里使用guava的ByteStreamsconvertToNetty返回一个FileRegion。如果spark.shuffle.io.lazyFD设置为true那么使用LazyFileRegion,如果为false使用DefaultFileRegion。LazyFileRegion会在传输的时候生成FileChannel,注解说如果netty使用了epoll协议那么不可以使用LazyFileRegion。
数据源为ByteBuf的实现类,该类用Bytebuf来存储数据。
public final class NettyManagedBuffer extends ManagedBuffer {
private final ByteBuf buf;
public NettyManagedBuffer(ByteBuf buf) {
this.buf = buf;
}
@Override
public long size() {
return buf.readableBytes();
}
@Override
public ByteBuffer nioByteBuffer() throws IOException {
return buf.nioBuffer();
}
@Override
public InputStream createInputStream() throws IOException {
return new ByteBufInputStream(buf);
}
@Override
public ManagedBuffer retain() {
buf.retain();
return this;
}
@Override
public ManagedBuffer release() {
buf.release();
return this;
}
@Override
public Object convertToNetty() throws IOException {
return buf.duplicate();
}
@Override
public String toString() {
return Objects.toStringHelper(this)
.add("buf", buf)
.toString();
}
}
[/code]把一个bytebuf对象转成InputStream对象使用ByteBufInputStream对象来完成。还有bytebuf的duplicate()返回一个bytebuf映射同一份数据,任何一个修改结果都会影响另一个,注意引用计数。参见http://www.maljob.com/pages/newsDetail.html?id=394
还一个数据源为bytebuffer的实现
public final class NioManagedBuffer extends ManagedBuffer {
private final ByteBuffer buf;
public NioManagedBuffer(ByteBuffer buf) {
this.buf = buf;
}
@Override
public long size() {
return buf.remaining();
}
@Override
public ByteBuffer nioByteBuffer() throws IOException {
return buf.duplicate();
}
@Override
public InputStream createInputStream() throws IOException {
return new ByteBufInputStream(Unpooled.wrappedBuffer(buf));
}
@Override
public ManagedBuffer retain() {
return this;
}
@Override
public ManagedBuffer release() {
return this;
}
@Override
public Object convertToNetty() throws IOException {
return Unpooled.wrappedBuffer(buf);
}
@Override
public String toString() {
return Objects.toStringHelper(this)
.add("buf", buf)
.toString();
}
}
[/code] 这里面一个有意思的显示就是把bytebuffer转成bytebuf使用netty中Unpooled.wrappedBuffer()实现
来自为知笔记(Wiz)
相关文章推荐
- ARM的BIN文件反汇编方法
- 欢迎使用CSDN-markdown编辑器
- 线段树套线段树 -转自CSDN博客nash142857 -POJ2155
- zzuli求最大值
- c语言:有趣的printf输出,“4321”
- android四大组件之Service服务之总体概述
- Cpp--next_permutation函数
- 输出阻抗与输入阻抗详解
- [转载]AxureRP 7.0部件详解(一)
- Raspberry PI 编译WLan驱动模块, 并配置登录WIFI
- 第三方框架收集
- c语言:2种方法编程及优化;喝汽水问题
- 操作系统概念(第七章) 死锁
- 给android studio安装新字体,如mac系统的monaco字体
- GDAL中GDALDataType中值与其在C++中数据类型对应
- Objective-C 中的元类(meta class)
- Djanto static静态文件配置
- 懒加载异常处理
- AJAX远程跨域获取数据
- jaxws webservice spring 注入 解决NullPointerException