spark core 2.0 ChunkedByteBufferOutputStream
2017-01-16 15:46
351 查看
ChunkedByteBufferOutputStream把输入的数据分块存储。
/** * An OutputStream that writes to fixed-size chunks of byte arrays. * * @param chunkSize size of each chunk, in bytes. */ private[spark] class ChunkedByteBufferOutputStream( chunkSize: Int, allocator: Int => ByteBuffer) extends OutputStream { private[this] var toChunkedByteBufferWasCalled = false private val chunks = new ArrayBuffer[ByteBuffer] /** Index of the last chunk. Starting with -1 when the chunks array is empty. */ private[this] var lastChunkIndex = -1 /** * Next position to write in the last chunk. * * If this equals chunkSize, it means for next write we need to allocate a new chunk. * This can also never be 0. */ private[this] var position = chunkSize private[this] var _size = 0 private[this] var closed: Boolean = false def size: Long = _size override def close(): Unit = { if (!closed) { super.close() closed = true } } override def write(b: Int): Unit = { require(!closed, "cannot write to a closed ChunkedByteBufferOutputStream") allocateNewChunkIfNeeded() chunks(lastChunkIndex).put(b.toByte) position += 1 _size += 1 } override def write(bytes: Array[Byte], off: Int, len: Int): Unit = { require(!closed, "cannot write to a closed ChunkedByteBufferOutputStream") var written = 0 while (written < len) { allocateNewChunkIfNeeded() val thisBatch = math.min(chunkSize - position, len - written) chunks(lastChunkIndex).put(bytes, written + off, thisBatch) written += thisBatch position += thisBatch } _size += len } @inline private def allocateNewChunkIfNeeded(): Unit = { if (position == chunkSize) { chunks += allocator(chunkSize) lastChunkIndex += 1 position = 0 } } def toChunkedByteBuffer: ChunkedByteBuffer = { require(closed, "cannot call toChunkedByteBuffer() unless close() has been called") require(!toChunkedByteBufferWasCalled, "toChunkedByteBuffer() can only be called once") toChunkedByteBufferWasCalled = true if (lastChunkIndex == -1) { new ChunkedByteBuffer(Array.empty[ByteBuffer]) } else { // Copy the first n-1 chunks to the output, and then create an array that fits the last chunk. // An alternative would have been returning an array of ByteBuffers, with the last buffer // bounded to only the last chunk's position. However, given our use case in Spark (to put // the chunks in block manager), only limiting the view bound of the buffer would still // require the block manager to store the whole chunk. val ret = new Array[ByteBuffer](chunks.size) for (i <- 0 until chunks.size - 1) { ret(i) = chunks(i) ret(i).flip() } if (position == chunkSize) { ret(lastChunkIndex) = chunks(lastChunkIndex) ret(lastChunkIndex).flip() } else { ret(lastChunkIndex) = allocator(position) chunks(lastChunkIndex).flip() ret(lastChunkIndex).put(chunks(lastChunkIndex)) ret(lastChunkIndex).flip() StorageUtils.dispose(chunks(lastChunkIndex)) } new ChunkedByteBuffer(ret) } } }
相关文章推荐
- Java NIO 2.0 : Memory-Mapped Files | MappedByteBuffer Tutorial
- MappedByteBuffer的使用
- MappedByteBuffer的映射内存的释放
- MappedByteBuffer的使用(转)
- MappedByteBuffer的使用
- 深入浅出 MappedByteBuffer
- Java RandomAccessFile与MappedByteBuffer
- 为何要在Java中使用内存映射文件(Memory Mapped File)或者MappedByteBuffer
- MappedByteBuffer基本使用与优点
- MappedByteBuffer
- RandomAccessFile和MappedByteBuffer
- 内存映射文件:MappedByteBuffer
- MappedByteBuffer的使用
- java NIO之MappedByteBuffer
- java nio 之MappedByteBuffer
- (转) Java RandomAccessFile与MappedByteBuffer
- Java--文件内存映射--NIO--MapedByteBuffer
- Java NIO教程 MappedByteBuffer
- MappedByteBuffer的使用
- MappedByteBuffer读取