您的位置:首页 > 其它

spark源码阅读之network(1)

2015-11-22 12:09 465 查看
spark将在1.6中替换掉akka,而采用netty实现整个集群的rpc的框架,netty的内存管理和NIO支持将有效的提高spark集群的网络传输能力,为了看懂这块代码,在网上找了两本书看《netty in action》和《netty权威指南》,结合了spark的源码既学习了netty也看完了spark netty的部分源码。该部分源码掺杂了太多netty的东西,看起来还是有点累的。下面是我画的UML类图。https://onedrive.live.com/redir?resid=C6D5F843CD40C4E8!364&authkey=!ABHmp7kWEm5n9Y4&ithint=file%2cEAP

缓存模块

network工程里面抽闲了一个ManagerBuffer的接口,该接口用来表示二进制数据中视图(表示数据的一部分),具体的实现依赖数据的来源,目前支持file,nio bytebuffer,netty bytebuf这3中数据来源。注意该接口具体的实现可能脱离了JVM GC的管理,比如NettyManagerBuffer是引用计数的,此时当该buffer传递给其他线程是需要调用retain/release来添加或减少引用。ManagerBuffer以ByteBuffer, InputStream和Netty对象三种方式对外显示这些数据,ByteBuffer由于消耗过大,不建议使用,添加了引用计数管理和数据大小查询。
public abstract class ManagedBuffer {


/** Number of bytes of the data. */

public abstract long size();


/**

* Exposes this buffer's data as an NIO ByteBuffer. Changing the position and limit of the

* returned ByteBuffer should not affect the content of this buffer.

*/

// TODO: Deprecate this, usage may require expensive memory mapping or allocation.

public abstract ByteBuffer nioByteBuffer() throws IOException;


/**

* Exposes this buffer's data as an InputStream. The underlying implementation does not

* necessarily check for the length of bytes read, so the caller is responsible for making sure

* it does not go over the limit.

*/

public abstract InputStream createInputStream() throws IOException;


/**

* Increment the reference count by one if applicable.

*/

public abstract ManagedBuffer retain();


/**

* If applicable, decrement the reference count by one and deallocates the buffer if the

* reference count reaches zero.

*/

public abstract ManagedBuffer release();


/**

* Convert the buffer into an Netty object, used to write the data out.

*/

public abstract Object convertToNetty() throws IOException;

}

[/code]ManageredBuffer每一种数据来源有一个实现类。先看下数据来源为file的。
public final class FileSegmentManagedBuffer extends ManagedBuffer {

private final TransportConf conf;

private final File file;

private final long offset;

private final long length;


public FileSegmentManagedBuffer(TransportConf conf, File file, long offset, long length) {

this.conf = conf;

this.file = file;

this.offset = offset;

this.length = length;

}


@Override

public long size() {

return length;

}


@Override

public ByteBuffer nioByteBuffer() throws IOException {

FileChannel channel = null;

try {

channel = new RandomAccessFile(file, "r").getChannel();

// Just copy the buffer if it's sufficiently small, as memory mapping has a high overhead.

if (length < conf.memoryMapBytes()) {

ByteBuffer buf = ByteBuffer.allocate((int) length);

channel.position(offset);

while (buf.remaining() != 0) {

if (channel.read(buf) == -1) {

throw new IOException(String.format("Reached EOF before filling buffer\n" +

"offset=%s\nfile=%s\nbuf.remaining=%s",

offset, file.getAbsoluteFile(), buf.remaining()));

}

}

buf.flip();

return buf;

} else {

return channel.map(FileChannel.MapMode.READ_ONLY, offset, length);

}

} catch (IOException e) {

  try {

if (channel != null) {

long size = channel.size();

throw new IOException("Error in reading " + this + " (actual file length " + size + ")",

e);

}

} catch (IOException ignored) {

// ignore

}

throw new IOException("Error in opening " + this, e);

} finally {

JavaUtils.closeQuietly(channel);

}

}


@Override

public InputStream createInputStream() throws IOException {

FileInputStream is = null;

try {

is = new FileInputStream(file);

ByteStreams.skipFully(is, offset);

return new LimitedInputStream(is, length);

} catch (IOException e) {

  try {

if (is != null) {

long size = file.length();

throw new IOException("Error in reading " + this + " (actual file length " + size + ")",

  e);

}

} catch (IOException ignored) {

// ignore

} finally {

JavaUtils.closeQuietly(is);

}

throw new IOException("Error in opening " + this, e);

} catch (RuntimeException e) {

JavaUtils.closeQuietly(is);

throw e;

}

}


@Override

public ManagedBuffer retain() {

return this;

}


@Override

public ManagedBuffer release() {

return this;

}


@Override

public Object convertToNetty() throws IOException {

if (conf.lazyFileDescriptor()) {

return new LazyFileRegion(file, offset, length);

} else {

FileChannel fileChannel = new FileInputStream(file).getChannel();

return new DefaultFileRegion(fileChannel, offset, length);

}

}


public File getFile() { return file; }


public long getOffset() { return offset; }


public long getLength() { return length; }


@Override

public String toString() {

return Objects.toStringHelper(this)

.add("file", file)

.add("offset", offset)

.add("length", length)

.toString();

}

}

[/code]nioByteBuffer,如果数据大小小于spark.storage.memoryMapThreshold。那么使用ByteBufer读取通道的数据,如果大于等于该值,那么使用文件内存映射方式读取数据。createInputStream中返回一个控制读取长度的LimitedInputStream,这里使用guava的ByteStreamsconvertToNetty返回一个FileRegion。如果spark.shuffle.io.lazyFD设置为true那么使用LazyFileRegion,如果为false使用DefaultFileRegion。LazyFileRegion会在传输的时候生成FileChannel,注解说如果netty使用了epoll协议那么不可以使用LazyFileRegion。
数据源为ByteBuf的实现类,该类用Bytebuf来存储数据。
public final class NettyManagedBuffer extends ManagedBuffer {

private final ByteBuf buf;


public NettyManagedBuffer(ByteBuf buf) {

this.buf = buf;

}


@Override

public long size() {

return buf.readableBytes();

}


@Override

public ByteBuffer nioByteBuffer() throws IOException {

return buf.nioBuffer();

}


@Override

public InputStream createInputStream() throws IOException {

return new ByteBufInputStream(buf);

}


@Override

public ManagedBuffer retain() {

buf.retain();

return this;

}


@Override

public ManagedBuffer release() {

buf.release();

return this;

}


@Override

public Object convertToNetty() throws IOException {

return buf.duplicate();

}


@Override

public String toString() {

return Objects.toStringHelper(this)

.add("buf", buf)

.toString();

}

}

[/code]把一个bytebuf对象转成InputStream对象使用ByteBufInputStream对象来完成。还有bytebuf的duplicate()返回一个bytebuf映射同一份数据,任何一个修改结果都会影响另一个,注意引用计数。参见http://www.maljob.com/pages/newsDetail.html?id=394
还一个数据源为bytebuffer的实现
public final class NioManagedBuffer extends ManagedBuffer {

private final ByteBuffer buf;


public NioManagedBuffer(ByteBuffer buf) {

this.buf = buf;

}


@Override

public long size() {

return buf.remaining();

}


@Override

public ByteBuffer nioByteBuffer() throws IOException {

return buf.duplicate();

}


@Override

public InputStream createInputStream() throws IOException {

return new ByteBufInputStream(Unpooled.wrappedBuffer(buf));

}


@Override

public ManagedBuffer retain() {

return this;

}


@Override

public ManagedBuffer release() {

return this;

}


@Override

public Object convertToNetty() throws IOException {

return Unpooled.wrappedBuffer(buf);

}


@Override

public String toString() {

return Objects.toStringHelper(this)

.add("buf", buf)

.toString();

}

}

[/code] 这里面一个有意思的显示就是把bytebuffer转成bytebuf使用netty中Unpooled.wrappedBuffer()实现

来自为知笔记(Wiz)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: