您的位置:首页 > 数据库 > Redis

【Redis源码剖析】 - Redis IO操作之rio

2016-05-17 10:37 543 查看
原创作品,转载请标明:/article/9431508.html

Reids内部封装了一个I/O层,称之为rio。今天我们就来简单介绍一下rio模块的具体实现。

本文主要涉及rio.h和rio.c两个文件。

1、rio结构体

关于文件读写操作和buffer的操作主要基于rio对象进行操作,我们先来看看rio结构体的定义,如下:

/* 系统IO操作的封装 */
struct _rio {
/* Backend functions.
* Since this functions do not tolerate short writes or reads the return
* value is simplified to: zero on error, non zero on complete success. */
/*  后端方法:函数的返回值为0表示发生错误,返回值为非0表示操作成功。 */

// 数据流读操作
size_t (*read)(struct _rio *, void *buf, size_t len);
// 数据流写操作
size_t (*write)(struct _rio *, const void *buf, size_t len);
// 读或写操作的当前偏移量
off_t (*tell)(struct _rio *);
// flush操作
int (*flush)(struct _rio *);
/* The update_cksum method if not NULL is used to compute the checksum of
* all the data that was read or written so far. The method should be
* designed so that can be called with the current checksum, and the buf
* and len fields pointing to the new block of data to add to the checksum
* computation. */
// 更新校验和
void (*update_cksum)(struct _rio *, const void *buf, size_t len);

/* The current checksum */
// 当前校验和
uint64_t cksum;

/* number of bytes read or written */
// 已读或已写的字节数
size_t processed_bytes;

/* maximum single read or write chunk size */
// 每次读或写操作的最大字节数
size_t max_processing_chunk;

/* Backend-specific vars. */
// io变量
union {
/* In-memory buffer target. */
// 内存缓冲区buffer结构体
struct {
// buffer中的内容,实际就是char数组
sds ptr;
// 偏移量
off_t pos;
} buffer;
/* Stdio file pointer target. */
// 文件结构体
struct {
// 打开的文件句柄
FILE *fp;
// 最后一个fsync后写入的字节数
off_t buffered; /* Bytes written since last fsync. */
// 多少字节进行一次fsync操作
off_t autosync; /* fsync after 'autosync' bytes written. */
} file;
/* Multiple FDs target (used to write to N sockets). */
// 封装了多个文件描述符结构体(用于写多个socket)
struct {
// 文件描述符数组
int *fds;       /* File descriptors. */
// 状态位,与fds对应
int *state;     /* Error state of each fd. 0 (if ok) or errno. */
// 文件描述符的个数
int numfds;
// 偏移量
off_t pos;
// 缓冲区
sds buf;
} fdset;
} io;
};


我们可以看到rio结构体主要包含以下三方面内容:

读写操作、获取偏移量操作等相关的回调函数。rio可以处理buffer、file、socket三种不同类型的I/O对象,不同的rio对象底层使用相应的系统调用完成read、write、tell、flush操作。比如,对于file rio对象,底层通过fwrite函数完成写操作,通过fread函数完成读操作(具体见源码)。

校验和操作。rio使用了RCR64算法计算校验和,具体实现可以参看crc64.h和crc64.c文件。

IO变量。_rio中的io成员是一个联合体,针对不同的I/O情况进行不同的处理:当执行内存buffer的I/O操作时,使用rio.buffer结构体;当执行文件I/O操作时,使用rio.file结构体;当执行socket的I/O操作时,使用rio.fdset结构体。

介绍完rio结构体后,我们来看看buffer rio对象的实现(file rio对象和socket rio对象的现实大家可以参考注释版源码,这里就不一一讲解)。

buffer的写操作实际上就是将数据存入r->io.buffer中,由
rioBufferWrite
函数实现:

/* 将buf中指定长度len的内容追加到rio对象的缓冲区中,操作成功返回1,否则返回0。*/
static size_t rioBufferWrite(rio *r, const void *buf, size_t len) {
// 调用sdscatlen实现append操作
r->io.buffer.ptr = sdscatlen(r->io.buffer.ptr,(char*)buf,len);
// 更新长度信息
r->io.buffer.pos += len;
return 1;
}


buffer的写操作实际上就是将数据从r->io.buffer中读出,由
rioBufferRead
函数实现:

/* 从rio对象的缓冲区中读取长度为len的内容到buf中,操作成功返回1,否则返回0。*/
static size_t rioBufferRead(rio *r, void *buf, size_t len) {
// 如果rio对象的缓冲区中内容的长度小于len,读取失败
if (sdslen(r->io.buffer.ptr)-r->io.buffer.pos < len)
return 0; /* not enough buffer to return len bytes. */
// 将缓冲区中的内容复制到buf
memcpy(buf,r->io.buffer.ptr+r->io.buffer.pos,len);
// 更新偏移量
r->io.buffer.pos += len;
return 1;
}


buffer的tell操作和flush操作如下:

/*  返回rio对象缓冲区的偏移量 */
static off_t rioBufferTell(rio *r) {
return r->io.buffer.pos;
}

/*  该函数什么事也没有做,直接返回1。*/
static int rioBufferFlush(rio *r) {
// REDIS_NOTUSED定义在redis.h文件中:#define REDIS_NOTUSED(V) ((void) V)
REDIS_NOTUSED(r);
return 1; /* Nothing to do, our write just appends to the buffer. */
}


根据上面定义的函数,我们就可以创建buffer rio对象:

/* 根据上面的方法定义的流为内存时使用的buffer rio对象 */
static const rio rioBufferIO = {
rioBufferRead,
rioBufferWrite,
rioBufferTell,
rioBufferFlush,
NULL,           /* update_checksum */
0,              /* current checksum */
0,              /* bytes read or written */
0,              /* read/write chunk size */
{ { NULL, 0 } } /* union for io-specific vars */
};


rioInitWithBuffer
函数负责初始化buffer rio对象,创建io.buffer缓冲区。

/* 初始化buffer io对象 */
void rioInitWithBuffer(rio *r, sds s) {
*r = rioBufferIO;
r->io.buffer.ptr = s;
r->io.buffer.pos = 0;
}


2、统一的读写操作接口

rio模块封装了Redis对buffer读写操作、file读写操作、socket读写操作,同时也定义了统一的read、write、tell、flus操作接口。

执行rio的写操作:

/*  将buf数组中的长度为len的字符写入rio对象中,写入成功返回1,写入失败返回0。*/
static inline size_t rioWrite(rio *r, const void *buf, size_t len) {
while (len) {
// 判断当前要求写入的字节数是否操作了max_processing_chunk规定的最大长度
size_t bytes_to_write = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
// 写入新的数据时,更新校验和字段
if (r->update_cksum) r->update_cksum(r,buf,bytes_to_write);
// 调用write方法执行写入操作
if (r->write(r,buf,bytes_to_write) == 0)
return 0;
// 更新buf下次写入的位置
buf = (char*)buf + bytes_to_write;
len -= bytes_to_write;
// 更新已写入的字节数
r->processed_bytes += bytes_to_write;
}
return 1;
}


执行rio的读操作:

/* 从rio对象中读出长度为len字节的数据并保存到buf数组中。读取成功返回1,读取失败返回0。*/
static inline size_t rioRead(rio *r, void *buf, size_t len) {
while (len) {
// 判断当前要求读出的字节数是否操作了max_processing_chunk规定的最大长度
size_t bytes_to_read = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
// 调用read方法读出数据到buf中
if (r->read(r,buf,bytes_to_read) == 0)
return 0;
// 更新buf下次写入的位置
if (r->update_cksum) r->update_cksum(r,buf,bytes_to_read);
buf = (char*)buf + bytes_to_read;
len -= bytes_to_read;
// 更新已读出的字节数
r->processed_bytes += bytes_to_read;
}
return 1;
}


执行rio的tell操作:

/* 返回当前偏移量 */
static inline off_t rioTell(rio *r) {
return r->tell(r);
}


执行rio的flush操作:

/* flush操作 */
static inline int rioFlush(rio *r) {
return r->flush(r);
}


从上面四个函数的实现中可以看出它们都是调用rio对象内部的回调函数来执行执行相应的write、read、tell、flush操作的,这些回调函数具体完成的操作又跟rio对象是buffer rio对象还是file rio对象,亦或是socket rio对象有关。

3、其它

Redis中的rio模块还封装了一些辅助生成AOF协议的函数,这些函数主要包括:

// 以"*<count>\r\n"的形式将count以字符串的格式写入rio对象中,返回写入的字节数。
size_t rioWriteBulkCount(rio *r, char prefix, int count);
// 以"$<count>\r\n<payload>\r\n"格式往rio对象中写入二进制安全字符串。
size_t rioWriteBulkString(rio *r, const char *buf, size_t len);
// 以"$<count>\r\n<payload>\r\n"的格式往rio对象中写入long long类型的值。
size_t rioWriteBulkLongLong(rio *r, long long l);
// 以"$<count>\r\n<payload>\r\n"的格式往rio对象中写入double类型的值。
size_t rioWriteBulkDouble(rio *r, double d);


Redis中的rio模块对系统I/O函数进行封装。在Redis内部实现中,RDB、AOF等都使用该模块的功能,所以这里简要介绍了rio模块的实现原理。

源码见:

rio.h:https://github.com/xiejingfa/the-annotated-redis-3.0/blob/master/rio.h

rio.c:https://github.com/xiejingfa/the-annotated-redis-3.0/blob/master/rio.c
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: