ACE_Message_Queue和spawn实现(生产者/消费者)(V2.00)
2014-05-07 17:32
405 查看
参考这里用到了线程管理,参考:http://blog.csdn.net/calmreason/article/details/36399697
下面的两个线程共享一个消息队列,一个用来放整数到队列,一个从队列里取消息出来。此程序在控制台不停的输出递增数字,主要是内存不会泄露
用到了多线程、ACE_Message_Queue、ACE_Message_Block、ACE_Thread_Manager::instance()->spawn等
下面的程序实现:多个线程将连续整数分批放到ACE_Message_Queue中,一个消费者线程负责从中取出,并验证数据是否完整无误
total_number:1000000
Generator_Number(const int 0)
Generator_Number(const int 1)
Generator_Number(0)::svc()
Generator_Number(1now sleep 1
)::svc()
now sleep 1
now sleep 1
now sleep 1
now sleep 1
now sleep 1
now sleep 1
now sleep 1
now sleep 1
now sleep 1
check result:1
0.944s
~Generator_Number(0)
~Generator_Number(1)
请按任意键继续. . .
http://blog.163.com/ecy_fu/blog/static/4445126200964115620862/
注意事项
http://blog.chinaunix.net/uid-20453737-id-37118.html
下面的两个线程共享一个消息队列,一个用来放整数到队列,一个从队列里取消息出来。此程序在控制台不停的输出递增数字,主要是内存不会泄露
用到了多线程、ACE_Message_Queue、ACE_Message_Block、ACE_Thread_Manager::instance()->spawn等
#include <iostream> using namespace std; #include "boost/lexical_cast.hpp" using namespace boost; #include "ace/Thread_Manager.h" #include "ace/Message_Queue.h" void* create_vairous_record(void* ace_message_queue); void* get_vairous_record(void* ace_message_queue); int ACE_TMAIN (int argc, ACE_TCHAR *argv[]) { ACE_Message_Queue<ACE_MT_SYNCH>* various_record_queue = new ACE_Message_Queue<ACE_MT_SYNCH>; ACE_Thread_Manager::instance()->spawn( ACE_THR_FUNC(create_vairous_record), various_record_queue, THR_NEW_LWP | THR_DETACHED); ACE_Thread_Manager::instance()->spawn( ACE_THR_FUNC(get_vairous_record), various_record_queue, THR_NEW_LWP | THR_DETACHED); ACE_Thread_Manager::instance()->wait(); return 0; } void* create_vairous_record(void* ace_message_queue) { ACE_Message_Queue<ACE_MT_SYNCH>* p_queue = (ACE_Message_Queue<ACE_MT_SYNCH>*)ace_message_queue; int i=0; while (i<10000000) { ACE_Message_Block* mbl = new ACE_Message_Block(10);//在这里创建消息 string temp = lexical_cast<string>(++i); mbl->copy(temp.c_str()); p_queue->enqueue_tail(mbl);//消息被放到队列中(用指针引用消息实体) } return nullptr; } void* get_vairous_record(void* ace_message_queue) { ACE_Message_Queue<ACE_MT_SYNCH>* p_queue = (ACE_Message_Queue<ACE_MT_SYNCH>*)ace_message_queue; while (true) { ACE_Message_Block* mbl =nullptr; p_queue->dequeue_head(mbl);//消息出队,出队的消息应该在用完之后被释放 if (mbl) { cout<<mbl->rd_ptr()<<endl; mbl->release();//消息已经用完,释放消息 } } return nullptr; }
下面的程序实现:多个线程将连续整数分批放到ACE_Message_Queue中,一个消费者线程负责从中取出,并验证数据是否完整无误
#include <iostream> #include <bitset> #include <vector> #include <memory> using namespace std; #include "ace/Thread_Manager.h" #include "ace/Message_Queue.h" #include "ace/Message_Block.h" #include "ace/Task.h" #include "ace/OS.h" namespace global { const int total_number = 1000000; int task_number = 2; typedef int number_type; } class Generator_Number : public ACE_Task<ACE_MT_SYNCH> { public: Generator_Number(ACE_Message_Queue<ACE_MT_SYNCH>* msgq,const int i); virtual int open(void *args = 0 ); ~Generator_Number(void); protected: Generator_Number(const Generator_Number&); Generator_Number& operator=(const Generator_Number&); private: int svc(void); int mod_i_; }; Generator_Number::Generator_Number(ACE_Message_Queue<ACE_MT_SYNCH>* msgq,const int i):mod_i_(i) { this->msg_queue(msgq); std::cout<<"Generator_Number(const int "<<i<<")"<<std::endl; } int Generator_Number::open(void *args ) { return this->activate(THR_NEW_LWP | THR_DETACHED); } int Generator_Number::svc(void) { std::cout<<"Generator_Number("<<this->mod_i_<<")::svc()"<<std::endl; for (size_t i = this->mod_i_ ; i<global::total_number;i+=global::task_number) { ACE_Message_Block * blk = new ACE_Message_Block(20); blk->copy(reinterpret_cast<const char*>(&i),sizeof(global::number_type)); this->msg_queue()->enqueue_tail(blk); } return 0; } Generator_Number::~Generator_Number(void) { std::cout<<"~Generator_Number("<<this->mod_i_<<")"<<std::endl; } void* out_put_queue(void* all_numbers_queue1) { ACE_Message_Queue<ACE_MT_SYNCH>* all_numbers_queue = (ACE_Message_Queue<ACE_MT_SYNCH>*)all_numbers_queue1; bitset<global::total_number> all_number_bitset; size_t count_got_message=0; while(true) { if(!all_numbers_queue->is_empty()) { ACE_Message_Block* blk = 0; all_numbers_queue->dequeue_head(blk); all_number_bitset.set(*reinterpret_cast<global::number_type*>(blk->rd_ptr())); blk->release(); if(++count_got_message == global::total_number) { break; } } else { std::cout<<"now sleep 1"<<std::endl; ACE_Time_Value t(0,3000); ACE_OS::sleep(t); } } global::number_type check =0; bool wright_flag = true; for (size_t j=0; j!= global::total_number;++j) { if (0 == all_number_bitset[j]) { wright_flag = false; break; } } std::cout<<std::endl; std::cout<<"check result:"<<wright_flag<<std::endl; return 0; } #include "boost/timer.hpp" using namespace boost; int ACE_TMAIN (int argc, ACE_TCHAR *argv[]) { cout<<"total_number:"<<global::total_number<<endl; timer t; ACE_Message_Queue<ACE_MT_SYNCH>* all_numbers_queue = new ACE_Message_Queue<ACE_MT_SYNCH>; vector<shared_ptr<Generator_Number>> gener_array; for (int i=0;i<global::task_number;++i) { gener_array.push_back(shared_ptr<Generator_Number>(new Generator_Number(all_numbers_queue,i))); } for (vector<shared_ptr<Generator_Number>>::const_iterator citer = gener_array.cbegin(); citer!=gener_array.cend(); ++citer) { (*citer)->open(); } ACE_Thread_Manager::instance()->spawn( ACE_THR_FUNC(out_put_queue), all_numbers_queue, THR_NEW_LWP | THR_DETACHED); ACE_Thread_Manager::instance()->wait(); cout<<t.elapsed()<<"s"<<endl; return 0; }输出如下:
total_number:1000000
Generator_Number(const int 0)
Generator_Number(const int 1)
Generator_Number(0)::svc()
Generator_Number(1now sleep 1
)::svc()
now sleep 1
now sleep 1
now sleep 1
now sleep 1
now sleep 1
now sleep 1
now sleep 1
now sleep 1
now sleep 1
check result:1
0.944s
~Generator_Number(0)
~Generator_Number(1)
请按任意键继续. . .
ACE_Message_Queue
高水位低水位http://blog.163.com/ecy_fu/blog/static/4445126200964115620862/
注意事项
http://blog.chinaunix.net/uid-20453737-id-37118.html
相关文章推荐
- ACE_Message_Queue和spawn实现(生产者/消费者)(V2.00)
- 基于ACE_Message_Queue的生产者消费者模式
- 基于ACE_Message_Queue的生产者消费者模式
- LinkedBlockingQueue实现的生产者消费者
- Python3之线程Queue实现生产者消费者模型
- python使用queue队列实现生产者消费者
- Chapter 7 生产者消费者之ArrayBlockingQueue实现
- 生产者消费者线程在Queue<T>中实现多线程同步
- 用ACE实现生产者与消费者模式
- 利用ArrayBlockingQueue实现生产者-消费者
- Queue、BlockingQueue以及用BlockingQueue实现生产者/消费者模式
- Queue 实现生产者消费者模型(实例讲解)
- 生产者消费者线程在Queue<T>中实现多线程同步
- Java多线程系列-Queue、BlockingQueue以及利用BlockingQueue实现生产者/消费者模型
- Java多线程15:Queue、BlockingQueue以及利用BlockingQueue实现生产者/消费者模型
- 用ACE实现生产者-消费者模式
- 生产者与消费者的快速实现——并发队列ConcurrentLinkedQueue和阻塞队列LinkedBlockingQueue用法
- 用BlockBoundQueue和c++11实现多线程生产者消费者问题
- 多线程中同步-异步---生产者与消费者-Queue---ThreadLocal实现局部变量保存
- 在python中实现生产者和消费者的例子(二):使用multiprocessing和queue