Libevent源码分析(六)--- bufferevent
2016-04-01 11:48
513 查看
上一节说过,libevent提供六种bufferevent类型,后面会详细分析其中的两个:bufferevent_sock和bufferevent_async.下面是bufferevent的详细定义:
其中ev_base指向bufferevent所属的event_base ,ev_read,ev_write是读写事件,timeout_read和timeout_write是读写超时时间,readcb,writecb和errorcb分别是读回调,写回调和事件回调,因为版本兼容问题使用errorcb命名。input和output是evbuffer类型的读写缓存。enabled表示支持的事件,目前只有EV_READ 和EV_WRITE。wm_read和wm_write代表读写缓存的伐值,be_ops是bufferevent_ops类型变量,它的定义如下:
bufferevent_ops 的实现和eventop比较类似,都是定义了一组指针,不同的bufferevent实现可以自定义这些具体操作。比如bufferevent_sock的对应定义:
接下来分析一下buffevent_sock的实现方式。
bufferevent_socket_new方法用于创建一个bufferevent_socket类型的bufferevent。如果是在win32环境下则会默认调用iocp的对应实现,即bufferevent_async_new。该方法还会创建一个bufferevent_private变量,下面是他的定义:
bufferevent_private结构的第一个变量是bufferevent类型的,其实每一个bufferevent都对应一个bufferevent_private变量,只是对用户来说是透明的,用户只需要了解bufferevent即可。bufferevent_private的变量基本都是用于记录状态。
继续分析bufferevent_socket_new函数,创建bufferevent_private之后调用bufferevent_init_common函数,这个函数定义在bufferevent.c文件中,该文件中的函数都是各个bufferevent类型通用的函数。
bufferevent_init_common比较简单,需要注意 bufferEvent默认开启EV_WRITE,EV_READ需要手动打开,如果需要支持多线程则为bufferevent创建锁,另外在目前的版本中,UNLOCK_CALLBACKS需要DEFER_CALLBACKS作为前置条件。
继续分析bufferevent_socket_new函数:
调用bufferevent_init_common之后会将bufev->output添加EVBUFFER_FLAG_DRAINS_TO_FD标记,这样output就支持从文件读取数据了,接下来设置读写事件回掉,最后添加bufferevent_socket_outbuf_cb作为output的call,当output中有数据时调用bufferevent_socket_outbuf_cb,这样就可以不必一直监听写事件,bufferevent_writecb在数据全部写出之后可以移除写事件, 然后在bufferevent_socket_outbuf_cb中判断是否需要添加事件。
初始化bufferevent之后需要调用be_socket_setfd设置网络套接字来监听读写事件:
be_socket_setfd中设置的套接字必须是已经建立好连接的socket。如果是client端,bufferevent_sock提供了bufferevent_socket_connect用于连接套接字,但是如果是server端,libevent没有提供listen的相关函数,不过libevent提供了listener.c文件可以用于监听连接。
win32的connect使用iocp实现,将在下一节iocp章节详细分析,这里只看其他平台的实现方式。evutil_socket_connect是libevent提供的工具函数,它的返回值有三种情况:0代表正在连接,此时需要调用be_socket_enable激活写事件。当连接成功时会调用bufferevent_writecb回掉。1代表已经连接成功,此时调用event_active直接手动触发bufferevent_writecb事件处理连接。-1代表连接被拒绝,此时同样调用event_active处理连接失败情况。
下面是bufferevent_writecb的具体实现:
bufferevent_writecb需要处理两件事,一个是连接逻辑,一个是发送数据逻辑,注意当处理连接逻辑时,如果当前没有激活EV_WRITE或者处于write_suspended状态,需要删除写监听,因为当前的写监听事件只用于处理连接。另外当处理数据逻辑时如果output的数据长度为0,同样需要删除写事件,因为output的callback在有数据之后会重新添加写事件:
bufferevent_writecb和bufferevent_readcb还需要调用用户自定义的读写事件:
如果bufferevent的选项包含BEV_OPT_DEFER_CALLBACKS则需要使用延迟调用方式,延迟调用是在event_base的loop循环统一处理的。p->deferred.queued如果没有被设置过,则加入到ev_base的延迟调用队列。bufferevent_init_common中设置过defered对应的回掉:
这两个回掉只是使用锁的方式不同:
以上就是bufferevent_sock的实现方式,下一节我们将分析IOCP的实现方式。
struct bufferevent { /** Event base for which this bufferevent was created. */ struct event_base *ev_base; /** Pointer to a table of function pointers to set up how this bufferevent behaves. */ const struct bufferevent_ops *be_ops; /** A read event that triggers when a timeout has happened or a socket is ready to read data. Only used by some subtypes of bufferevent. */ struct event ev_read; /** A write event that triggers when a timeout has happened or a socket is ready to write data. Only used by some subtypes of bufferevent. */ struct event ev_write; /** An input buffer. Only the bufferevent is allowed to add data to this buffer, though the user is allowed to drain it. */ struct evbuffer *input; /** An input buffer. Only the bufferevent is allowed to drain data from this buffer, though the user is allowed to add it. */ struct evbuffer *output; struct event_watermark wm_read; struct event_watermark wm_write; bufferevent_data_cb readcb; bufferevent_data_cb writecb; /* This should be called 'eventcb', but renaming it would break * backward compatibility */ bufferevent_event_cb errorcb; void *cbarg; struct timeval timeout_read; struct timeval timeout_write; /** Events that are currently enabled: currently EV_READ and EV_WRITE are supported. */ short enabled; };
其中ev_base指向bufferevent所属的event_base ,ev_read,ev_write是读写事件,timeout_read和timeout_write是读写超时时间,readcb,writecb和errorcb分别是读回调,写回调和事件回调,因为版本兼容问题使用errorcb命名。input和output是evbuffer类型的读写缓存。enabled表示支持的事件,目前只有EV_READ 和EV_WRITE。wm_read和wm_write代表读写缓存的伐值,be_ops是bufferevent_ops类型变量,它的定义如下:
struct bufferevent_ops { /** The name of the bufferevent's type. */ const char *type; /** At what offset into the implementation type will we find a bufferevent structure? Example: if the type is implemented as struct bufferevent_x { int extra_data; struct bufferevent bev; } then mem_offset should be offsetof(struct bufferevent_x, bev) */ off_t mem_offset; /** Enables one or more of EV_READ|EV_WRITE on a bufferevent. Does not need to adjust the 'enabled' field. Returns 0 on success, -1 on failure. */ int (*enable)(struct bufferevent *, short); /** Disables one or more of EV_READ|EV_WRITE on a bufferevent. Does not need to adjust the 'enabled' field. Returns 0 on success, -1 on failure. */ int (*disable)(struct bufferevent *, short); /** Free any storage and deallocate any extra data or structures used in this implementation. */ void (*destruct)(struct bufferevent *); /** Called when the timeouts on the bufferevent have changed.*/ int (*adj_timeouts)(struct bufferevent *); /** Called to flush data. */ int (*flush)(struct bufferevent *, short, enum bufferevent_flush_mode); /** Called to access miscellaneous fields. */ int (*ctrl)(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *); };
bufferevent_ops 的实现和eventop比较类似,都是定义了一组指针,不同的bufferevent实现可以自定义这些具体操作。比如bufferevent_sock的对应定义:
const struct bufferevent_ops bufferevent_ops_socket = { "socket", evutil_offsetof(struct bufferevent_private, bev), be_socket_enable, be_socket_disable, be_socket_destruct, be_socket_adj_timeouts, be_socket_flush, be_socket_ctrl, };
接下来分析一下buffevent_sock的实现方式。
struct bufferevent * bufferevent_new(evutil_socket_t fd, bufferevent_data_cb readcb, bufferevent_data_cb writecb, bufferevent_event_cb eventcb, void *cbarg) { struct bufferevent *bufev; if (!(bufev = bufferevent_socket_new(NULL, fd, 0))) return NULL; bufferevent_setcb(bufev, readcb, writecb, eventcb, cbarg); return bufev; } struct bufferevent * bufferevent_socket_new(struct event_base *base, evutil_socket_t fd, int options) { struct bufferevent_private *bufev_p; struct bufferevent *bufev; #ifdef WIN32 if (base && event_base_get_iocp(base)) return bufferevent_async_new(base, fd, options); #endif if ((bufev_p = mm_calloc(1, sizeof(struct bufferevent_private)))== NULL) return NULL; if (bufferevent_init_common(bufev_p, base, &bufferevent_ops_socket, options) < 0) { mm_free(bufev_p); return NULL; } bufev = &bufev_p->bev; evbuffer_set_flags(bufev->output, EVBUFFER_FLAG_DRAINS_TO_FD); event_assign(&bufev->ev_read, bufev->ev_base, fd, EV_READ|EV_PERSIST, bufferevent_readcb, bufev); event_assign(&bufev->ev_write, bufev->ev_base, fd, EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev); evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev); evbuffer_freeze(bufev->input, 0); evbuffer_freeze(bufev->output, 1); return bufev; }
bufferevent_socket_new方法用于创建一个bufferevent_socket类型的bufferevent。如果是在win32环境下则会默认调用iocp的对应实现,即bufferevent_async_new。该方法还会创建一个bufferevent_private变量,下面是他的定义:
struct bufferevent_private { /** The underlying bufferevent structure. */ struct bufferevent bev; /** Evbuffer callback to enforce watermarks on input. */ struct evbuffer_cb_entry *read_watermarks_cb; /** If set, we should free the lock when we free the bufferevent. */ unsigned own_lock : 1; /** Flag: set if we have deferred callbacks and a read callback is * pending. */ unsigned readcb_pending : 1; /** Flag: set if we have deferred callbacks and a write callback is * pending. */ unsigned writecb_pending : 1; /** Flag: set if we are currently busy connecting. */ unsigned connecting : 1; /** Flag: set if a connect failed prematurely; this is a hack for * getting around the bufferevent abstraction. */ unsigned connection_refused : 1; /** Set to the events pending if we have deferred callbacks and * an events callback is pending. */ short eventcb_pending; /** If set, read is suspended until one or more conditions are over. * The actual value here is a bitfield of those conditions; see the * BEV_SUSPEND_* flags above. */ bufferevent_suspend_flags read_suspended; /** If set, writing is suspended until one or more conditions are over. * The actual value here is a bitfield of those conditions; see the * BEV_SUSPEND_* flags above. */ bufferevent_suspend_flags write_suspended; /** Set to the current socket errno if we have deferred callbacks and * an events callback is pending. */ int errno_pending; /** The DNS error code for bufferevent_socket_connect_hostname */ int dns_error; /** Used to implement deferred callbacks */ struct deferred_cb deferred; /** The options this bufferevent was constructed with */ enum bufferevent_options options; /** Current reference count for this bufferevent. */ int refcnt; /** Lock for this bufferevent. Shared by the inbuf and the outbuf. * If NULL, locking is disabled. */ void *lock; /** Rate-limiting information for this bufferevent */ struct bufferevent_rate_limit *rate_limiting; };
bufferevent_private结构的第一个变量是bufferevent类型的,其实每一个bufferevent都对应一个bufferevent_private变量,只是对用户来说是透明的,用户只需要了解bufferevent即可。bufferevent_private的变量基本都是用于记录状态。
继续分析bufferevent_socket_new函数,创建bufferevent_private之后调用bufferevent_init_common函数,这个函数定义在bufferevent.c文件中,该文件中的函数都是各个bufferevent类型通用的函数。
int bufferevent_init_common(struct bufferevent_private *bufev_private, struct event_base *base, const struct bufferevent_ops *ops, enum bufferevent_options options) { struct bufferevent *bufev = &bufev_private->bev; if (!bufev->input) { if ((bufev->input = evbuffer_new()) == NULL) return -1; } if (!bufev->output) { if ((bufev->output = evbuffer_new()) == NULL) { evbuffer_free(bufev->input); return -1; } } bufev_private->refcnt = 1; bufev->ev_base = base; /* Disable timeouts. */ evutil_timerclear(&bufev->timeout_read); evutil_timerclear(&bufev->timeout_write); bufev->be_ops = ops; /* * Set to EV_WRITE so that using bufferevent_write is going to * trigger a callback. Reading needs to be explicitly enabled * because otherwise no data will be available. */ bufev->enabled = EV_WRITE; #ifndef _EVENT_DISABLE_THREAD_SUPPORT if (options & BEV_OPT_THREADSAFE) { if (bufferevent_enable_locking(bufev, NULL) < 0) { /* cleanup */ evbuffer_free(bufev->input); evbuffer_free(bufev->output); bufev->input = NULL; bufev->output = NULL; return -1; } } #endif if ((options & (BEV_OPT_DEFER_CALLBACKS|BEV_OPT_UNLOCK_CALLBACKS)) == BEV_OPT_UNLOCK_CALLBACKS) { event_warnx("UNLOCK_CALLBACKS requires DEFER_CALLBACKS"); return -1; } if (options & BEV_OPT_DEFER_CALLBACKS) { if (options & BEV_OPT_UNLOCK_CALLBACKS) event_deferred_cb_init(&bufev_private->deferred, bufferevent_run_deferred_callbacks_unlocked, bufev_private); else event_deferred_cb_init(&bufev_private->deferred, bufferevent_run_deferred_callbacks_locked, bufev_private); } bufev_private->options = options; evbuffer_set_parent(bufev->input, bufev); evbuffer_set_parent(bufev->output, bufev); return 0; }
bufferevent_init_common比较简单,需要注意 bufferEvent默认开启EV_WRITE,EV_READ需要手动打开,如果需要支持多线程则为bufferevent创建锁,另外在目前的版本中,UNLOCK_CALLBACKS需要DEFER_CALLBACKS作为前置条件。
继续分析bufferevent_socket_new函数:
evbuffer_set_flags(bufev->output, EVBUFFER_FLAG_DRAINS_TO_FD); event_assign(&bufev->ev_read, bufev->ev_base, fd, EV_READ|EV_PERSIST, bufferevent_readcb, bufev); event_assign(&bufev->ev_write, bufev->ev_base, fd, EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev); evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);
调用bufferevent_init_common之后会将bufev->output添加EVBUFFER_FLAG_DRAINS_TO_FD标记,这样output就支持从文件读取数据了,接下来设置读写事件回掉,最后添加bufferevent_socket_outbuf_cb作为output的call,当output中有数据时调用bufferevent_socket_outbuf_cb,这样就可以不必一直监听写事件,bufferevent_writecb在数据全部写出之后可以移除写事件, 然后在bufferevent_socket_outbuf_cb中判断是否需要添加事件。
初始化bufferevent之后需要调用be_socket_setfd设置网络套接字来监听读写事件:
static void be_socket_setfd(struct bufferevent *bufev, evutil_socket_t fd) { BEV_LOCK(bufev); EVUTIL_ASSERT(bufev->be_ops == &bufferevent_ops_socket); event_del(&bufev->ev_read); event_del(&bufev->ev_write); event_assign(&bufev->ev_read, bufev->ev_base, fd, EV_READ|EV_PERSIST, bufferevent_readcb, bufev); event_assign(&bufev->ev_write, bufev->ev_base, fd, EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev); if (fd >= 0) bufferevent_enable(bufev, bufev->enabled); BEV_UNLOCK(bufev); } int bufferevent_enable(struct bufferevent *bufev, short event) { struct bufferevent_private *bufev_private = EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); short impl_events = event; int r = 0; _bufferevent_incref_and_lock(bufev); if (bufev_private->read_suspended) impl_events &= ~EV_READ; if (bufev_private->write_suspended) impl_events &= ~EV_WRITE; bufev->enabled |= event; if (impl_events && bufev->be_ops->enable(bufev, impl_events) < 0) r = -1; _bufferevent_decref_and_unlock(bufev); return r; }
be_socket_setfd中设置的套接字必须是已经建立好连接的socket。如果是client端,bufferevent_sock提供了bufferevent_socket_connect用于连接套接字,但是如果是server端,libevent没有提供listen的相关函数,不过libevent提供了listener.c文件可以用于监听连接。
int bufferevent_socket_connect(struct bufferevent *bev, struct sockaddr *sa, int socklen) { struct bufferevent_private *bufev_p = EVUTIL_UPCAST(bev, struct bufferevent_private, bev); evutil_socket_t fd; int r = 0; int result=-1; int ownfd = 0; _bufferevent_incref_and_lock(bev); if (!bufev_p) goto done; fd = bufferevent_getfd(bev); if (fd < 0) { if (!sa) goto done; fd = socket(sa->sa_family, SOCK_STREAM, 0); if (fd < 0) goto done; if (evutil_make_socket_nonblocking(fd)<0) goto done; ownfd = 1; } if (sa) { #ifdef WIN32 if (bufferevent_async_can_connect(bev)) { bufferevent_setfd(bev, fd); r = bufferevent_async_connect(bev, fd, sa, socklen); if (r < 0) goto freesock; bufev_p->connecting = 1; result = 0; goto done; } else #endif r = evutil_socket_connect(&fd, sa, socklen); if (r < 0) goto freesock; } #ifdef WIN32 /* ConnectEx() isn't always around, even when IOCP is enabled. * Here, we borrow the socket object's write handler to fall back * on a non-blocking connect() when ConnectEx() is unavailable. */ if (BEV_IS_ASYNC(bev)) { event_assign(&bev->ev_write, bev->ev_base, fd, EV_WRITE|EV_PERSIST, bufferevent_writecb, bev); } #endif bufferevent_setfd(bev, fd); if (r == 0) { if (! be_socket_enable(bev, EV_WRITE)) { bufev_p->connecting = 1; result = 0; goto done; } } else if (r == 1) { /* The connect succeeded already. How very BSD of it. */ result = 0; bufev_p->connecting = 1; event_active(&bev->ev_write, EV_WRITE, 1); } else { /* The connect failed already. How very BSD of it. */ bufev_p->connection_refused = 1; bufev_p->connecting = 1; result = 0; event_active(&bev->ev_write, EV_WRITE, 1); } goto done; freesock: _bufferevent_run_eventcb(bev, BEV_EVENT_ERROR); if (ownfd) evutil_closesocket(fd); /* do something about the error? */ done: _bufferevent_decref_and_unlock(bev); return result; }
win32的connect使用iocp实现,将在下一节iocp章节详细分析,这里只看其他平台的实现方式。evutil_socket_connect是libevent提供的工具函数,它的返回值有三种情况:0代表正在连接,此时需要调用be_socket_enable激活写事件。当连接成功时会调用bufferevent_writecb回掉。1代表已经连接成功,此时调用event_active直接手动触发bufferevent_writecb事件处理连接。-1代表连接被拒绝,此时同样调用event_active处理连接失败情况。
下面是bufferevent_writecb的具体实现:
static void bufferevent_writecb(evutil_socket_t fd, short event, void *arg) { struct bufferevent *bufev = arg; struct bufferevent_private *bufev_p = EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); int res = 0; short what = BEV_EVENT_WRITING; int connected = 0; ev_ssize_t atmost = -1; _bufferevent_incref_and_lock(bufev); if (event == EV_TIMEOUT) { /* Note that we only check for event==EV_TIMEOUT. If * event==EV_TIMEOUT|EV_WRITE, we can safely ignore the * timeout, since a read has occurred */ what |= BEV_EVENT_TIMEOUT; goto error; } if (bufev_p->connecting) { int c = evutil_socket_finished_connecting(fd); /* we need to fake the error if the connection was refused * immediately - usually connection to localhost on BSD */ if (bufev_p->connection_refused) { bufev_p->connection_refused = 0; c = -1; } if (c == 0) goto done; bufev_p->connecting = 0; if (c < 0) { event_del(&bufev->ev_write); event_del(&bufev->ev_read); _bufferevent_run_eventcb(bufev, BEV_EVENT_ERROR); goto done; } else { connected = 1; #ifdef WIN32 if (BEV_IS_ASYNC(bufev)) { event_del(&bufev->ev_write); bufferevent_async_set_connected(bufev); _bufferevent_run_eventcb(bufev, BEV_EVENT_CONNECTED); goto done; } #endif _bufferevent_run_eventcb(bufev, BEV_EVENT_CONNECTED); if (!(bufev->enabled & EV_WRITE) || bufev_p->write_suspended) { event_del(&bufev->ev_write); goto done; } } } atmost = _bufferevent_get_write_max(bufev_p); if (bufev_p->write_suspended) goto done; if (evbuffer_get_length(bufev->output)) { evbuffer_unfreeze(bufev->output, 1); res = evbuffer_write_atmost(bufev->output, fd, atmost); evbuffer_freeze(bufev->output, 1); if (res == -1) { int err = evutil_socket_geterror(fd); if (EVUTIL_ERR_RW_RETRIABLE(err)) goto reschedule; what |= BEV_EVENT_ERROR; } else if (res == 0) { /* eof case XXXX Actually, a 0 on write doesn't indicate an EOF. An ECONNRESET might be more typical. */ what |= BEV_EVENT_EOF; } if (res <= 0) goto error; _bufferevent_decrement_write_buckets(bufev_p, res); } if (evbuffer_get_length(bufev->output) == 0) { event_del(&bufev->ev_write); } /* * Invoke the user callback if our buffer is drained or below the * low watermark. */ if ((res || !connected) && evbuffer_get_length(bufev->output) <= bufev->wm_write.low) { _bufferevent_run_writecb(bufev); } goto done; reschedule: if (evbuffer_get_length(bufev->output) == 0) { event_del(&bufev->ev_write); } goto done; error: bufferevent_disable(bufev, EV_WRITE); _bufferevent_run_eventcb(bufev, what); done: _bufferevent_decref_and_unlock(bufev); }
bufferevent_writecb需要处理两件事,一个是连接逻辑,一个是发送数据逻辑,注意当处理连接逻辑时,如果当前没有激活EV_WRITE或者处于write_suspended状态,需要删除写监听,因为当前的写监听事件只用于处理连接。另外当处理数据逻辑时如果output的数据长度为0,同样需要删除写事件,因为output的callback在有数据之后会重新添加写事件:
static void bufferevent_socket_outbuf_cb(struct evbuffer *buf, const struct evbuffer_cb_info *cbinfo, void *arg) { struct bufferevent *bufev = arg; struct bufferevent_private *bufev_p = EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); if (cbinfo->n_added && (bufev->enabled & EV_WRITE) && !event_pending(&bufev->ev_write, EV_WRITE, NULL) && !bufev_p->write_suspended) { /* Somebody added data to the buffer, and we would like to * write, and we were not writing. So, start writing. */ if (be_socket_add(&bufev->ev_write, &bufev->timeout_write) == -1) { /* Should we log this? */ } } }
bufferevent_writecb和bufferevent_readcb还需要调用用户自定义的读写事件:
void _bufferevent_run_writecb(struct bufferevent *bufev) { /* Requires that we hold the lock and a reference */ struct bufferevent_private *p = EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); if (bufev->writecb == NULL) return; if (p->options & BEV_OPT_DEFER_CALLBACKS) { p->writecb_pending = 1; if (!p->deferred.queued) SCHEDULE_DEFERRED(p); } else { bufev->writecb(bufev, bufev->cbarg); } } #define SCHEDULE_DEFERRED(bevp) \ do { \ bufferevent_incref(&(bevp)->bev); \ event_deferred_cb_schedule( \ event_base_get_deferred_cb_queue((bevp)->bev.ev_base), \ &(bevp)->deferred); \ } while (0)
如果bufferevent的选项包含BEV_OPT_DEFER_CALLBACKS则需要使用延迟调用方式,延迟调用是在event_base的loop循环统一处理的。p->deferred.queued如果没有被设置过,则加入到ev_base的延迟调用队列。bufferevent_init_common中设置过defered对应的回掉:
if (options & BEV_OPT_DEFER_CALLBACKS) { if (options & BEV_OPT_UNLOCK_CALLBACKS) event_deferred_cb_init(&bufev_private->deferred, bufferevent_run_deferred_callbacks_unlocked, bufev_private); else event_deferred_cb_init(&bufev_private->deferred, bufferevent_run_deferred_callbacks_locked, bufev_private); }
这两个回掉只是使用锁的方式不同:
static void bufferevent_run_deferred_callbacks_locked(struct deferred_cb *_, void *arg) { struct bufferevent_private *bufev_private = arg; struct bufferevent *bufev = &bufev_private->bev; BEV_LOCK(bufev); if ((bufev_private->eventcb_pending & BEV_EVENT_CONNECTED) && bufev->errorcb) { /* The "connected" happened before any reads or writes, so send it first. */ bufev_private->eventcb_pending &= ~BEV_EVENT_CONNECTED; bufev->errorcb(bufev, BEV_EVENT_CONNECTED, bufev->cbarg); } if (bufev_private->readcb_pending && bufev->readcb) { bufev_private->readcb_pending = 0; bufev->readcb(bufev, bufev->cbarg); } if (bufev_private->writecb_pending && bufev->writecb) { bufev_private->writecb_pending = 0; bufev->writecb(bufev, bufev->cbarg); } if (bufev_private->eventcb_pending && bufev->errorcb) { short what = bufev_private->eventcb_pending; int err = bufev_private->errno_pending; bufev_private->eventcb_pending = 0; bufev_private->errno_pending = 0; EVUTIL_SET_SOCKET_ERROR(err); bufev->errorcb(bufev, what, bufev->cbarg); } _bufferevent_decref_and_unlock(bufev); } static void bufferevent_run_deferred_callbacks_unlocked(struct deferred_cb *_, void *arg) { struct bufferevent_private *bufev_private = arg; struct bufferevent *bufev = &bufev_private->bev; BEV_LOCK(bufev); #define UNLOCKED(stmt) \ do { BEV_UNLOCK(bufev); stmt; BEV_LOCK(bufev); } while(0) if ((bufev_private->eventcb_pending & BEV_EVENT_CONNECTED) && bufev->errorcb) { /* The "connected" happened before any reads or writes, so send it first. */ bufferevent_event_cb errorcb = bufev->errorcb; void *cbarg = bufev->cbarg; bufev_private->eventcb_pending &= ~BEV_EVENT_CONNECTED; UNLOCKED(errorcb(bufev, BEV_EVENT_CONNECTED, cbarg)); } if (bufev_private->readcb_pending && bufev->readcb) { bufferevent_data_cb readcb = bufev->readcb; void *cbarg = bufev->cbarg; bufev_private->readcb_pending = 0; UNLOCKED(readcb(bufev, cbarg)); } if (bufev_private->writecb_pending && bufev->writecb) { bufferevent_data_cb writecb = bufev->writecb; void *cbarg = bufev->cbarg; bufev_private->writecb_pending = 0; UNLOCKED(writecb(bufev, cbarg)); } if (bufev_private->eventcb_pending && bufev->errorcb) { bufferevent_event_cb errorcb = bufev->errorcb; void *cbarg = bufev->cbarg; short what = bufev_private->eventcb_pending; int err = bufev_private->errno_pending; bufev_private->eventcb_pending = 0; bufev_private->errno_pending = 0; EVUTIL_SET_SOCKET_ERROR(err); UNLOCKED(errorcb(bufev,what,cbarg)); } _bufferevent_decref_and_unlock(bufev); #undef UNLOCKED }
以上就是bufferevent_sock的实现方式,下一节我们将分析IOCP的实现方式。
相关文章推荐
- JavaScript 10分钟入门
- 关于VS打开cshtml出现 未能完成该操作。无效指针
- 实例讲解jQuery EasyUI tree中state属性慎用
- jQuery插件infinitescroll参数【无限翻页】
- 【笔记】 《js权威指南》- 第3章 类型、值和变量 - 3.5 全局对象
- JavaScript 获取当前时间戳 (3种方式)
- extjs表单提交combobox提交值问题
- 关于图标和文字对齐的方法整理。
- webpack window 安装loader
- 数组型Json解析之细节
- HTML5 解决表单输入提示 placeholder 属性
- getClientRects 和 getBoundingClientRect 的用法和区别
- js 排序
- jquery中attr和prop的区别
- Extjs实现下拉树
- HTML学习(六)——表单
- VB6下简易的JSON解析器
- jQuery选择器总结
- infinitescroll 通过无限制分页(json方式完整代码)
- 将HTML从JavaScript中抽离(源自:编写可维护的JavaScript)