本篇分析,sflow的消息处理,处理入口函数是在process_upcall函数。

1、process_upcall函数

    case SFLOW_UPCALL:if (upcall->sflow) {union user_action_cookie cookie;const struct nlattr *actions;size_t actions_len = 0;struct dpif_sflow_actions sflow_actions;memset(&sflow_actions, 0, sizeof sflow_actions);memset(&cookie, 0, sizeof cookie);memcpy(&cookie, nl_attr_get(userdata), sizeof cookie.sflow); //读取user data信息if (upcall->actions) {/* Actions were passed up from datapath. */actions = nl_attr_get(upcall->actions);actions_len = nl_attr_get_size(upcall->actions);if (actions && actions_len) {dpif_sflow_read_actions(flow, actions, actions_len,      //获取sflow action&sflow_actions);}}if (actions_len == 0) {/* Lookup actions in userspace cache. */struct udpif_key *ukey = ukey_lookup(udpif, upcall->ufid);   //用户态cache,加速if (ukey) {ukey_get_actions(ukey, &actions, &actions_len);dpif_sflow_read_actions(flow, actions, actions_len,&sflow_actions);}}dpif_sflow_received(upcall->sflow, packet, flow,          //sflow消息处理flow->in_port.odp_port, &cookie,actions_len > 0 ? &sflow_actions : NULL);}break;

2、dpif_sflow_received函数

void
dpif_sflow_received(struct dpif_sflow *ds, const struct dp_packet *packet,const struct flow *flow, odp_port_t odp_in_port,const union user_action_cookie *cookie,const struct dpif_sflow_actions *sflow_actions)OVS_EXCLUDED(mutex)
{SFL_FLOW_SAMPLE_TYPE fs;SFLFlow_sample_element hdrElem;SFLSampled_header *header;SFLFlow_sample_element switchElem;uint8_t tnlInProto, tnlOutProto;SFLFlow_sample_element tnlInElem, tnlOutElem;SFLFlow_sample_element vniInElem, vniOutElem;SFLFlow_sample_element mplsElem;uint32_t mpls_lse_buf[FLOW_MAX_MPLS_LABELS];SFLSampler *sampler;struct dpif_sflow_port *in_dsp;struct dpif_sflow_port *out_dsp;ovs_be16 vlan_tci;ovs_mutex_lock(&mutex);sampler = ds->sflow_agent->samplers;   //获取第一个samplerif (!sampler) {goto out;}/* Build a flow sample. */memset(&fs, 0, sizeof fs);/* Look up the input ifIndex if this port has one. Otherwise just* leave it as 0 (meaning 'unknown') and continue. */in_dsp = dpif_sflow_find_port(ds, odp_in_port);if (in_dsp) {fs.input = SFL_DS_INDEX(in_dsp->dsi);}/* Make the assumption that the random number generator in the datapath converges* to the configured mean, and just increment the samplePool by the configured* sampling rate every time. */sampler->samplePool += sfl_sampler_get_sFlowFsPacketSamplingRate(sampler);/* Sampled header. */memset(&hdrElem, 0, sizeof hdrElem);hdrElem.tag = SFLFLOW_HEADER;header = &hdrElem.flowType.header;header->header_protocol = SFLHEADER_ETHERNET_ISO8023;/* The frame_length should include the Ethernet FCS (4 bytes),* but it has already been stripped,  so we need to add 4 here. */header->frame_length = dp_packet_size(packet) + 4;/* Ethernet FCS stripped off. */header->stripped = 4;header->header_length = MIN(dp_packet_size(packet),sampler->sFlowFsMaximumHeaderSize);header->header_bytes = dp_packet_data(packet);/* Add extended switch element. */memset(&switchElem, 0, sizeof(switchElem));switchElem.tag = SFLFLOW_EX_SWITCH;switchElem.flowType.sw.src_vlan = vlan_tci_to_vid(flow->vlan_tci);switchElem.flowType.sw.src_priority = vlan_tci_to_pcp(flow->vlan_tci);/* Retrieve data from user_action_cookie. */vlan_tci = cookie->sflow.vlan_tci;switchElem.flowType.sw.dst_vlan = vlan_tci_to_vid(vlan_tci);switchElem.flowType.sw.dst_priority = vlan_tci_to_pcp(vlan_tci);fs.output = cookie->sflow.output;/* Input tunnel. */if (flow->tunnel.ip_dst) {memset(&tnlInElem, 0, sizeof(tnlInElem));tnlInElem.tag = SFLFLOW_EX_IPV4_TUNNEL_INGRESS;tnlInProto = dpif_sflow_tunnel_proto(in_dsp->tunnel_type);dpif_sflow_tunnel_v4(tnlInProto,&flow->tunnel,&tnlInElem.flowType.ipv4);SFLADD_ELEMENT(&fs, &tnlInElem);if (flow->tunnel.tun_id) {memset(&vniInElem, 0, sizeof(vniInElem));vniInElem.tag = SFLFLOW_EX_VNI_INGRESS;vniInElem.flowType.tunnel_vni.vni= ntohll(flow->tunnel.tun_id);SFLADD_ELEMENT(&fs, &vniInElem);}}/* Output tunnel. */if (sflow_actions&& sflow_actions->encap_depth == 1&& !sflow_actions->tunnel_err&& dpif_sflow_cookie_num_outputs(cookie) == 1) {tnlOutProto = sflow_actions->tunnel_ipproto;if (tnlOutProto == 0) {/* Try to infer the ip-protocol from the output port. */if (sflow_actions->out_port != ODPP_NONE) {out_dsp = dpif_sflow_find_port(ds, sflow_actions->out_port);if (out_dsp) {tnlOutProto = dpif_sflow_tunnel_proto(out_dsp->tunnel_type);}}}memset(&tnlOutElem, 0, sizeof(tnlOutElem));tnlOutElem.tag = SFLFLOW_EX_IPV4_TUNNEL_EGRESS;dpif_sflow_tunnel_v4(tnlOutProto,&sflow_actions->tunnel,&tnlOutElem.flowType.ipv4);SFLADD_ELEMENT(&fs, &tnlOutElem);if (sflow_actions->tunnel.tun_id) {memset(&vniOutElem, 0, sizeof(vniOutElem));vniOutElem.tag = SFLFLOW_EX_VNI_EGRESS;vniOutElem.flowType.tunnel_vni.vni= ntohll(sflow_actions->tunnel.tun_id);SFLADD_ELEMENT(&fs, &vniOutElem);}}/* MPLS output label stack. */if (sflow_actions&& sflow_actions->mpls_stack_depth > 0&& !sflow_actions->mpls_err&& dpif_sflow_cookie_num_outputs(cookie) == 1) {memset(&mplsElem, 0, sizeof(mplsElem));mplsElem.tag = SFLFLOW_EX_MPLS;dpif_sflow_encode_mpls_stack(&mplsElem.flowType.mpls.out_stack,mpls_lse_buf,sflow_actions);SFLADD_ELEMENT(&fs, &mplsElem);}/* Submit the flow sample to be encoded into the next datagram. */SFLADD_ELEMENT(&fs, &hdrElem);SFLADD_ELEMENT(&fs, &switchElem);sfl_sampler_writeFlowSample(sampler, &fs);      //发送sflow报文out:ovs_mutex_unlock(&mutex);
}

3、sfl_sampler_writeFlowSample函数

void sfl_sampler_writeFlowSample(SFLSampler *sampler, SFL_FLOW_SAMPLE_TYPE *fs)
{if(fs == NULL) return;sampler->samplesThisTick++;/* increment the sequence number */fs->sequence_number = ++sampler->flowSampleSeqNo;/* copy the other header fields in */
#ifdef SFL_USE_32BIT_INDEXfs->ds_class = SFL_DS_CLASS(sampler->dsi);fs->ds_index = SFL_DS_INDEX(sampler->dsi);
#elsefs->source_id = SFL_DS_DATASOURCE(sampler->dsi);
#endif/* the sampling rate may have been set already. */if(fs->sampling_rate == 0) fs->sampling_rate = sampler->sFlowFsPacketSamplingRate;/* the samplePool may be maintained upstream too. */if( fs->sample_pool == 0) fs->sample_pool = sampler->samplePool;/* sent to my receiver */if(sampler->myReceiver) sfl_receiver_writeFlowSample(sampler->myReceiver, fs); //通过receiver发送sflow报文
}

4、sfl_receiver_writeFlowSample函数

int sfl_receiver_writeFlowSample(SFLReceiver *receiver, SFL_FLOW_SAMPLE_TYPE *fs)
{int packedSize;if(fs == NULL) return -1;if((packedSize = computeFlowSampleSize(receiver, fs)) == -1) return -1;// check in case this one sample alone is too big for the datagram// in fact - if it is even half as big then we should ditch it. Very// important to avoid overruning the packet buffer.if(packedSize > (int)(receiver->sFlowRcvrMaximumDatagramSize / 2)) {sflError(receiver, "flow sample too big for datagram");return -1;}// if the sample pkt is full enough so that this sample might put// it over the limit, then we should send it now before going on.if((receiver->sampleCollector.pktlen + packedSize) >= receiver->sFlowRcvrMaximumDatagramSize)sendSample(receiver);     //如果sflow的报文超过一定的长度,则发送slfow报文receiver->sampleCollector.numSamples++;#ifdef SFL_USE_32BIT_INDEXputNet32(receiver, SFLFLOW_SAMPLE_EXPANDED);
#elseputNet32(receiver, SFLFLOW_SAMPLE);
#endifputNet32(receiver, packedSize - 8); // don't include tag and lenputNet32(receiver, fs->sequence_number);#ifdef SFL_USE_32BIT_INDEXputNet32(receiver, fs->ds_class);putNet32(receiver, fs->ds_index);
#elseputNet32(receiver, fs->source_id);
#endifputNet32(receiver, fs->sampling_rate);putNet32(receiver, fs->sample_pool);putNet32(receiver, fs->drops);#ifdef SFL_USE_32BIT_INDEXputNet32(receiver, fs->inputFormat);putNet32(receiver, fs->input);putNet32(receiver, fs->outputFormat);putNet32(receiver, fs->output);
#elseputNet32(receiver, fs->input);putNet32(receiver, fs->output);
#endifputNet32(receiver, fs->num_elements);{SFLFlow_sample_element *elem = fs->elements;for(; elem != NULL; elem = elem->nxt) {putNet32(receiver, elem->tag);putNet32(receiver, elem->length); // length cached in computeFlowSampleSize()switch(elem->tag) {case SFLFLOW_HEADER:putNet32(receiver, elem->flowType.header.header_protocol);putNet32(receiver, elem->flowType.header.frame_length);putNet32(receiver, elem->flowType.header.stripped);putNet32(receiver, elem->flowType.header.header_length);/* the header */memcpy(receiver->sampleCollector.datap, elem->flowType.header.header_bytes, elem->flowType.header.header_length);/* round up to multiple of 4 to preserve alignment */receiver->sampleCollector.datap += ((elem->flowType.header.header_length + 3) / 4);break;case SFLFLOW_ETHERNET:putNet32(receiver, elem->flowType.ethernet.eth_len);putMACAddress(receiver, elem->flowType.ethernet.src_mac);putMACAddress(receiver, elem->flowType.ethernet.dst_mac);putNet32(receiver, elem->flowType.ethernet.eth_type);break;case SFLFLOW_IPV4:case SFLFLOW_EX_IPV4_TUNNEL_EGRESS:case SFLFLOW_EX_IPV4_TUNNEL_INGRESS:putNet32(receiver, elem->flowType.ipv4.length);putNet32(receiver, elem->flowType.ipv4.protocol);put32(receiver, elem->flowType.ipv4.src_ip.addr);put32(receiver, elem->flowType.ipv4.dst_ip.addr);putNet32(receiver, elem->flowType.ipv4.src_port);putNet32(receiver, elem->flowType.ipv4.dst_port);putNet32(receiver, elem->flowType.ipv4.tcp_flags);putNet32(receiver, elem->flowType.ipv4.tos);break;case SFLFLOW_IPV6:putNet32(receiver, elem->flowType.ipv6.length);putNet32(receiver, elem->flowType.ipv6.protocol);put128(receiver, elem->flowType.ipv6.src_ip.addr);put128(receiver, elem->flowType.ipv6.dst_ip.addr);putNet32(receiver, elem->flowType.ipv6.src_port);putNet32(receiver, elem->flowType.ipv6.dst_port);putNet32(receiver, elem->flowType.ipv6.tcp_flags);putNet32(receiver, elem->flowType.ipv6.priority);break;case SFLFLOW_EX_SWITCH: putSwitch(receiver, &elem->flowType.sw); break;case SFLFLOW_EX_ROUTER: putRouter(receiver, &elem->flowType.router); break;case SFLFLOW_EX_GATEWAY: putGateway(receiver, &elem->flowType.gateway); break;case SFLFLOW_EX_USER: putUser(receiver, &elem->flowType.user); break;case SFLFLOW_EX_URL: putUrl(receiver, &elem->flowType.url); break;case SFLFLOW_EX_MPLS: putMpls(receiver, &elem->flowType.mpls); break;case SFLFLOW_EX_NAT: putNat(receiver, &elem->flowType.nat); break;case SFLFLOW_EX_MPLS_TUNNEL: putMplsTunnel(receiver, &elem->flowType.mpls_tunnel); break;case SFLFLOW_EX_MPLS_VC: putMplsVc(receiver, &elem->flowType.mpls_vc); break;case SFLFLOW_EX_MPLS_FTN: putMplsFtn(receiver, &elem->flowType.mpls_ftn); break;case SFLFLOW_EX_MPLS_LDP_FEC: putMplsLdpFec(receiver, &elem->flowType.mpls_ldp_fec); break;case SFLFLOW_EX_VLAN_TUNNEL: putVlanTunnel(receiver, &elem->flowType.vlan_tunnel); break;case SFLFLOW_EX_VNI_EGRESS:case SFLFLOW_EX_VNI_INGRESS:putNet32(receiver, elem->flowType.tunnel_vni.vni);break;default:sflError(receiver, "unexpected packet_data_tag");return -1;break;}}}// sanity checkassert(((u_char *)receiver->sampleCollector.datap- (u_char *)receiver->sampleCollector.data- receiver->sampleCollector.pktlen)  == (u_int32_t)packedSize);// update the pktlenreceiver->sampleCollector.pktlen = (u_char *)receiver->sampleCollector.datap - (u_char *)receiver->sampleCollector.data;return packedSize;
}

5、sendSample函数

static void sendSample(SFLReceiver *receiver)
{/* construct and send out the sample, then reset for the next one... *//* first fill in the header with the latest values *//* version, agent_address and sub_agent_id were pre-set. */u_int32_t hdrIdx = (receiver->agent->myIP.type == SFLADDRESSTYPE_IP_V6) ? 7 : 4;receiver->sampleCollector.data[hdrIdx++] = htonl(++receiver->sampleCollector.packetSeqNo); /* seq no */receiver->sampleCollector.data[hdrIdx++] = htonl((receiver->agent->now - receiver->agent->bootTime) * 1000); /* uptime */receiver->sampleCollector.data[hdrIdx++] = htonl(receiver->sampleCollector.numSamples); /* num samples *//* send */if(receiver->agent->sendFn) (*receiver->agent->sendFn)(receiver->agent->magic,  //走此分支,实际调用sflow_agent_send_packet_cbreceiver->agent,receiver,(u_char *)receiver->sampleCollector.data,receiver->sampleCollector.pktlen);else {
#ifdef SFLOW_DO_SOCKET/* send it myself */if (receiver->sFlowRcvrAddress.type == SFLADDRESSTYPE_IP_V6) {u_int32_t soclen = sizeof(struct sockaddr_in6);int result = sendto(receiver->agent->receiverSocket6,receiver->sampleCollector.data,receiver->sampleCollector.pktlen,0,(struct sockaddr *)&receiver->receiver6,soclen);if(result == -1 && errno != EINTR) sfl_agent_sysError(receiver->agent, "receiver", "IPv6 socket sendto error");if(result == 0) sfl_agent_error(receiver->agent, "receiver", "IPv6 socket sendto returned 0");}else {u_int32_t soclen = sizeof(struct sockaddr_in);int result = sendto(receiver->agent->receiverSocket4,receiver->sampleCollector.data,receiver->sampleCollector.pktlen,0,(struct sockaddr *)&receiver->receiver4,soclen);if(result == -1 && errno != EINTR) sfl_agent_sysError(receiver->agent, "receiver", "socket sendto error");if(result == 0) sfl_agent_error(receiver->agent, "receiver", "socket sendto returned 0");}
#endif}/* reset for the next time */resetSampleCollector(receiver);
}

6、sflow_agent_send_packet_cb函数

/* sFlow library callback to send datagram. */
static void
sflow_agent_send_packet_cb(void *ds_, SFLAgent *agent OVS_UNUSED,SFLReceiver *receiver OVS_UNUSED, u_char *pkt,uint32_t pktLen)
{struct dpif_sflow *ds = ds_;collectors_send(ds->collectors, pkt, pktLen);  //报文发送给所有的collector
}

7、collectors_send函数

/* Sends the 'n'-byte 'payload' to each of the collectors in 'c'. */
void
collectors_send(const struct collectors *c, const void *payload, size_t n)
{if (c) {size_t i;for (i = 0; i < c->n_fds; i++) {static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);if (send(c->fds[i], payload, n, 0) == -1) {      //调用linux标准发包函数char *s = describe_fd(c->fds[i]);VLOG_WARN_RL(&rl, "%s: sending to collector failed (%s)",s, ovs_strerror(errno));free(s);}}}
}

处理sflow消息,概括地讲就是构造sflow的消息体,然后发送给所有的collector。 中间包括数据对象的封装,从代码实现来看,感觉封装的作用不大,个人感觉反而带来了代码的复杂性。

【OVS2.5.0源码分析】sFlow实现分析(3)相关推荐

  1. Android4.0源码Launcher启动流程分析【android源码Launcher系列一】

    最近研究ICS4.0的Launcher,发现4.0和2.3有稍微点区别,但是区别不是特别大,所以我就先整理一下Launcher启动的大致流程. Launcher其实是贯彻于手机的整个系统的,时时刻刻都 ...

  2. Android中ICS4.0源码Launcher启动流程分析【android源码Launcher系列一】

    最近研究ICS4.0的Launcher,发现4.0和2.3有稍微点区别,但是区别不是特别大,所以我就先整理一下Launcher启动的大致流程.Launcher其实是贯彻于手机的整个系统的,时时刻刻都在 ...

  3. 【OVS2.5.0源码分析】mirror实现原理(1)

    端口镜像是交换机的标准功能之一,针对某个端口的报文拷贝到除真实目的之外的另外一个目的地(output),这一篇我们先分析配置mirror之后,如何生成流表,在什么阶段生成流表. 1.xlate_act ...

  4. Android 7.0 源码分析项目一期竣工啦

    从 Android 入行开始,因为工作需求和解决疑难bug的原因陆陆续续的看过一些源码,但都不成系统,从2016年年底开始,在Github上建了一个Android Open Source Projec ...

  5. spring boot 2.0 源码分析(二)

    在上一章学习了spring boot 2.0启动的大概流程以后,今天我们来深挖一下SpringApplication实例变量的run函数. 先把这段run函数的代码贴出来: /*** Run the ...

  6. 《MapReduce 2.0源码分析与编程实战》一第1章 HBase介绍

    本节书摘来异步社区<MapReduce 2.0源码分析与编程实战>一书中的第1章,作者: 王晓华 责编: 陈冀康,更多章节内容可以访问云栖社区"异步社区"公众号查看. ...

  7. Tomcat7.0源码分析——Session管理分析(下)

    版权声明:本文为博主原创文章,未经博主允许不得转载. https://blog.csdn.net/beliefer/article/details/52451061 前言 在<Tomcat7.0 ...

  8. vue-cli 3.0 源码分析

    写在前面 其实最开始不是特意来研究 vue-cli 的源码,只是想了解下 node 的命令,如果想要了解 node 命令的话,那么绕不开 tj 写的 commander.js.在学习 commande ...

  9. Tomcat7.0源码分析——Session管理分析(上)

    版权声明:本文为博主原创文章,未经博主允许不得转载. https://blog.csdn.net/beliefer/article/details/52450268 前言 对于广大java开发者而言, ...

最新文章

  1. SQL脚本--有关压缩数据库日志
  2. python【蓝桥杯vip练习题库】ADV-92求最大公约数(递归)
  3. java web分层的思想
  4. PAT (Basic Level) 1073 多选题常见计分法(恶心模拟)
  5. 算法训练 最长字符串 java
  6. R语言之离群点检验(part3)--利用聚类检测离群点
  7. 你碰到过的最难调试的 Bug 是什么样的?
  8. 瑞幸咖啡股价再创新低,App 反冲 TOP 1
  9. 大话重构连载9:大布局你伤不起
  10. 产品经理适合当项目经理吗?
  11. ArcGIS 查看运行结果
  12. 没有电脑却想运行代码?有手机就够了
  13. psp2000 M33 自制固件---恢复模式说明(基本所有版本都适用)
  14. 如何导出魔兽3模型到3Dmax里
  15. lfs库下载_Git上传大文件夹LFS
  16. 宝塔面板远程登录连接FTP空间详细教程
  17. cyj等于什么英语单词_英语参考单词读写规律大全.doc
  18. Matplotlib 箱线图
  19. springboot设置session超时和session监听
  20. oracle的当前日期,Oracle 获取当前日期及日期格式

热门文章

  1. 【angular学习】自定义实现双向绑定
  2. windows10下文件被占用,不能删除
  3. NC65 查询信用余额——客户信用联查、销售订单信用联查等
  4. 统计学习方法chapter1
  5. linux hwclock -r显示的HWC TIME(硬件时钟时间)与timedatectl结果中的RTC TIME(实时时钟时间)有什么区别?BIOS时钟源
  6. 如何使用KMS激活win10和office
  7. Intranet/Internet
  8. 如何在web端登录企业邮箱? 163企业邮箱怎么登陆?
  9. leaflet 画扇形
  10. iOS 如何连接打印机