libcoubase源码解析
2015-08-19 19:15
531 查看
libcouchbase是couchbase的客户端库,有很多的语言版本,现在看的是c语言2.0.6版本。
libcouchbase支持时间循环使用libevent或者libev,并且在默认情况下,优先选择libevent,如果不存在libevent,再使用libev,这样的逻辑选择在函数栈
#0 create_v1 (io=0x7fffffffd5d8, options=0x7fffffffd500) at src/iofactory.c:218
#1 0x00002aaaaaacd34c in create_v0 (options=0x7fffffffd520, io=0x7fffffffd5d8) at src/iofactory.c:202
#2 lcb_create_io_ops (io=0x7fffffffd5d8, io_opts=<optimized out>) at src/iofactory.c:172
#3 0x00002aaaaaac7a9e in lcb_create (instance=0x2aaaace8b220, options=<optimized out>) at src/instance.c:311
中能够提现出来。
作为一个异步的网络程序,需要下面三个关键部件:
1 读写事件循环,更新接口
2 读写缓冲
3 读写事件回掉回掉
对应libcouchbase,为:
1 lcb_io_run_event_loop,这是个事件循环接口,因为选择的libev或者libevent不同而具体实现不同。lcb_io_update_event,这是事件更新接口,用于开始或者结束事件监测。
2 ringbuffer,作为读写的缓冲层,在可读事件发生后,会从socket上读数据到buffer中,当可写事件发生后,会从socket上写数据到buffer中
3 lcb_server_event_handler,在读写事件发生后,调用具体的读写函数:do_send_data和do_read_data
真正进行写操作的栈如下:
#0 0x0000003267c0ed20 in sendmsg () from /lib64/libpthread.so.0
#1 0x00002aee4385128c in lcb_io_sendv (iops=0x2aee435dc540, sock=<optimized out>, iov=<optimized out>, niov=<optimized out>) at plugins/io/libev/plugin-libev.c:113
#2 0x00002aee411f0f6a in do_send_data (c=<optimized out>) at src/event.c:318
#3 lcb_server_event_handler (sock=<optimized out>, which=4, arg=0x2aee437b6108) at src/event.c:377
#4 0x00002aee423d23cd in ev_invoke_pending (loop=0x2aee435d7200) at ev.c:2994
#5 0x00002aee423d32bf in ev_run (loop=0x2aee435d7200, flags=0) at ev.c:3394
#6 0x00002aee43850de6 in lcb_io_run_event_loop (iops=<optimized out>) at plugins/io/libev/plugin-libev.c:337
#7 0x00002aee411fcce3 in lcb_wait (instance=0x2aee437ab800) at src/wait.c:67
作为客户端需要一个结构体描述对端的server,如下:
/**
* The structure representing each couchbase server
*/
struct lcb_server_st {
/** The server index in the list */
int index;
/** Non-zero for node is using for configuration */
int is_config_node;
/** The name of the server */
char *hostname;
/** The servers port */
char *port;
/** The server endpoint as hostname:port */
char *authority;
/** The Couchbase Views API endpoint base */
char *couch_api_base;
/** The REST API server as hostname:port */
char *rest_api_server;
/** The socket to the server */
lcb_socket_t sock;
/** The address information for this server (the one to release) */
struct addrinfo *root_ai;
/** The address information for this server (the one we're trying) */
struct addrinfo *curr_ai;
/** The output buffer for this server */
ringbuffer_t output;
/** The sent buffer for this server so that we can resend the
* command to another server if the bucket is moved... */
ringbuffer_t cmd_log;
ringbuffer_t output_cookies;
/**
* The pending buffer where we write data until we're in a
* connected state;
*/
ringbuffer_t pending;
ringbuffer_t pending_cookies;
/** The input buffer for this server */
ringbuffer_t input;
/** The set of the pointers to HTTP requests to couchbase (Views and
* Management API) */
hashset_t http_requests;
/** The SASL object used for this server */
sasl_conn_t *sasl_conn;
/** The event item representing _this_ object */
void *event;
/** The timer bound to the socket to implement timeouts */
void *timer;
/** Is this server in a connected state (done with sasl auth) */
int connected;
/* Pointer back to the instance */
lcb_t instance;
};
其中ringbuffer_t 用来做缓存。
其中lcb_t实际是一个指针。指向的结构体如下:
struct lcb_st {
/**
* the type of the connection:
* * LCB_TYPE_BUCKET
* NULL for bucket means "default" bucket
* * LCB_TYPE_CLUSTER
* the bucket argument ignored and all data commands will
* return LCB_EBADHANDLE
*/
lcb_type_t type;
/** The couchbase host */
char host[NI_MAXHOST + 1];
/** The port of the couchbase server */
char port[NI_MAXSERV + 1];
/** The URL request to send to the server */
char *http_uri;
lcb_size_t n_http_uri_sent;
/** The event item representing _this_ object */
void *event;
/** The current vbucket config handle */
VBUCKET_CONFIG_HANDLE vbucket_config;
struct {
char *header;
buffer_t input;
lcb_size_t chunk_size;
buffer_t chunk;
} vbucket_stream;
struct lcb_io_opt_st *io;
/** The number of weird things happened with config node
* This counter reflects event on memcached port (default 11210),
* but used to make decisions about healthiness of the
* configuration port (default 8091).
*/
lcb_size_t weird_things;
lcb_size_t weird_things_threshold;
/* The current synchronous mode */
lcb_syncmode_t syncmode;
lcb_socket_t sock;
struct addrinfo *ai;
struct addrinfo *curr_ai;
/** The number of couchbase server in the configuration */
lcb_size_t nservers;
/** The array of the couchbase servers */
lcb_server_t *servers;
/** if non-zero, backup_nodes entries should be freed before
freeing the pointer itself */
int should_free_backup_nodes;
/** The array of last known nodes as hostname:port */
char **backup_nodes;
/** The current connect index */
int backup_idx;
/** The type of the key distribution */
VBUCKET_DISTRIBUTION_TYPE dist_type;
/** The number of replicas */
lcb_uint16_t nreplicas;
/** The number of vbuckets */
lcb_uint16_t nvbuckets;
/** A map from the vbucket to the server hosting the vbucket */
lcb_vbucket_t *vb_server_map;
vbucket_state_listener_t vbucket_state_listener;
/* credentials needed to operate cluster via REST API */
char *username;
char *password;
struct {
const char *name;
union {
sasl_secret_t secret;
char buffer[256];
} password;
sasl_callback_t callbacks[4];
} sasl;
/** The set of the timers */
hashset_t timers;
/** The set of the pointers to HTTP requests to Cluster */
hashset_t http_requests;
struct lcb_callback_st callbacks;
struct lcb_histogram_st *histogram;
lcb_uint32_t seqno;
int wait;
int connected;
/** Is IPv6 enabled */
lcb_ipv6_t ipv6;
const void *cookie;
lcb_error_t last_error;
struct {
hrtime_t next;
void *event;
lcb_uint32_t usec;
} timeout;
struct {
lcb_compat_t type;
union {
struct {
time_t mtime;
char *cachefile;
int updating;
int needs_update;
} cached;
} value;
} compat;
#ifdef LCB_DEBUG
lcb_debug_st debug;
#endif
};
所以和io相关的操作是这样调用的:
server->instance->io->v.v0.update_event
couchbase client是通过长链接的方式和server链接的,这样每一个client只需要在创建的时候进行一次事件循环初始化,后续的事件更新都是在回调中进行的。
libcouchbase支持时间循环使用libevent或者libev,并且在默认情况下,优先选择libevent,如果不存在libevent,再使用libev,这样的逻辑选择在函数栈
#0 create_v1 (io=0x7fffffffd5d8, options=0x7fffffffd500) at src/iofactory.c:218
#1 0x00002aaaaaacd34c in create_v0 (options=0x7fffffffd520, io=0x7fffffffd5d8) at src/iofactory.c:202
#2 lcb_create_io_ops (io=0x7fffffffd5d8, io_opts=<optimized out>) at src/iofactory.c:172
#3 0x00002aaaaaac7a9e in lcb_create (instance=0x2aaaace8b220, options=<optimized out>) at src/instance.c:311
中能够提现出来。
作为一个异步的网络程序,需要下面三个关键部件:
1 读写事件循环,更新接口
2 读写缓冲
3 读写事件回掉回掉
对应libcouchbase,为:
1 lcb_io_run_event_loop,这是个事件循环接口,因为选择的libev或者libevent不同而具体实现不同。lcb_io_update_event,这是事件更新接口,用于开始或者结束事件监测。
2 ringbuffer,作为读写的缓冲层,在可读事件发生后,会从socket上读数据到buffer中,当可写事件发生后,会从socket上写数据到buffer中
3 lcb_server_event_handler,在读写事件发生后,调用具体的读写函数:do_send_data和do_read_data
真正进行写操作的栈如下:
#0 0x0000003267c0ed20 in sendmsg () from /lib64/libpthread.so.0
#1 0x00002aee4385128c in lcb_io_sendv (iops=0x2aee435dc540, sock=<optimized out>, iov=<optimized out>, niov=<optimized out>) at plugins/io/libev/plugin-libev.c:113
#2 0x00002aee411f0f6a in do_send_data (c=<optimized out>) at src/event.c:318
#3 lcb_server_event_handler (sock=<optimized out>, which=4, arg=0x2aee437b6108) at src/event.c:377
#4 0x00002aee423d23cd in ev_invoke_pending (loop=0x2aee435d7200) at ev.c:2994
#5 0x00002aee423d32bf in ev_run (loop=0x2aee435d7200, flags=0) at ev.c:3394
#6 0x00002aee43850de6 in lcb_io_run_event_loop (iops=<optimized out>) at plugins/io/libev/plugin-libev.c:337
#7 0x00002aee411fcce3 in lcb_wait (instance=0x2aee437ab800) at src/wait.c:67
作为客户端需要一个结构体描述对端的server,如下:
/**
* The structure representing each couchbase server
*/
struct lcb_server_st {
/** The server index in the list */
int index;
/** Non-zero for node is using for configuration */
int is_config_node;
/** The name of the server */
char *hostname;
/** The servers port */
char *port;
/** The server endpoint as hostname:port */
char *authority;
/** The Couchbase Views API endpoint base */
char *couch_api_base;
/** The REST API server as hostname:port */
char *rest_api_server;
/** The socket to the server */
lcb_socket_t sock;
/** The address information for this server (the one to release) */
struct addrinfo *root_ai;
/** The address information for this server (the one we're trying) */
struct addrinfo *curr_ai;
/** The output buffer for this server */
ringbuffer_t output;
/** The sent buffer for this server so that we can resend the
* command to another server if the bucket is moved... */
ringbuffer_t cmd_log;
ringbuffer_t output_cookies;
/**
* The pending buffer where we write data until we're in a
* connected state;
*/
ringbuffer_t pending;
ringbuffer_t pending_cookies;
/** The input buffer for this server */
ringbuffer_t input;
/** The set of the pointers to HTTP requests to couchbase (Views and
* Management API) */
hashset_t http_requests;
/** The SASL object used for this server */
sasl_conn_t *sasl_conn;
/** The event item representing _this_ object */
void *event;
/** The timer bound to the socket to implement timeouts */
void *timer;
/** Is this server in a connected state (done with sasl auth) */
int connected;
/* Pointer back to the instance */
lcb_t instance;
};
其中ringbuffer_t 用来做缓存。
其中lcb_t实际是一个指针。指向的结构体如下:
struct lcb_st {
/**
* the type of the connection:
* * LCB_TYPE_BUCKET
* NULL for bucket means "default" bucket
* * LCB_TYPE_CLUSTER
* the bucket argument ignored and all data commands will
* return LCB_EBADHANDLE
*/
lcb_type_t type;
/** The couchbase host */
char host[NI_MAXHOST + 1];
/** The port of the couchbase server */
char port[NI_MAXSERV + 1];
/** The URL request to send to the server */
char *http_uri;
lcb_size_t n_http_uri_sent;
/** The event item representing _this_ object */
void *event;
/** The current vbucket config handle */
VBUCKET_CONFIG_HANDLE vbucket_config;
struct {
char *header;
buffer_t input;
lcb_size_t chunk_size;
buffer_t chunk;
} vbucket_stream;
struct lcb_io_opt_st *io;
/** The number of weird things happened with config node
* This counter reflects event on memcached port (default 11210),
* but used to make decisions about healthiness of the
* configuration port (default 8091).
*/
lcb_size_t weird_things;
lcb_size_t weird_things_threshold;
/* The current synchronous mode */
lcb_syncmode_t syncmode;
lcb_socket_t sock;
struct addrinfo *ai;
struct addrinfo *curr_ai;
/** The number of couchbase server in the configuration */
lcb_size_t nservers;
/** The array of the couchbase servers */
lcb_server_t *servers;
/** if non-zero, backup_nodes entries should be freed before
freeing the pointer itself */
int should_free_backup_nodes;
/** The array of last known nodes as hostname:port */
char **backup_nodes;
/** The current connect index */
int backup_idx;
/** The type of the key distribution */
VBUCKET_DISTRIBUTION_TYPE dist_type;
/** The number of replicas */
lcb_uint16_t nreplicas;
/** The number of vbuckets */
lcb_uint16_t nvbuckets;
/** A map from the vbucket to the server hosting the vbucket */
lcb_vbucket_t *vb_server_map;
vbucket_state_listener_t vbucket_state_listener;
/* credentials needed to operate cluster via REST API */
char *username;
char *password;
struct {
const char *name;
union {
sasl_secret_t secret;
char buffer[256];
} password;
sasl_callback_t callbacks[4];
} sasl;
/** The set of the timers */
hashset_t timers;
/** The set of the pointers to HTTP requests to Cluster */
hashset_t http_requests;
struct lcb_callback_st callbacks;
struct lcb_histogram_st *histogram;
lcb_uint32_t seqno;
int wait;
int connected;
/** Is IPv6 enabled */
lcb_ipv6_t ipv6;
const void *cookie;
lcb_error_t last_error;
struct {
hrtime_t next;
void *event;
lcb_uint32_t usec;
} timeout;
struct {
lcb_compat_t type;
union {
struct {
time_t mtime;
char *cachefile;
int updating;
int needs_update;
} cached;
} value;
} compat;
#ifdef LCB_DEBUG
lcb_debug_st debug;
#endif
};
所以和io相关的操作是这样调用的:
server->instance->io->v.v0.update_event
couchbase client是通过长链接的方式和server链接的,这样每一个client只需要在创建的时候进行一次事件循环初始化,后续的事件更新都是在回调中进行的。