BlockingQueue 1
2013-11-06 19:02
323 查看
BlockingQueue类图
------------------------------------mutex_ : MutexLock //互斥锁 |
-notEmpty : Condition // |
-queue_: std::deque<T> //堆栈 |
--------------------------- |
<<create>>-BlockingQueue() |
+put(X:const T&):void |
+tacke():T |
+size():size_t |
-----------------------------------|
BlockingQueue的源文件
// Use of this source code is governed by a BSD-style license // that can be found in the License file. // // Author: Shuo Chen (chenshuo at chenshuo dot com) #ifndef MUDUO_BASE_BLOCKINGQUEUE_H #define MUDUO_BASE_BLOCKINGQUEUE_H #include <muduo/base/Condition.h> #include <muduo/base/Mutex.h> #include <boost/noncopyable.hpp> #include <deque> #include <assert.h> /** 阻塞队列,无界限队列 **/ namespace muduo { template<typename T> //模板 class BlockingQueue : boost::noncopyable //不可复制的 { public: BlockingQueue() //构造函数 : mutex_(), //互斥量 notEmpty_(mutex_), // 测试队列是否为空,在测试直线先加锁 queue_() // 队列 { } //人队列 void put(const T& x) { //先加锁 MutexLockGuard lock(mutex_); //入栈 queue_.push_back(x); //发送信号 notEmpty_.notify(); // TODO: move outside of lock } //出队列 T take() { //出栈前先加锁,防止出错 MutexLockGuard lock(mutex_); // always use a while-loop, due to spurious wakeup //如果队列为空,一直等待。。。。 while (queue_.empty()) { notEmpty_.wait(); } //断言队列是否为空 assert(!queue_.empty()); //第一个元素出队列 T front(queue_.front()); queue_.pop_front(); return front; } //队列的大小 size_t size() const { MutexLockGuard lock(mutex_); return queue_.size(); } private: //互斥量,主要是赋给 MutexLock--->MutexLockGuard --->&MutexLock mutable MutexLock mutex_; Condition notEmpty_; std::deque<T> queue_; }; } #endif // MUDUO_BASE_BLOCKINGQUEUE_H
[/code]
BlockingQueue的测试程序1
#include <muduo/base/BlockingQueue.h> #include <muduo/base/CountDownLatch.h> #include <muduo/base/Thread.h> #include <muduo/base/Timestamp.h> #include <boost/bind.hpp> #include <boost/ptr_container/ptr_vector.hpp> #include <map> #include <string> #include <stdio.h> /* BlockingQueue 的测试程序 **/ class Bench { public: Bench(int numThreads) : latch_(numThreads), //计算器为numThreads ,就是线程的个数 threads_(numThreads) //线程容器的大小numThreads { //加入numThreads个线程 for (int i = 0; i < numThreads; ++i) { char name[32]; snprintf(name, sizeof name, "work thread %d", i); threads_.push_back(new muduo::Thread( boost::bind(&Bench::threadFunc, this), muduo::string(name))); } //启动线程 for_each(threads_.begin(), threads_.end(), boost::bind(&muduo::Thread::start, _1)); } /*主线程的Run函数*/ void run(int times) { printf("waiting for count down latch\n"); //等待计算器为0 ,就是所其他线程都准备好好,主线程才运行 ! latch_.wait(); printf("all threads started\n"); //想队列里面加入times个时间戳 for (int i = 0; i < times; ++i) { muduo::Timestamp now(muduo::Timestamp::now()); queue_.put(now); usleep(1000); } } void joinAll() { for (size_t i = 0; i < threads_.size(); ++i) { queue_.put(muduo::Timestamp::invalid()); } for_each(threads_.begin(), threads_.end(), boost::bind(&muduo::Thread::join, _1)); } private: //线程的回调函数 void threadFunc() { printf("tid=%d, %s started\n", muduo::CurrentThread::tid(), muduo::CurrentThread::name()); std::map<int, int> delays; //协程的计算器-1 latch_.countDown(); //如果时间无效,则false bool running = true; while (running) { //时间戳 t , now muduo::Timestamp t(queue_.take()); muduo::Timestamp now(muduo::Timestamp::now()); if (t.valid()) { int delay = static_cast<int>(timeDifference(now, t) * 1000000); // printf("tid=%d, latency = %d us\n", // muduo::CurrentThread::tid(), delay); ++delays[delay]; } running = t.valid(); } printf("tid=%d, %s stopped\n", muduo::CurrentThread::tid(), muduo::CurrentThread::name()); for (std::map<int, int>::iterator it = delays.begin(); it != delays.end(); ++it) { printf("tid = %d, delay = %d, count = %d\n", muduo::CurrentThread::tid(), it->first, it->second); } } muduo::BlockingQueue<muduo::Timestamp> queue_; // 无界限缓冲区 muduo::CountDownLatch latch_; //线程同步类,协程 boost::ptr_vector<muduo::Thread> threads_; //线程数目 }; int main(int argc, char* argv[]) { // 线程的个数 int threads = argc > 1 ? atoi(argv[1]) : 1; Bench t(threads); t.run(10000); t.joinAll();
}[/code]
BlockingQueue的测试程序2
#include <muduo/base/BlockingQueue.h> #include <muduo/base/CountDownLatch.h> #include <muduo/base/Thread.h> #include <boost/bind.hpp> #include <boost/ptr_container/ptr_vector.hpp> #include <string> #include <stdio.h> class Test { public: Test(int numThreads) : latch_(numThreads), threads_(numThreads) { for (int i = 0; i < numThreads; ++i) { char name[32]; snprintf(name, sizeof name, "work thread %d", i); threads_.push_back(new muduo::Thread( boost::bind(&Test::threadFunc, this), muduo::string(name))); } for_each(threads_.begin(), threads_.end(), boost::bind(&muduo::Thread::start, _1)); } void run(int times) { printf("waiting for count down latch\n"); latch_.wait(); printf("all threads started\n"); for (int i = 0; i < times; ++i) { char buf[32]; snprintf(buf, sizeof buf, "hello %d", i); queue_.put(buf); printf("tid=%d, put data = %s, size = %zd\n", muduo::CurrentThread::tid(), buf, queue_.size()); } } void joinAll() { for (size_t i = 0; i < threads_.size(); ++i) { //加入线程终止条件“stop”标示 queue_.put("stop"); } //开启线程 for_each(threads_.begin(), threads_.end(), boost::bind(&muduo::Thread::join, _1)); } private: void threadFunc() { printf("tid=%d, %s started\n", muduo::CurrentThread::tid(), muduo::CurrentThread::name()); latch_.countDown(); bool running = true; while (running) { std::string d(queue_.take()); printf("tid=%d, get data = %s, size = %zd\n", muduo::CurrentThread::tid(), d.c_str(), queue_.size()); running = (d != "stop"); } printf("tid=%d, %s stopped\n", muduo::CurrentThread::tid(), muduo::CurrentThread::name()); } muduo::BlockingQueue<std::string> queue_; //队列 ,条件变量的测试量 muduo::CountDownLatch latch_; //协程 boost::ptr_vector<muduo::Thread> threads_; //线程个数 }; int main() { printf("pid=%d, tid=%d\n", ::getpid(), muduo::CurrentThread::tid()); //5个子线程 Test t(5); //100容量的queue t.run(100); t.joinAll(); printf("number of created threads %d\n", muduo::Thread::numCreated()); }
[/code]
相关文章推荐
- 一个C++的BlockingQueue实现
- 多线程下的生产者和消费者 - BlockingQueue
- java多线程学习-java.util.concurrent详解(四) BlockingQueue
- BlockingQueue的一个小例子
- BlockingQueue详解
- java concurent之BlockingQueue
- BlockingQueue的异常Queue full
- java并发之BlockingQueue和Lock以及synchronized
- 线程之BlockingQueue
- 并发包中的BlockingQueue这个类的介绍及使用
- 生产者-消费者模型的3种Java实现:synchronized,signal/notifyAll及BlockingQueue
- Java多线程-工具篇-BlockingQueue
- 多线程之9——BlockingQueue (阻塞队列)
- 项目积累——Blockingqueue,ConcurrentLinkedQueue,Executors
- 使用BlockingQueue进行多线程间的异步通信
- Java多线程-工具篇-BlockingQueue
- BlockingQueue
- Java多线程-工具篇-BlockingQueue
- 阻塞队列BlockingQueue
- Java多线程-工具篇-BlockingQueue