您的位置:首页 > 运维架构 > Linux

linux多线程服务器编程 muduo库学习笔记二

2016-07-14 15:55 525 查看
知识点:Linux多线程服务器端编程 8.2

接着上一篇,这里用定时器创建了一个事件,在TimerQueue构造函数中可以看到,每当计时到时时,Poller响应,调用定时器channel的回调函数,这里被绑定了handleRead()函数。handleRead()函数处理已经超时的所有Timer, 处理Timer的回调函数, 然后重置到时的Timer。这里调用addTimer插入一个Timer事件和它的回调函数。定时器哪里没有去研究一下使用,大概流程和Poller管理所有channel事件差不多。

  代码在上一篇中增加了一个文件。

// excerpts from http://code.google.com/p/muduo/ //
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
//
// Author: Shuo Chen (chenshuo at chenshuo dot com)

#ifndef MUDUO_NET_CALLBACKS_H
#define MUDUO_NET_CALLBACKS_H

#include <boost/function.hpp>
#include <boost/shared_ptr.hpp>

#include "muduo/base/Timestamp.h"

namespace muduo
{

// All client visible callbacks go here.

typedef boost::function<void()> TimerCallback;

}

#endif  // MUDUO_NET_CALLBACKS_H

// excerpts from http://code.google.com/p/muduo/ //
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
//
// Author: Shuo Chen (chenshuo at chenshuo dot com)

#ifndef MUDUO_NET_TIMER_H
#define MUDUO_NET_TIMER_H

#include <boost/noncopyable.hpp>

#include "muduo/base/Timestamp.h"
#include "Callbacks.h"

namespace muduo
{

///
/// Internal class for timer event.
///
class Timer : boost::noncopyable
{
public:
Timer(const TimerCallback& cb, Timestamp when, double interval)
: callback_(cb),
expiration_(when),
interval_(interval),
repeat_(interval > 0.0)
{ }

void run() const
{
callback_();
}

Timestamp expiration() const  { return expiration_; }
bool repeat() const { return repeat_; }

void restart(Timestamp now);

private:
const TimerCallback callback_;
Timestamp expiration_;
const double interval_;
const bool repeat_;
};

}
#endif  // MUDUO_NET_TIMER_H

// excerpts from http://code.google.com/p/muduo/ //
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
//
// Author: Shuo Chen (chenshuo at chenshuo dot com)

#ifndef MUDUO_NET_TIMERID_H
#define MUDUO_NET_TIMERID_H

#include "muduo/base/copyable.h"

namespace muduo
{

class Timer;

///
/// An opaque identifier, for canceling Timer.
///
class TimerId : public muduo::copyable
{
public:
explicit TimerId(Timer* timer)
: value_(timer)
{
}

// default copy-ctor, dtor and assignment are okay

private:
Timer* value_;
};

}

#endif  // MUDUO_NET_TIMERID_H

#ifndef TIMERQUEUE_H
#define TIMERQUEUE_H
#include<set>
#include<vector>
#include <boost/noncopyable.hpp>

#include "muduo/base/Timestamp.h"
#include "muduo/base/Mutex.h"
#include "Callbacks.h"
#include "channel.h"
namespace muduo
{

class EventLoop;
class Timer;
class TimerId;

class TimerQueue :boost::noncopyable
{
public:
TimerQueue(EventLoop * loop);
~TimerQueue();
TimerId addTimer(const  TimerCallback& cb,
Timestamp when,
double interval);
private:
typedef std::pair<Timestamp, Timer*>Entry;
typedef std::set<Entry> TimerList;
void handleRead();
std::vector<Entry> getExpired(Timestamp now);
void reset(const std::vector<Entry>& expired, Timestamp now);
bool insert(Timer* timer);
EventLoop* loop_;
const int timerfd_;
Channel timerfdChannel_;
// Timer list sorted by expiration
TimerList timers_;
};
}
#endif // TIMERQUEUE_H

#define __STDC_LIMIT_MACROS
#include "TimerQueue.h"

#include "muduo/base/Logging.h"
#include "eventloop.h"
#include "Timer.h"
#include "TimerId.h"
#include <stdio.h>
#include <boost/bind.hpp>

#include <sys/timerfd.h>

namespace muduo
{
namespace detail
{

int createTimerfd()
{
int timerfd = ::timerfd_create(CLOCK_MONOTONIC,
TFD_NONBLOCK | TFD_CLOEXEC);
if (timerfd < 0)
{
LOG_SYSFATAL << "Failed in timerfd_create";
}
return timerfd;
}

struct timespec howMuchTimeFromNow(Timestamp when)
{
int64_t microseconds = when.microSecondsSinceEpoch()
- Timestamp::now().microSecondsSinceEpoch();
if (microseconds < 100)
{
microseconds = 100;
}
struct timespec ts;
ts.tv_sec = static_cast<time_t>(
microseconds / Timestamp::kMicroSecondsPerSecond);
ts.tv_nsec = static_cast<long>(
(microseconds % Timestamp::kMicroSecondsPerSecond) * 1000);
return ts;
}

void readTimerfd(int timerfd, Timestamp now)
{
uint64_t howmany;
ssize_t n = ::read(timerfd, &howmany, sizeof howmany);
LOG_TRACE << "TimerQueue::handleRead() " << howmany << " at " << now.toString();
if (n != sizeof howmany)
{
LOG_ERROR << "TimerQueue::handleRead() reads " << n << " bytes instead of 8";
}
}

void resetTimerfd(int timerfd, Timestamp expiration)
{
// wake up loop by timerfd_settime()
struct itimerspec newValue;
struct itimerspec oldValue;
bzero(&newValue, sizeof newValue);
bzero(&oldValue, sizeof oldValue);
newValue.it_value = howMuchTimeFromNow(expiration);
int ret = ::timerfd_settime(timerfd, 0, &newValue, &oldValue);
if (ret)
{
LOG_SYSERR << "timerfd_settime()";
}
}

}
}
using namespace muduo;
using namespace muduo::detail;

TimerQueue::TimerQueue(EventLoop* loop)
:loop_(loop),
timerfd_(createTimerfd()),
timerfdChannel_(loop, timerfd_),
timers_()
{
timerfdChannel_.setReadCallback(
boost::bind(&TimerQueue::handleRead, this));
// we are always reading the timerfd, we disarm it with timerfd_settime.
timerfdChannel_.enableReading();
printf("timerqueue gouzaohanshu");
}

TimerQueue::~TimerQueue()
{
::close(timerfd_);
// do not remove channel, since we're in EventLoop::dtor();
for (TimerList::iterator it = timers_.begin();
it != timers_.end(); ++it)
{
delete it->second;
}
}

void TimerQueue::handleRead()
{
loop_->assertInLoopThread();
Timestamp now(Timestamp::now());
readTimerfd(timerfd_, now);

std::vector<Entry> expired = getExpired(now);

// safe to callback outside critical section
for (std::vector<Entry>::iterator it = expired.begin();
it != expired.end(); ++it)
{
it->second->run();
}

reset(expired, now);
}

std::vector<TimerQueue::Entry> TimerQueue::getExpired(Timestamp now)
{
std::vector<Entry> expired;
Entry sentry = std::make_pair(now, reinterpret_cast<Timer*>(UINTPTR_MAX));
TimerList::iterator it = timers_.lower_bound(sentry);
assert(it == timers_.end() || now < it->first);
std::copy(timers_.begin(), it, back_inserter(expired));
timers_.erase(timers_.begin(), it);

return expired;
}
void TimerQueue::reset(const std::vector<Entry>& expired, Timestamp now)
{
Timestamp nextExpire;

for (std::vector<Entry>::const_iterator it = expired.begin();
it != expired.end(); ++it)
{
if (it->second->repeat())
{
it->second->restart(now);
insert(it->second);
}
else
{
// FIXME move to a free list
delete it->second;
}
}

if (!timers_.empty())
{
nextExpire = timers_.begin()->second->expiration();
}

if (nextExpire.valid())
{
resetTimerfd(timerfd_, nextExpire);
}
}

bool TimerQueue::insert(Timer* timer)
{
bool earliestChanged = false;
Timestamp when = timer->expiration();
TimerList::iterator it = timers_.begin();
if (it == timers_.end() || when < it->first)
{
earliestChanged = true;
}
std::pair<TimerList::iterator, bool> result =
timers_.insert(std::make_pair(when, timer));
assert(result.second);
return earliestChanged;
}
TimerId TimerQueue::addTimer(const TimerCallback& cb,
Timestamp when,
double interval)
{
Timer* timer = new Timer(cb, when, interval);
loop_->assertInLoopThread();
bool earliestChanged = insert(timer);

if (earliestChanged)
{
resetTimerfd(timerfd_, timer->expiration());
}
return TimerId(timer);
}
这里与书上的test4不同,这里没有封装的eventloop中,建立一个TimerQueue对象,有助于理解。
#include "eventloop.h"
#include "TimerId.h"
#include <boost/bind.hpp>
#include <stdio.h>
#include "TimerQueue.h"
using namespace muduo;
int cnt = 0;
muduo::EventLoop* g_loop;

void printTid()
{
printf("pid = %d, tid = %d\n", getpid(), muduo::CurrentThread::tid());
printf("now %s\n", muduo::Timestamp::now().toString().c_str());
}

void print(const char* msg)
{
printf("msg %s %s\n", muduo::Timestamp::now().toString().c_str(), msg);
if (++cnt == 20)
{
g_loop->quit();
}
}

int main()
{
printTid();
muduo::EventLoop loop;
g_loop = &loop;
muduo::TimerQueue timerqueue(&loop);
Timestamp time(addTime(Timestamp::now(), 1.0));
timerqueue.addTimer( boost::bind(print,"once"), time, 5);
print("main");
//loop.runAfter(1, boost::bind(print, "once1"));
// loop.runAfter(1.5, boost::bind(print, "once1.5"));
// loop.runAfter(2.5, boost::bind(print, "once2.5"));
// loop.runAfter(3.5, boost::bind(print, "once3.5"));
// loop.runEvery(2, boost::bind(print, "every2"));
// loop.runEvery(3, boost::bind(print, "every3"));

loop.loop();
print("main loop exits");
sleep(1);
return 0;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: