c++并发抽象
2016-07-14 21:55
363 查看
在线程的基础上,以任务的形式将并发抽象。设计task类,用于完成一个任务或者说动作。process类用于存储task队列,可以向process投递一个task。再设计一个consumer类,它包含一个线程以及多个process,它的proce函数循环从所有process取出一个task并执行。如果执行返回false则将该process移除。consumer有一个负载均衡的函数balancing可以在繁忙时将process转移给其它闲consumer。从用户层面来看,每个process都是一个逻辑线程,可以以较小的代价创建大量的process。在同一个process里面的操作都是同步的,不同的process可能是异步的,所以如果多个process处理共享数据则需要加锁。
#ifndef PROCESS_20160503_H #define PROCESS_20160503_H #include <functional> #include <vector> #include <thread> #include <string> #include <memory> #include <atomic> #include "safe_queue.h" #include "log.h" struct task { task(){} task(const std::function<bool()>& t, const std::string task_name) : job(t) , name(task_name) { } task(const task& t) { job = t.job; name = t.name; } std::function<bool()> job; std::string name; }; typedef safe_queue<task> taskqueue; class process; typedef std::shared_ptr<process> ptr_process; class consumer { public: const static int max_busy = 10000; const static int min_busy = -10000; public: consumer(); ~consumer(); public: void profile(); std::string name() { return _name; } void push(ptr_process ptr); int busy() { return _busy; } private: void proc(); void proc_i(); void balancing(); bool init_busy(); private: std::string _name; std::vector<ptr_process> _processes; ptr_process _chan; std::atomic_int _busy; bool _idle; task _tmptask; std::thread _thread; }; typedef std::shared_ptr<consumer> ptr_consumer; class process { public: friend class consumer; friend class process_manager; public: process(const std::string& process_name); private: process(const process&); process& operator = (const process&); public: void profile(); public: const std::string& process_name() { return _process_name; } void run(const std::function<bool()>& t, const std::string& name); void stop(); private: taskqueue _tasks; std::string _process_name; }; class process_manager { public: friend class consumer; private: process_manager(); process_manager(const process_manager&); process_manager& operator = (const process_manager&); public: static process_manager& instance(); ptr_process create_process(const std::string name); public: void profile(); private: ptr_consumer get_consumer(); std::vector<ptr_consumer>& get_consumers(); private: std::vector<ptr_consumer> _consumers; unsigned int _no; }; #endif
#include "process.h" #include <chrono> #include "util.h" #include <sstream> #if defined(WIN32) #include <windows.h> #else #include<unistd.h> #endif int cpu_num() { #if defined(WIN32) SYSTEM_INFO info; ::GetSystemInfo(&info); return info.dwNumberOfProcessors; #else return (int)sysconf(_SC_NPROCESSORS_ONLN); #endif } bool consumer::init_busy() { _busy = 0; return true; } consumer::consumer() : _idle(init_busy()) , _chan(new process("chan")) , _thread([this]{ this->proc(); }) { std::ostringstream ss; ss<<std::this_thread::get_id(); _name = ss.str(); logdebug("consumer:%s constructor", _name.c_str()); } consumer::~consumer() { _thread.join(); } void consumer::push(ptr_process ptr) { _chan->run([ptr, this]{ _processes.push_back(ptr); return true; }, std::string("push process:") + ptr->process_name() ); } void consumer::proc() { while (!global_exit::is_exit()) { proc_i(); if(_idle) { _busy = (_busy<min_busy)?min_busy:(_busy-1); std::this_thread::sleep_for(std::chrono::microseconds(10)); } else { _busy = (_busy>max_busy)?max_busy:(_busy+1); } if(_busy == max_busy) { balancing(); } } } // 每一个process都执行一次 void consumer::proc_i() { _idle = true; if(_chan->_tasks.try_pop(_tmptask)) { _tmptask.job(); } for(unsigned int i = 0; i<_processes.size(); ++i) { if(_processes[i]->_tasks.try_pop(_tmptask) == taskqueue::SUCCESS) { if(!_tmptask.job()) { logdebug("process task:'%s' return false", _tmptask.name.c_str()); if(i == _processes.size()-1) { _processes.pop_back(); } else { _processes[i] = _processes[_processes.size()-1]; _processes.pop_back(); } break; } _idle = false; } } } // 负载均衡 取出最忙process投递给最闲线程 void consumer::balancing() { unsigned int max = 0; unsigned int tmp = 0; unsigned int index = -1; for(unsigned int i = 0; i<_processes.size(); ++i) { tmp = _processes[i]->_tasks.size(); if(max<tmp) { max = tmp; index = i; } } ptr_process ptr = _processes[index]; if(index != _processes.size()-1) { _processes[index] = _processes[_processes.size()-1]; } std::vector<ptr_consumer>& css = process_manager::instance().get_consumers(); index = -1; int minbusy = consumer::max_busy - 1; int tmpbusy = 0; for(unsigned int i = 0; i<css.size(); ++i) { tmpbusy = css[i]->busy(); if(minbusy>tmpbusy) { index = i; minbusy = tmpbusy; } } if(css[index].get()!=this) { _processes.pop_back(); css[index]->push(ptr); } } process::process(const std::string& process_name) : _process_name(process_name) , _tasks(1024*1024, 0) { } void process::run(const std::function<bool()>& t, const std::string& name) { if(_tasks.try_push(task(t, name)) == taskqueue::QUEUE_FULL) { logerror("taskqueue full.process_name:%s task_name:%s", _process_name.c_str(), name.c_str()); } } void process::profile() { loginfo("process name:%s, task size:%d", _process_name.c_str(), _tasks.size()); } process_manager::process_manager() { int num = cpu_num() + 4; for(int i = 0; i<num; ++i) { _consumers.push_back(ptr_consumer(new consumer)); } _no = 0; } process_manager& process_manager::instance() { static process_manager cm; return cm; } ptr_consumer process_manager::get_consumer() { return _consumers[(_no++)%_consumers.size()]; } std::vector<ptr_consumer>& process_manager::get_consumers() { return _consumers; } void process_manager::profile() { for(unsigned int i = 0; i<_consumers.size(); ++i) { _consumers[i]->profile(); } } ptr_process process_manager::create_process(const std::string name) { ptr_process ptr(new process(name)); process_manager::instance().get_consumer()->push(ptr); return ptr; }
相关文章推荐
- [转载]C++中处理XML文件
- 【C++】Winsock套接字编程,简单的socket通信代码(客户端、服务端)
- C++线程安全的单例模式
- Remove Element
- 数组指针与指针数组
- C++实现最长公共子序列和最长公共子串
- java引用传递值传递的"深入"解析与c++中的值传递
- 链表的C++代码
- Remove Duplicates from Sorted Array
- C++11 std::bind笔记
- July 14th 模拟赛C T2 数码问题 Solution
- C++11 容器Array
- C++Primer课后练习exe6.33
- MFC中程序的延时
- Emacs下编译C++/C程序<转>
- Binary Tree Level Order Traversal II
- C++ 模板类的实现为何放在.h中
- 洛谷 P1969 [NOIP2013 D2T1] 积木大赛
- c语言中long long的用法
- c/c++中的extern