https://blog.csdn.net/jqh9804/article/details/53066676

VPP使用者几乎都会使用dpdk node作为收包驱动,本文将分析其源码。

基本概念

vlib_buffer_t 
dpdk收到的数据包用rte_mbuf结构描述。vpp为了兼容其它收包node(netmap,pcap等)改为使用vlib_buffer_t来描述数据包。 
 
vlib_buffer_t紧跟在rte_mbuf后面,headroom空间中。 
vlib_buffer_from_rte_mbuf和rte_mbuf_from_vlib_buffer完成相互转化 
值得注意的是,该结构使用了三个CLIB_CACHE_LINE_ALIGN_MARK宏,把结构分成了3段,确保每段都是CLIB_CACHE_LINE_BYTES对齐。作为一个被频繁使用的结构,这样设计可以更好的更具需要进行内存预取。 
vlib_get_next_frame,vlib_put_next_frame 
几乎每个node中必定出现的一对好基友。vlib_get_next_frame获取传递给下一个node的数据包将驻留的内存结构。vlib_put_next_frame把传递给下一个node的数据包写入特定位置。这样下一个node将正式可以被调度框架调度,并处理传给他的数据包 
EFD (early-fast-discard) 
没有找到相关资料,估计也是一个性能优化特性,之后再补个人对作者意图的猜测。

- 核心函数
dpdk_device_input//use_efd指定是否启用early-fast-discard特性。
/** This function is used when there are no worker threads.* The main thread performs IO and forwards the packets.*/
static inline u32
dpdk_device_input (dpdk_main_t * dm,dpdk_device_t * xd,vlib_node_runtime_t * node,u32 cpu_index, u16 queue_id, int use_efd)
{u32 n_buffers;/*缺省情况下一跳是ethernet node,之后可以最终改变下一条,博主认为这里使用next_index = node->cached_next_index会更合理*/u32 next_index = DPDK_RX_NEXT_ETHERNET_INPUT;u32 n_left_to_next, *to_next;u32 mb_index;vlib_main_t *vm = vlib_get_main ();uword n_rx_bytes = 0;u32 n_trace, trace_cnt __attribute__ ((unused));vlib_buffer_free_list_t *fl;u8 efd_discard_burst = 0;u32 buffer_flags_template;//网卡必须up了if (xd->admin_up == 0)return 0;//指定网卡的指定队列上收一批包,返回收到的包个数,数据包地址存在xd->rx_vectors[queue_id]中。n_buffers = dpdk_rx_burst (dm, xd, queue_id);if (n_buffers == 0){/* check if EFD (dpdk) is enabled *///看来EFD是默认不使用EFD。if (PREDICT_FALSE (use_efd && dm->efd.enabled)){/* reset a few stats */xd->efd_agent.last_poll_time = 0;xd->efd_agent.last_burst_sz = 0;}return 0;}buffer_flags_template = dm->buffer_flags_template;vec_reset_length (xd->d_trace_buffers);trace_cnt = n_trace = vlib_get_trace_count (vm, node);/*vlib_buffer有多个pool,可以用在自己构建的数据包,这里的fl仅仅用来作为模板初始化ret_mbuf后面的vlib_buffer结构。*/fl = vlib_buffer_get_free_list (vm, VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX);/** DAW-FIXME: VMXNET3 device stop/start doesn't work,* therefore fake the stop in the dpdk driver by* silently dropping all of the incoming pkts instead of* stopping the driver / hardware.*/if (PREDICT_FALSE (xd->admin_up != 1)){for (mb_index = 0; mb_index < n_buffers; mb_index++)rte_pktmbuf_free (xd->rx_vectors[queue_id][mb_index]);return 0;}/* Check for congestion if EFD (Early-Fast-Discard) is enabled* in any mode (e.g. dpdk, monitor, or drop_all)*/if (PREDICT_FALSE (use_efd && dm->efd.enabled)){/* update EFD counters */dpdk_efd_update_counters (xd, n_buffers, dm->efd.enabled);if (PREDICT_FALSE (dm->efd.enabled & DPDK_EFD_DROPALL_ENABLED)){/* discard all received packets */for (mb_index = 0; mb_index < n_buffers; mb_index++)rte_pktmbuf_free (xd->rx_vectors[queue_id][mb_index]);xd->efd_agent.discard_cnt += n_buffers;increment_efd_drop_counter (vm,DPDK_ERROR_VLAN_EFD_DROP_PKTS,n_buffers);return 0;}/*一次收包到了VLIB_FRAME_SIZE个,如果这种情况次数很多,说明发生了收包端拥塞,可以在后面直接丢弃数据包*/if (PREDICT_FALSE (xd->efd_agent.consec_full_frames_cnt >=dm->efd.consec_full_frames_hi_thresh)){u32 device_queue_sz = rte_eth_rx_queue_count (xd->device_index,queue_id);if (device_queue_sz >= dm->efd.queue_hi_thresh){/* dpdk device queue has reached the critical threshold */xd->efd_agent.congestion_cnt++;/* apply EFD to packets from the burst */efd_discard_burst = 1;}}}mb_index = 0;while (n_buffers > 0){u32 bi0;u8 next0, error0;u32 l3_offset0;vlib_buffer_t *b0, *b_seg, *b_chain = 0;u32 cntr_type;//to_next指向的内存用来保存数据包地址,n_left_to_next保存数据包地址的内存最大剩余空间vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next);while (n_buffers > 0 && n_left_to_next > 0){u8 nb_seg = 1;struct rte_mbuf *mb = xd->rx_vectors[queue_id][mb_index];struct rte_mbuf *mb_seg = mb->next;if (PREDICT_TRUE (n_buffers > 2)){/*把之后需要使用的数据包内存预取一下。为什么预取2个包以后的而不是下一个包?疑惑。内存预取所花费的cpu周期,博主有时间在定量分析下。*/struct rte_mbuf *pfmb = xd->rx_vectors[queue_id][mb_index + 2];vlib_buffer_t *bp = vlib_buffer_from_rte_mbuf (pfmb);CLIB_PREFETCH (pfmb, CLIB_CACHE_LINE_BYTES, STORE);CLIB_PREFETCH (bp, CLIB_CACHE_LINE_BYTES, STORE);}ASSERT (mb);b0 = vlib_buffer_from_rte_mbuf (mb);/* check whether EFD is looking for packets to discard *///拥塞发生了,丢包if (PREDICT_FALSE (efd_discard_burst)){vlib_thread_main_t *tm = vlib_get_thread_main ();if (PREDICT_TRUE (cntr_type = is_efd_discardable (tm, b0, mb))){rte_pktmbuf_free (mb);xd->efd_agent.discard_cnt++;increment_efd_drop_counter (vm, cntr_type, 1);n_buffers--;mb_index++;continue;}}/* Prefetch one next segment if it exists. */if (PREDICT_FALSE (mb->nb_segs > 1)){//又是内存预取struct rte_mbuf *pfmb = mb->next;vlib_buffer_t *bp = vlib_buffer_from_rte_mbuf (pfmb);CLIB_PREFETCH (pfmb, CLIB_CACHE_LINE_BYTES, LOAD);CLIB_PREFETCH (bp, CLIB_CACHE_LINE_BYTES, STORE);b_chain = b0;}/*取用fl中的模板给b0初始化,这个函数在赋值时,如果cpu支持128bit向量,则可以每次赋值拷贝16个字节(u8x16类型支持)*/vlib_buffer_init_for_free_list (b0, fl);//vlib_buffer转换为偏移量,注意地址都是1 << CLIB_LOG2_CACHE_LINE_BYTES对齐bi0 = vlib_get_buffer_index (vm, b0);//数据包写入缓存,下一跳node将会使用to_next[0] = bi0;to_next++;n_left_to_next--;//决定下一跳node,默认情况下会根据3层协议来决定下一跳。如果自己hook了,则条转给指定的nodedpdk_rx_next_and_error_from_mb_flags_x1 (xd, mb, b0,&next0, &error0);
#ifdef RTE_LIBRTE_MBUF_EXT_RX_OLFLAGS/** Clear overloaded TX offload flags when a DPDK driver* is using them for RX flags (e.g. Cisco VIC Ethernet driver)*/if (PREDICT_TRUE (trace_cnt == 0))mb->ol_flags &= PKT_EXT_RX_CLR_TX_FLAGS_MASK;elsetrace_cnt--;
#endif /* RTE_LIBRTE_MBUF_EXT_RX_OLFLAGS */b0->error = node->errors[error0];l3_offset0 = ((next0 == DPDK_RX_NEXT_IP4_INPUT ||next0 == DPDK_RX_NEXT_IP6_INPUT ||next0 == DPDK_RX_NEXT_MPLS_INPUT) ?sizeof (ethernet_header_t) : 0);b0->current_data = l3_offset0;/* Some drivers like fm10k receive frames withmb->data_off > RTE_PKTMBUF_HEADROOM *//*博主认为这里是个bug,应该是b0->current_data += mb->data_off - sizeof(struct vlib_buffer_t)*/b0->current_data += mb->data_off - RTE_PKTMBUF_HEADROOM;b0->current_length = mb->data_len - l3_offset0;b0->flags = buffer_flags_template;if (VMWARE_LENGTH_BUG_WORKAROUND)b0->current_length -= 4;vnet_buffer (b0)->sw_if_index[VLIB_RX] = xd->vlib_sw_if_index;vnet_buffer (b0)->sw_if_index[VLIB_TX] = (u32) ~ 0;n_rx_bytes += mb->pkt_len;/* Process subsequent segments of multi-segment packets */while ((mb->nb_segs > 1) && (nb_seg < mb->nb_segs)){ASSERT (mb_seg != 0);b_seg = vlib_buffer_from_rte_mbuf (mb_seg);vlib_buffer_init_for_free_list (b_seg, fl);ASSERT ((b_seg->flags & VLIB_BUFFER_NEXT_PRESENT) == 0);ASSERT (b_seg->current_data == 0);/** The driver (e.g. virtio) may not put the packet data at the start* of the segment, so don't assume b_seg->current_data == 0 is correct.*/b_seg->current_data =(mb_seg->buf_addr + mb_seg->data_off) - (void *) b_seg->data;b_seg->current_length = mb_seg->data_len;b0->total_length_not_including_first_buffer += mb_seg->data_len;b_chain->flags |= VLIB_BUFFER_NEXT_PRESENT;b_chain->next_buffer = vlib_get_buffer_index (vm, b_seg);b_chain = b_seg;mb_seg = mb_seg->next;nb_seg++;}/** Turn this on if you run into* "bad monkey" contexts, and you want to know exactly* which nodes they've visited... See main.c...*/VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b0);vlib_validate_buffer_enqueue_x1 (vm, node, next_index,to_next, n_left_to_next,bi0, next0);if (PREDICT_FALSE (n_trace > mb_index))vec_add1 (xd->d_trace_buffers, bi0);n_buffers--;mb_index++;}//调度框架之后就可以调度下一跳node处理上文传入的数据包集合了。vlib_put_next_frame (vm, node, next_index, n_left_to_next);}if (PREDICT_FALSE (vec_len (xd->d_trace_buffers) > 0)){dpdk_rx_trace (dm, node, xd, queue_id, xd->d_trace_buffers,vec_len (xd->d_trace_buffers));vlib_set_trace_count (vm, node,n_trace - vec_len (xd->d_trace_buffers));}vlib_increment_combined_counter(vnet_get_main ()->interface_main.combined_sw_if_counters+ VNET_INTERFACE_COUNTER_RX,cpu_index, xd->vlib_sw_if_index, mb_index, n_rx_bytes);//多线程模型下,可以使用多个dpdk线程收包。dpdk_worker_t *dw = vec_elt_at_index (dm->workers, cpu_index);dw->aggregate_rx_packets += mb_index;return mb_index;
}//观察这个注册函数,可见它有5个下一跳。
VLIB_REGISTER_NODE (dpdk_input_node) = {.function = dpdk_input,.type = VLIB_NODE_TYPE_INPUT,.name = "dpdk-input",/* Will be enabled if/when hardware is detected. */.state = VLIB_NODE_STATE_DISABLED,.format_buffer = format_ethernet_header_with_length,.format_trace = format_dpdk_rx_dma_trace,.n_errors = DPDK_N_ERROR,.error_strings = dpdk_error_strings,.n_next_nodes = DPDK_RX_N_NEXT,.next_nodes = {[DPDK_RX_NEXT_DROP] = "error-drop",[DPDK_RX_NEXT_ETHERNET_INPUT] = "ethernet-input",[DPDK_RX_NEXT_IP4_INPUT] = "ip4-input-no-checksum",[DPDK_RX_NEXT_IP6_INPUT] = "ip6-input",[DPDK_RX_NEXT_MPLS_INPUT] = "mpls-gre-input",},
};

(转)思科VPP源码分析(dpdk node分析)相关推荐

  1. Colly源码解析——结合例子分析底层实现

    通过<Colly源码解析--框架>分析,我们可以知道Colly执行的主要流程.本文将结合http://go-colly.org上的例子分析一些高级设置的底层实现.(转载请指明出于break ...

  2. 直播源码app,node博客之路由搭建

    直播源码app,node博客之路由搭建实现的相关代码 www.js const http = require('http') const PORT = 9000 const serverHandle ...

  3. vpp源码框架的rpm打包、安装、启动1

    给vpp框架代码 rpm 打包,可参考: vpp rpm 官网 可能由于天朝网络的原因墙内很多人可能看不到,即使能看到也是长长的一页英文,个人感觉很多步骤都时比较废柴的.故此直接总结一下,如果大家是初 ...

  4. lodash源码中debounce函数分析

    lodash源码中debounce函数分析 一.使用 在lodash中我们可以使用debounce函数来进行防抖和截流,之前我并未仔细注意过,但是不可思议的是,lodash中的防抖节流函数是一个函数两 ...

  5. Linux内核学习(五):linux kernel源码结构以及makefile分析

    Linux内核学习(五):linux kernel源码结构以及makefile分析 前面我们知道了linux内核镜像的生成.加载以及加载工具uboot. 这里我们来看看linux内核的源码的宏观东西, ...

  6. JUC之CountDownLatch的源码和使用场景分析

    最近工作不饱和,写写文章充充电.何以解忧,唯有Coding.后续更新的文章涉及的方向有:ThreadPoolExecutor.Spring.MyBatis.ReentrantLock.CyclicBa ...

  7. d3-force 力导图 源码解读与原理分析【一】

    首先先推荐一下某呆翻译的d3-force的中文文档:https://github.com/xswei/D3-V... . 在我们解读源码前还请读者先熟悉一下force相关的API,以及es6语法 . ...

  8. (附源码)nodejs+mysql+node基于vue框架的游戏商城设计及开发 -《夜幕》毕业设计262127

    Node.js<夜幕>游戏商城的开发 摘 要 现今人们的生活方式逐渐丰富,电脑和网络已经融入了人们生活中的滴滴点点,无时不刻的影响着我们的日常生活,网络游戏已经进入到了大多数人的生活之中. ...

  9. (附源码)基于Node.js校园志愿者管理系统-计算机毕设 78452

    Node.js校园志愿者管理系统 摘 要 图3-1 用户用例图 管理员用例图如下所示. 图3-2 管理员用例图 前台用户功能 前台用户可分为未注册用户需求和已注册用户需求. 未注册用户的功能如下: 注 ...

  10. 刨根问底---cocos2d源码的理解与分析

    主要看的是cocos2d的2D部分C++源码(不包含3D的或者是creator相关的),某些比较特殊的方法和变量会比较详细的展开分析讨论. cocos2dx_3.1.7版本 CCRef:[已看完] c ...

最新文章

  1. 贝佐斯旗下媒体爆料:亚马逊因言废人,不管啥岗位,抗议就走人
  2. 深度学习笔记第二门课 改善深层神经网络 第一周:深度学习的实践层面
  3. CSS3:linear-gradient,线性渐变的使用方法
  4. RabbitMQ自学之路(九)——RabbitMQ实现延时队列的两种方式
  5. 对 Oracle 备份与恢复 的补充说明
  6. MySQL--更新自增列的潜在风险
  7. Unity3D 场景与C# Control进行结合
  8. Vagrant 快速入门
  9. java 字符串 1_java 字符串操作大全1
  10. 一步步创建第一个Docker App —— 4. 部署应用
  11. 人生路上必须明白的七个哲理[转]
  12. php显示几个字符串,比较php中的两个字符串并显示字符差异
  13. 二分图最大匹配(匈牙利算法) URAL 1721 Two Sides of the Same Coin
  14. Telegram纸飞机最大的电报中文搜索引擎Telegram中文交流社区
  15. 零中频发射机设计与实现
  16. 无刷直流电机matlab建模,基于MATLAB的无刷直流电机建模方法_郭丹蕊
  17. 微信小程序点击分享功能
  18. HDU 2544 最短路 最短路入门
  19. Bootstrap可以这样学-曹领雄-专题视频课程
  20. ES Search流程 与GET/MGET

热门文章

  1. javascript string对象方法总结
  2. Mobicents记录1:如何搭建和运行mobicents3.0环境(基于jboss7.2)
  3. gbdt 算法比随机森林容易_数据挖掘面试准备(1)|常见算法(logistic回归,随机森林,GBDT和xgboost)...
  4. linux之 sed命令
  5. Go 标准库介绍五: io
  6. openstack nova后端使用ceph rbd(增加在线迁移live_migrate和快照snapshot功能)
  7. .net面试问答(大汇总)(转)
  8. matlab用mkdir在指定的文件夹下创建新的文件夹,并把图像保存在该文件夹内
  9. C语言里面的 malloc 函数
  10. js和jquery给iframe src赋值的3种方法