您的位置:首页 > 产品设计 > UI/UE

基于ACE_Message_Queue的生产者消费者模式

2013-05-26 23:45 381 查看
1.生产者尽可能生产;

2.消费者按照自身需求执行消费行为;

code:

class CMediaSendBlock : public ACE_Message_Block
{
public:
CMediaSendBlock( const char* pData, const unsigned long ulDataLen)
: ACE_Message_Block((size_t)ulDataLen, MB_DATA, 0, 0)
{
if (NULL != pData)
{
(void)this->copy(pData, (size_t) ulDataLen);
}
}

virtual ~CMediaSendBlock() {}
};

class CMediaSendQueue : public ACE_Message_Queue<ACE_MT_SYNCH>
{
public:
CMediaSendQueue() {}
virtual ~CMediaSendQueue() {}

int enqueue_tail_ex(CMediaSendBlock* pBlock)
{
ACE_Time_Value timeValue = ACE_OS::gettimeofday() + ACE_Time_Value(0, 10);
int nRetVal = ACE_Message_Queue<ACE_MT_SYNCH>::enqueue_tail(pBlock, &timeValue);

if (0 >= nRetVal)
{
return -1;
}

return nRetVal;
}
};

CMediaSendQueue m_sendQueue;
bool m_bRunning = true;

//生产者
//生产者尽可能快的生产
void* produce(void *arg)
{
static int iThreadIndex = -1;
++iThreadIndex;
std::cout << "this is produce thread num " << iThreadIndex << std::endl;

int iSize = 1024 * 1024;
char* pBuff = new char[iSize];
memset(pBuff, 0x0, iSize);

while(m_bRunning)
{
CMediaSendBlock* pBlock = new CMediaSendBlock( pBuff, iSize);
int iRet = m_sendQueue.enqueue_tail(pBlock, NULL); //等待到达低水位

if (0 >= iRet)
{
delete pBlock;
pBlock = NULL;

std::cout << "center is full...." << std::endl;
}
else
{
std::cout << "push success...." << std::endl;
}
}

std::cout << "produce task finished...." << std::endl;
return NULL;
}

//消费者
//消费者按照自身需要的速度进行消费
void* consume(void *arg)
{
static int iThreadIndex = -1;
++iThreadIndex;
std::cout << "this is consume thread num " << iThreadIndex << std::endl;

while(m_bRunning)
{
ACE_Message_Block* pBlock = NULL;
if (-1 == m_sendQueue.dequeue_head(pBlock))
continue;

//发送
CMediaSendBlock* pSendBlock = dynamic_cast<CMediaSendBlock*>(pBlock);
if(NULL == pSendBlock)
continue;

std::cout << "I am consuming.... " << std::endl;

//释放block
delete pSendBlock;
pSendBlock = NULL;

std::cout << "message_bytes" << std::dec << m_sendQueue.message_bytes() << std::endl;
std::cout << "message_length" << std::dec << m_sendQueue.message_length() << std::endl;

ACE_OS::sleep(2);
}

std::cout << "consume over......" << std::endl;
return NULL;
}

int main(int argc, char* argv[])
{
ACE::init();

m_sendQueue.high_water_mark(10 * 1024 * 1024);
m_sendQueue.low_water_mark(2 * 1024 * 1024);
m_sendQueue.activate();

//m个生产者,n个消费者
//产生生产者线程
ACE_Thread_Manager::instance()->spawn_n
(
5,
(ACE_THR_FUNC) produce
);

ACE_OS::sleep(2); //让生产者填满仓库

////产生消费者线程
ACE_Thread_Manager::instance()->spawn_n
(
2,
(ACE_THR_FUNC) consume
);

int iData;
std::cin >> iData;

//close
m_bRunning = false;
m_sendQueue.deactivate();

//wait
ACE_OS::sleep(2);

ACE::fini();
return 0;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: