muduo网络库源码学习————无界队列和有界队列
2015-08-29 17:22
627 查看
muduo库里实现了两个队列模板类:无界队列为BlockingQueue.h,有界队列为BoundedBlockingQueue.h,两个测试程序实现了生产者和消费者模型。(这里以无界队列为例,有界队列和无界的差不多)代码如下:
BlockingQueue.h
测试代码有两个:
BlockingQueue_test.cc
单独编译后运行结果如下:
![](http://img.blog.csdn.net/20150829171957864)
BlockingQueue_bench.cc
单独编译后运行结构如下:(输出过长,时间也太长,截图时中断了程序)
BlockingQueue.h
[code]#include <muduo/base/Condition.h> #include <muduo/base/Mutex.h> #include <boost/noncopyable.hpp> #include <deque> #include <assert.h> namespace muduo { template<typename T>//队列模板 class BlockingQueue : boost::noncopyable { public: BlockingQueue(): mutex_(), notEmpty_(mutex_),queue_() {//构造函数对3个成员进行初始化 } void put(const T& x)//生产产品 { MutexLockGuard lock(mutex_);//先加上锁对队列进行保护,构造函数中调用lock,析构函数会自动调用unlock queue_.push_back(x);//产品放进队列 //队列不为空,通知消费者可以进行消费 notEmpty_.notify(); // TODO: move outside of lock } T take()//消费产品 { MutexLockGuard lock(mutex_);//加锁保护队列 // always use a while-loop, due to spurious wakeup while (queue_.empty())//如果队列为空,则一直等待 { notEmpty_.wait(); } assert(!queue_.empty());//断言队列非空 T front(queue_.front());//取出队首元素 queue_.pop_front();//将队首元素弹出 return front;//返回队首元素 } size_t size() const//队列大小 { MutexLockGuard lock(mutex_);//加锁保护 return queue_.size();//返回队列大小 } private: mutable MutexLock mutex_;//互斥锁 Condition notEmpty_;//条件变量 std::deque<T> queue_;//队列使用stl中的deque }; } #endif // MUDUO_BASE_BLOCKINGQUEUE_H
测试代码有两个:
BlockingQueue_test.cc
[code]#include <muduo/base/BlockingQueue.h> #include <muduo/base/CountDownLatch.h> #include <muduo/base/Thread.h> #include <boost/bind.hpp> #include <boost/ptr_container/ptr_vector.hpp> #include <string> #include <stdio.h> class Test { public://numThreads初始化为5,条件变量count初始化为5,线程个数也为5 Test(int numThreads) : latch_(numThreads), threads_(numThreads) { for (int i = 0; i < numThreads; ++i) {//线程名称 char name[32]; snprintf(name, sizeof name, "work thread %d", i); //创建5个线程,threadFunc为线程回调函数 threads_.push_back(new muduo::Thread(boost::bind(&Test::threadFunc, this), muduo::string(name))); } //启动线程 for_each(threads_.begin(), threads_.end(), boost::bind(&muduo::Thread::start, _1)); } void run(int times) { printf("waiting for count down latch\n"); latch_.wait();//等待count被减为0 printf("all threads started\n"); for (int i = 0; i < times; ++i)//100次 { char buf[32]; snprintf(buf, sizeof buf, "hello %d", i); queue_.put(buf);//往队列中添加100个产品 //打印信息 printf("tid=%d, put data = %s, size = %zd\n", muduo::CurrentThread::tid(), buf, queue_.size()); } } void joinAll() { for (size_t i = 0; i < threads_.size(); ++i) {//往5个线程添加stop queue_.put("stop"); } //执行join for_each(threads_.begin(), threads_.end(), boost::bind(&muduo::Thread::join, _1)); } private: //线程回调函数 void threadFunc() {//输出线程id和名称 printf("tid=%d, %s started\n", muduo::CurrentThread::tid(),muduo::CurrentThread::name()); //计数值减一 latch_.countDown();//count减为0时将通知所有等待线程 bool running = true; while (running) { std::string d(queue_.take());//消费产品 //打印取出的值 printf("tid=%d, get data = %s, size = %zd\n", muduo::CurrentThread::tid(), d.c_str(), queue_.size()); //直到产品的名称==stop,跳出循环 running = (d != "stop"); } //打印停止信息 printf("tid=%d, %s stopped\n",muduo::CurrentThread::tid(),muduo::CurrentThread::name()); } muduo::BlockingQueue<std::string> queue_;//队列 muduo::CountDownLatch latch_;//条件变量 boost::ptr_vector<muduo::Thread> threads_;//线程数组 }; int main() {//打印进程,线程id printf("pid=%d, tid=%d\n", ::getpid(), muduo::CurrentThread::tid()); Test t(5);//定义test类 t.run(100); t.joinAll(); printf("number of created threads %d\n", muduo::Thread::numCreated()); }
单独编译后运行结果如下:
BlockingQueue_bench.cc
[code]#include <muduo/base/BlockingQueue.h> #include <muduo/base/CountDownLatch.h> #include <muduo/base/Thread.h> #include <muduo/base/Timestamp.h> #include <boost/bind.hpp> #include <boost/ptr_container/ptr_vector.hpp> #include <map> #include <string> #include <stdio.h> class Bench//Bench是用来度量时间的一个类 { public://count初始化为numThreads,创建numThreads个线程 Bench(int numThreads) : latch_(numThreads),threads_(numThreads) { for (int i = 0; i < numThreads; ++i) { char name[32]; snprintf(name, sizeof name, "work thread %d", i); //创建线程,设置回调 threads_.push_back(new muduo::Thread( boost::bind(&Bench::threadFunc, this), muduo::string(name))); } //线程start for_each(threads_.begin(), threads_.end(), boost::bind(&muduo::Thread::start, _1)); } void run(int times)//生产产品 {//10000个 printf("waiting for count down latch\n"); latch_.wait();//等待count降为0 printf("all threads started\n"); for (int i = 0; i < times; ++i) { muduo::Timestamp now(muduo::Timestamp::now()); queue_.put(now);//当前时间戳进队 usleep(1000);//1000微秒一次 } } void joinAll() { for (size_t i = 0; i < threads_.size(); ++i) { queue_.put(muduo::Timestamp::invalid());//产生非法时间,即产生跳出循环的条件 } for_each(threads_.begin(), threads_.end(), boost::bind(&muduo::Thread::join, _1)); } private: void threadFunc()//用于消费产品 { printf("tid=%d, %s started\n",muduo::CurrentThread::tid(),muduo::CurrentThread::name()); //Map是STL[1] 的一个关联容器,它提供一对一 //(其中第一个可以称为关键字,每个关键字只能在map中出现一次,第二个可能称为该关键字的值)的数据处理能力 //第一个是delay值,第二个是相同delay的次数 std::map<int, int> delays;//map容器 latch_.countDown(); bool running = true; while (running) { muduo::Timestamp t(queue_.take());//取出队列中的时间戳 muduo::Timestamp now(muduo::Timestamp::now());//建立当前时间戳对象 if (t.valid())//如果是一个合法的时间 {//计算差值 int delay = static_cast<int>(timeDifference(now, t) * 1000000);//以微秒为单位 // printf("tid=%d, latency = %d us\n", muduo::CurrentThread::tid(), delay); ++delays[delay];//?? } running = t.valid();//t为非法的时间则跳出循环 } printf("tid=%d, %s stopped\n", muduo::CurrentThread::tid(),muduo::CurrentThread::name()); //使用迭代器遍历map容器 for (std::map<int, int>::iterator it = delays.begin(); it != delays.end(); ++it) { printf("tid = %d, delay = %d, count = %d\n",muduo::CurrentThread::tid(),it->first, it->second); } } muduo::BlockingQueue<muduo::Timestamp> queue_; muduo::CountDownLatch latch_; boost::ptr_vector<muduo::Thread> threads_; }; int main(int argc, char* argv[]) {//若参数大于1则是传入的参数,否则设为1 int threads = argc > 1 ? atoi(argv[1]) : 1; Bench t(threads);//建立Bench对象 t.run(10000); t.joinAll(); }
单独编译后运行结构如下:(输出过长,时间也太长,截图时中断了程序)
相关文章推荐
- 多客户端服务器网络编程
- RMI * Hessian * Burlap * Httpinvoker * WebService
- 网络号和主机号的计算(转载)
- bp神经网络算法中的权值修改问题
- hdu 4739 2013杭州赛区网络赛 寻找平行坐标轴的四边形 **
- http://www.gisinternals.com/aboutgisinternals.html
- .net学习笔记---HttpHandle与HttpModule
- 网络技术杂技
- SOCKET 编程TCP/IP、UDP
- SOCKET 编程TCP/IP、UDP
- SOCKET 编程TCP/IP、UDP
- SOCKET 编程TCP/IP、UDP
- 计算机网络面试题
- MVC HTTP 错误 403.14 - Forbidden
- muduo网络库源码学习————条件变量
- ubuntu/centos网络配置
- hdu 4738 2013杭州赛区网络赛 桥+重边+连通判断 ***
- Linux C 网络编程之初探
- Delphi 调用极光推送服务端HTTP API实例
- 非action层获取HttpServletRequest