您的位置:首页 > 编程语言 > C语言/C++

c++并发抽象

2016-07-14 21:55 363 查看
在线程的基础上,以任务的形式将并发抽象。设计task类,用于完成一个任务或者说动作。process类用于存储task队列,可以向process投递一个task。再设计一个consumer类,它包含一个线程以及多个process,它的proce函数循环从所有process取出一个task并执行。如果执行返回false则将该process移除。consumer有一个负载均衡的函数balancing可以在繁忙时将process转移给其它闲consumer。从用户层面来看,每个process都是一个逻辑线程,可以以较小的代价创建大量的process。在同一个process里面的操作都是同步的,不同的process可能是异步的,所以如果多个process处理共享数据则需要加锁。

#ifndef PROCESS_20160503_H
#define PROCESS_20160503_H

#include <functional>
#include <vector>
#include <thread>
#include <string>
#include <memory>
#include <atomic>
#include "safe_queue.h"
#include "log.h"

struct task
{
task(){}
task(const std::function<bool()>& t, const std::string task_name)
: job(t)
, name(task_name)
{

}
task(const task& t)
{
job = t.job;
name = t.name;
}

std::function<bool()> job;
std::string name;
};

typedef safe_queue<task> taskqueue;
class process;
typedef std::shared_ptr<process>    ptr_process;

class consumer
{
public:
const static int max_busy = 10000;
const static int min_busy = -10000;
public:
consumer();
~consumer();
public:
void profile();
std::string name() { return _name; }
void push(ptr_process ptr);
int busy() { return _busy; }

private:
void proc();
void proc_i();
void balancing();
bool init_busy();
private:
std::string _name;
std::vector<ptr_process> _processes;
ptr_process _chan;
std::atomic_int _busy;
bool _idle;
task _tmptask;
std::thread _thread;

};

typedef std::shared_ptr<consumer>    ptr_consumer;

class process
{
public:
friend class consumer;
friend class process_manager;
public:
process(const std::string& process_name);

private:
process(const process&);
process& operator = (const process&);

public:
void profile();

public:
const std::string& process_name() { return _process_name; }
void run(const std::function<bool()>& t, const std::string& name);
void stop();

private:
taskqueue _tasks;
std::string _process_name;
};

class process_manager
{
public:
friend class consumer;
private:
process_manager();
process_manager(const process_manager&);
process_manager& operator = (const process_manager&);

public:
static process_manager& instance();
ptr_process create_process(const std::string name);

public:
void profile();

private:
ptr_consumer get_consumer();
std::vector<ptr_consumer>& get_consumers();

private:
std::vector<ptr_consumer> _consumers;
unsigned int _no;
};

#endif


#include "process.h"
#include <chrono>
#include "util.h"
#include <sstream>

#if defined(WIN32)
#include <windows.h>
#else
#include<unistd.h>
#endif

int cpu_num()
{
#if defined(WIN32)
SYSTEM_INFO info;
::GetSystemInfo(&info);
return info.dwNumberOfProcessors;
#else
return (int)sysconf(_SC_NPROCESSORS_ONLN);
#endif
}

bool consumer::init_busy()
{
_busy = 0;
return true;
}

consumer::consumer()
: _idle(init_busy())
, _chan(new process("chan"))
, _thread([this]{
this->proc();
})
{
std::ostringstream ss;
ss<<std::this_thread::get_id();
_name = ss.str();
logdebug("consumer:%s constructor", _name.c_str());
}

consumer::~consumer()
{
_thread.join();
}

void consumer::push(ptr_process ptr)
{
_chan->run([ptr, this]{
_processes.push_back(ptr);
return true;
}, std::string("push process:") + ptr->process_name()
);
}

void consumer::proc()
{
while (!global_exit::is_exit())
{
proc_i();
if(_idle)
{
_busy = (_busy<min_busy)?min_busy:(_busy-1);
std::this_thread::sleep_for(std::chrono::microseconds(10));
}
else
{
_busy = (_busy>max_busy)?max_busy:(_busy+1);
}
if(_busy == max_busy)
{
balancing();
}
}
}

// 每一个process都执行一次
void consumer::proc_i()
{
_idle = true;
if(_chan->_tasks.try_pop(_tmptask))
{
_tmptask.job();
}
for(unsigned int i = 0; i<_processes.size(); ++i)
{
if(_processes[i]->_tasks.try_pop(_tmptask) == taskqueue::SUCCESS)
{
if(!_tmptask.job())
{
logdebug("process task:'%s' return false", _tmptask.name.c_str());
if(i == _processes.size()-1)
{
_processes.pop_back();
}
else
{
_processes[i] = _processes[_processes.size()-1];
_processes.pop_back();
}
break;
}
_idle = false;
}
}
}

// 负载均衡 取出最忙process投递给最闲线程
void consumer::balancing()
{
unsigned int max = 0;
unsigned int tmp = 0;
unsigned int index = -1;
for(unsigned int i = 0; i<_processes.size(); ++i)
{
tmp = _processes[i]->_tasks.size();
if(max<tmp)
{
max = tmp;
index = i;
}
}
ptr_process ptr = _processes[index];
if(index != _processes.size()-1)
{
_processes[index] = _processes[_processes.size()-1];
}
std::vector<ptr_consumer>& css = process_manager::instance().get_consumers();
index = -1;
int minbusy = consumer::max_busy - 1;
int tmpbusy = 0;
for(unsigned int i = 0; i<css.size(); ++i)
{
tmpbusy = css[i]->busy();
if(minbusy>tmpbusy)
{
index = i;
minbusy = tmpbusy;
}
}
if(css[index].get()!=this)
{
_processes.pop_back();
css[index]->push(ptr);
}
}

process::process(const std::string& process_name)
: _process_name(process_name)
, _tasks(1024*1024, 0)
{

}

void process::run(const std::function<bool()>& t, const std::string& name)
{
if(_tasks.try_push(task(t, name)) == taskqueue::QUEUE_FULL)
{
logerror("taskqueue full.process_name:%s task_name:%s", _process_name.c_str(), name.c_str());
}
}

void process::profile()
{
loginfo("process name:%s, task size:%d", _process_name.c_str(), _tasks.size());
}

process_manager::process_manager()
{
int num = cpu_num() + 4;
for(int i = 0; i<num; ++i)
{
_consumers.push_back(ptr_consumer(new consumer));
}
_no = 0;
}

process_manager& process_manager::instance()
{
static process_manager cm;
return cm;
}

ptr_consumer process_manager::get_consumer()
{
return _consumers[(_no++)%_consumers.size()];
}

std::vector<ptr_consumer>& process_manager::get_consumers()
{
return _consumers;
}

void process_manager::profile()
{
for(unsigned int i = 0; i<_consumers.size(); ++i)
{
_consumers[i]->profile();
}
}

ptr_process process_manager::create_process(const std::string name)
{
ptr_process ptr(new process(name));
process_manager::instance().get_consumer()->push(ptr);
return ptr;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: