基于ACE_Message_Queue的生产者消费者模式
2014-04-16 22:53
357 查看
1.生产者尽可能生产; 2.消费者按照自身需求执行消费行为; code: class CMediaSendBlock : public ACE_Message_Block { public: CMediaSendBlock( const char* pData, const unsigned long ulDataLen) : ACE_Message_Block((size_t)ulDataLen, MB_DATA, 0, 0) { if (NULL != pData) { (void)this->copy(pData, (size_t) ulDataLen); } } virtual ~CMediaSendBlock() {} }; class CMediaSendQueue : public ACE_Message_Queue<ACE_MT_SYNCH> { public: CMediaSendQueue() {} virtual ~CMediaSendQueue() {} int enqueue_tail_ex(CMediaSendBlock* pBlock) { ACE_Time_Value timeValue = ACE_OS::gettimeofday() + ACE_Time_Value(0, 10); int nRetVal = ACE_Message_Queue<ACE_MT_SYNCH>::enqueue_tail(pBlock, &timeValue); if (0 >= nRetVal) { return -1; } return nRetVal; } }; CMediaSendQueue m_sendQueue; bool m_bRunning = true; //生产者 //生产者尽可能快的生产 void* produce(void *arg) { static int iThreadIndex = -1; ++iThreadIndex; std::cout << "this is produce thread num " << iThreadIndex << std::endl; int iSize = 1024 * 1024; char* pBuff = new char[iSize]; memset(pBuff, 0x0, iSize); while(m_bRunning) { CMediaSendBlock* pBlock = new CMediaSendBlock( pBuff, iSize); int iRet = m_sendQueue.enqueue_tail(pBlock, NULL); //等待到达低水位 if (0 >= iRet) { delete pBlock; pBlock = NULL; std::cout << "center is full...." << std::endl; } else { std::cout << "push success...." << std::endl; } } std::cout << "produce task finished...." << std::endl; return NULL; } //消费者 //消费者按照自身需要的速度进行消费 void* consume(void *arg) { static int iThreadIndex = -1; ++iThreadIndex; std::cout << "this is consume thread num " << iThreadIndex << std::endl; while(m_bRunning) { ACE_Message_Block* pBlock = NULL; if (-1 == m_sendQueue.dequeue_head(pBlock)) continue; //发送 CMediaSendBlock* pSendBlock = dynamic_cast<CMediaSendBlock*>(pBlock); if(NULL == pSendBlock) continue; std::cout << "I am consuming.... " << std::endl; //释放block delete pSendBlock; pSendBlock = NULL; std::cout << "message_bytes" << std::dec << m_sendQueue.message_bytes() << std::endl; std::cout << "message_length" << std::dec << m_sendQueue.message_length() << std::endl; ACE_OS::sleep(2); } std::cout << "consume over......" << std::endl; return NULL; } int main(int argc, char* argv[]) { ACE::init(); m_sendQueue.high_water_mark(10 * 1024 * 1024); m_sendQueue.low_water_mark(2 * 1024 * 1024); m_sendQueue.activate(); //m个生产者,n个消费者 //产生生产者线程 ACE_Thread_Manager::instance()->spawn_n ( 5, (ACE_THR_FUNC) produce ); ACE_OS::sleep(2); //让生产者填满仓库 ////产生消费者线程 ACE_Thread_Manager::instance()->spawn_n ( 2, (ACE_THR_FUNC) consume ); int iData; std::cin >> iData; //close m_bRunning = false; m_sendQueue.deactivate(); //wait ACE_OS::sleep(2); ACE::fini(); return 0; }
相关文章推荐
- 基于ACE_Message_Queue的生产者消费者模式
- ACE_Message_Queue和spawn实现(生产者/消费者)(V2.00)
- ACE_Message_Queue和spawn实现(生产者/消费者)(V2.00)
- Queue、BlockingQueue以及用BlockingQueue实现生产者/消费者模式
- 用ACE实现生产者-消费者模式
- 基于wait和nofity的生产者和消费者模式版本
- 基于并发包同步机制实现生产者-消费者模式
- LinkedBlockingQueue实现生产者-消费者模式
- 用ACE实现生产者与消费者模式
- 基于Java自带同步机制实现生产者-消费者模式
- 基于ArrayBlockingQueue的生产者和消费者
- 基于Java阻塞队列实现生产者与消费者模式
- 用ACE实现的生产者和消费者模式
- 关于生产者/消费者/订阅者模式的那些事
- 【Java多线程】生产者消费者模式
- 描述消费者和生产者模式的代码(有些公司面试的时候会让你直接写一段代码)
- 利用ArrayBlockingQueue实现生产者-消费者
- 生产者消费者线程在Queue<T>中实现多线程同步
- 设计模式-生产者消费者模式
- java线程通信 生产者与消费者模式