您的位置:首页 > 产品设计 > UI/UE

BlockingQueue 1

2013-11-06 19:02 323 查看

BlockingQueue类图

-----------------------------------

-mutex_ : MutexLock //互斥锁 |

-notEmpty : Condition // |

-queue_: std::deque<T> //堆栈 |

--------------------------- |

<<create>>-BlockingQueue() |

+put(X:const T&):void |

+tacke():T |

+size():size_t |

-----------------------------------|

BlockingQueue的源文件

// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
//
// Author: Shuo Chen (chenshuo at chenshuo dot com)

#ifndef MUDUO_BASE_BLOCKINGQUEUE_H
#define MUDUO_BASE_BLOCKINGQUEUE_H

#include <muduo/base/Condition.h>
#include <muduo/base/Mutex.h>

#include <boost/noncopyable.hpp>
#include <deque>
#include <assert.h>

/**
阻塞队列,无界限队列

**/
namespace muduo
{

template<typename T>  //模板
class BlockingQueue : boost::noncopyable //不可复制的
{
public:
BlockingQueue() //构造函数
: mutex_(), //互斥量
notEmpty_(mutex_), // 测试队列是否为空,在测试直线先加锁
queue_()  // 队列
{
}

//人队列
void put(const T& x)
{
//先加锁
MutexLockGuard lock(mutex_);
//入栈
queue_.push_back(x);
//发送信号
notEmpty_.notify(); // TODO: move outside of lock
}
//出队列
T take()
{
//出栈前先加锁,防止出错
MutexLockGuard lock(mutex_);
// always use a while-loop, due to spurious wakeup
//如果队列为空,一直等待。。。。
while (queue_.empty())
{
notEmpty_.wait();
}
//断言队列是否为空
assert(!queue_.empty());
//第一个元素出队列
T front(queue_.front());
queue_.pop_front();
return front;
}
//队列的大小
size_t size() const
{
MutexLockGuard lock(mutex_);
return queue_.size();
}

private:
//互斥量,主要是赋给 MutexLock--->MutexLockGuard --->&MutexLock
mutable MutexLock mutex_;
Condition         notEmpty_;
std::deque<T>     queue_;
};

}

#endif  // MUDUO_BASE_BLOCKINGQUEUE_H

[/code]

BlockingQueue的测试程序1

#include <muduo/base/BlockingQueue.h>
#include <muduo/base/CountDownLatch.h>
#include <muduo/base/Thread.h>
#include <muduo/base/Timestamp.h>

#include <boost/bind.hpp>
#include <boost/ptr_container/ptr_vector.hpp>
#include <map>
#include <string>
#include <stdio.h>
/*
BlockingQueue 的测试程序
**/
class Bench
{
public:
Bench(int numThreads)
: latch_(numThreads), //计算器为numThreads ,就是线程的个数
threads_(numThreads) //线程容器的大小numThreads
{
//加入numThreads个线程
for (int i = 0; i < numThreads; ++i)
{
char name[32];
snprintf(name, sizeof name, "work thread %d", i);
threads_.push_back(new muduo::Thread(
boost::bind(&Bench::threadFunc, this), muduo::string(name)));
}
//启动线程
for_each(threads_.begin(), threads_.end(), boost::bind(&muduo::Thread::start, _1));
}
/*主线程的Run函数*/
void run(int times)
{
printf("waiting for count down latch\n");
//等待计算器为0 ,就是所其他线程都准备好好,主线程才运行 !
latch_.wait();
printf("all threads started\n");
//想队列里面加入times个时间戳
for (int i = 0; i < times; ++i)
{
muduo::Timestamp now(muduo::Timestamp::now());
queue_.put(now);
usleep(1000);
}
}

void joinAll()
{
for (size_t i = 0; i < threads_.size(); ++i)
{
queue_.put(muduo::Timestamp::invalid());
}

for_each(threads_.begin(), threads_.end(), boost::bind(&muduo::Thread::join, _1));
}

private:
//线程的回调函数
void threadFunc()
{
printf("tid=%d, %s started\n",
muduo::CurrentThread::tid(),
muduo::CurrentThread::name());

std::map<int, int> delays;
//协程的计算器-1
latch_.countDown();
//如果时间无效,则false
bool running = true;
while (running)
{
//时间戳 t , now
muduo::Timestamp t(queue_.take());
muduo::Timestamp now(muduo::Timestamp::now());
if (t.valid())
{
int delay = static_cast<int>(timeDifference(now, t) * 1000000);
// printf("tid=%d, latency = %d us\n",
//        muduo::CurrentThread::tid(), delay);
++delays[delay];
}
running = t.valid();
}

printf("tid=%d, %s stopped\n",
muduo::CurrentThread::tid(),
muduo::CurrentThread::name());
for (std::map<int, int>::iterator it = delays.begin();
it != delays.end(); ++it)
{
printf("tid = %d, delay = %d, count = %d\n",
muduo::CurrentThread::tid(),
it->first, it->second);
}
}

muduo::BlockingQueue<muduo::Timestamp> queue_; // 无界限缓冲区
muduo::CountDownLatch latch_;   //线程同步类,协程
boost::ptr_vector<muduo::Thread> threads_;  //线程数目
};

int main(int argc, char* argv[])
{
// 线程的个数
int threads = argc > 1 ? atoi(argv[1]) : 1;

Bench t(threads);
t.run(10000);
t.joinAll();

}[/code]

BlockingQueue的测试程序2

#include <muduo/base/BlockingQueue.h>
#include <muduo/base/CountDownLatch.h>
#include <muduo/base/Thread.h>

#include <boost/bind.hpp>
#include <boost/ptr_container/ptr_vector.hpp>
#include <string>
#include <stdio.h>

class Test
{
public:
Test(int numThreads)
: latch_(numThreads),
threads_(numThreads)
{
for (int i = 0; i < numThreads; ++i)
{
char name[32];
snprintf(name, sizeof name, "work thread %d", i);
threads_.push_back(new muduo::Thread(
boost::bind(&Test::threadFunc, this), muduo::string(name)));
}
for_each(threads_.begin(), threads_.end(), boost::bind(&muduo::Thread::start, _1));
}

void run(int times)
{
printf("waiting for count down latch\n");
latch_.wait();
printf("all threads started\n");
for (int i = 0; i < times; ++i)
{
char buf[32];
snprintf(buf, sizeof buf, "hello %d", i);
queue_.put(buf);
printf("tid=%d, put data = %s, size = %zd\n", muduo::CurrentThread::tid(), buf, queue_.size());
}
}

void joinAll()
{
for (size_t i = 0; i < threads_.size(); ++i)
{
//加入线程终止条件“stop”标示
queue_.put("stop");
}
//开启线程
for_each(threads_.begin(), threads_.end(), boost::bind(&muduo::Thread::join, _1));
}

private:

void threadFunc()
{
printf("tid=%d, %s started\n",
muduo::CurrentThread::tid(),
muduo::CurrentThread::name());

latch_.countDown();
bool running = true;
while (running)
{
std::string d(queue_.take());
printf("tid=%d, get data = %s, size = %zd\n", muduo::CurrentThread::tid(), d.c_str(), queue_.size());
running = (d != "stop");
}

printf("tid=%d, %s stopped\n",
muduo::CurrentThread::tid(),
muduo::CurrentThread::name());
}

muduo::BlockingQueue<std::string> queue_; //队列 ,条件变量的测试量
muduo::CountDownLatch latch_; //协程
boost::ptr_vector<muduo::Thread> threads_; //线程个数
};

int main()
{
printf("pid=%d, tid=%d\n", ::getpid(), muduo::CurrentThread::tid());
//5个子线程
Test t(5);
//100容量的queue
t.run(100);
t.joinAll();

printf("number of created threads %d\n", muduo::Thread::numCreated());
}

[/code]
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: