1、概述

GRO是针对报文接收方向的,是指设备链路层在接收报文处理的时候,将多个小包合并成一个大包一起上送协议栈,减少数据包在协议栈间交互的机制。可以通过ethtool -K eth0 gro on/off来打开或关闭GRO功能,GRO虽然可以提升吞吐,但同时也会带来一定是时延增加。GRO是需要网卡有NAPI的能力,驱动通过NAPI收上来包后,判断如果有启用GRO功能,则将包按流的方式先存放在napi->gro_list链表里,等NAPI收完包或GRO链表里的skb超时,或者GRO合并过程中判断需要上送协议栈处理时,将对应的gro链表的skb上送协议栈。

struct napi_struct {/* The poll_list must only be managed by the entity which* changes the state of the NAPI_STATE_SCHED bit.  This means* whoever atomically sets that bit can add this napi_struct* to the per-cpu poll_list, and whoever clears that bit* can remove from the list right before clearing the bit.*/struct list_head   poll_list;unsigned long     state;int           weight;//gro链表流的个数,最多不超过8个unsigned int       gro_count;int           (*poll)(struct napi_struct *, int);
#ifdef CONFIG_NETPOLLspinlock_t     poll_lock;int           poll_owner;
#endifstruct net_device *dev;//gro链表struct sk_buff      *gro_list;struct sk_buff        *skb;struct list_head   dev_list;struct hlist_node  napi_hash_node;unsigned int     napi_id;RH_KABI_EXTEND(size_t   size)RH_KABI_EXTEND(struct hrtimer  timer)
};

2、流程分析

ixgbe_rx_skb

网卡驱动从rx ring里收到包后,调用ixgbe_rx_skb上送协议栈,ixgbe_rx_skb判断上层socket是否有在对队列polling,如果没有,则进入gro合并入口函数napi_gro_receive;

static void ixgbe_rx_skb(struct ixgbe_q_vector *q_vector,struct sk_buff *skb)
{skb_mark_napi_id(skb, &q_vector->napi);if (ixgbe_qv_busy_polling(q_vector))netif_receive_skb(skb);elsenapi_gro_receive(&q_vector->napi, skb);
}

dev_gro_receive

gro入口函数进一步调用dev_gro_receive,在dev_gro_receive里,先重置下skb的mac层信息,然后调用ip层提供的GRO回调函数,上层回调函数判断napi->gro_list链表里是否有跟skb是同一条流的,如果存在,则将skb合并到对应的skb里,如果不存在,返回到dev_gro_receive函数后,将新的skb插入到napi->gro_list的末尾,作为这条流的首包。

static enum gro_result dev_gro_receive(struct napi_struct *napi, struct sk_buff *skb)
{struct sk_buff **pp = NULL;struct packet_offload *ptype;__be16 type = skb->protocol;struct list_head *head = &offload_base;int same_flow;enum gro_result ret;int grow;if (!(skb->dev->features & NETIF_F_GRO))goto normal;if (skb_is_gso(skb) || skb_has_frag_list(skb) || skb->csum_bad)goto normal;gro_list_prepare(napi, skb);rcu_read_lock();list_for_each_entry_rcu(ptype, head, list) {if (ptype->type != type || !ptype->callbacks.gro_receive)continue;skb_set_network_header(skb, skb_gro_offset(skb));skb_reset_mac_len(skb);//先将same_flow清零NAPI_GRO_CB(skb)->same_flow = 0;NAPI_GRO_CB(skb)->flush = 0;NAPI_GRO_CB(skb)->free = 0;NAPI_GRO_CB(skb)->encap_mark = 0;NAPI_GRO_CB(skb)->recursion_counter = 0;NAPI_GRO_CB(skb)->is_atomic = 1;NAPI_GRO_CB(skb)->gro_remcsum_start = 0;/* Setup for GRO checksum validation */switch (skb->ip_summed) {case CHECKSUM_COMPLETE:NAPI_GRO_CB(skb)->csum = skb->csum;NAPI_GRO_CB(skb)->csum_valid = 1;NAPI_GRO_CB(skb)->csum_cnt = 0;break;case CHECKSUM_UNNECESSARY:NAPI_GRO_CB(skb)->csum_cnt = skb->csum_level + 1;NAPI_GRO_CB(skb)->csum_valid = 0;break;default:NAPI_GRO_CB(skb)->csum_cnt = 0;NAPI_GRO_CB(skb)->csum_valid = 0;}pp = ptype->callbacks.gro_receive(&napi->gro_list, skb);break;}rcu_read_unlock();if (&ptype->list == head)goto normal;//在回调网络层、传输层的gro合并回调函数时,会判断已有的gro链表是否存在相同流的//如果存在,same_flow为置1,因此这里判断same_flow的值,如果为0,说明是流首包//如果非0,说明skb已经被合并到gro_list里了same_flow = NAPI_GRO_CB(skb)->same_flow;ret = NAPI_GRO_CB(skb)->free ? GRO_MERGED_FREE : GRO_MERGED;//pp为非空,说明需要flushif (pp) {struct sk_buff *nskb = *pp;*pp = nskb->next;nskb->next = NULL;napi_gro_complete(nskb);napi->gro_count--;}//如果存在同一条流的, 说明在gro_receive流程里已经将skb合入到gro_list里了,因此这里不需要再处理了if (same_flow)goto ok;//这个skb需要直接上送协议栈,不能添加到gro_listif (NAPI_GRO_CB(skb)->flush)goto normal;//gro链表上一共有8条流了,则再添加新的一条流前,把链表里最老的那条流的skb先发送出去if (unlikely(napi->gro_count >= MAX_GRO_SKBS)) {struct sk_buff *nskb = napi->gro_list;/* locate the end of the list to select the 'oldest' flow */while (nskb->next) {pp = &nskb->next;nskb = *pp;}*pp = NULL;nskb->next = NULL;napi_gro_complete(nskb);} else {napi->gro_count++;}//走到这里说明,待合入的skb是这条流的首包,因此将其挂到gro_list里,//并将NAPI_GRO_CB(skb)->last指向自己//并等待后续同一条流的skb到来NAPI_GRO_CB(skb)->count = 1;NAPI_GRO_CB(skb)->age = jiffies;NAPI_GRO_CB(skb)->last = skb;skb_shinfo(skb)->gso_size = skb_gro_len(skb);skb->next = napi->gro_list;napi->gro_list = skb;ret = GRO_HELD;pull:grow = skb_gro_offset(skb) - skb_headlen(skb);if (grow > 0)gro_pull_from_frag0(skb, grow);
ok:return ret;normal:ret = GRO_NORMAL;goto pull;
}

inet_gro_receive

GRO合并消息进入到ip层后,首先根据ip头的信息(源、宿ip)进一步找到skb_list里相同的流,然后判断待GRO合并的skb是否是分片数据包,分片数据包不能做GRO,最后重置下带GRO合并的skb的网络层信息后,进一步调用传输层的GRO回调函数;

static struct sk_buff **inet_gro_receive(struct sk_buff **head,struct sk_buff *skb)
{const struct net_offload *ops;struct sk_buff **pp = NULL;struct sk_buff *p;const struct iphdr *iph;unsigned int hlen;unsigned int off;unsigned int id;int flush = 1;int proto;off = skb_gro_offset(skb);hlen = off + sizeof(*iph);iph = skb_gro_header_fast(skb, off);if (skb_gro_header_hard(skb, hlen)) {iph = skb_gro_header_slow(skb, hlen, off);if (unlikely(!iph))goto out;}proto = iph->protocol;rcu_read_lock();ops = rcu_dereference(inet_offloads[proto]);if (!ops || !ops->callbacks.gro_receive)goto out_unlock;if (*(u8 *)iph != 0x45)goto out_unlock;if (unlikely(ip_fast_csum((u8 *)iph, 5)))goto out_unlock;id = ntohl(*(__be32 *)&iph->id);flush = (u16)((ntohl(*(__be32 *)iph) ^ skb_gro_len(skb)) | (id & ~IP_DF));id >>= 16;for (p = *head; p; p = p->next) {struct iphdr *iph2;u16 flush_id;//不是相同流的,跳过if (!NAPI_GRO_CB(p)->same_flow)continue;//off为skb的data偏移,因为驱动就已经把mac头剥离了,所以这里的p->data是指向ip头iph2 = (struct iphdr *)(p->data + off);/* The above works because, with the exception of the top* (inner most) layer, we only aggregate pkts with the same* hdr length so all the hdrs we'll need to verify will start* at the same offset.*///再次判断ip头,确认是同一条流if ((iph->protocol ^ iph2->protocol) |((__force u32)iph->saddr ^ (__force u32)iph2->saddr) |((__force u32)iph->daddr ^ (__force u32)iph2->daddr)) {NAPI_GRO_CB(p)->same_flow = 0;continue;}/* All fields must match except length and checksum. *///分片数据包不能groNAPI_GRO_CB(p)->flush |=(iph->ttl ^ iph2->ttl) |(iph->tos ^ iph2->tos) |(__force int)((iph->frag_off ^ iph2->frag_off) & htons(IP_DF));NAPI_GRO_CB(p)->flush |= flush;/* We need to store of the IP ID check to be included later* when we can verify that this packet does in fact belong* to a given flow.*/flush_id = (u16)(id - ntohs(iph2->id));/* This bit of code makes it much easier for us to identify* the cases where we are doing atomic vs non-atomic IP ID* checks.  Specifically an atomic check can return IP ID* values 0 - 0xFFFF, while a non-atomic check can only* return 0 or 0xFFFF.*/if (!NAPI_GRO_CB(p)->is_atomic ||!(iph->frag_off & htons(IP_DF))) {flush_id ^= NAPI_GRO_CB(p)->count;flush_id = flush_id ? 0xFFFF : 0;}/* If the previous IP ID value was based on an atomic* datagram we can overwrite the value and ignore it.*/if (NAPI_GRO_CB(skb)->is_atomic)NAPI_GRO_CB(p)->flush_id = flush_id;elseNAPI_GRO_CB(p)->flush_id |= flush_id;}NAPI_GRO_CB(skb)->is_atomic = !!(iph->frag_off & htons(IP_DF));NAPI_GRO_CB(skb)->flush |= flush;//设置ip头信息skb_set_network_header(skb, off);/* The above will be needed by the transport layer if there is one* immediately following this IP hdr.*///data_offset偏移增加ip头偏移skb_gro_pull(skb, sizeof(*iph));//设置传输层信息skb_set_transport_header(skb, skb_gro_offset(skb));pp = call_gro_receive(ops->callbacks.gro_receive, head, skb);out_unlock:rcu_read_unlock();out:NAPI_GRO_CB(skb)->flush |= flush;return pp;
}

tcp4_gro_receive

进入到传输层的GRO处理函数后,首先对待合并的skb做checksum校验;

static struct sk_buff **tcp4_gro_receive(struct sk_buff **head, struct sk_buff *skb)
{/* Don't bother verifying checksum if we're going to flush anyway. *///先对skb做checksum校验,检验通过后csum_validif (!NAPI_GRO_CB(skb)->flush &&skb_gro_checksum_validate(skb, IPPROTO_TCP,inet_gro_compute_pseudo)) {NAPI_GRO_CB(skb)->flush = 1;return NULL;}return tcp_gro_receive(head, skb);
}

校验通过后进一步调用tcp_gro_receive,在tcp_gro_receive里进一步根据tcp头部信息找到skb_list里相同的流,然后调用skb_gro_receive,skb_gro_receive为真正做GRO合并的处理函数,在skb_gro_receive将新的skb的线性区或非线性区合入到gro_skb的非线性区,合并完成后,同步更新gro_skb的data_len和len长度。如果合并过程发现gro_skb的非线性区域个数已经超过最大值(8个),则将skb最为一个新的数据包挂到gro_skb的next链表里。

int skb_gro_receive(struct sk_buff **head, struct sk_buff *skb)
{//走到这里说明head的skb与待合并的skb是同一条流struct skb_shared_info *pinfo, *skbinfo = skb_shinfo(skb);//skb->data基于skb->head的偏移(此时skb->data指向tcp头)unsigned int offset = skb_gro_offset(skb);//线性区长度unsigned int headlen = skb_headlen(skb);//skb的data数据长度(包括线性区和非线性区)unsigned int len = skb_gro_len(skb);struct sk_buff *lp, *p = *head;unsigned int delta_truesize;if (unlikely(p->len + len >= 65536))return -E2BIG;lp = NAPI_GRO_CB(p)->last;pinfo = skb_shinfo(lp);//skb的线性区长度不超过offset,说明skb的线性区没有data数据,因此从skb的非线性区拷贝数据//拷贝的数据放到gro_skb->last的非线性区if (headlen <= offset) {skb_frag_t *frag;skb_frag_t *frag2;int i = skbinfo->nr_frags;int nr_frags = pinfo->nr_frags + i;//如果这个gro_skb->last的frags已经超标,则将新加入的skb挂到gro_skb->last里if (nr_frags > MAX_SKB_FRAGS)goto merge;offset -= headlen;pinfo->nr_frags = nr_frags;skbinfo->nr_frags = 0;frag = pinfo->frags + nr_frags;frag2 = skbinfo->frags + i;do {*--frag = *--frag2;} while (--i);frag->page_offset += offset;skb_frag_size_sub(frag, offset);/* all fragments truesize : remove (head size + sk_buff) */delta_truesize = skb->truesize -SKB_TRUESIZE(skb_end_offset(skb));skb->truesize -= skb->data_len;skb->len -= skb->data_len;skb->data_len = 0;NAPI_GRO_CB(skb)->free = NAPI_GRO_FREE;goto done;} else if (skb->head_frag) {//将skb的线性区拷贝到拷贝到gro_skb->last的非线性区int nr_frags = pinfo->nr_frags;skb_frag_t *frag = pinfo->frags + nr_frags;struct page *page = virt_to_head_page(skb->head);unsigned int first_size = headlen - offset;unsigned int first_offset;if (nr_frags + 1 + skbinfo->nr_frags > MAX_SKB_FRAGS)goto merge;first_offset = skb->data -(unsigned char *)page_address(page) +offset;pinfo->nr_frags = nr_frags + 1 + skbinfo->nr_frags;frag->page.p     = page;frag->page_offset = first_offset;skb_frag_size_set(frag, first_size);memcpy(frag + 1, skbinfo->frags, sizeof(*frag) * skbinfo->nr_frags);/* We dont need to clear skbinfo->nr_frags here */delta_truesize = skb->truesize - SKB_DATA_ALIGN(sizeof(struct sk_buff));NAPI_GRO_CB(skb)->free = NAPI_GRO_FREE_STOLEN_HEAD;goto done;}merge://gro->last的空间已满(frags个数已经达到最多的16个),将待合并的skb挂到gro_skb->last里delta_truesize = skb->truesize;if (offset > headlen) {unsigned int eat = offset - headlen;skbinfo->frags[0].page_offset += eat;skb_frag_size_sub(&skbinfo->frags[0], eat);skb->data_len -= eat;skb->len -= eat;offset = headlen;}__skb_pull(skb, offset);if (NAPI_GRO_CB(p)->last == p)skb_shinfo(p)->frag_list = skb;elseNAPI_GRO_CB(p)->last->next = skb;NAPI_GRO_CB(p)->last = skb;__skb_header_release(skb);lp = p;done://合并完一个skb后,count计数加1NAPI_GRO_CB(p)->count++;//data_len长度加len,len为新合并的skb的长度,因为新合并的skb都是放在p的非线性区,所以data_len要增加p->data_len += len;p->truesize += delta_truesize;//整个skb长度增加lenp->len += len;if (lp != p) {lp->data_len += len;lp->truesize += delta_truesize;lp->len += len;}NAPI_GRO_CB(skb)->same_flow = 1;return 0;
}
EXPORT_SYMBOL_GPL(skb_gro_receive);

napi_gro_complete

当GRO合并过程中判断需要刷新gro_list或者gro_list的流个数超过8个,再或者napi_poll过程判断需要刷新gro_list时,会调用napi_gro_complete处理函数,然后进一步调用ip层的complete处理函数inet_gro_complete;

inet_gro_complete

在ip层回调函数里,根据最新的skb->len,跟新ip头的checksum,然后进一步调用传输层的complete函数tcp4_gro_complete;在tcp4_gro_complete更新一下tcp的伪头部checksum,然后最终调用netif_receive_skb_internal将gro skb上送协议栈。

static int inet_gro_complete(struct sk_buff *skb, int nhoff)
{__be16 newlen = htons(skb->len - nhoff);struct iphdr *iph = (struct iphdr *)(skb->data + nhoff);const struct net_offload *ops;int proto = iph->protocol;int err = -ENOSYS;if (skb->encapsulation) {skb_set_inner_protocol(skb, cpu_to_be16(ETH_P_IP));skb_set_inner_network_header(skb, nhoff);}//更新ip头的checksum,newlen为skb做gro合并后的新长度 csum_replace2(&iph->check, iph->tot_len, newlen);iph->tot_len = newlen;rcu_read_lock();ops = rcu_dereference(inet_offloads[proto]);if (WARN_ON(!ops || !ops->callbacks.gro_complete))goto out_unlock;/* Only need to add sizeof(*iph) to get to the next hdr below* because any hdr with option will have been flushed in* inet_gro_receive().*/err = ops->callbacks.gro_complete(skb, nhoff + sizeof(*iph));out_unlock:rcu_read_unlock();return err;
}

netif_receive_skb_internal

在netif_receive_skb_internal里,判断是否有开启rps,如果有,则通过enqueue_to_backlog对应cpu的softnet_data的input_pkt_queue队列,如果不需要rps,则通过__netif_receive_skb进一步上送协议栈,最后通过ip层注册的回调函数ip_rcv进入ip层。

static int netif_receive_skb_internal(struct sk_buff *skb)
{int ret;net_timestamp_check(netdev_tstamp_prequeue, skb);if (skb_defer_rx_timestamp(skb))return NET_RX_SUCCESS;rcu_read_lock();//检查是否需要rps,如果要,则将报文放到cpu的softnet队列里,并且触发软中断//软中断处理函数最终调用process_backlog从softnet队列里取出报文,上送协议栈
#ifdef CONFIG_RPSif (static_key_false(&rps_needed)) {struct rps_dev_flow voidflow, *rflow = &voidflow;int cpu = get_rps_cpu(skb->dev, skb, &rflow);if (cpu >= 0) {ret = enqueue_to_backlog(skb, cpu, &rflow->last_qtail);rcu_read_unlock();return ret;}}
#endif//不需要rps,直接上送协议栈ret = __netif_receive_skb(skb);rcu_read_unlock();return ret;
}

Linux GRO流程分析相关推荐

  1. RISC-V Linux 启动流程分析

    " Author:  通天塔 985400330@qq.com Date:    2022/05/15 Revisor: lzufalcon falcon@tinylab.org Proje ...

  2. 全志linux关机键,全志平台linux启动流程分析

    转载:全志平台linux启动流程分析 一.BROM阶段 机器上电之后会执行固化在BROM里面的一段引导程序,这个程序会依次遍历所有支持的启动介质,直到找到第一个支持的.目前支持的启动介质是sd/mmc ...

  3. Linux TSO流程分析

    1.TSO(transimit segment offload)是针对tcp而言的,是指协议栈可以将tcp 分段的操作offload到硬件的能力,本身需要硬件的支持.当网卡具有TSO能力时,上层协议栈 ...

  4. uboot启动linux内核流程分析(三)

    uboot bootz命令流程图 Uboot启动linux内核是使用bootz命令,bootz是如何启动linux内核?uboot的生命周期是怎么终止的?linux是如何启动? 启动linux内核的时 ...

  5. Linux中断流程分析

    裸机中断: 1.中断流入口 2.事先注册中断处理程序 3.根据中断源编号,调取处理程序 irq_svc:1.等到产生中断源的编号(每一个中断号都有一个描述结构) 2. 转载于:https://www. ...

  6. Linux网络协议栈:NAPI机制与处理流程分析(图解)

    Table of Contents NAPI机制 NAPI缺陷 使用 NAPI 先决条件 非NAPI帧的接收 netif_rx - 将网卡中收到的数据包放到系统中的接收队列中 enqueue_to_b ...

  7. Linux clock子系统【3】-i2c控制器打开时钟的流程分析(devm_clk_get)(consumer侧)

    文章目录 前言 一.硬件流程图 二.晶振设备树描述 三. I2CX时钟设备树描述 四.驱动中获得/使能时钟 4.1 流程源码分析 4.1.1 devm_clk_get(struct device *d ...

  8. Linux开机启动流程分析

    Linux开机启动十步骤 收藏分享2012-2-6 11:15| 发布者: 红黑魂| 查看数: 1366| 评论数: 0|来自: 比特网 摘要: 开机过程指的是从打开计算机电源直到LINUX显示用户登 ...

  9. LINUX 路由子系统流程分析

    title: LINUX 路由子系统流程分析 date: 2020-11-28 categories: Linux tags: Linux Routing-Subsystem 上次分析了Linux协议 ...

最新文章

  1. Spring MVC 原理探秘 - 一个请求的旅行过程
  2. SQL链表查询 数据库为空
  3. 阿里团队高效沟通的秘密,全在这5点!
  4. ACID中C与CAP定理中C的区别
  5. 初学servlet之使用web.xml配置
  6. 基础学习——C语言递归解决分鱼问题
  7. java多线程多态_Java学习之多线程
  8. 如何在恢复模式下启动 Mac?
  9. 叩丁狼java培训:LinkedList的原理介绍
  10. 计算一个字符串里面特定字符的个数
  11. Android bug日志/错误收集
  12. iOS开发:判断iPhone是否是刘海屏iPhoneX、iPhoneXR、iPhoneXs、iPhoneXs Max等
  13. 公众号快速涨粉方法汇总
  14. java hdms_网盘预研 - ZICK_ZEON的个人空间 - OSCHINA - 中文开源技术交流社区
  15. 运动世界校园一直显示服务器开小差,运动世界校园跑步异常 运动世界跑步成绩异常怎么办...
  16. OpenNI2的下载与安装
  17. SAP WM 上架策略R的几个幺蛾子
  18. PMP考试费用要多少?(含pmp资料)
  19. VOC和COCO数据集的介绍和转换
  20. 基于SSM的考试项目管理系统

热门文章

  1. 找2021考研资料?这些超强资源网站必须知道!
  2. Cortex-M3处理器的舞台
  3. 微信公众平台开发(15)--群发消息
  4. TorontoCity:众生观天下
  5. 协作通信-af df的matlab仿真,协作通信三种协作方式(AF+DF+CC)的matlab仿真程序
  6. html中如何写div中div的位置,position设置div的位置
  7. Linux命令之查看磁盘空间
  8. Protobuf3 使用..
  9. 浅谈Linux就业前景
  10. “第二届openGauss每日一练打卡活动” 获奖名单公布!