您的位置:首页 > 其它

ceph源码分析之消息通信机制

2015-01-20 10:33 591 查看
在介绍ceph的读写流程时,我们流程的开始于OSD::_ms_dispatcher,这个函数的意义表示,osd拿到了消息,并要根据不同的表示对消息进行dispatcher,那么消息究竟是如何来的,这个之前没有进行任何介绍,本章主要就是对此进行梳理。本文引用参考了《解析ceph:网络层的处理》这篇文章

现在大多的网络编程中,都会使用基于事件通知的异步网络IO方式来实现,比如Epoll和Kqueue。Ceph项目开始的比较早,使用的是poll。在终端采用了两个读写线程pipe::
reader_thread和pipe::writer_thread来处理读写事件。读线程在得到请求后会解析网络流并重构消息,然后派发给后端的dispatcher队列,后端的dispatcher线程负责将队列中的消息进行分发。写线程等到有模块调用send_message时会被wakeup然后处理。

Ceph在目前的网络层面上有三个重要概念,分别是
Messenger,Pipe,Connection。Messenger实际上可以理解为一个监听地址和多个连接的集合。比如每个OSD
中会有cluster_messenger
和public_messenger,顾名思义cluster_messenger
负责给OSD
与其他OSD
和Monitor
的通信并提供了一个监听地址,public_messenger负责与客户端的通信并提供了一个面向客户端的监听地址。因此cluster_messenger
中负责的连接会全部是面向其他OSD
或者Monitor
的连接。Pipe实际上是一个
Session 的载体,为了解决网络连接不稳定或者临时闪断连接的问题,Pipe会一直维护面向一个终端地址的会话状态,如类似
TCP 包序号的消息序号和发送队列。Connection
就是一个 socket
的 wrapper,它从属于某一个
Pipe。(引用)

下图是麦子迈文章中的逻辑图



上图是一个OSD端的网络逻辑。OSD继承自Dispatcher类,它其中有SimpleMessenger类的成员变量cluster_messenger和client_messenger。
SimpleMessenger类中有Accepter接收类,DispatchQueue派发类和成员为Pipe类的set。
Accepter中线程负责将图中Listen送入Pipe
set中。
DispatchQueue类中线程负责将Pipe
set中的数据拿出来处理,调用它中的Dispatcher类的成员函数ms_dispatcher将数据交给后端(Dispatcher类就是OSD,在创建SimpleMessenger就把自己传入了)
每个Pipe类中又各有一个writer_thread和reader_thread线程,负责创建Connection与外界进行会话。
以下分别对应了socket函数对应实现地点:
::socket------Accepter::bind
::bind------Accepter::bind
::listen------Accepter::bind
::accept------Accepter::entry
::send------Pipe::tcp_write
::recv------Pipe::tcp_read_nonblocking
::close------Accepter::stop
先看Accepter类,
class Accepter : public Thread {
SimpleMessenger *msgr;
bool done;
int listen_sd;
uint64_t nonce;
public:
Accepter(SimpleMessenger *r, uint64_t n) : msgr(r), done(false), listen_sd(-1), nonce(n) {}
void *entry();//入口函数,主要处理socket文件的accept操作
void stop();//关闭socket
int bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports);//创建socket文件,bind,listen
int rebind(const set<int>& avoid_port);
int start();//调用Thread::create函数创建线程
};
int Accepter::bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports)
{
......
/* socket creation */
listen_sd = ::socket(family, SOCK_STREAM, 0);

/* bind to port */
int rc = -1;
if (listen_addr.get_port()) {
// specific port

// reuse addr+port when possible
int on = 1;
rc = ::setsockopt(listen_sd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
.....

rc = ::bind(listen_sd, (struct sockaddr *) &listen_addr.ss_addr(), listen_addr.addr_size());//bind socket文件
......
} else {
// try a range of ports
for (int port = msgr->cct->_conf->ms_bind_port_min; port <= msgr->cct->_conf->ms_bind_port_max; port++) {
if (avoid_ports.count(port))
continue;
listen_addr.set_port(port);
rc = ::bind(listen_sd, (struct sockaddr *) &listen_addr.ss_addr(), listen_addr.addr_size());//bind socket文件
}
......
}
......

// listen!
rc = ::listen(listen_sd, 128);
......
msgr->set_myaddr(bind_addr);
entity_addr_t addr = msgr->get_myaddr();
addr.nonce = nonce;
msgr->set_myaddr(addr);
msgr->init_local_connection();
return 0;
}
void *Accepter::entry()
{
......
struct pollfd pfd;
pfd.fd = listen_sd;
pfd.events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
while (!done) {
int r = poll(&pfd, 1, -1);//调用poll函数开始监控
if (r < 0)
break;

if (pfd.revents & (POLLERR | POLLNVAL | POLLHUP))
break;

if (done) break;

// accept
entity_addr_t addr;
socklen_t slen = sizeof(addr.ss_addr());
int sd = ::accept(listen_sd, (sockaddr*)&addr.ss_addr(), &slen);
if (sd >= 0) {
errors = 0;
ldout(msgr->cct,10) << "accepted incoming on sd " << sd << dendl;
msgr->add_accept_pipe(sd);//accept成功之后,将消息加入到管道之中,等待处理
} else {
ldout(msgr->cct,0) << "accepter no incoming connection?  sd = " << sd
<< " errno " << errno << " " << strerror_r(errno, buf, sizeof(buf)) << dendl;
if (++errors > 4)
break;
}
}

ldout(msgr->cct,20) << "accepter closing" << dendl;
// don't close socket, in case we start up again?  blech.
if (listen_sd >= 0) {
::close(listen_sd);
listen_sd = -1;
}
ldout(msgr->cct,10) << "accepter stopping" << dendl;
return 0;
}


由代码我们可以看出,在进程开始时会调用Accepter::start来创建线程,之后进程会调用Accepter::bind用来初始化出listen_sd,之后线程通过entry函数发现listen_sd有活动,便会调用::accept函数将socket接管下来,并调用SimpleMessenger::add_accetp_pipe,此函数会通过Pipe::start_reader函数开始读线程,来处理acceptsocket文件,并送入SimpleMessenger的Pipe活跃队列中。
Pipe *SimpleMessenger::add_accept_pipe(int sd)
{
lock.Lock();
Pipe *p = new Pipe(this, Pipe::STATE_ACCEPTING, NULL);
p->sd = sd;
p->pipe_lock.Lock();
p->start_reader();    //启动读线程用来读socket
p->pipe_lock.Unlock();
pipes.insert(p);
accepting_pipes.insert(p);
lock.Unlock();
return p;
}


下面我们来看Pipe类,该类的主要功能就是作为前端Connetion和后端Dispatcher的中间层,其中拥有读写线程pipe::
reader_thread和pipe::writer_thread,他们的入口函数分别为Pipe::reader和Pipe::writer函数。
先说读线程,上面提到调用了SimpleMessenger调用了Pipe::start_reader函数来启动reader_pthread,该函数主要调用Thread::create创建了线程,入口函数为Pipe::reader函数。该函数首先调用Pipe::accept做处理。之后进入消息主循环。在循环中判定tag类型,当是一个CEPH_MSGR_TAG_MSG时,认为是ceph内部消息,创建一个connection。当是一个CEPH_MSGR_TAG_ACK
时,就调用Pipe::tcp_read函数来读取信息处理。
void Pipe::reader()
{
if (state == STATE_ACCEPTING) {
accept();//ywy:读线程首先响应socket,在做循环等待读消息
assert(pipe_lock.is_locked());
}
// loop.
while (state != STATE_CLOSED &&
state != STATE_CONNECTING) {
assert(pipe_lock.is_locked());
......

// open ...
if (tag == CEPH_MSGR_TAG_ACK) {
......
int rc = tcp_read((char*)&seq, sizeof(seq));
......
continue;
}

else if (tag == CEPH_MSGR_TAG_MSG) {
...
Message *m = 0;
int r = read_message(&m);
......
m->set_connection(connection_state.get());//创建一个connection
......
}

else if (tag == CEPH_MSGR_TAG_CLOSE) {
......
}
}
......
}

再说写线程,之前说到Pipe::accept函数会处理,之中走到一些条件分支时就会调用Pipe::start_writer函数来启动writer_pthread。该函数主要调用Thread::create创建了线程,入口函数为Pipe::writer函数。该函数调用后,写线程进入了主循环。

至此Accepter类介绍完毕了,下面我们来介绍SimpleMessenger中的另外一个重要类成员DispatchQueue。DispatchQueue类中也拥有一个线程dispatch_thread,他是专门用来进行消息分发的,入口函数为DispatchQueue::entry,该线程进入主循环后,将在调用SimpleMessenger::ms_ms_deliver_dispatch,此函数又会调用Dispatcher::ms_dispatch,此时就会依次从队列中拿出消息分发调用。用的队列是SimpleMessenger::dispatchers,类型为list<Dispatcher*>。

介绍完SimpleMessenger,Pipe。最后我们来介绍一下连接Connection.
Connection定义了一个抽象的连接,用来维持每个连接之间的状态。它其中包含了后端的Pipe,包括了对面连接的Connection,对面的地址等等参数。每一个Pipe在初始化的时候,便会同时new出一个Connection,相当于Pipe的一个身份标示,有了它,就知道对面连接的是谁,在发送消息的时候可以指定消息体Message和消息身份表示Connection。
这里提到了另一个结构体Message,这个是所有消息结构的父类,这个结构体比较好理解,就是包含了消息的头和消息等。所有的消息都是继承这个类。

我们知道了当一个实体想要主动给别人发消息时,一般会调用SimpleMessenger::send_message,传入参数为消息体Message和一个connection,这个connection是调用SimpleMessenger::get_connection函数,此函数会先查找有没有已经存在的对方地址为指定地址的pipe,如果有,直接返回pipe中的connection,如果没有的话,就new一个connection。

ConnectionRef SimpleMessenger::get_connection(const entity_inst_t& dest)
{
Mutex::Locker l(lock);
if (my_inst.addr == dest.addr) {
// local
return local_connection;
}

// remote
while (true) {
Pipe *pipe = _lookup_pipe(dest.addr);
if (pipe) {
ldout(cct, 10) << "get_connection " << dest << " existing " << pipe << dendl;
} else {
pipe = connect_rank(dest.addr, dest.name.type(), NULL, NULL);
ldout(cct, 10) << "get_connection " << dest << " new " << pipe << dendl;
}
Mutex::Locker l(pipe->pipe_lock);
if (pipe->connection_state)
return pipe->connection_state;
// we failed too quickly!  retry.  FIXME.
}
}

总结来看SimpleMessenger是一个有本实体开始到别处实体的发送消息的集合,它里面包含了许多个pipe,每一个对方不同的地址就对应一个pipe,pipe创建了一个connection,拥有它就相当于有了一个可以操作的标示符。同时message是一个包含消息内容的实体。通过SimpleMessenger::send_message就可以把消息发出,而对面就会送入pipe中由pipe交给DispatchQueue然后交给了后端。全部的这些东西合起来,各实体之间就可以进行读写沟通了。
具体的关于消息机制的外部接口实例可以查看src/test/Testmsgr.cc文件。

最后,在SimpleMessenger中有一个收割者线程reaper_thread,顾名思义这个线程不停的主循环,找到无效的pipe,然后干掉它,回收线程资源。



缺点:每增加一个实体,对应就要出现一个pipe,而一个pipe就要创建好几个线程。线程的增加会导致严重的
Context Switch 损耗,线程级的Context Switch
大概在 us
级别,会影响延迟敏感性应用的性能并且对系统造成资源压力。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐