您的位置:首页 > 产品设计 > UI/UE

ACE_Message_Queue和spawn实现(生产者/消费者)(V2.00)

2014-05-07 17:32 405 查看
参考这里用到了线程管理,参考:http://blog.csdn.net/calmreason/article/details/36399697

下面的两个线程共享一个消息队列,一个用来放整数到队列,一个从队列里取消息出来。此程序在控制台不停的输出递增数字,主要是内存不会泄露

用到了多线程、ACE_Message_Queue、ACE_Message_Block、ACE_Thread_Manager::instance()->spawn等

#include <iostream>
using namespace std;
#include "boost/lexical_cast.hpp"
using namespace boost;
#include "ace/Thread_Manager.h"
#include "ace/Message_Queue.h"

void* create_vairous_record(void* ace_message_queue);

void* get_vairous_record(void* ace_message_queue);

int ACE_TMAIN (int argc, ACE_TCHAR *argv[])
{

ACE_Message_Queue<ACE_MT_SYNCH>* various_record_queue = new ACE_Message_Queue<ACE_MT_SYNCH>;

ACE_Thread_Manager::instance()->spawn(
ACE_THR_FUNC(create_vairous_record),
various_record_queue,
THR_NEW_LWP | THR_DETACHED);

ACE_Thread_Manager::instance()->spawn(
ACE_THR_FUNC(get_vairous_record),
various_record_queue,
THR_NEW_LWP | THR_DETACHED);

ACE_Thread_Manager::instance()->wait();

return 0;
}

void* create_vairous_record(void* ace_message_queue)
{

ACE_Message_Queue<ACE_MT_SYNCH>* p_queue = (ACE_Message_Queue<ACE_MT_SYNCH>*)ace_message_queue;
int i=0;
while (i<10000000)
{
ACE_Message_Block* mbl = new ACE_Message_Block(10);//在这里创建消息
string temp = lexical_cast<string>(++i);
mbl->copy(temp.c_str());
p_queue->enqueue_tail(mbl);//消息被放到队列中(用指针引用消息实体)
}
return nullptr;
}

void* get_vairous_record(void* ace_message_queue)
{

ACE_Message_Queue<ACE_MT_SYNCH>* p_queue = (ACE_Message_Queue<ACE_MT_SYNCH>*)ace_message_queue;
while (true)
{
ACE_Message_Block* mbl =nullptr;
p_queue->dequeue_head(mbl);//消息出队,出队的消息应该在用完之后被释放
if (mbl)
{
cout<<mbl->rd_ptr()<<endl;
mbl->release();//消息已经用完,释放消息
}
}
return nullptr;

}

下面的程序实现:多个线程将连续整数分批放到ACE_Message_Queue中,一个消费者线程负责从中取出,并验证数据是否完整无误

#include <iostream>
#include <bitset>
#include <vector>
#include <memory>
using namespace std;

#include "ace/Thread_Manager.h"
#include "ace/Message_Queue.h"
#include "ace/Message_Block.h"
#include "ace/Task.h"
#include "ace/OS.h"

namespace global
{
const int total_number = 1000000;
int task_number = 2;
typedef int number_type;
}

class Generator_Number : public ACE_Task<ACE_MT_SYNCH>
{
public:
Generator_Number(ACE_Message_Queue<ACE_MT_SYNCH>* msgq,const int i);
virtual int open(void *args  = 0 );
~Generator_Number(void);
protected:
Generator_Number(const Generator_Number&);
Generator_Number& operator=(const Generator_Number&);
private:
int svc(void);
int mod_i_;
};

Generator_Number::Generator_Number(ACE_Message_Queue<ACE_MT_SYNCH>* msgq,const int i):mod_i_(i)
{
this->msg_queue(msgq);
std::cout<<"Generator_Number(const int "<<i<<")"<<std::endl;
}

int Generator_Number::open(void *args )
{
return this->activate(THR_NEW_LWP | THR_DETACHED);
}

int Generator_Number::svc(void)
{
std::cout<<"Generator_Number("<<this->mod_i_<<")::svc()"<<std::endl;
for (size_t i = this->mod_i_ ; i<global::total_number;i+=global::task_number)
{
ACE_Message_Block * blk = new ACE_Message_Block(20);
blk->copy(reinterpret_cast<const char*>(&i),sizeof(global::number_type));
this->msg_queue()->enqueue_tail(blk);
}
return 0;
}

Generator_Number::~Generator_Number(void)
{
std::cout<<"~Generator_Number("<<this->mod_i_<<")"<<std::endl;
}

void* out_put_queue(void* all_numbers_queue1)
{
ACE_Message_Queue<ACE_MT_SYNCH>* all_numbers_queue = (ACE_Message_Queue<ACE_MT_SYNCH>*)all_numbers_queue1;
bitset<global::total_number> all_number_bitset;
size_t count_got_message=0;
while(true)
{
if(!all_numbers_queue->is_empty())
{
ACE_Message_Block* blk = 0;
all_numbers_queue->dequeue_head(blk);
all_number_bitset.set(*reinterpret_cast<global::number_type*>(blk->rd_ptr()));
blk->release();
if(++count_got_message == global::total_number)
{
break;
}
}
else
{
std::cout<<"now sleep 1"<<std::endl;
ACE_Time_Value t(0,3000);
ACE_OS::sleep(t);
}
}
global::number_type check =0;
bool wright_flag = true;
for (size_t j=0; j!= global::total_number;++j)
{
if (0 == all_number_bitset[j])
{
wright_flag = false;
break;
}
}
std::cout<<std::endl;
std::cout<<"check result:"<<wright_flag<<std::endl;
return 0;
}
#include "boost/timer.hpp"
using namespace boost;

int ACE_TMAIN (int argc, ACE_TCHAR *argv[])
{
cout<<"total_number:"<<global::total_number<<endl;
timer t;
ACE_Message_Queue<ACE_MT_SYNCH>* all_numbers_queue = new ACE_Message_Queue<ACE_MT_SYNCH>;

vector<shared_ptr<Generator_Number>> gener_array;

for (int i=0;i<global::task_number;++i)
{
gener_array.push_back(shared_ptr<Generator_Number>(new Generator_Number(all_numbers_queue,i)));
}
for (vector<shared_ptr<Generator_Number>>::const_iterator citer = gener_array.cbegin();
citer!=gener_array.cend();
++citer)
{
(*citer)->open();
}

ACE_Thread_Manager::instance()->spawn(
ACE_THR_FUNC(out_put_queue),
all_numbers_queue,
THR_NEW_LWP | THR_DETACHED);

ACE_Thread_Manager::instance()->wait();
cout<<t.elapsed()<<"s"<<endl;
return 0;
}
输出如下:

total_number:1000000

Generator_Number(const int 0)

Generator_Number(const int 1)

Generator_Number(0)::svc()

Generator_Number(1now sleep 1

)::svc()

now sleep 1

now sleep 1

now sleep 1

now sleep 1

now sleep 1

now sleep 1

now sleep 1

now sleep 1

now sleep 1

check result:1

0.944s

~Generator_Number(0)

~Generator_Number(1)

请按任意键继续. . .

ACE_Message_Queue

高水位低水位

http://blog.163.com/ecy_fu/blog/static/4445126200964115620862/

注意事项

http://blog.chinaunix.net/uid-20453737-id-37118.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: