您的位置:首页 > 理论基础 > 计算机网络

【OVS2.5.0源码分析】sFlow实现分析(3)

2016-09-18 18:36 851 查看
本篇分析,sflow的消息处理,处理入口函数是在process_upcall函数。

1、process_upcall函数

case SFLOW_UPCALL:
if (upcall->sflow) {
union user_action_cookie cookie;
const struct nlattr *actions;
size_t actions_len = 0;
struct dpif_sflow_actions sflow_actions;
memset(&sflow_actions, 0, sizeof sflow_actions);
memset(&cookie, 0, sizeof cookie);
memcpy(&cookie, nl_attr_get(userdata), sizeof cookie.sflow); //读取user data信息
if (upcall->actions) {
/* Actions were passed up from datapath. */
actions = nl_attr_get(upcall->actions);
actions_len = nl_attr_get_size(upcall->actions);
if (actions && actions_len) {
dpif_sflow_read_actions(flow, actions, actions_len, //获取sflow action
&sflow_actions);
}
}
if (actions_len == 0) {
/* Lookup actions in userspace cache. */
struct udpif_key *ukey = ukey_lookup(udpif, upcall->ufid); //用户态cache,加速
if (ukey) {
ukey_get_actions(ukey, &actions, &actions_len);
dpif_sflow_read_actions(flow, actions, actions_len,
&sflow_actions);
}
}
dpif_sflow_received(upcall->sflow, packet, flow, //sflow消息处理
flow->in_port.odp_port, &cookie,
actions_len > 0 ? &sflow_actions : NULL);
}
break;
2、dpif_sflow_received函数
void
dpif_sflow_received(struct dpif_sflow *ds, const struct dp_packet *packet,
const struct flow *flow, odp_port_t odp_in_port,
const union user_action_cookie *cookie,
const struct dpif_sflow_actions *sflow_actions)
OVS_EXCLUDED(mutex)
{
SFL_FLOW_SAMPLE_TYPE fs;
SFLFlow_sample_element hdrElem;
SFLSampled_header *header;
SFLFlow_sample_element switchElem;
uint8_t tnlInProto, tnlOutProto;
SFLFlow_sample_element tnlInElem, tnlOutElem;
SFLFlow_sample_element vniInElem, vniOutElem;
SFLFlow_sample_element mplsElem;
uint32_t mpls_lse_buf[FLOW_MAX_MPLS_LABELS];
SFLSampler *sampler;
struct dpif_sflow_port *in_dsp;
struct dpif_sflow_port *out_dsp;
ovs_be16 vlan_tci;

ovs_mutex_lock(&mutex);
sampler = ds->sflow_agent->samplers; //获取第一个sampler
if (!sampler) {
goto out;
}

/* Build a flow sample. */
memset(&fs, 0, sizeof fs);

/* Look up the input ifIndex if this port has one. Otherwise just
* leave it as 0 (meaning 'unknown') and continue. */
in_dsp = dpif_sflow_find_port(ds, odp_in_port);
if (in_dsp) {
fs.input = SFL_DS_INDEX(in_dsp->dsi);
}

/* Make the assumption that the random number generator in the datapath converges
* to the configured mean, and just increment the samplePool by the configured
* sampling rate every time. */
sampler->samplePool += sfl_sampler_get_sFlowFsPacketSamplingRate(sampler);

/* Sampled header. */

4000
memset(&hdrElem, 0, sizeof hdrElem);
hdrElem.tag = SFLFLOW_HEADER;
header = &hdrElem.flowType.header;
header->header_protocol = SFLHEADER_ETHERNET_ISO8023;
/* The frame_length should include the Ethernet FCS (4 bytes),
* but it has already been stripped, so we need to add 4 here. */
header->frame_length = dp_packet_size(packet) + 4;
/* Ethernet FCS stripped off. */
header->stripped = 4;
header->header_length = MIN(dp_packet_size(packet),
sampler->sFlowFsMaximumHeaderSize);
header->header_bytes = dp_packet_data(packet);

/* Add extended switch element. */
memset(&switchElem, 0, sizeof(switchElem));
switchElem.tag = SFLFLOW_EX_SWITCH;
switchElem.flowType.sw.src_vlan = vlan_tci_to_vid(flow->vlan_tci);
switchElem.flowType.sw.src_priority = vlan_tci_to_pcp(flow->vlan_tci);

/* Retrieve data from user_action_cookie. */
vlan_tci = cookie->sflow.vlan_tci;
switchElem.flowType.sw.dst_vlan = vlan_tci_to_vid(vlan_tci);
switchElem.flowType.sw.dst_priority = vlan_tci_to_pcp(vlan_tci);

fs.output = cookie->sflow.output;

/* Input tunnel. */
if (flow->tunnel.ip_dst) {
memset(&tnlInElem, 0, sizeof(tnlInElem));
tnlInElem.tag = SFLFLOW_EX_IPV4_TUNNEL_INGRESS;
tnlInProto = dpif_sflow_tunnel_proto(in_dsp->tunnel_type);
dpif_sflow_tunnel_v4(tnlInProto,
&flow->tunnel,
&tnlInElem.flowType.ipv4);
SFLADD_ELEMENT(&fs, &tnlInElem);
if (flow->tunnel.tun_id) {
memset(&vniInElem, 0, sizeof(vniInElem));
vniInElem.tag = SFLFLOW_EX_VNI_INGRESS;
vniInElem.flowType.tunnel_vni.vni
= ntohll(flow->tunnel.tun_id);
SFLADD_ELEMENT(&fs, &vniInElem);
}
}

/* Output tunnel. */
if (sflow_actions
&& sflow_actions->encap_depth == 1
&& !sflow_actions->tunnel_err
&& dpif_sflow_cookie_num_outputs(cookie) == 1) {
tnlOutProto = sflow_actions->tunnel_ipproto;
if (tnlOutProto == 0) {
/* Try to infer the ip-protocol from the output port. */
if (sflow_actions->out_port != ODPP_NONE) {
out_dsp = dpif_sflow_find_port(ds, sflow_actions->out_port);
if (out_dsp) {
tnlOutProto = dpif_sflow_tunnel_proto(out_dsp->tunnel_type);
}
}
}
memset(&tnlOutElem, 0, sizeof(tnlOutElem));
tnlOutElem.tag = SFLFLOW_EX_IPV4_TUNNEL_EGRESS;
dpif_sflow_tunnel_v4(tnlOutProto,
&sflow_actions->tunnel,
&tnlOutElem.flowType.ipv4);
SFLADD_ELEMENT(&fs, &tnlOutElem);
if (sflow_actions->tunnel.tun_id) {
memset(&vniOutElem, 0, sizeof(vniOutElem));
vniOutElem.tag = SFLFLOW_EX_VNI_EGRESS;
vniOutElem.flowType.tunnel_vni.vni
= ntohll(sflow_actions->tunnel.tun_id);
SFLADD_ELEMENT(&fs, &vniOutElem);
}
}

/* MPLS output label stack. */
if (sflow_actions
&& sflow_actions->mpls_stack_depth > 0
&& !sflow_actions->mpls_err
&& dpif_sflow_cookie_num_outputs(cookie) == 1) {
memset(&mplsElem, 0, sizeof(mplsElem));
mplsElem.tag = SFLFLOW_EX_MPLS;
dpif_sflow_encode_mpls_stack(&mplsElem.flowType.mpls.out_stack,
mpls_lse_buf,
sflow_actions);
SFLADD_ELEMENT(&fs, &mplsElem);
}

/* Submit the flow sample to be encoded into the next datagram. */
SFLADD_ELEMENT(&fs, &hdrElem);
SFLADD_ELEMENT(&fs, &switchElem);
sfl_sampler_writeFlowSample(sampler, &fs); //发送sflow报文

out:
ovs_mutex_unlock(&mutex);
}3、sfl_sampler_writeFlowSample函数
void sfl_sampler_writeFlowSample(SFLSampler *sampler, SFL_FLOW_SAMPLE_TYPE *fs)
{
if(fs == NULL) return;
sampler->samplesThisTick++;
/* increment the sequence number */
fs->sequence_number = ++sampler->flowSampleSeqNo;
/* copy the other header fields in */
#ifdef SFL_USE_32BIT_INDEX
fs->ds_class = SFL_DS_CLASS(sampler->dsi);
fs->ds_index = SFL_DS_INDEX(sampler->dsi);
#else
fs->source_id = SFL_DS_DATASOURCE(sampler->dsi);
#endif
/* the sampling rate may have been set already. */
if(fs->sampling_rate == 0) fs->sampling_rate = sampler->sFlowFsPacketSamplingRate;
/* the samplePool may be maintained upstream too. */
if( fs->sample_pool == 0) fs->sample_pool = sampler->samplePool;
/* sent to my receiver */
if(sampler->myReceiver) sfl_receiver_writeFlowSample(sampler->myReceiver, fs); //通过receiver发送sflow报文
}4、sfl_receiver_writeFlowSample函数
int sfl_receiver_writeFlowSample(SFLReceiver *receiver, SFL_FLOW_SAMPLE_TYPE *fs)
{
int packedSize;
if(fs == NULL) return -1;
if((packedSize = computeFlowSampleSize(receiver, fs)) == -1) return -1;

// check in case this one sample alone is too big for the datagram
// in fact - if it is even half as big then we should ditch it. Very
// important to avoid overruning the packet buffer.
if(packedSize > (int)(receiver->sFlowRcvrMaximumDatagramSize / 2)) {
sflError(receiver, "flow sample too big for datagram");
return -1;
}

// if the sample pkt is full enough so that this sample might put
// it over the limit, then we should send it now before going on.
if((receiver->sampleCollector.pktlen + packedSize) >= receiver->sFlowRcvrMaximumDatagramSize)
sendSample(receiver); //如果sflow的报文超过一定的长度,则发送slfow报文

receiver->sampleCollector.numSamples++;

#ifdef SFL_USE_32BIT_INDEX
putNet32(receiver, SFLFLOW_SAMPLE_EXPANDED);
#else
putNet32(receiver, SFLFLOW_SAMPLE);
#endif

putNet32(receiver, packedSize - 8); // don't include tag and len
putNet32(receiver, fs->sequence_number);

#ifdef SFL_USE_32BIT_INDEX
putNet32(receiver, fs->ds_class);
putNet32(receiver, fs->ds_index);
#else
putNet32(receiver, fs->source_id);
#endif

putNet32(receiver, fs->sampling_rate);
putNet32(receiver, fs->sample_pool);
putNet32(receiver, fs->drops);

#ifdef SFL_USE_32BIT_INDEX
putNet32(receiver, fs->inputFormat);
putNet32(receiver, fs->input);
putNet32(receiver, fs->outputFormat);
putNet32(receiver, fs->output);
#else
putNet32(receiver, fs->input);
putNet32(receiver, fs->output);
#endif

putNet32(receiver, fs->num_elements);

{
SFLFlow_sample_element *elem = fs->elements;
for(; elem != NULL; elem = elem->nxt) {

putNet32(receiver, elem->tag);
putNet32(receiver, elem->length); // length cached in computeFlowSampleSize()

switch(elem->tag) {
case SFLFLOW_HEADER:
putNet32(receiver, elem->flowType.header.header_protocol);
putNet32(receiver, elem->flowType.header.frame_length);
putNet32(receiver, elem->flowType.header.stripped);
putNet32(receiver, elem->flowType.header.header_length);
/* the header */
memcpy(receiver->sampleCollector.datap, elem->flowType.header.header_bytes, elem->flowType.header.header_length);
/* round up to multiple of 4 to preserve alignment */
receiver->sampleCollector.datap += ((elem->flowType.header.header_length + 3) / 4);
break;
case SFLFLOW_ETHERNET:
putNet32(receiver, elem->flowType.ethernet.eth_len);
putMACAddress(receiver, elem->flowType.ethernet.src_mac);
putMACAddress(receiver, elem->flowType.ethernet.dst_mac);
putNet32(receiver, elem->flowType.ethernet.eth_type);
break;
case SFLFLOW_IPV4:
case SFLFLOW_EX_IPV4_TUNNEL_EGRESS:
case SFLFLOW_EX_IPV4_TUNNEL_INGRESS:
putNet32(receiver, elem->flowType.ipv4.length);
putNet32(receiver, elem->flowType.ipv4.protocol);
put32(receiver, elem->flowType.ipv4.src_ip.addr);
put32(receiver, elem->flowType.ipv4.dst_ip.addr);
putNet32(receiver, elem->flowType.ipv4.src_port);
putNet32(receiver, elem->flowType.ipv4.dst_port);
putNet32(receiver, elem->flowType.ipv4.tcp_flags);
putNet32(receiver, elem->flowType.ipv4.tos);
break;
case SFLFLOW_IPV6:
putNet32(receiver, elem->flowType.ipv6.length);
putNet32(receiver, elem->flowType.ipv6.protocol);
put128(receiver, elem->flowType.ipv6.src_ip.addr);
put128(receiver, elem->flowType.ipv6.dst_ip.addr);
putNet32(receiver, elem->flowType.ipv6.src_port);
putNet32(receiver, elem->flowType.ipv6.dst_port);
putNet32(receiver, elem->flowType.ipv6.tcp_flags);
putNet32(receiver, elem->flowType.ipv6.priority);
break;
case SFLFLOW_EX_SWITCH: putSwitch(receiver, &elem->flowType.sw); break;
case SFLFLOW_EX_ROUTER: putRouter(receiver, &elem->flowType.router); break;
case SFLFLOW_EX_GATEWAY: putGateway(receiver, &elem->flowType.gateway); break;
case SFLFLOW_EX_USER: putUser(receiver, &elem->flowType.user); break;
case SFLFLOW_EX_URL: putUrl(receiver, &elem->flowType.url); break;
case SFLFLOW_EX_MPLS: putMpls(receiver, &elem->flowType.mpls); break;
case SFLFLOW_EX_NAT: putNat(receiver, &elem->flowType.nat); break;
case SFLFLOW_EX_MPLS_TUNNEL: putMplsTunnel(receiver, &elem->flowType.mpls_tunnel); break;
case SFLFLOW_EX_MPLS_VC: putMplsVc(receiver, &elem->flowType.mpls_vc); break;
case SFLFLOW_EX_MPLS_FTN: putMplsFtn(receiver, &elem->flowType.mpls_ftn); break;
case SFLFLOW_EX_MPLS_LDP_FEC: putMplsLdpFec(receiver, &elem->flowType.mpls_ldp_fec); break;
case SFLFLOW_EX_VLAN_TUNNEL: putVlanTunnel(receiver, &elem->flowType.vlan_tunnel); break;
case SFLFLOW_EX_VNI_EGRESS:
case SFLFLOW_EX_VNI_INGRESS:
putNet32(receiver, elem->flowType.tunnel_vni.vni);
break;

default:
sflError(receiver, "unexpected packet_data_tag");
return -1;
break;
}
}
}

// sanity check
assert(((u_char *)receiver->sampleCollector.datap
- (u_char *)receiver->sampleCollector.data
- receiver->sampleCollector.pktlen) == (u_int32_t)packedSize);

// update the pktlen
receiver->sampleCollector.pktlen = (u_char *)receiver->sampleCollector.datap - (u_char *)receiver->sampleCollector.data;
return packedSize;
}
5、sendSample函数
static void sendSample(SFLReceiver *receiver)
{
/* construct and send out the sample, then reset for the next one... */
/* first fill in the header with the latest values */
/* version, agent_address and sub_agent_id were pre-set. */
u_int32_t hdrIdx = (receiver->agent->myIP.type == SFLADDRESSTYPE_IP_V6) ? 7 : 4;
receiver->sampleCollector.data[hdrIdx++] = htonl(++receiver->sampleCollector.packetSeqNo); /* seq no */
receiver->sampleCollector.data[hdrIdx++] = htonl((receiver->agent->now - receiver->agent->bootTime) * 1000); /* uptime */
receiver->sampleCollector.data[hdrIdx++] = htonl(receiver->sampleCollector.numSamples); /* num samples */
/* send */
if(receiver->agent->sendFn) (*receiver->agent->sendFn)(receiver->agent->magic, //走此分支,实际调用sflow_agent_send_packet_cb
receiver->agent,
receiver,
(u_char *)receiver->sampleCollector.data,
receiver->sampleCollector.pktlen);
else {
#ifdef SFLOW_DO_SOCKET
/* send it myself */
if (receiver->sFlowRcvrAddress.type == SFLADDRESSTYPE_IP_V6) {
u_int32_t soclen = sizeof(struct sockaddr_in6);
int result = sendto(receiver->agent->receiverSocket6,
receiver->sampleCollector.data,
receiver->sampleCollector.pktlen,
0,
(struct sockaddr *)&receiver->receiver6,
soclen);
if(result == -1 && errno != EINTR) sfl_agent_sysError(receiver->agent, "receiver", "IPv6 socket sendto error");
if(result == 0) sfl_agent_error(receiver->agent, "receiver", "IPv6 socket sendto returned 0");
}
else {
u_int32_t soclen = sizeof(struct sockaddr_in);
int result = sendto(receiver->agent->receiverSocket4,
receiver->sampleCollector.data,
receiver->sampleCollector.pktlen,
0,
(struct sockaddr *)&receiver->receiver4,
soclen);
if(result == -1 && errno != EINTR) sfl_agent_sysError(receiver->agent, "receiver", "socket sendto error");
if(result == 0) sfl_agent_error(receiver->agent, "receiver", "socket sendto returned 0");
}
#endif
}

/* reset for the next time */
resetSampleCollector(receiver);
}
6、sflow_agent_send_packet_cb函数
/* sFlow library callback to send datagram. */
static void
sflow_agent_send_packet_cb(void *ds_, SFLAgent *agent OVS_UNUSED,
SFLReceiver *receiver OVS_UNUSED, u_char *pkt,
uint32_t pktLen)
{
struct dpif_sflow *ds = ds_;
collectors_send(ds->collectors, pkt, pktLen); //报文发送给所有的collector
}
7、collectors_send函数
/* Sends the 'n'-byte 'payload' to each of the collectors in 'c'. */
void
collectors_send(const struct collectors *c, const void *payload, size_t n)
{
if (c) {
size_t i;

for (i = 0; i < c->n_fds; i++) {
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
if (send(c->fds[i], payload, n, 0) == -1) { //调用linux标准发包函数
char *s = describe_fd(c->fds[i]);
VLOG_WARN_RL(&rl, "%s: sending to collector failed (%s)",
s, ovs_strerror(errno));
free(s);
}
}
}
}
处理sflow消息,概括地讲就是构造sflow的消息体,然后发送给所有的collector。 中间包括数据对象的封装,从代码实现来看,感觉封装的作用不大,个人感觉反而带来了代码的复杂性。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息