您的位置:首页 > 其它

ACE_Task的生产者和消费者

2014-12-17 15:57 176 查看
#include "ace/Task.h"
#include "ace/Message_Block.h"
#include "ace/OS.h"

const int N = 10;

//The Consumer Task.
class Consumer : public ACE_Task<ACE_MT_SYNCH> {
public:
int open(void*) {
ACE_DEBUG((LM_DEBUG, "(%t)Consumer task opened \n"));
//Activate the Task
activate(THR_NEW_LWP, 3);
return 0;
}
//The Service Processing routine
int svc(void) {
ACE_Message_Block* mb = 0;
for (;;) {
mb = 0;
getq(mb);//Get message from underlying queue
if (*mb->rd_ptr() < N) {
ACE_DEBUG(
(LM_DEBUG, "(%t)Got message: %d from remote task\n", *mb->rd_ptr()));
}
else if (*mb->rd_ptr() == N) {
ACE_DEBUG(
(LM_DEBUG, "(%t)Got message: %d from remote task\n", *mb->rd_ptr()));
++*mb->rd_ptr(); // *mb->rd_ptr() == N+1
ungetq(mb);
break;// 供后续线程查看
}
else { // *mb->rd_ptr() > N
ungetq(mb);
break;// 供后续线程查看
}
}
return 0;
}
int close(u_long) {
ACE_DEBUG((LM_DEBUG, "(%t)Consumer closes down \n"));
return 0;
}
};

class Producer : public ACE_Task_Base {
public:
Producer(Consumer * consumer) :
data_(0), consumer_(consumer) {
mb_ = new ACE_Message_Block((char*)&data_, sizeof(data_));
}
int open(void*) {
ACE_DEBUG((LM_DEBUG, "(%t)Producer task opened \n"));
//Activate the Task
activate(THR_NEW_LWP, 1);
return 0;
}
//The Service Processing routine
int svc(void) {
while (data_ <= N) {
//Send message to consumer
ACE_DEBUG(
(LM_DEBUG, "(%t)Sending message: %d to remote task\n", data_));
consumer_->putq(mb_);
//Go to sleep for a sec.
ACE_OS::sleep(1);
++data_;
}
return 0;
}
int close(u_long) {
ACE_DEBUG((LM_DEBUG, "(%t)Producer closes down \n"));
return 0;
}
private:
char data_;
Consumer * consumer_;
ACE_Message_Block * mb_;
};

int main(int argc, char * argv[]){
Consumer *consumer = new Consumer;
Producer * producer = new Producer(consumer);
producer->open(0);
consumer->open(0);
//Wait for all the tasks to exit.
ACE_Thread_Manager::instance()->wait();
return 0;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  ACT_Task