ceph源码分析--monitor的lease机制
2018-01-18 00:33
567 查看
概述
在monitor节点中,存在着Leader和Peon两种角色。在monitor采用了一种lease机制,保证了副本在一定时间内可读写。同时lease机制也保证了整个集群中的monitor当前都是可用状态。Leader节点会向所有的Peon节点定时的发送lease消息,延长各个节点的lease时间,同时收集所有节点的ack消息。只要有一个节点没有回复ack消息。就会重新发起选举。
同理,Peon节点一直在等待Leader向自己发送lease消息。超时之后,也会重新发起选举。
这样就保证了整个monitor集群的可用性。
源码分析
从lease的发起者extend_lease()开始讲void Paxos::extend_lease() { //断言lease是由Leader节点发起 assert(mon->is_leader()); //assert(is_active()); //当前时间+5s作为租期 lease_expire = ceph_clock_now(); lease_expire += g_conf->mon_lease; //已经收到的lease回复集合清空。将leader节点加入集合 acked_lease.clear(); acked_lease.insert(mon->rank); dout(7) << "extend_lease now+" << g_conf->mon_lease << " (" << lease_expire << ")" << dendl; // bcast for (set<int>::const_iterator p = mon->get_quorum().begin(); p != mon->get_quorum().end(); ++p) { //向quorum中的所有peon节点发送lease消息 if (*p == mon->rank) continue; MMonPaxos *lease = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LEASE, ceph_clock_now()); lease->last_committed = last_committed; lease->lease_timestamp = lease_expire; lease->first_committed = first_committed; mon->messenger->send_message(lease, mon->monmap->get_inst(*p)); } // set timeout event. // if old timeout is still in place, leave it. if (!lease_ack_timeout_event) { lease_ack_timeout_event = mon->timer.add_event_after( //2*5=10超时时间为10s g_conf->mon_lease_ack_timeout_factor * g_conf->mon_lease, new C_MonContext(mon, [this](int r) { if (r == -ECANCELED) return; //超时后发起选举 lease_ack_timeout(); })); } // set renew event //继续发起下一轮extend_lease utime_t at = lease_expire; at -= g_conf->mon_lease; at += g_conf->mon_lease_renew_interval_factor * g_conf->mon_lease; lease_renew_event = mon->timer.add_event_at( at, new C_MonContext(mon, [this](int r) { if (r == -ECANCELED) return; lease_renew_timeout(); })); }
注册一个dispatch来接收ack消息
void Paxos::dispatch(MonOpRequestRef op) { assert(op->is_type_paxos()); op->mark_paxos_event("dispatch"); PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req()); // election in progress? if (!mon->is_leader() && !mon->is_peon()) { dout(5) << "election in progress, dropping " << *m << dendl; return; } // check sanity assert(mon->is_leader() || (mon->is_peon() && m->get_source().num() == mon->get_leader())); switch (m->get_type()) { case MSG_MON_PAXOS: { MMonPaxos *pm = reinterpret_cast<MMonPaxos*>(m); // NOTE: these ops are defined in messages/MMonPaxos.h switch (pm->op) { // learner case MMonPaxos::OP_COLLECT: handle_collect(op); break; case MMonPaxos::OP_LAST: handle_last(op); break; case MMonPaxos::OP_BEGIN: handle_begin(op); break; case MMonPaxos::OP_ACCEPT: handle_accept(op); break; case MMonPaxos::OP_COMMIT: handle_commit(op); break; case MMonPaxos::OP_LEASE: handle_lease(op); break; //处理lease_ack case MMonPaxos::OP_LEASE_ACK: handle_lease_ack(op); break;
handle_lease_ack
void Paxos::handle_lease_ack(MonOpRequestRef op) { op->mark_paxos_event("handle_lease_ack"); MMonPaxos *ack = static_cast<MMonPaxos*>(op->get_req()); int from = ack->get_source().num(); if (!lease_ack_timeout_event) { dout(10) << "handle_lease_ack from " << ack->get_source() << " -- stray (probably since revoked)" << dendl; } else if (acked_lease.count(from) == 0) { acked_lease.insert(from); if (ack->feature_map.length()) { auto p = ack->feature_map.begin(); FeatureMap& t = mon->quorum_feature_map[from]; ::decode(t, p); } if (acked_lease == mon->get_quorum()) { // yay! //收到了所有的ack消息。取消掉lease超时事件 dout(10) << "handle_lease_ack from " << ack->get_source() << " -- got everyone" << dendl; mon->timer.cancel_event(lease_ack_timeout_event); lease_ack_timeout_event = 0; } else { dout(10) << "handle_lease_ack from " << ack->get_source() << " -- still need " << mon->get_quorum().size() - acked_lease.size() << " more" << dendl; } } else { //acked_lease集合里已有,重复ack,忽略 dout(10) << "handle_lease_ack from " << ack->get_source() << " dup (lagging!), ignoring" << dendl; } warn_on_future_time(ack->sent_timestamp, ack->get_source()); }
如果超时时间内没有收集到所有的ack消息
那么lease_ack_timeout()会被调用
lease_renew_event = mon->timer.add_event_at( at, new C_MonContext(mon, [this](int r) { if (r == -ECANCELED) return; lease_renew_timeout(); })); class C_MonContext final : public FunctionContext { const Monitor *mon; public: explicit C_MonContext(Monitor *m, boost::function<void(int)>&& callback) : FunctionContext(std::move(callback)), mon(m) {} void finish(int r) override; };
void Paxos::lease_ack_timeout(),发起选举
void Paxos::lease_ack_timeout() { dout(1) << "lease_ack_timeout -- calling new election" << dendl; assert(mon->is_leader()); assert(is_active()); logger->inc(l_paxos_lease_ack_timeout); lease_ack_timeout_event = 0; mon->bootstrap(); }
关于Leader节点差不多到底就结束了
下面是peon节点的lease处理
那么peon节点在收到leader节点发出的lease消息做了哪些处理呢?
void Paxos::handle_lease(MonOpRequestRef op) { op->mark_paxos_event("handle_lease"); MMonPaxos *lease = static_cast<MMonPaxos*>(op->get_req()); // sanity if (!mon->is_peon() || last_committed != lease->last_committed) { dout(10) << "handle_lease i'm not a peon, or they're not the leader," << " or the last_committed doesn't match, dropping" << dendl; op->mark_paxos_event("invalid lease, ignore"); return; } warn_on_future_time(lease->sent_timestamp, lease->get_source()); // extend lease //收到leader的lease消息后,根据lease消息来延长 if (lease_expire < lease->lease_timestamp) { lease_expire = lease->lease_timestamp; utime_t now = ceph_clock_now(); //如果lease延长后比当前时间还要早,告警,可能是monitor laggy或者是各monitor节点之间时间差距较大 if (lease_expire < now) { utime_t diff = now - lease_expire; derr << "lease_expire from " << lease->get_source_inst() << " is " << diff << " seconds in the past; mons are probably laggy (or possibly clocks are too skewed)" << dendl; } } state = STATE_ACTIVE; dout(10) << "handle_lease on " << lease->last_committed << " now " << lease_expire << dendl; // ack //发送ack消息到leader MMonPaxos *ack = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LEASE_ACK, ceph_clock_now()); ack->last_committed = last_committed; ack->first_committed = first_committed; ack->lease_timestamp = ceph_clock_now(); ::encode(mon->session_map.feature_map, ack->feature_map); lease->get_connection()->send_message(ack); // (re)set timeout event. //重新设置超时事件 reset_lease_timeout(); // kick waiters finish_contexts(g_ceph_context, waiting_for_active); if (is_readable()) finish_contexts(g_ceph_context, waiting_for_readable); }
那么当leader节点不给peon节点lease消息的时候,peon是如何触发选举的呢
关键就是void Paxos::reset_lease_timeout(),当超时时间内一直未收到leader的消息,则进行重新选举
void Paxos::reset_lease_timeout() { dout(20) << "reset_lease_timeout - setting timeout event" << dendl; if (lease_timeout_event) mon->timer.cancel_event(lease_timeout_event); lease_timeout_event = mon->timer.add_event_after( g_conf->mon_lease_ack_timeout_factor * g_conf->mon_lease, new C_MonContext(mon, [this](int r) { if (r == -ECANCELED) return; lease_timeout(); })); } void Paxos::lease_timeout() { dout(1) << "lease_timeout -- calling new election" << dendl; assert(mon->is_peon()); logger->inc(l_paxos_lease_timeout); lease_timeout_event = 0; mon->bootstrap(); }
行文至此,lease的主要工作工作机理已经清楚。
但还遗留着几个问题。
1.extend_lease由谁来调用呢?
2.消息传递的整个流程是怎么样的呢?
2.消息传递博客还在写。
1.问题首先可以看到extend_lease会调用lease_renew_timeout()函数,在其中又调用了extend_lease,以进行不间断的lease消息发送。
void Paxos::lease_renew_timeout() { lease_renew_event = 0; extend_lease(); }
可以看到
// leader void Paxos::handle_last(MonOpRequestRef op)
这个函数中调用了extend_lease()
那关于这个函数的作用呢,留在ceph paxos博客中再进行探讨
相关文章推荐
- ceph源码分析--monitor的lease机制
- Ceph Monitor源码机制分析(三)—— 选举
- Ceph Monitor源码机制分析(一)—— 概述
- Ceph Monitor源码机制分析(二)—— 初始化
- ceph源码分析--monitor leader选举
- ceph源码分析--Monitor对osd report进行报down处理
- 小伙伴们的ceph源码分析二——monitor启动流程
- ceph源码分析--Monitor对osd report进行报down处
- ceph源码分析--monitor leader选举
- ceph源码分析之消息通信机制
- Ceph学习——Ceph网络通信机制与源码分析
- ceph 数据校验机制 scrub源码分析
- 小伙伴们的ceph源码分析三——monitor消息处理流程
- Android消息机制源码分析
- android的消息处理机制(图+源码分析)——Looper,Handler,Message
- Handler消息机制的源码分析
- Android事件分发机制源码分析
- 分布式消息队列RocketMQ源码分析之3 -- Consumer负载均衡机制 -- Rebalance
- Android消息机制源码分析
- dubbo源码分析系列——dubbo的SPI机制源码分析