您的位置:首页 > 编程语言 > C语言/C++

c++之初级的消息队列及线程池模型

2017-03-10 10:09 323 查看
1.最近项目不是很忙,结合之前看的一些开源代码(skynet及其他github代码)及项目代码,抽空写了一个简单的任务队列当做练习。

2.介绍:

  1)全局队列中锁的使用:多线程下,全局队列需要加锁,本例中封装了MutexGuard。操作全局队列之前,先在栈上创建一个临时锁对象,调用构造函数时加锁,对象销毁时调用析构函数从而解锁,减少了我们手动加锁,解锁的过程。

  2)信号的使用:本例可以说是为了使用信号而使用信号,仅仅是为了熟悉信号机一些特性。 当程序以后台模式 跑起来以后,输入kill -USR1 %1 向程序发送SIGUSR1信号,从而使生产者生产一定数量的job,供消费者使用;消费者线程,在处理完全局队列以后sleep,等待生产者产生新任务; 输入 kill -USR2 %1, 改变变量状态,向信号监听线程发送结束通知,结束线程。

  3)简单的线程池模型。

  4)简单的线程间通信和同步方式示例。

  5)简单的类模板的使用。

3.编译: 文件不多,偷懒没有写makefile文件,可自行加上。编译指令 : g++ -g -Wall -o test main.cpp mutex.cpp List.h mutex.h -lpthread

4:执行流程:

  1)编译成功后,输入 ./mytest &。 以后台模式运行程序

  2)此时所有consumer线程阻塞,等待生产者生产job; 一个producer线程阻塞在select处,等待读管道内的消息;一个signal_handler线程调用 pthread_sigwait( ... ) 等待 SIGUSR1 和 SIGUSR2 信号的到来。

可通过在控制台输入: kill -USR1 %1(ps: kill 指令用来产生信号 当以后台模式运行该进程时, %1用来获得该进程 id,因此该命令表示向 该进程发送 SIGUSR1 信号)进程发送SIGUSR1信号,被signal_handler捕捉到以后,生产job,唤醒consumer线程处理job,此流程可重复执行;当在控制台输入 kill -USR2 %1 时, 改变quit变量值,从而使得各个线程退出,进程结束。还有一个 spoliling 轮询线程,在全局队列不为空的情况下,及时唤醒consumer线程处理任务。可通过调整wakeup中的参数,调整唤醒consumer的频率。

5.参考:

  1)UNIX环境高级编程。

  2)https://github.com/idispatch/signaltest

  3)https:github.com/cloudwu/skynet/skynet-src/skynet_start.c

水平有限,仅供参考,希望能对读者有所帮助。以上描述及以下源码有任何漏洞与不足,欢迎及时指正与交流。

6:源码:

main.cpp:

#include "List.h"
#include "mutex.h"

template<typename T>
List<T>::List()
:m_init(false)
{

}

template<typename T>
List<T>::List(const list<T> &l)
:m_init(false)
{

}

template<typename T>
List<T>::~List()
{
destroy();
}

template<typename T>
void List<T>::Push(const T t)
{
MyMutexGuard g(mm);
my_list.push_back(t);
}

template<typename T>
T List<T>::Pop()
{
MyMutexGuard g(mm);

if(my_list.empty())
{
return NULL;
}
else
{
T tt = my_list.front();
my_list.pop_front();

return tt;
}
}

template<typename T>
bool List<T>::Empty()
{
MyMutexGuard g(mm);
return my_list.empty();
}

template<typename T>
void List<T>::init()
{
if(!m_init)
{
m_init = (pthread_mutex_init(&mm, NULL) == 0);
}

return m_init;
}

template<typename T>
void List<T>::destroy()
{
pthread_mutex_destroy(&mm);
}

template<typename T>
int List<T>::get_job_num()
{
MyMutexGuard g(mm);
return my_list.size();
}


View Code
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: