您的位置:首页 > 其它

Clucene构建索引的辅助工具类剖析

2010-07-05 15:22 351 查看
下面,我们将对Clucene构建索引的辅助工具类进行剖析,并且对关键代码进行介绍.
一.首先,我们来看一看,辅助工具类图,清楚一下这几个类的关系:
(1).类图1描述了基类Directory, 文件目录类FSDirectory,内存文件目录类RAMDirectory
类之间的关系图

(2).类图2描述了索引读入工具类IndexInput, 索引缓冲读入工具类BufferedIndexInput,
文件目录索引读入工具FSIndexInput,内存文件目录索引读入工具RAMIndexInput,
文件映射索引读入工具MMapIndexInput 类之间的关系图:

(3).类图3描述了索引写入工具类IndexOutput, 索引缓冲写入工具类BufferedIndexOutput,
文件目录索引写入工具FSIndexOutput,内存文件目录索引读入工具RAMIndexOutput
类之间的关系图:

二.接下来,我们来对关键的代码进行剖析;
(1). class IndexInput:
class IndexInput: LUCENE_BASE{
private:
void skipChars(const int32_t count);
protected:
IndexInput();
IndexInput(const IndexInput& clone);
public:
virtual ~IndexInput(){}
virtual IndexInput* clone() const =0;
virtual uint8_t readByte() =0; //纯虚函数,派生类实现
virtual void readBytes(uint8_t* b, const int32_t len) =0; //纯虚函数,派生类实现
int32_t readInt();
}
(2). class BufferedIndexInput:
class BufferedIndexInput: public IndexInput{
private:
uint8_t* buffer; //buffer字节数组
void refill(); //重新填充
protected:
int32_t bufferSize; //buffer的大小
int64_t bufferStart; //buffer起始位置
int32_t bufferLength; //buffer的长度
int32_t bufferPosition; //读buffer的位置
public:
inline uint8_t readByte()
{
if (bufferPosition >= bufferLength)
{
refill();
}
return buffer[bufferPosition++]; //读取字节时是从buffer来取的信息
}
void readBytes(uint8_t* b, const int32_t len);
int64_t getFilePointer() const;
void seek(const int64_t pos);
protected:
virtual void readInternal(uint8_t* b, const int32_t len) = 0;
//实现文件定位操作,readInternal的读操作从这个位置开始,需子类实现
virtual void seekInternal(const int64_t pos) = 0;
}
readByte()方法剖析:
(a).首先在类的构造函数:
BufferedIndexInput::BufferedIndexInput(int32_t _bufferSize)
里面对变量进行了初始化:
buffer=NULL bufferStart=0 bufferLength=0 bufferPosition=0
(a.1).开始读入一个字节时, bufferPosition >= bufferLength 0=0,调用refill()方法;
(a.2). void BufferedIndexInput::refill()方法
void BufferedIndexInput::refill()
{
int64_t start = bufferStart + bufferPosition; // bufferPosition:指针读取的计数器
int64_t end = start + bufferSize; // bufferSize:buffer的缓冲值大小
if (end > length()) // don't read past EOF
end = length();
bufferLength = (int32_t)(end - start); //初始是bufferSize=1024大小
if (bufferLength == 0)
_CLTHROWA(CL_ERR_IO, "IndexInput read past EOF");
if (buffer == NULL){
buffer = _CL_NEWARRAY(uint8_t,bufferSize); // allocate buffer lazily
}
readInternal(buffer, bufferLength);
bufferStart = start;
bufferPosition = 0;
}
刚开始时start=0 end=1024,通过比较长度大小,得到end修正后的值,在确定bufferLength的值,初始buffer == NULL,然后申请bufferSize=1024 大小的buffer,
在读入bufferLength大小字节到buffer缓冲区.这个时候bufferStart=0 bufferPosition=0; readByte()的时候,直接从buffer缓冲取值,并且bufferPosition++;
(a.3).void BufferedIndexInput::readBytes(uint8_t* b, const int32_t len) 方法剖析:
void BufferedIndexInput::readBytes(uint8_t* b, const int32_t len)
{
if (len<bufferSize)
{
for (int32_t i = 0; i < len; ++i)
{
b[i] = readByte();
}
} else { // read all-at-once
int64_t start = getFilePointer(); //获得当前位置
seekInternal(start); //实现定位操作,readInternal的读操作从这个位置开始,需子类实现
readInternal(b, len); //实际读取操作
bufferStart = start + len; //调整bufferStart位置
bufferPosition = 0; //当前bufferPosition重置为,计数器的作用
bufferLength = 0; //bufferLength重置为
}
}
具体的读取方法是如果要读取的长度len<bufferSize,那么就做for循环执行,每个子节的去读取, 如果(bufferPosition >= bufferLength) 又需要重新去填充buffer, bufferLength是本次读取的字节长度, bufferStart是一个指针计数器,指向读取的总共的字节数;如果实际读取的长度超过bufferSize,len>=bufferSize,那么start= bufferStart + bufferPosition,即为到达当前指针读取位置,然后seekInternal(start) 指针移到当前指针位置, readInternal(b, len) 从当前指针位置读取len长度到b中, bufferStart = start + len文件指针设置到读取len的位置上,bufferPosition = 0,当前bufferPosition重置为0,bufferLength=0 bufferLength重置为0,目的是触发下一次填充缓冲区的动作;
(b). void BufferedIndexInput::seek(const int64_t pos)
//设置文件的当前读位置,下一次读从这个位置开始
void BufferedIndexInput::seek(const int64_t pos)
{
if (pos<0)
{
_CLTHROWA(CL_ERR_IO, "IO Argument Error. Value must be a positive value.");
}
if (pos >= bufferStart && pos < (bufferStart + bufferLength))
{
bufferPosition = (int32_t)(pos - bufferStart); // seek within buffer
}
else
{
bufferStart = pos;
bufferPosition = 0;
bufferLength = 0; //触发重新填充缓冲的作用
seekInternal(pos); //实现文件定位操作,readInternal的读操作从这个位置开始
}
}
Seek方法是设置当前读的位置,首先进行判断,如果设置的位置大于当前指针位置并且小于当前指针位置加上buffer的长度,那么设置bufferPosition = (int32_t)(pos - bufferStart),否则的话bufferStart = pos bufferPosition = 0 bufferLength = 0
然后在调用派生类的seekInternal(pos)方法,实现文件定位操作.
(3).class FSIndexInput:public BufferedIndexInput:
class FSIndexInput:public BufferedIndexInput {
SharedHandle* handle; //文件句柄指针
int64_t _pos; //记录文件内部指针
protected:
FSIndexInput(const FSIndexInput& clone);
public:
FSIndexInput(const char* path, int32_t bufferSize=CL_NS(store)::BufferedIndexOutput::BUFFER_SIZE);
~FSIndexInput();
IndexInput* clone() const;
void close();
int64_t length(){ return handle->_length; }
protected:
//随机访问文件方法
void seekInternal(const int64_t position);
//读取方法
void readInternal(uint8_t* b, const int32_t len);
};
(a).void FSDirectory::FSIndexInput::seekInternal方法:
//实现文件定位操作,readInternal的读操作从这个位置开始,需子类实现
void FSDirectory::FSIndexInput::seekInternal(const int64_t position)
{
_pos = position;
}
(b).void FSDirectory::FSIndexInput::readInternal方法:

void FSDirectory::FSIndexInput::readInternal(uint8_t* b, const int32_t len)
{
SCOPED_LOCK_MUTEX(handle->THIS_LOCK)
CND_PRECONDITION(handle!=NULL,"shared file handle has closed");
CND_PRECONDITION(handle->fhandle>=0,"file is not open");
if ( handle->_fpos != _pos ){
if ( fileSeek(handle->fhandle,_pos,SEEK_SET) != _pos ){
_CLTHROWA( CL_ERR_IO, "File IO Seek error");
}
handle->_fpos = _pos;
}
//读取的实际长度
bufferLength = _read(handle->fhandle,b,len);
if (bufferLength == 0){
_CLTHROWA(CL_ERR_IO, "read past EOF");
}
if (bufferLength == -1){
_CLTHROWA(CL_ERR_IO, "read error");
}
_pos+=bufferLength; //累计字节数
handle->_fpos=_pos;
}

代码含义是如果文件句柄内部_fpos不等于_pos,那么首先就设置文件当前位置到_pos,
在去读取bufferLength长度字节的内容,在移动文件指针位置,设置文件句柄的_fpos值
为_pos.
(4). class RAMIndexInput:public BufferedIndexInput:
class RAMIndexInput:public BufferedIndexInput {
private:
RAMFile* file;
int32_t pointer;
int64_t _length;
protected:
RAMIndexInput(const RAMIndexInput& clone);
void readInternal(uint8_t *dest, const int32_t len);
void seekInternal(const int64_t pos);
public:
RAMIndexInput(RAMFile* f);
~RAMIndexInput();
IndexInput* clone() const;
void close();
int64_t length();
const char* getDirectoryType() const;
};
(a).void RAMIndexInput::seekInternal方法:
void RAMIndexInput::seekInternal(const int64_t pos)
{
CND_PRECONDITION(pos>=0 &&pos<this->_length,"Seeking out of range")
pointer = (int32_t)pos;
}
设置内存文件指针pointer值为设置的pos值;
(c).void RAMIndexInput::readInternal方法:
void RAMIndexInput::readInternal(uint8_t* dest, const int32_t len)
{
const int64_t bytesAvailable = file->length - pointer; //初始时pointer=0
int64_t remainder = len <= bytesAvailable ? len : bytesAvailable;
int32_t start = pointer;
int32_t destOffset = 0;
while (remainder != 0) {
int32_t bufferNumber = start / CL_NS(store)::BufferedIndexOutput::BUFFER_SIZE;
int32_t bufferOffset = start % CL_NS(store)::BufferedIndexOutput::BUFFER_SIZE;
int32_t bytesInBuffer = CL_NS(store)::BufferedIndexOutput::BUFFER_SIZE - bufferOffset;
int32_t bytesToCopy = bytesInBuffer >= remainder ? static_cast<int32_t>(remainder) : bytesInBuffer;
uint8_t* b = file->buffers[bufferNumber];
memcpy(dest+destOffset,b+bufferOffset,bytesToCopy * sizeof(uint8_t)); //从内存文件中读取字节到dest中
destOffset += bytesToCopy;
start += bytesToCopy;
remainder -= bytesToCopy;
pointer += bytesToCopy;
}
}
具体的读取过程是:初始pointer=0, bytesAvailable就是整个的内存文件长度,remainder的值remainder = len <= bytesAvailable ? len : bytesAvailable,如果小于剩余字节数那么就取len本身,否则的话就取剩余的字节数. Start设置为当前内存指针的位置,目标便宜量destOffset设置为0,进入while循环处理, bufferNumber是缓冲序号, bufferOffset是缓冲区的偏移量, bytesInBuffer是缓冲区还有多少字节数,
bytesToCopy=bytesInBuffer>=remainder?remainder: bytesInBuffer 可以拷贝的字节数根据剩余字节数与缓冲里面的字节数来进行判断,然后在执行拷贝的操作, uint8_t* b = file->buffers[bufferNumber] 首先取得buffer缓冲区的序号,然后在从这个缓冲区进行拷贝到目标数据里面,在循环处理过程中更新pointer.
(5).class IndexOutput
class IndexOutput:LUCENE_BASE{
bool isclosed;
public:
IndexOutput();
virtual ~IndexOutput();
virtual void writeByte(const uint8_t b) = 0; //纯虚函数
virtual void writeBytes(const uint8_t* b, const int32_t length) = 0; //纯虚函数
void writeInt(const int32_t i);
virtual int64_t getFilePointer() const = 0; //纯虚函数
virtual void seek(const int64_t pos) = 0; //纯虚函数
virtual int64_t length() = 0;
virtual void flush() = 0; //纯虚函数
};
(6). class BufferedIndexOutput : public IndexOutput:
class BufferedIndexOutput : public IndexOutput{
public:
LUCENE_STATIC_CONSTANT(int32_t, BUFFER_SIZE=LUCENE_STREAM_BUFFER_SIZE);
private:
uint8_t* buffer;
int64_t bufferStart; //在构造函数里起始位置设置为0
int32_t bufferPosition; //在构造函数里写入字节位置为0
public:
BufferedIndexOutput();
virtual ~BufferedIndexOutput();
virtual void writeByte(const uint8_t b);
virtual void writeBytes(const uint8_t* b, const int32_t length);
virtual void close();
int64_t getFilePointer() const;
virtual void seek(const int64_t pos);
virtual int64_t length() = 0;
void flush(); //刷新
protected:
virtual void flushBuffer(const uint8_t* b, const int32_t len)=0; //纯虚函数:刷新缓冲buffer
};
(a). void BufferedIndexOutput::writeByte 写入单个字节
void BufferedIndexOutput::writeByte(const uint8_t b)
{
CND_PRECONDITION(buffer!=NULL,"IndexOutput is closed")
if (bufferPosition >= BUFFER_SIZE)
{
flush(); //由继承的类完成刷新操作
}
buffer[bufferPosition++] = b;
}
(b). void BufferedIndexOutput::flush()
void BufferedIndexOutput::flush()
{
flushBuffer(buffer, bufferPosition); //刷新:buffer头指针,字符写位置,直接写入到文件中
bufferStart += bufferPosition; //相当于是得到总的字节数
bufferPosition = 0;
}
调用派生类的flushBuffer方法,刷新buffer里面的内容,然后bufferStart += bufferPosition,作用是相当于是得到写入的总的字节数.
(c).写入多个字节的方法: void BufferedIndexOutput::writeBytes
void BufferedIndexOutput::writeBytes(const uint8_t* b, const int32_t length)
{
if ( length < 0 )
{
_CLTHROWA(CL_ERR_IllegalArgument, "IO Argument Error. Value must be a positive value.");
}
//看一下buffer缓冲还有多少剩余空间
int32_t bytesLeft = BUFFER_SIZE - bufferPosition;
//如果缓冲区剩余大于要写入的长度
if (bytesLeft >= length) {
//直接拷贝到缓冲区里面
memcpy(buffer + bufferPosition, b, length);
//缓冲位置加上写入的长度
bufferPosition += length;
//如果缓冲填满,需要刷新缓冲
if (BUFFER_SIZE - bufferPosition == 0)
flush();
} else {
if (length > BUFFER_SIZE) {
//如果要写入的数据比缓冲BUFFER_SIZE大
if (bufferPosition > 0)
flush(); //如果缓冲区里面有数据,先刷新
flushBuffer(b, length); //然后在调用派生类的flushBuffer
bufferStart += length; //写入的总的字节数
} else {
// 否则的话,在分片写入
int64_t pos = 0; // position in the input data
int32_t pieceLength;
while (pos < length) {
if ( length - pos < bytesLeft ) //如果要写入的字节小于buffer剩余的字节
{
pieceLength = length - pos; //要写入的长度就是自己本身
}
else
{
//否则的话(写入的字节数大于剩余的字节数)
pieceLength = bytesLeft; //如果要写入的字节大于buffer剩余的字节,先把buffer填满
}
memcpy(buffer + bufferPosition, b + pos, pieceLength); //
pos += pieceLength;
bufferPosition += pieceLength;
// if the buffer is full, flush it
bytesLeft = BUFFER_SIZE - bufferPosition;
if (bytesLeft == 0) {
flush();
bytesLeft = BUFFER_SIZE; //刷新缓冲后,是一个新的缓冲
}
}
}
}
}
(7). class FSIndexOutput: public BufferedIndexOutput
class FSIndexOutput: public BufferedIndexOutput {
private:
int32_t fhandle;
protected:
void flushBuffer(const uint8_t* b, const int32_t size);
public:
FSIndexOutput(const char* path);
~FSIndexOutput();
void close();
void seek(const int64_t pos);
int64_t length();
};
(a). void FSDirectory::FSIndexOutput::flushBuffer
//直接写入到文件中去了
void FSDirectory::FSIndexOutput::flushBuffer(const uint8_t* b, const int32_t size)
{
CND_PRECONDITION(fhandle>=0,"file is not open");
if ( size > 0 && _write(fhandle,b,size) != size )
_CLTHROWA(CL_ERR_IO, "File IO Write error");
}
(b). void FSDirectory::FSIndexOutput::seek
void FSDirectory::FSIndexOutput::seek(const int64_t pos)
{
CND_PRECONDITION(fhandle>=0,"file is not open");
BufferedIndexOutput::seek(pos); //先调用基类的查找定位
int64_t ret = fileSeek(fhandle,pos,SEEK_SET);
if ( ret != pos ){
_CLTHROWA(CL_ERR_IO, "File IO Seek error");
}
}
(8). class RAMIndexOutput: public BufferedIndexOutput
(a).void RAMIndexOutput::flushBuffer
//刷新缓冲区方法:得到内存文件长度,src会出现长度大于设定的缓冲的情况
void RAMIndexOutput::flushBuffer(const uint8_t* src, const int32_t len)
{
uint8_t* b = NULL;
int32_t bufferPos = 0;
while (bufferPos != len)
{
uint32_t bufferNumber = pointer/CL_NS(store)::BufferedIndexOutput::BUFFER_SIZE; //取整
int32_t bufferOffset = pointer%CL_NS(store)::BufferedIndexOutput::BUFFER_SIZE; //取余:剩余的缓冲的偏移量,写完后,还留有一部分,供下次在写入
int32_t bytesInBuffer = CL_NS(store)::BufferedIndexOutput::BUFFER_SIZE - bufferOffset;
int32_t remainInSrcBuffer = len - bufferPos;
int32_t bytesToCopy = bytesInBuffer >= remainInSrcBuffer ? remainInSrcBuffer : bytesInBuffer;
if (bufferNumber == file->buffers.size()){ //
b = _CL_NEWARRAY(uint8_t, CL_NS(store)::BufferedIndexOutput::BUFFER_SIZE);
file->buffers.push_back( b );
}else{
b = file->buffers[bufferNumber]; //如果上次缓冲还没有写完的话,利用上次剩余的缓冲区
}
memcpy(b+bufferOffset, src+bufferPos, bytesToCopy * sizeof(uint8_t));
bufferPos += bytesToCopy;
pointer += bytesToCopy;
}
if (pointer > file->length)
{
file->length = pointer;
}
file->lastModified = Misc::currentTimeMillis();
}
刷新缓冲的过程是对于内存文件,首先确定缓冲区序号, bufferNumber,在初始时pointer=0 bufferOffset缓冲偏移量也是0, bytesInBuffer是缓冲中还有多少可以写入的空间,然后得到缓冲区里面拷贝的自己大小bytesToCopy = bytesInBuffer >= remainInSrcBuffer ? remainInSrcBuffer : bytesInBuffer;如果缓冲区序号等于内存文件file->buffers.size()那么需要重新生成要写入的缓冲,并且加入到file->buffers.push_back( b );内存文件缓存buffers的数组中,否则的话, 如果上次缓冲还没有写完的话,利用上次剩余的缓冲区.这里可以试想一下,如果写入三次,前两次都有写满,第三次没有写满,那么可以得到第三个缓冲的偏移量, bufferNumber=2 是第三个缓冲区的序号,也就是这次写入缓冲的开始位置.在做循环,每次更新pointer的值,以及内存文件的长度值.
(b). void RAMIndexOutput::writeTo方法:
void RAMIndexOutput::writeTo(IndexOutput* out)
{
//先刷新,flush会调用flushBuffer的方法
flush();
int64_t end = file->length; //虚拟文件长度
int64_t pos = 0;
int32_t p = 0;
while (pos < end)
{
//每次写入长度设置为:BUFFER_SIZE
int32_t length = CL_NS(store)::BufferedIndexOutput::BUFFER_SIZE;
int64_t nextPos = pos + length;
if (nextPos > end)
{
//at the last buffer
length = (int32_t)(end - pos);
}
//写入字节到内存文件缓冲数组中的buffers里面
out->writeBytes((uint8_t*)file->buffers[p++], length);
pos = nextPos;
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: