您的位置:首页 > 运维架构

【OVS2.5.0源码分析】openflow连接实现分析(3)

2016-08-28 08:07 597 查看
上两篇分析了controller连接建立的完整流程,本篇分析下接收openflow报文的过程,openflow报文的处理在后续文章中分析。 整个调用过程,可以根据上一篇的对象关系图得出,本篇只是把调用关系细化到函数。

1、connmgr_run函数

    if (mgr->in_band) {
        if (!in_band_run(mgr->in_band)) {
            in_band_destroy(mgr->in_band);
            mgr->in_band = NULL;
        }
    }

    LIST_FOR_EACH_SAFE (ofconn, next_ofconn, node, &mgr->all_conns) {
        ofconn_run(ofconn, handle_openflow); //all_conns有两部分组成:controller连接和ofservice
    }
    ofmonitor_run(mgr);

    /* Fail-open maintenance.  Do this after processing the ofconns since
     * fail-open checks the status of the controller rconn. */
    if (mgr->fail_open) {
        fail_open_run(mgr->fail_open);
    }

2、ofconn_run函数
static void
ofconn_run(struct ofconn *ofconn,
void (*handle_openflow)(struct ofconn *,
const struct ofpbuf *ofp_msg))
{
struct connmgr *mgr = ofconn->connmgr;
size_t i;

for (i = 0; i < N_SCHEDULERS; i++) {
struct ovs_list txq;

pinsched_run(ofconn->schedulers[i], &txq);
do_send_packet_ins(ofconn, &txq);
}

rconn_run(ofconn->rconn); //该调用链为 vconn_run->stream_run, 对于tcp的连接,主要行为是判断是否有发送报文,有则发送之

/* Limit the number of iterations to avoid starving other tasks. */
for (i = 0; i < 50 && ofconn_may_recv(ofconn); i++) {
struct ofpbuf *of_msg = rconn_recv(ofconn->rconn); //接收报文
if (!of_msg) {
break;
}

if (mgr->fail_open) {
fail_open_maybe_recover(mgr->fail_open);
}

handle_openflow(ofconn, of_msg);
ofpbuf_delete(of_msg);
}

if (time_msec() >= ofconn->next_op_report) {
ofconn_log_flow_mods(ofconn);
}

ovs_mutex_lock(&ofproto_mutex);
if (!rconn_is_alive(ofconn->rconn)) {
ofconn_destroy(ofconn);
} else if (!rconn_is_connected(ofconn->rconn)) {
ofconn_flush(ofconn);
}
ovs_mutex_unlock(&ofproto_mutex);
}

3、rconn_receive函数
struct ofpbuf *
rconn_recv(struct rconn *rc)
OVS_EXCLUDED(rc->mutex)
{
struct ofpbuf *buffer = NULL;

ovs_mutex_lock(&rc->mutex);
if (rc->state & (S_ACTIVE | S_IDLE)) {
int error = vconn_recv(rc->vconn, &buffer); //调用vconn receive函数
if (!error) {
copy_to_monitor(rc, buffer);
if (rc->probably_admitted || is_admitted_msg(buffer)
|| time_now() - rc->last_connected >= 30) {
rc->probably_admitted = true;
rc->last_admitted = time_now();
}
rc->last_activity = time_now();
if (rc->state == S_IDLE) {
state_transition(rc, S_ACTIVE);
}
} else if (error != EAGAIN) {
report_error(rc, error);
disconnect(rc, error);
}
}
ovs_mutex_unlock(&rc->mutex);

return buffer;
}

4、vconn_receive函数
int
vconn_recv(struct vconn *vconn, struct ofpbuf **msgp)
{
struct ofpbuf *msg;
int retval;

retval = vconn_connect(vconn);
if (!retval) {
retval = do_recv(vconn, &msg); //调用
}
if (!retval && !vconn->recv_any_version) {
const struct ofp_header *oh = msg->data;
if (oh->version != vconn->version) {
enum ofptype type;

if (ofptype_decode(&type, msg->data)
|| (type != OFPTYPE_HELLO &&
type != OFPTYPE_ERROR &&
type != OFPTYPE_ECHO_REQUEST &&
type != OFPTYPE_ECHO_REPLY)) {
struct ofpbuf *reply;

VLOG_ERR_RL(&bad_ofmsg_rl, "%s: received OpenFlow version "
"0x%02"PRIx8" != expected %02x",
vconn->name, oh->version, vconn->version);

/* Send a "bad version" reply, if we can. */
reply = ofperr_encode_reply(OFPERR_OFPBRC_BAD_VERSION, oh);
retval = vconn_send(vconn, reply);
if (retval) {
VLOG_INFO_RL(&bad_ofmsg_rl,
"%s: failed to queue error reply (%s)",
vconn->name, ovs_strerror(retval));
ofpbuf_delete(reply);
}

/* Suppress the received message, as if it had not arrived. */
retval = EAGAIN;
ofpbuf_delete(msg);
}
}
}

*msgp = retval ? NULL : msg;
return retval;
}

5、do_receive函数
static int
do_recv(struct vconn *vconn, struct ofpbuf **msgp)
{
int retval = (vconn->vclass->recv)(vconn, msgp); //实际调用vconn_stream_recv函数
if (!retval) {
COVERAGE_INC(vconn_received);
if (VLOG_IS_DBG_ENABLED()) {
char *s = ofp_to_string((*msgp)->data, (*msgp)->size, 1);
VLOG_DBG_RL(&ofmsg_rl, "%s: received: %s", vconn->name, s);
free(s);
}
}
return retval;
}

6、vconn_stream_recv函数
static int
vconn_stream_recv(struct vconn *vconn, struct ofpbuf **bufferp)
{
struct vconn_stream *s = vconn_stream_cast(vconn);
const struct ofp_header *oh;
int rx_len;

/* Allocate new receive buffer if we don't have one. */
if (s->rxbuf == NULL) {
s->rxbuf = ofpbuf_new(1564);
}

/* Read ofp_header. */
if (s->rxbuf->size < sizeof(struct ofp_header)) {
int retval = vconn_stream_recv__(s, sizeof(struct ofp_header));
if (retval) {
return retval;
}
}

/* Read payload. */
oh = s->rxbuf->data;
rx_len = ntohs(oh->length);
if (rx_len < sizeof(struct ofp_header)) {
VLOG_ERR_RL(&rl, "received too-short ofp_header (%d bytes
4000
)", rx_len);
return EPROTO;
} else if (s->rxbuf->size < rx_len) {
int retval = vconn_stream_recv__(s, rx_len);
if (retval) {
return retval;
}
}

s->n_packets++;
*bufferp = s->rxbuf;
s->rxbuf = NULL;
return 0;
}

7、vconn_stream_recv__函数
static int
vconn_stream_recv__(struct vconn_stream *s, int rx_len)
{
struct ofpbuf *rx = s->rxbuf;
int want_bytes, retval;

want_bytes = rx_len - rx->size;
ofpbuf_prealloc_tailroom(rx, want_bytes);
retval = stream_recv(s->stream, ofpbuf_tail(rx), want_bytes);
if (retval > 0) {
rx->size += retval;
return retval == want_bytes ? 0 : EAGAIN;
} else if (retval == 0) {
if (rx->size) {
VLOG_ERR_RL(&rl, "connection dropped mid-packet");
return EPROTO;
}
return EOF;
} else {
return -retval;
}
}

8、stream_recv函数
/* Tries to receive up to 'n' bytes from 'stream' into 'buffer', and returns:
*
* - If successful, the number of bytes received (between 1 and 'n').
*
* - On error, a negative errno value.
*
* - 0, if the connection has been closed in the normal fashion, or if 'n'
* is zero.
*
* The recv function will not block waiting for a packet to arrive. If no
* data have been received, it returns -EAGAIN immediately. */
int
stream_recv(struct stream *stream, void *buffer, size_t n)
{
int retval = stream_connect(stream);
return (retval ? -retval
: n == 0 ? 0
: (stream->class->recv)(stream, buffer, n)); //以tcp为例,实际调用fd_recv函数
}

9、fd_recv函数
static ssize_t
fd_recv(struct stream *stream, void *buffer, size_t n)
{
struct stream_fd *s = stream_fd_cast(stream);
ssize_t retval;
int error;

retval = recv(s->fd, buffer, n, 0); //调用linux socket函数
if (retval < 0) {
error = sock_errno();
#ifdef _WIN32
if (error == WSAEWOULDBLOCK) {
error = EAGAIN;
}
#endif
if (error != EAGAIN) {
VLOG_DBG_RL(&rl, "recv: %s", sock_strerror(error));
}
return -error;
}
return retval;
}

到此整个调用过程基本完成,从ofconn -> rconn -> vconn -> stream -> linux socket,整个过程和上一篇的体香关系图是一致的。 通过此流程分析,可以看到vconn是在stream的基础增加了发送和接收缓冲区。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  OVS openvswitch 网络