您的位置:首页 > 其它

Ceph学习——客户端读写操作分析

2017-12-04 18:42 686 查看

客户端读写操作分析

本节设计到了Librados以及Osdc等操作,相关类如 RadosClient、Objecter、IoCtxImpl等介绍见上一节:

Ceph学习——Librados与Osdc实现源码解析



1)调用rados_create()创建一个RadosClient对象。

2) 调用rados_config_read()读取配置文件。

3)调用rados_connect()函数,最终他会调用RadosClient::connect()来完成初始化

4)调用rados_ioctx_create(),它最终调用RadosClient::create_ioctx()穿件pool相关的IoCtxImpl类。

5)调用 rados_write 函数 想该pool中写入对象。调用了IoCtxImpl::write()。



写操作消息封装

int librados::IoCtxImpl::write(const object_t& oid, bufferlist& bl,
size_t len, uint64_t off)
{
if (len > UINT_MAX/2)
return -E2BIG;
::ObjectOperation op;//创建ObjectOperation对象
prepare_assert_ops(&op);//封装相关写操作
bufferlist mybl;
mybl.substr_of(bl, 0, len);
op.write(off, mybl);
return operate(oid, &op, NULL);//调用operate处理
}


write中调用operate

int librados::IoCtxImpl::operate(const object_t& oid, ::ObjectOperation *o,
ceph::real_time *pmtime, int flags)
{
ceph::real_time ut = (pmtime ? *pmtime :
ceph::real_clock::now());

/* can't write to a snapshot */
if (snap_seq != CEPH_NOSNAP)
return -EROFS;

if (!o->size())
return 0;

Mutex mylock("IoCtxImpl::operate::mylock");
Cond cond;
bool done;
int r;
version_t ver;

Context *oncommit = new C_SafeCond(&mylock, &cond, &done, &r);

int op = o->ops[0].op.op;
ldout(client->cct, 10) << ceph_osd_op_name(op) << " oid=" << oid
<< " nspace=" << oloc.nspace << dendl;
//调用objecter->prepare_mutate_op把ObjectOperation封装为Op类型
Objecter::Op *objecter_op = objecter->prepare_mutate_op(oid, oloc,
*o, snapc, ut, flags,
oncommit, &ver);
objecter->op_submit(objecter_op);//发送消息

mylock.Lock();
while (!done)
cond.Wait(mylock);
mylock.Unlock();
ldout(client->cct, 10) << "Objecter returned from "
<< ceph_osd_op_name(op) << " r=" << r << dendl;

set_sync_op_version(ver);

return r;
}


发送消息 op_submit

该函数将封装好的Op操作通过网络发送出去。在op_submit中调用了_op_submit_with_budget用来处理Throttle相关的流量信息以及超时处理,最后该函数调用 _op_submit用来完成关键地址寻址和发送工作。

函数

oid Objecter::op_submit(Op *op, ceph_tid_t *ptid, int *ctx_budget)
{
shunique_lock rl(rwlock, ceph::acquire_shared);
ceph_tid_t tid = 0;
if (!ptid)
ptid = &tid;
op->trace.event("op submit");
_op_submit_with_budget(op, rl, ptid, ctx_budget);//调用_op_submit_with_budget
}

void Op *op, shunique_lock& sul,
ceph_tid_t *ptid,
int *ctx_budget)
{
assert(initialized);

assert(op->ops.size() == op->out_bl.size());
assert(op->ops.size() == op->out_rval.size());
assert(op->ops.size() == op->out_handler.size());

// throttle.  before we look at any state, because
// _take_op_budget() may drop our lock while it blocks.
if (!op->ctx_budgeted || (ctx_budget && (*ctx_budget == -1))) {
int op_budget = _take_op_budget(op, sul);
// take and pass out the budget for the first OP
// in the context session
if (ctx_budget && (*ctx_budget == -1)) {
*ctx_budget = op_budget;
}
}

if (osd_timeout > timespan(0)) {
if (op->tid == 0)
op->tid = ++last_tid;
auto tid = op->tid;
op->ontimeout = timer.add_event(osd_timeout,
[this, tid]() {
op_cancel(tid, -ETIMEDOUT); });
}

_op_submit(op, sul, ptid);
}

void Objecter::_op_submit(Op *op, shunique_lock& sul, ceph_tid_t *ptid)
{
// rwlock is locked

ldout(cct, 10) << __func__ << " op " << op << dendl;

// pick target
assert(op->session == NULL);
OSDSession *s = NULL;
//调用_calc_target 来计算目标OSD
bool check_for_latest_map = _calc_target(&op->target, nullptr)
== RECALC_OP_TARGET_POOL_DNE;

// Try to get a session, including a retry if we need to take write lock
//调用函数 _get_session 获取目标OSD的连接,如果返回-EAGAIN,就升级为写锁,重新获取
int r = _get_session(op->target.osd, &s, sul);
if (r == -EAGAIN ||
(check_for_latest_map && sul.owns_lock_shared())) {
epoch_t orig_epoch = osdmap->get_epoch();
sul.unlock();
if (cct->_conf->objecter_debug_inject_relock_delay) {
sleep(1);
}
sul.lock();
if (orig
4000
_epoch != osdmap->get_epoch()) {
// map changed; recalculate mapping
ldout(cct, 10) << __func__ << " relock raced with osdmap, recalc target"
<< dendl;
//调用_calc_target 来计算目标OSD
check_for_latest_map = _calc_target(&op->target, nullptr)
== RECALC_OP_TARGET_POOL_DNE;
if (s) {
put_session(s);
s = NULL;
r = -EAGAIN;
}
}
}
if (r == -EAGAIN) {
assert(s == NULL);
r = _get_session(op->target.osd, &s, sul);
}
assert(r == 0);
assert(s);  // may be homeless

_send_op_account(op);

// send?

assert(op->target.flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE));

if (osdmap_full_try) {
op->target.flags |= CEPH_OSD_FLAG_FULL_TRY;
}

bool need_send = false;
//判断当前状态,如果可以发送请求就调用 函数_prepare_osd_op 准备请求消息,调用函数_send_op发送消息
if (osdmap->get_epoch() < epoch_barrier) {
ldout(cct, 10) << " barrier, paused " << op << " tid " << op->tid
<< dendl;
op->target.paused = true;
_maybe_request_map();
} else if ((op->target.flags & CEPH_OSD_FLAG_WRITE) &&
osdmap->test_flag(CEPH_OSDMAP_PAUSEWR)) {
ldout(cct, 10) << " paused modify " << op << " tid " << op->tid
<< dendl;
op->target.paused = true;
_maybe_request_map();
} else if ((op->target.flags & CEPH_OSD_FLAG_READ) &&
osdmap->test_flag(CEPH_OSDMAP_PAUSERD)) {
ldout(cct, 10) << " paused read " << op << " tid " << op->tid
<< dendl;
op->target.paused = true;
_maybe_request_map();
} else if (op->respects_full() &&
(_osdmap_full_flag() ||
_osdmap_pool_full(op->target.base_oloc.pool))) {
ldout(cct, 0) << " FULL, paused modify " << op << " tid "
<< op->tid << dendl;
op->target.paused = true;
_maybe_request_map();
} else if (!s->is_homeless()) {
need_send = true;
} else {
_maybe_request_map();
}
//如果可以发送请求就调用 函数_prepare_osd_op 准备请求消息
MOSDOp *m = NULL;
if (need_send) {
m = _prepare_osd_op(op);
}

OSDSession::unique_lock sl(s->lock);
if (op->tid == 0)
op->tid = ++last_tid;

ldout(cct, 10) << "_op_submit oid " << op->target.base_oid
<< " '" << op->target.base_oloc << "' '"
<< op->target.target_oloc << "' " << op->ops << " tid "
<< op->tid << " osd." << (!s->is_homeless() ? s->osd : -1)
<< dendl;

_session_op_assign(s, op);
//如果可以发送请求就调用,调用函数_send_op发送消息
if (need_send) {
_send_op(op, m);
}

// Last chance to touch Op here, after giving up session lock it can
// be freed at any time by response handler.
ceph_tid_t tid = op->tid;
if (check_for_latest_map) {
_send_op_map_check(op);
}
if (ptid)
*ptid = tid;
op = NULL;

sl.unlock();
put_session(s);

ldout(cct, 5) << num_in_flight << " in flight" << dendl;
}


对象寻址 _calc_target

int Objecter::_calc_target(op_target_t *t, Connection *con, bool any_change)
{
....
....
....
const pg_pool_t *pi = osdmap->get_pg_pool(t->base_oloc.pool);
if (!pi) {
t->osd = -1;
return RECALC_OP_TARGET_POOL_DNE;
}
ldout(cct,30) << __func__ << "  base pi " << pi
<< " pg_num " << pi->get_pg_num() << dendl;

bool force_resend = false;
if (osdmap->get_epoch() == pi->last_force_op_resend) {
if (t->last_force_resend < pi->last_force_op_resend) {
t->last_force_resend = pi->last_force_op_resend;
force_resend = true;
} else if (t->last_force_resend == 0) {
force_resend = true;
}
}

// apply tiering
t->target_oid = t->base_oid;
t->target_oloc = t->base_oloc;
if ((t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
if (is_read && pi->has_read_tier())
t->target_oloc.pool = pi->read_tier;
if (is_write && pi->has_write_tier())
t->target_oloc.pool = pi->write_tier;
pi = osdmap->get_pg_pool(t->target_oloc.pool);
if (!pi) {
t->osd = -1;
return RECALC_OP_TARGET_POOL_DNE;
}
}

pg_t pgid;
if (t->precalc_pgid) {
assert(t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY);
assert(t->base_oid.name.empty()); // make sure this is a pg op
assert(t->base_oloc.pool == (int64_t)t->base_pgid.pool());
pgid = t->base_pgid;
} else {
//!!!!!!!!!!!!!!!!获取目标对象所在的PG!!!!!!!!!!!!!!!!!!!!!!!!!!!
int ret = osdmap->object_locator_to_pg(t->target_oid, t->target_oloc,
pgid);
if (ret == -ENOENT) {
t->osd = -1;
return RECALC_OP_TARGET_POOL_DNE;
}
}
ldout(cct,20) << __func__ << " target " << t->target_oid << " "
<< t->target_oloc << " -> pgid " << pgid << dendl;
ldout(cct,30) << __func__ << "  target pi " << pi
<< " pg_num " << pi->get_pg_num() << dendl;
t->pool_ever_existed = true;

int size = pi->size;
int min_size = pi->min_size;
unsigned pg_num = pi->get_pg_num();
int up_primary, acting_primary;
vector<int> up, acting;
//!!!!!!!!!!!!!!!通过CRUSH算法,获取该PG对应的OSD列表!!!!!!!!!!!!!!!!!
osdmap->pg_to_up_acting_osds(pgid, &up, &up_primary,
&acting, &acting_primary);
bool sort_bitwise = osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE);
bool recovery_deletes = osdmap->test_flag(CEPH_OSDMAP_RECOVERY_DELETES);
unsigned prev_seed = ceph_stable_mod(pgid.ps(), t->pg_num, t->pg_num_mask);
pg_t prev_pgid(prev_seed, pgid.pool());
....
....
....

return RECALC_OP_TARGET_NO_ACTION;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
相关文章推荐