您的位置:首页 > 运维架构

libevent实现多线程,one loop per thread,多线程通信

2017-09-12 10:46 941 查看
libevent实现多线程

LibEvent代码阅读---线程间通信、信号处理

libevent并不是线程安全的,但这不代表libevent不支持多线程模式。

前几天在微博上看到ruanyf发了条微博说到apache和nginx的并发模型,看到评论很多人都说不对

于是自己又查了下,总结一下我所学过的网络库或者网络服务器的并发模型

1、muduo:one loop per thread,主线程注册listen事件,通过某种负载均衡机制(round robin)将连接的事件注册到子线程的Reactor上,据说也是Netty的方案,最近也正好在学netty,刚好可以验证一下。

另外,muduo还提到了一个
runInLoop()
的功能:如果用户在当前线程调用,则回调functor会同步进行,如果在其他线程调用,则IO线程会被唤醒执行这个functor。这种跨线程调用是如何实现的?因为其他线程很可能阻塞在Reactor上。传统方法是用pipe(后面将提到memcache的多线程),在muduo里面是用
eventfde(2)
,将回调放入线程的任务队列,并发送一个uint64大小的消息来唤醒Reactor。

2、nginx:master + worker的工作模式,ruanyf的微博说是master接受连接分配给worker,事实上不是这样的,master只是通过信号管理worker进程,worker之间通过accept_mutex来决定是否将监听套接字加入到loop中。所谓的one loop per process

3、libevent:这是在网上找的资料,libevent并不是线程安全,但不代表其不支持多线程。memcache的网络部分使用libevent,有一个经典的图描述了其多线程实现: 



这种消息通知+同步层的机制,通过pipe和一个加锁的任务队列(CQ)实现。与muduo的eventfd效果类似。

简要的对最近的代码学习进行一下汇总:
    在高性能的网络程序中,使用得最多的是非阻塞IO和多IO复用的模型,通常称为Reactor模式。还有另一种模式称为Proactor。目前大部分的实现都是基于Reactor模式的实现。
    在Reactor的模式中,基本结构是事件循环,以事件驱动和事件回调的方式实现业务逻辑。其中的事件可以包含读写socket、连接的建立都可以采用非阻塞的方式进行,这种方式能够提高并发度和吞吐量,对于IO密集型的应用比较合适。但是回调的方式也导致了业务逻辑的割裂。Reactor模式通常是一种非阻塞IO和单线程循环相结合的方式,称为non-blocking 
IO + one loop per thread模式。
    其中的loop是指线程的主循环,通常将Timer或IO注册到线程的loop中。对于实时性要求较高的情况,可以采用一个单一的线程完成连接操作,而将数据处理的任务分摊给一个线程池处理。实现了一种1+N的方式。非阻塞的IO和超时实现了这个循环的执行。
    在面向对象的编程中(JAVA和C++)都支持BlockingQueued的方式实现从一个线程往另一个线程丢数据的方式,在C语言中有Mqueue的方式。在UNP2中有关于mqueue的相关介绍。
    在linux中Socket和Pipe都是基于fd的操作,因此都支持select、poll等。在性能方面做得较好的框架中的实现方法中采用了如下的实现方式:多个单线程的进程(nginx,从接口看可支持多线程,采用主进程和多个工作进程的方式)或运行多线程的进程。通常调用fork的进程不采用多线程。原因:调用fork的目的:1、执行其他的业务进程。2、通过父子进程之间共享文件描述符的方式进行父子间进程通信,协同完成任务。

简要的阅读了Libevent的sample,基本的实现过程如下:
首先分配一个base,后面所有的操作都会与这个base进行关联:
base = event_base_new();
然后设置一系列的事件控制块,实际上是完成对应事件的处理函数注册过程。
signal_event = evsignal_new(base, SIGINT, signal_cb, (void *)base);
event_add(signal_event, NULL);
最后进入循环中,等待对应的事件的产生和执行:
event_base_dispatch(base);
后面再详细的分析libevent的基本框架和逻辑。

在Libevent中实现不同线程之间的通告采用了pipe的方式:
具体的实现在函数evthread_make_base_notifiable()中。其中使用了socketpair,该函数会创建一对socket Pipe去实现不同线程间的通告。
evutil_socketpair(LOCAL_SOCKETPAIR_AF, SOCK_STREAM, 0,
   base->th_notify_fd);
首先定义了一个通告函数和回调函数,通告函数用于其他线程往libevent所在的线程通告。回调函数用于libevent所在的线程进行响应。
 /* 回调函数,在使用函数中接收到另一个线程的通告 */
 void (*cb)(evutil_socket_t, short, void *) = evthread_notify_drain_default;
 /* 设置通告函数,在其他的线程中通告 */
 int (*notify)(struct event_base *) = evthread_notify_base_default;
在Event_Base中创建了一个notify的函数,实际的执行函数为evthread_notify_base_default().通过socket pipe[1]发送数据到pipe[0]中。
base->th_notify_fn = notify;
static int
evthread_notify_base_default(struct event_base *base)
{
 char buf[1];
 int r;
 buf[0] = (char) 0;
 r = write(base->th_notify_fd[1], buf, 1);
 return (r < 0 && errno != EAGAIN) ? -1 : 0;
}
因此在Libevent所在的线程中需要监听pipe[0]的接收事件,因此需要监听Pipe[0],并将回调函数作为该事件的响应,实际就是读取发送的内容。在Libevent中添加了一个基于pipe[0]读事件的处理。
 /* 设置pipe[0]用于读数据,也就是获取通告,因此pipe[1]在其他线程中调用 */
 event_assign(&base->th_notify, base, base->th_notify_fd[0],
     EV_READ|EV_PERSIST, cb, base);
其中的cb就是定义的回调函数:
static void
evthread_notify_drain_default(evutil_socket_t fd, short what, void *arg)
{
 unsigned char buf[1024];
 struct event_base *base = arg;
 while (read(fd, (char*)buf, sizeof(buf)) > 0)
  ;
 EVBASE_ACQUIRE_LOCK(base, th_base_lock);
 base->is_notify_pending = 0;
 EVBASE_RELEASE_LOCK(base, th_base_lock);
}
因此通过pipe[0,1]较好的实现了跨线程的事件通告, 通过Event_base将不同线程间的通告函数进行了较好的包装。当然都采用了非阻塞的IO方式。

Signal的实现方式:
在IO复用模型中经常采用了都是LoopEvent的实现方式,在初始化IO复用模型的过程中也需要考虑信号的实现,在linux中,信号和程序的正常运行实际上是两个不同的控制流,为了在两个流之间进行通信,在LibEvent中采用了跨线程的通告方式,也是采用了socketPair的方式,具体的实现如下所示:
在Select的初始化函数中初始化信号evsig_init(base);
static void *
select_init(struct event_base *base)
{
 struct selectop *sop;
...
 evsig_init(base);
...
 return (sop);
}

evsig_init的实现如下:

int
evsig_init(struct event_base *base)
{
  /* 这个signal实际上也是采用了socket的方式实现 */
 evutil_socketpair( AF_UNIX, SOCK_STREAM, 0, base->sig.ev_signal_pair);
...
 evutil_make_socket_nonblocking(base->sig.ev_signal_pair[0]);
 evutil_make_socket_nonblocking(base->sig.ev_signal_pair[1]);
/* 添加读事件到ev_signal_pair[1],也就是在Signal的处理函数中可以采用ev_signal_pair[0]发送数据,触发ev_signal_pair[1]的读事件,函数evsig_cb中完成读操作 */
 event_assign(&base->sig.ev_signal, base, base->sig.ev_signal_pair[1],
  EV_READ | EV_PERSIST, evsig_cb, base);
 ...
/* 这边是实际信号发生时应该处理的操作结构体eventop */
base->evsigsel = &evsigops;
}
evsig_cb:
static void
evsig_cb(evutil_socket_t fd, short what, void *arg)
{
...
 while (1) {
  /* 从socket中读数据,同时为非阻塞,因此多次读数据会退出,不会死循环 */
  n = recv(fd, signals, sizeof(signals), 0);
  if (n == -1) {
    ...
   break;
  } else if (n == 0) {
   break;
  }
/* 统计对应信号触发的次数 */
  for (i = 0; i < n; ++i) {
   ev_uint8_t sig = signals[i];
   if (sig < NSIG)
    ncaught[sig]++;
  }
 }

 EVBASE_ACQUIRE_LOCK(base, th_base_lock);
 for (i = 0; i < NSIG; ++i) {
  if (ncaught[i])
/* 激活对应的信号 */

   evmap_signal_active(base, i, ncaught[i]);
 }
 EVBASE_RELEASE_LOCK(base, th_base_lock);
}

结构体evsigops:
static const struct eventop evsigops = {
 "signal",
 NULL,
 evsig_add,  /* 添加操作函数 */
 evsig_del,
 NULL,
 NULL,
 0, 0, 0
};

添加的操作函数为evsig_add:
static int
evsig_add(struct event_base *base, evutil_socket_t evsignal, short old, short events, void *p)
{
...
 EVSIGBASE_LOCK();
 
 evsig_base = base;
/* 信号添加 */
 evsig_base_n_signals_added = ++sig->ev_n_signals_added;
/* 发送信号事件的socket,也就是未使用的pipe[0] */
 evsig_base_fd = base->sig.ev_signal_pair[0];
 EVSIGBASE_UNLOCK();

...
/* 设置新号的处理函数为evsig_handler */
_evsig_set_handler(base, (int)evsignal, evsig_handler);
/* 添加事件 */
event_add(&sig->ev_signal, NULL);

 return (0);

}

信号处理函数evsig_handler():
该函数实际就是完成了信号的发送操作:
evsig_handler(int sig) {
 msg = sig;
 send(evsig_base_fd, (char*)&msg, 1, 0);
}

继续自己的C语言知识学习,丰富自己考虑、设计思路。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  linux epoll libevent