Ceph学习——客户端读写操作分析
2017-12-04 18:42
686 查看
客户端读写操作分析
本节设计到了Librados以及Osdc等操作,相关类如 RadosClient、Objecter、IoCtxImpl等介绍见上一节:Ceph学习——Librados与Osdc实现源码解析
1)调用rados_create()创建一个RadosClient对象。
2) 调用rados_config_read()读取配置文件。
3)调用rados_connect()函数,最终他会调用RadosClient::connect()来完成初始化
4)调用rados_ioctx_create(),它最终调用RadosClient::create_ioctx()穿件pool相关的IoCtxImpl类。
5)调用 rados_write 函数 想该pool中写入对象。调用了IoCtxImpl::write()。
写操作消息封装
int librados::IoCtxImpl::write(const object_t& oid, bufferlist& bl, size_t len, uint64_t off) { if (len > UINT_MAX/2) return -E2BIG; ::ObjectOperation op;//创建ObjectOperation对象 prepare_assert_ops(&op);//封装相关写操作 bufferlist mybl; mybl.substr_of(bl, 0, len); op.write(off, mybl); return operate(oid, &op, NULL);//调用operate处理 }
write中调用operate
int librados::IoCtxImpl::operate(const object_t& oid, ::ObjectOperation *o, ceph::real_time *pmtime, int flags) { ceph::real_time ut = (pmtime ? *pmtime : ceph::real_clock::now()); /* can't write to a snapshot */ if (snap_seq != CEPH_NOSNAP) return -EROFS; if (!o->size()) return 0; Mutex mylock("IoCtxImpl::operate::mylock"); Cond cond; bool done; int r; version_t ver; Context *oncommit = new C_SafeCond(&mylock, &cond, &done, &r); int op = o->ops[0].op.op; ldout(client->cct, 10) << ceph_osd_op_name(op) << " oid=" << oid << " nspace=" << oloc.nspace << dendl; //调用objecter->prepare_mutate_op把ObjectOperation封装为Op类型 Objecter::Op *objecter_op = objecter->prepare_mutate_op(oid, oloc, *o, snapc, ut, flags, oncommit, &ver); objecter->op_submit(objecter_op);//发送消息 mylock.Lock(); while (!done) cond.Wait(mylock); mylock.Unlock(); ldout(client->cct, 10) << "Objecter returned from " << ceph_osd_op_name(op) << " r=" << r << dendl; set_sync_op_version(ver); return r; }
发送消息 op_submit
该函数将封装好的Op操作通过网络发送出去。在op_submit中调用了_op_submit_with_budget用来处理Throttle相关的流量信息以及超时处理,最后该函数调用 _op_submit用来完成关键地址寻址和发送工作。函数
oid Objecter::op_submit(Op *op, ceph_tid_t *ptid, int *ctx_budget) { shunique_lock rl(rwlock, ceph::acquire_shared); ceph_tid_t tid = 0; if (!ptid) ptid = &tid; op->trace.event("op submit"); _op_submit_with_budget(op, rl, ptid, ctx_budget);//调用_op_submit_with_budget } void Op *op, shunique_lock& sul, ceph_tid_t *ptid, int *ctx_budget) { assert(initialized); assert(op->ops.size() == op->out_bl.size()); assert(op->ops.size() == op->out_rval.size()); assert(op->ops.size() == op->out_handler.size()); // throttle. before we look at any state, because // _take_op_budget() may drop our lock while it blocks. if (!op->ctx_budgeted || (ctx_budget && (*ctx_budget == -1))) { int op_budget = _take_op_budget(op, sul); // take and pass out the budget for the first OP // in the context session if (ctx_budget && (*ctx_budget == -1)) { *ctx_budget = op_budget; } } if (osd_timeout > timespan(0)) { if (op->tid == 0) op->tid = ++last_tid; auto tid = op->tid; op->ontimeout = timer.add_event(osd_timeout, [this, tid]() { op_cancel(tid, -ETIMEDOUT); }); } _op_submit(op, sul, ptid); } void Objecter::_op_submit(Op *op, shunique_lock& sul, ceph_tid_t *ptid) { // rwlock is locked ldout(cct, 10) << __func__ << " op " << op << dendl; // pick target assert(op->session == NULL); OSDSession *s = NULL; //调用_calc_target 来计算目标OSD bool check_for_latest_map = _calc_target(&op->target, nullptr) == RECALC_OP_TARGET_POOL_DNE; // Try to get a session, including a retry if we need to take write lock //调用函数 _get_session 获取目标OSD的连接,如果返回-EAGAIN,就升级为写锁,重新获取 int r = _get_session(op->target.osd, &s, sul); if (r == -EAGAIN || (check_for_latest_map && sul.owns_lock_shared())) { epoch_t orig_epoch = osdmap->get_epoch(); sul.unlock(); if (cct->_conf->objecter_debug_inject_relock_delay) { sleep(1); } sul.lock(); if (orig 4000 _epoch != osdmap->get_epoch()) { // map changed; recalculate mapping ldout(cct, 10) << __func__ << " relock raced with osdmap, recalc target" << dendl; //调用_calc_target 来计算目标OSD check_for_latest_map = _calc_target(&op->target, nullptr) == RECALC_OP_TARGET_POOL_DNE; if (s) { put_session(s); s = NULL; r = -EAGAIN; } } } if (r == -EAGAIN) { assert(s == NULL); r = _get_session(op->target.osd, &s, sul); } assert(r == 0); assert(s); // may be homeless _send_op_account(op); // send? assert(op->target.flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)); if (osdmap_full_try) { op->target.flags |= CEPH_OSD_FLAG_FULL_TRY; } bool need_send = false; //判断当前状态,如果可以发送请求就调用 函数_prepare_osd_op 准备请求消息,调用函数_send_op发送消息 if (osdmap->get_epoch() < epoch_barrier) { ldout(cct, 10) << " barrier, paused " << op << " tid " << op->tid << dendl; op->target.paused = true; _maybe_request_map(); } else if ((op->target.flags & CEPH_OSD_FLAG_WRITE) && osdmap->test_flag(CEPH_OSDMAP_PAUSEWR)) { ldout(cct, 10) << " paused modify " << op << " tid " << op->tid << dendl; op->target.paused = true; _maybe_request_map(); } else if ((op->target.flags & CEPH_OSD_FLAG_READ) && osdmap->test_flag(CEPH_OSDMAP_PAUSERD)) { ldout(cct, 10) << " paused read " << op << " tid " << op->tid << dendl; op->target.paused = true; _maybe_request_map(); } else if (op->respects_full() && (_osdmap_full_flag() || _osdmap_pool_full(op->target.base_oloc.pool))) { ldout(cct, 0) << " FULL, paused modify " << op << " tid " << op->tid << dendl; op->target.paused = true; _maybe_request_map(); } else if (!s->is_homeless()) { need_send = true; } else { _maybe_request_map(); } //如果可以发送请求就调用 函数_prepare_osd_op 准备请求消息 MOSDOp *m = NULL; if (need_send) { m = _prepare_osd_op(op); } OSDSession::unique_lock sl(s->lock); if (op->tid == 0) op->tid = ++last_tid; ldout(cct, 10) << "_op_submit oid " << op->target.base_oid << " '" << op->target.base_oloc << "' '" << op->target.target_oloc << "' " << op->ops << " tid " << op->tid << " osd." << (!s->is_homeless() ? s->osd : -1) << dendl; _session_op_assign(s, op); //如果可以发送请求就调用,调用函数_send_op发送消息 if (need_send) { _send_op(op, m); } // Last chance to touch Op here, after giving up session lock it can // be freed at any time by response handler. ceph_tid_t tid = op->tid; if (check_for_latest_map) { _send_op_map_check(op); } if (ptid) *ptid = tid; op = NULL; sl.unlock(); put_session(s); ldout(cct, 5) << num_in_flight << " in flight" << dendl; }
对象寻址 _calc_target
int Objecter::_calc_target(op_target_t *t, Connection *con, bool any_change) { .... .... .... const pg_pool_t *pi = osdmap->get_pg_pool(t->base_oloc.pool); if (!pi) { t->osd = -1; return RECALC_OP_TARGET_POOL_DNE; } ldout(cct,30) << __func__ << " base pi " << pi << " pg_num " << pi->get_pg_num() << dendl; bool force_resend = false; if (osdmap->get_epoch() == pi->last_force_op_resend) { if (t->last_force_resend < pi->last_force_op_resend) { t->last_force_resend = pi->last_force_op_resend; force_resend = true; } else if (t->last_force_resend == 0) { force_resend = true; } } // apply tiering t->target_oid = t->base_oid; t->target_oloc = t->base_oloc; if ((t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) { if (is_read && pi->has_read_tier()) t->target_oloc.pool = pi->read_tier; if (is_write && pi->has_write_tier()) t->target_oloc.pool = pi->write_tier; pi = osdmap->get_pg_pool(t->target_oloc.pool); if (!pi) { t->osd = -1; return RECALC_OP_TARGET_POOL_DNE; } } pg_t pgid; if (t->precalc_pgid) { assert(t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY); assert(t->base_oid.name.empty()); // make sure this is a pg op assert(t->base_oloc.pool == (int64_t)t->base_pgid.pool()); pgid = t->base_pgid; } else { //!!!!!!!!!!!!!!!!获取目标对象所在的PG!!!!!!!!!!!!!!!!!!!!!!!!!!! int ret = osdmap->object_locator_to_pg(t->target_oid, t->target_oloc, pgid); if (ret == -ENOENT) { t->osd = -1; return RECALC_OP_TARGET_POOL_DNE; } } ldout(cct,20) << __func__ << " target " << t->target_oid << " " << t->target_oloc << " -> pgid " << pgid << dendl; ldout(cct,30) << __func__ << " target pi " << pi << " pg_num " << pi->get_pg_num() << dendl; t->pool_ever_existed = true; int size = pi->size; int min_size = pi->min_size; unsigned pg_num = pi->get_pg_num(); int up_primary, acting_primary; vector<int> up, acting; //!!!!!!!!!!!!!!!通过CRUSH算法,获取该PG对应的OSD列表!!!!!!!!!!!!!!!!! osdmap->pg_to_up_acting_osds(pgid, &up, &up_primary, &acting, &acting_primary); bool sort_bitwise = osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE); bool recovery_deletes = osdmap->test_flag(CEPH_OSDMAP_RECOVERY_DELETES); unsigned prev_seed = ceph_stable_mod(pgid.ps(), t->pg_num, t->pg_num_mask); pg_t prev_pgid(prev_seed, pgid.pool()); .... .... .... return RECALC_OP_TARGET_NO_ACTION; }
相关文章推荐
- ceph源码分析之读写操作流程(1)
- Ceph学习——Librbd块存储库与RBD读写流程源码分析
- ceph源码分析之读写操作流程(2)
- Ceph 学习——OSD读写流程与源码分析(一)
- 一个非常非常非常基础的程序,写的不好,但是一般的文件读写操作及字符处理函数都涉及到了..新手学习用的
- 【学习日记】文件的读写操作
- Qt实现GUI的二进制文件读写操作(源码分析+工程打包+测试例子)
- QT sqlite3数据库读取、容器操作、文件读写——学习笔记。
- C++ Primer 学习笔记_29_STL实践与分析(3) --顺序容器的操作(下)
- BT源代码学习心得(十三):客户端源代码分析(对等客户的连接建立及其握手协议) 转自CSDN:gushenghua的专栏
- Ceph 学习——CRUSH算法及源码分析(二)
- Ceph OS模块介绍及读写流程分析
- Snail—OC学习之文件操作(非读写)
- ASP.NET MVC WebApi 返回数据类型序列化控制(json,xml) 用javascript在客户端删除某一个cookie键值对 input点击链接另一个页面,各种操作。 C# 往线程里传参数的方法总结 TCP/IP 协议 用C#+Selenium+ChromeDriver 生成我的咕咚跑步路线地图 (转)值得学习百度开源70+项目
- Java并发学习(六)-深入分析CAS操作
- 【python学习笔记】pthon3.x中的文件读写操作
- 学习笔记--- S3C2440 对NANDFLASH操作原理与测试代码分析
- opencv 学习笔记2—XML读写操作
- Windows 8 学习笔记(二十三)--WritableBitmap的读写操作C++
- BT源代码学习心得(九):客户端源代码分析(图形界面浅析) -- 转贴自 wolfenstein (NeverSayNever)