目录

有关监视和调整Linux网络堆栈的一般建议

总览

详细外观

协议族注册

通过套接字发送网络数据

sock_sendmsg,__sock_sendmsg和__sock_sendmsg_nosec

inet_sendmsg

UDP协议层

udp_sendmsg

udp_send_skb

监视:UDP协议层统计信息

调优:套接字发送队列内存

IP协议层

ip_send_skb

ip_local_out 和 __ip_local_out

netfilter和 nf_hook

目标缓存

ip_output

ip_finish_output

ip_finish_output2

dst_neigh_output

neigh_hh_output

n->output

监控:IP协议层

Linux网络设备子系统

Linux流量控制

dev_queue_xmit 和 __dev_queue_xmit

正在恢复 __dev_queue_xmit

__dev_xmit_skb

调优:传输数据包导向(XPS)

排队学科!

qdisc_run_begin 和 qdisc_run_end

__qdisc_run

qdisc_restart

提醒,同时循环 __qdisc_run

终于可以见到我们的朋友了 dev_hard_start_xmit

监控qdiscs

调优qdiscs

网络设备驱动程序

驱动注册

与传输数据 ndo_start_xmit

igb_tx_map

传输完成

监控网络设备

监视动态队列限制

调整网络设备

结束

附加功能

减少ARP流量(MSG_CONFIRM)

UDP瓶塞

时间戳记

结论

帮助Linux联网或其他系统

相关文章


这篇博客文章解释了运行Linux内核的计算机如何发送数据包,以及如何在数据包从用户程序流到网络硬件时监视和调整网络堆栈的每个组件。

这篇文章与我们之前的文章英文原文《监视和调整Linux网络堆栈:接收数据》或中文翻译《监视和调整Linux网络协议栈:接收数据》形成了一对。

如果不阅读内核的源代码并且对正在发生的事情有深刻的了解,就不可能调整或监视Linux网络堆栈。

希望该博客文章可以为希望这样做的任何人提供参考。

有关监视和调整Linux网络堆栈的一般建议


正如我们在上一篇文章中提到的,Linux网络堆栈很复杂,没有一种适合所有解决方案的监视或调整解决方案。如果您确实想调整网络堆栈,则别无选择,只能花费大量的时间,精力和金钱来了解网络系统各部分之间的交互方式。

本博客文章中提供的许多示例设置仅用于说明目的,不建议或反对某些配置或默认设置。在调整任何设置之前,您应该围绕需要监视的内容建立参考框架,以注意到有意义的更改。

通过网络连接到机器时调整网络设置很危险;您可以轻松地将自己锁定在外,或者完全断开网络连接。不要在生产机器上调整这些设置;取而代之的是,对新机器进行调整,然后将其轮流投入生产。

总览


作为参考,您可能希望随身携带一份器件数据手册。这篇文章将检查由igb设备驱动程序控制的Intel I350以太网控制器。您可以在此处找到该数据表(警告:大PDF)以供参考。

从用户程序到网络设备的高级路径网络数据如下:

  1. 数据是使用系统调用写入(如sendtosendmsg等人)。
  2. 数据通过套接字子系统传递到套接字的协议系列系统(在我们的示例中为AF_INET)。
  3. 协议族通过协议层(在许多情况下)将数据安排到数据包中传递数据。
  4. 数据通过路由层,沿途填充目标缓存和邻居缓存(如果它们很冷)。如果需要查找以太网地址,则可以生成ARP通信。
  5. 经过协议层后,数据包到达设备不可知层。
  6. 使用XPS(如果启用)或哈希函数选择输出队列
  7. 调用设备驱动程序的发送功能。
  8. 然后,数据将传递到连接到输出设备的队列规范(qdisc)。
  9. 如果可以,qdisc将直接传输数据,或者将其排队等待在NET_TXsoftirq 期间发送。
  10. 最终,数据从qdisc传递给驱动程序
  11. 驱动程序创建所需的DMA映射,以便设备可以从RAM读取数据。
  12. 驱动程序向设备发送信号,表明数据已准备好进行传输。
  13. 设备从RAM中获取数据并进行传输
  14. 传输完成后,设备会引发一个中断以信号传输完成
  15. 驱动程序为传输完成而注册的IRQ处理程序。对于许多设备,此处理程序仅触发NAPI轮询循环即可通过NET_RXsoftirq 开始运行。
  16. 轮询功能通过softIRQ运行,并调用驱动程序以取消映射DMA区域和释放数据包数据。

在以下各节中将详细检查整个流程。

下面检查的协议层是IP和UDP协议层。所提供的许多信息也将用作其他协议层的参考。

详细外观


本博客文章将研究Linux内核版本3.13.0链接到的代码在GitHub和代码段贯穿这篇文章,就像同伴后。

让我们开始研究协议系列如何在内核中注册并由套接字子系统使用,然后我们可以继续接收数据。

协议族注册

在用户程序中运行一段这样的代码以创建UDP套接字时会发生什么?

sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)

简而言之,Linux内核查找由UDP协议栈导出的一组函数,这些函数处理许多事情,包括发送和接收网络数据。要确切了解其工作原理,我们必须研究AF_INET地址族代码。

Linux内核会inet_init在内核初始化期间尽早执行该功能。此函数注册AF_INET协议家族,该家族中的各个协议栈(TCP,UDP,ICMP和RAW),并调用初始化例程以使协议栈准备就绪以处理网络数据。您可以inet_init在./net/ipv4/af_inet.c中找到该代码。

AF_INET协议族出口具有结构create功能。从用户程序创建套接字时,内核会调用此函数:

static const struct net_proto_family inet_family_ops = {.family = PF_INET,.create = inet_create,.owner  = THIS_MODULE,
};

inet_create函数接受传递给套接字系统调用的参数,并搜索已注册的协议,以找到一组链接到套接字的操作。看一看:

        /* Look for the requested type/protocol pair. */
lookup_protocol:err = -ESOCKTNOSUPPORT;rcu_read_lock();list_for_each_entry_rcu(answer, &inetsw[sock->type], list) {err = 0;/* Check the non-wild match. */if (protocol == answer->protocol) {if (protocol != IPPROTO_IP)break;} else {/* Check for the two wild cases. */if (IPPROTO_IP == protocol) {protocol = answer->protocol;break;}if (IPPROTO_IP == answer->protocol)break;}err = -EPROTONOSUPPORT;}

稍后,answer它保留对特定协议栈的引用,并将其ops字段复制到套接字结构中:

sock->ops = answer->ops;

您可以在中找到所有协议栈的结构定义af_inet.c。让我们看一下TCP和UDP协议结构:

/* Upon startup we insert all the elements in inetsw_array[] into* the linked list inetsw.*/
static struct inet_protosw inetsw_array[] =
{{.type =       SOCK_STREAM,.protocol =   IPPROTO_TCP,.prot =       &tcp_prot,.ops =        &inet_stream_ops,.no_check =   0,.flags =      INET_PROTOSW_PERMANENT |INET_PROTOSW_ICSK,},{.type =       SOCK_DGRAM,.protocol =   IPPROTO_UDP,.prot =       &udp_prot,.ops =        &inet_dgram_ops,.no_check =   UDP_CSUM_DEFAULT,.flags =      INET_PROTOSW_PERMANENT,},/* .... more protocols ... */
/* Upon startup we insert all the elements in inetsw_array[] into* the linked list inetsw.*/
static struct inet_protosw inetsw_array[] =
{{.type =       SOCK_STREAM,.protocol =   IPPROTO_TCP,.prot =       &tcp_prot,.ops =        &inet_stream_ops,.no_check =   0,.flags =      INET_PROTOSW_PERMANENT |INET_PROTOSW_ICSK,},{.type =       SOCK_DGRAM,.protocol =   IPPROTO_UDP,.prot =       &udp_prot,.ops =        &inet_dgram_ops,.no_check =   UDP_CSUM_DEFAULT,.flags =      INET_PROTOSW_PERMANENT,},{.type =       SOCK_DGRAM,.protocol =   IPPROTO_ICMP,.prot =       &ping_prot,.ops =        &inet_dgram_ops,.no_check =   UDP_CSUM_DEFAULT,.flags =      INET_PROTOSW_REUSE,},{.type =       SOCK_RAW,.protocol =   IPPROTO_IP,  /* wild card */.prot =       &raw_prot,.ops =        &inet_sockraw_ops,.no_check =   UDP_CSUM_DEFAULT,.flags =      INET_PROTOSW_REUSE,}
};

在的情况下IPPROTO_UDPops结构链接到位,该结构包含用于各种事物的功能,包括发送和接收数据:

const struct proto_ops inet_dgram_ops = {.family        = PF_INET,.owner          = THIS_MODULE,/* ... */.sendmsg      = inet_sendmsg,.recvmsg       = inet_recvmsg,/* ... */
};
EXPORT_SYMBOL(inet_dgram_ops);

以及特定于协议的结构prot,其中包含指向所有内部UDP协议栈功能的功能指针。对于UDP协议,此结构被调用udp_prot,并由./net/ipv4/udp.c导出:

struct proto udp_prot = {.name           = "UDP",.owner          = THIS_MODULE,/* ... */.sendmsg      = udp_sendmsg,.recvmsg    = udp_recvmsg,/* ... */
};
EXPORT_SYMBOL(udp_prot);

现在,我们来看一个发送UDP数据的用户程序,以了解udp_sendmsg内核中的调用方式!

通过套接字发送网络数据


用户程序想要发送UDP网络数据,因此它使用sendto系统调用,也许像这样:

ret = sendto(socket, buffer, buflen, 0, &dest, sizeof(dest));

这个系统调用通过Linux系统调用层和土地在这个函数中./net/socket.c

/**      Send a datagram to a given address. We move the address into kernel*      space and check the user space data area is readable before invoking*      the protocol.*/SYSCALL_DEFINE6(sendto, int, fd, void __user *, buff, size_t, len,unsigned int, flags, struct sockaddr __user *, addr,int, addr_len)
{/*  ... code ... */err = sock_sendmsg(sock, &msg, len);/* ... code  ... */
}

SYSCALL_DEFINE6宏观展现出来的变成了一堆宏,反过来,成立创建与6个参数(因此系统调用所需的基础设施DEFINE6)。结果之一是,在内核内部,系统调用函数名称已被sys_添加到它们前面。

以较低层将能够处理的方式安排数据之后,用于sendto呼叫的系统调用代码sock_sendmsg。特别是,它接收传入的目标地址sendto并将其安排到一个结构中,让我们看一下:

  iov.iov_base = buff;iov.iov_len = len;msg.msg_name = NULL;msg.msg_iov = &iov;msg.msg_iovlen = 1;msg.msg_control = NULL;msg.msg_controllen = 0;msg.msg_namelen = 0;if (addr) {err = move_addr_to_kernel(addr, addr_len, &address);if (err < 0)goto out_put;msg.msg_name = (struct sockaddr *)&address;msg.msg_namelen = addr_len;}

这段代码正在复制addr,通过用户程序传递到内核数据结构中address,然后嵌入到struct msghdr结构中msg_name。这类似于userland程序在调用sendmsg而不是时将执行的操作sendto。内核提供这种突变,因为两者sendtosendmsg做电到sock_sendmsg

sock_sendmsg__sock_sendmsg__sock_sendmsg_nosec


sock_sendmsg在调用之前执行一些错误检查,然后在调用之前进行__sock_sendmsg自己的错误检查__sock_sendmsg_nosec__sock_sendmsg_nosec将数据更深地传递到套接字子系统:

static inline int __sock_sendmsg_nosec(struct kiocb *iocb, struct socket *sock,struct msghdr *msg, size_t size)
{struct sock_iocb *si =  ..../* other code ... */return sock->ops->sendmsg(iocb, sock, msg, size);
}

从解释套接字创建的上一节中可以看到,sendmsg注册到此套接字操作结构的函数为inet_sendmsg

inet_sendmsg


正如您可能从名称中猜到的那样,这是AF_INET协议系列提供的通用功能。该函数通过调用sock_rps_record_flow来记录处理该流的最后一个CPU。这由接收数据包导向使用。接下来,此函数sendmsg在套接字的内部协议操作结构上查找该函数并调用它:

int inet_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,size_t size)
{struct sock *sk = sock->sk;sock_rps_record_flow(sk);/* We may need to bind the socket. */if (!inet_sk(sk)->inet_num && !sk->sk_prot->no_autobind &&inet_autobind(sk))return -EAGAIN;return sk->sk_prot->sendmsg(iocb, sk, msg, size);
}
EXPORT_SYMBOL(inet_sendmsg);

在处理UDP时,sk->sk_prot->sendmsg以上内容是udp_sendmsg通过udp_prot我们之前看到的结构由UDP协议层导出的。该函数调用从通用AF_INET协议系列过渡到UDP协议栈。

UDP协议层


udp_sendmsg

udp_sendmsg功能可以在./net/ipv4/udp.c中找到。整个功能很长,因此我们将在下面对其进行检查。如果您想完整阅读它,请遵循上一个链接。

UDP瓶塞

在变量声明和一些基本的错误检查之后,要做的第一件事udp_sendmsg就是检查套接字是否“塞住”。UDP插入是一项功能,它允许用户程序请求内核send在发送之前将多次调用中的数据累积到单个数据报中。有两种方法可以在用户程序中启用此选项:

  1. 使用setsockopt系统调用并将其UDP_CORK作为套接字选项传递。
  2. 通过MSG_MORE为一体的flags呼叫时sendsendtosendmsg从你的程序。

这些选项分别记录在UDP手册页和send / sendto / sendmsg手册页中。

来自的代码将udp_sendmsg检查up->pending以确定套接字当前是否已插入塞子,如果是,则直接进入附加数据。稍后我们将了解如何添加数据。

int udp_sendmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,size_t len)
{/* variables and error checking ... */fl4 = &inet->cork.fl.u.ip4;if (up->pending) {/** There are pending frames.* The socket lock must be held while it's corked.*/lock_sock(sk);if (likely(up->pending)) {if (unlikely(up->pending != AF_INET)) {release_sock(sk);return -EINVAL;}goto do_append_data;}release_sock(sk);}

获取UDP目的地址和端口

接下来,从两个可能的来源之一确定目标地址和端口:

  1. 套接字本身已存储了目标地址,因为套接字已连接到某个点。
  2. 该地址通过辅助结构传递,如我们在的内核代码中所见sendto

内核如何处理此问题:

  /**      Get and verify the address.*/if (msg->msg_name) {struct sockaddr_in *usin = (struct sockaddr_in *)msg->msg_name;if (msg->msg_namelen < sizeof(*usin))return -EINVAL;if (usin->sin_family != AF_INET) {if (usin->sin_family != AF_UNSPEC)return -EAFNOSUPPORT;}daddr = usin->sin_addr.s_addr;dport = usin->sin_port;if (dport == 0)return -EINVAL;} else {if (sk->sk_state != TCP_ESTABLISHED)return -EDESTADDRREQ;daddr = inet->inet_daddr;dport = inet->inet_dport;/* Open fast path for connected socket.Route will not be used, if at least one option is set.*/connected = 1;}

是的,TCP_ESTABLISHED在UDP协议层中!更好或更坏的套接字状态使用TCP状态描述。

回想一下前面的内容,我们看到了struct msghdr当用户程序调用时内核如何代表用户安排结构sendto。上面的代码显示了内核如何解析出该数据以便进行设置daddrdport

如果udp_sendmsg内核功能安排struct msghdr结构而达到了该功能,则将从套接字本身中检索目标地址和端口,并将该套接字标记为“已连接”。

无论哪种情况daddrdport都将被设置为目标地址和端口。

套接字传输簿记和时间戳

接着,将源地址,设备索引,并且其被设置在插座上的任何时间戳选项(例如SOCK_TIMESTAMPING_TX_HARDWARESOCK_TIMESTAMPING_TX_SOFTWARESOCK_WIFI_STATUS)被检索并存储:

ipc.addr = inet->inet_saddr;ipc.oif = sk->sk_bound_dev_if;sock_tx_timestamp(sk, &ipc.tx_flags);

辅助消息,通过 sendmsg

sendmsgrecvmsg系统调用允许用户组或在除了发送或接收数据包请求辅助数据。用户程序可以通过在struct msghdr其中嵌入请求的方式编写一个辅助数据来使用此辅助数据。IP的手册页中记录了许多辅助数据类型。

辅助数据的一个流行示例是IP_PKTINFO。在sendmsg这种数据类型的情况下,允许程序设置struct in_pktinfo要在发送数据时使用的a 。程序可以通过在struct in_pktinfo结构中填写字段来指定要在数据包上使用的源地址。如果程序是侦听多个IP地址的服务器程序,则此选项很有用。在这种情况下,服务器程序可能希望使用与客户端用来联系服务器的IP地址相同的IP地址回复客户端。IP_PKTINFO恰好启用了该用例。

类似地,当IP_TTLIP_TOS辅助消息与数据一起从用户程序传递到用户包时,用户消息和辅助消息使用户可以按包设置IP包TTL和TOS值sendmsg。请注意,可以使用来为所有传出数据包在套接字级别上同时设置IP_TTL和,而不是根据需要按每个数据包设置。Linux内核使用数组将指定的TOS值转换为优先级。优先级影响从队列规则传输数据包的方式和时间。稍后,我们将详细介绍其含义。IP_TOSsetsockopt

我们可以看到内核如何处理sendmsgUDP套接字上的辅助消息:

if (msg->msg_controllen) {err = ip_cmsg_send(sock_net(sk), msg, &ipc,sk->sk_family == AF_INET6);if (err)return err;if (ipc.opt)free = 1;connected = 0;
}

解析辅助消息的内部是通过处理ip_cmsg_send从./net/ipv4/ip_sockglue.c。请注意,仅提供任何辅助数据就将该套接字标记为未连接。

设置自定义IP选项

接下来,sendmsg将检查用户是否在附带消息中指定了任何自定义IP选项。如果设置了选项,将使用它们。如果没有,将使用此套接字已使用的选项:

if (!ipc.opt) {struct ip_options_rcu *inet_opt;rcu_read_lock();inet_opt = rcu_dereference(inet->inet_opt);if (inet_opt) {memcpy(&opt_copy, inet_opt,sizeof(*inet_opt) + inet_opt->opt.optlen);ipc.opt = &opt_copy.opt;}rcu_read_unlock();
}

接下来,该功能检查是否设置了源记录路由(SRR)IP选项。源记录路由有两种类型:松散和严格的源记录路由。如果设置了此选项,则将第一跳地址记录并存储为faddr,并将套接字标记为“未连接”。稍后将使用:

ipc.addr = faddr = daddr;if (ipc.opt && ipc.opt->opt.srr) {if (!daddr)return -EINVAL;faddr = ipc.opt->opt.faddr;connected = 0;
}

处理SRR选项后,可以从用户通过辅助消息设置的值或套接字当前正在使用的值中检索TOS IP标志。随后进行检查以确定是否:

  • SO_DONTROUTE设置在插座上(带有setsockopt),或
  • MSG_DONTROUTE在调用sendtosendmsg或时被指定为标志
  • is_strictroute被置位,表明严格源记录路由期望

然后,将toshas 0x1RTO_ONLINK)添加到其位集中,并且将套接字视为未“连接”:

tos = get_rttos(&ipc, inet);
if (sock_flag(sk, SOCK_LOCALROUTE) ||(msg->msg_flags & MSG_DONTROUTE) ||(ipc.opt && ipc.opt->opt.is_strictroute)) {tos |= RTO_ONLINK;connected = 0;
}

组播还是单播?

接下来,代码尝试处理多播。如前所述,这有点棘手,因为用户可以通过发送辅助IP_PKTINFO消息来指定从何处发送数据包的备用源地址或设备索引。

如果目标地址是多播地址:

  1. 将数据包写入位置的设备索引将设置为多播设备索引,并且
  2. 数据包上的源地址将设置为多播源地址。

除非用户没有通过发送IP_PKTINFO辅助消息来覆盖设备索引。让我们来看看:

if (ipv4_is_multicast(daddr)) {if (!ipc.oif)ipc.oif = inet->mc_index;if (!saddr)saddr = inet->mc_addr;connected = 0;
} else if (!ipc.oif)ipc.oif = inet->uc_index;

如果目标地址不是多播地址,则除非用户使用覆盖设备索引,否则将设置设备索引IP_PKTINFO

路由

现在该进行路由了!

UDP层中用于路由的代码以快速路径开头。如果套接字已连接,请尝试获取路由结构:

if (connected)rt = (struct rtable *)sk_dst_check(sk, 0);

如果套接字未连接,或者路由套接字帮助程序sk_dst_check确定路由已过时,则代码将移入慢速路径以生成路由结构。首先从调用flowi4_init_output构造描述此UDP流的结构开始:

if (rt == NULL) {struct net *net = sock_net(sk);fl4 = &fl4_stack;flowi4_init_output(fl4, ipc.oif, sk->sk_mark, tos,RT_SCOPE_UNIVERSE, sk->sk_protocol,inet_sk_flowi_flags(sk)|FLOWI_FLAG_CAN_SLEEP,faddr, saddr, dport, inet->inet_sport);

一旦构造了此流结构,便将套接字及其流结构传递到安全子系统,以便SELinux或SMACK之类的系统可以在流结构上设置安全性id值。接下来,ip_route_output_flow将调用IP路由代码以为此流生成路由结构:

security_sk_classify_flow(sk, flowi4_to_flowi(fl4));
rt = ip_route_output_flow(net, fl4, sk);

如果无法生成路由结构且错误为ENETUNREACH,则OUTNOROUTES统计计数器将增加。

if (IS_ERR(rt)) {err = PTR_ERR(rt);rt = NULL;if (err == -ENETUNREACH)IP_INC_STATS(net, IPSTATS_MIB_OUTNOROUTES);goto out;
}

包含这些统计信息计数器和其他可用计数器的文件的位置及其含义将在下面的UDP监视部分中讨论。

接下来,如果路由是广播的,但是未在套接字上SOCK_BROADCAST设置套接字选项,则代码终止。如果套接字被视为“已连接”(如本功能所述),则路由结构将缓存在套接字上:

err = -EACCES;
if ((rt->rt_flags & RTCF_BROADCAST) &&!sock_flag(sk, SOCK_BROADCAST))goto out;
if (connected)sk_dst_set(sk, dst_clone(&rt->dst));

防止ARP缓存过时 MSG_CONFIRM

如果用户MSG_CONFIRM在调用sendsendto或时指定了该标志sendmsg,则UDP协议层现在将处理该标志:

  if (msg->msg_flags&MSG_CONFIRM)goto do_confirm;
back_from_confirm:

该标志指示系统确认ARP缓存条目仍然有效,并防止对其进行垃圾回收。该dst_confirm函数只是在目标高速缓存条目上设置一个标志,当查询邻居高速缓存并找到条目时,将在以后检查该标志。我们稍后会再次看到。UDP网络应用程序中通常使用此功能,以减少不必要的ARP通信。该do_confirm标签靠近该函数结束时发现的,但它很简单:

do_confirm:dst_confirm(&rt->dst);if (!(msg->msg_flags&MSG_PROBE) || len)goto back_from_confirm;err = 0;goto out;

back_from_confirm如果不是探针,则此代码确认高速缓存条目并跳回到。

一旦do_confirm代码跳回到back_from_confirm(或首先没有发生跳变do_confirm),代码将尝试接下来处理UDP软木塞和未软木塞的情况。

未插塞UDP套接字的快速路径:准备要发送的数据

如果不要求UDP插入,则可以将数据打包到中struct sk_buff并传递到udp_send_skb堆栈中,并向下移至IP协议层。这是通过调用来完成的ip_make_skb。请注意,也将ip_route_output_flow传入先前通过调用生成的路由结构。它将附加到skb,稍后在IP协议层中使用。

/* Lockless fast path for the non-corking case. */
if (!corkreq) {skb = ip_make_skb(sk, fl4, getfrag, msg->msg_iov, ulen,sizeof(struct udphdr), &ipc, &rt,msg->msg_flags);err = PTR_ERR(skb);if (!IS_ERR_OR_NULL(skb))err = udp_send_skb(skb, fl4);goto out;
}

ip_make_skb函数将尝试考虑多种因素来构造skb,例如:

  • 该MTU。
  • UDP瓶塞(如果启用)。
  • UDP分片卸载(UFO)。
  • 如果不支持UFO,并且要传输的数据大小大于MTU,则为碎片。

大多数网络设备驱动程序不支持UFO,因为网络硬件本身不支持此功能。让我们看一下这段代码,记住禁用了软木塞。接下来,我们将看一下启用软木塞的路径。

ip_make_skb

ip_make_skb功能可以在./net/ipv4/ip_output.c中找到。此功能有点棘手。ip_make_skb为了构建skb而需要使用的较低级别的代码,需要一个软木塞结构和队列,在该队列中将传入skb。在未软木塞的情况下,将传递人造软木塞结构和空队列。作为假人。

让我们看一下如何设置人造软木塞结构和队列:

struct sk_buff *ip_make_skb(struct sock *sk, /* more args */)
{struct inet_cork cork;struct sk_buff_head queue;int err;if (flags & MSG_PROBE)return NULL;__skb_queue_head_init(&queue);cork.flags = 0;cork.addr = 0;cork.opt = NULL;err = ip_setup_cork(sk, &cork, /* more args */);if (err)return ERR_PTR(err);

如上所示,软木塞结构(cork)和队列(queue)都是堆栈分配的;ip_make_skb在完成之前都不需要。人造软木塞结构是通过调用来建立的,该调用ip_setup_cork分配内存并初始化该结构。接下来,__ip_append_data调用,并传入队列和阻塞化结构:

err = __ip_append_data(sk, fl4, &queue, &cork,&current->task_frag, getfrag,from, length, transhdrlen, flags);

稍后我们将看到此函数的工作方式,因为无论是否插入插座,这两种情况都将使用它。现在,我们需要知道的是__ip_append_data将创建一个skb,向其添加数据,并将该skb添加到传入的队列中。如果添加数据失败,__ip_flush_pending_frame则调用该操作将数据放在地板上,错误代码为向上返回:

if (err) {__ip_flush_pending_frames(sk, &queue, &cork);return ERR_PTR(err);
}

最后,如果没有发生错误,__ip_make_skb将使排队的skb出队,添加IP选项,并返回准备好传递到较低层以进行发送的skb:

return __ip_make_skb(sk, fl4, &queue, &cork);

传输数据!

如果没有发生错误,则将skb传递到udp_send_skb该skb ,它将把skb传递到网络协议栈的下一层,即IP协议栈:

err = PTR_ERR(skb);
if (!IS_ERR_OR_NULL(skb))err = udp_send_skb(skb, fl4);
goto out;

如果有错误,将在以后处理。有关更多信息,请参见UDP纠正情况下的“错误计费”部分。

没有预先存在的临时数据的临时UDP套接字的慢速路径

如果正在使用UDP阻塞,但没有阻塞先前存在的数据,则慢路径开始:

  1. 锁定插座。
  2. 检查应用程序错误:已“重新塞好”的塞住的插座。
  3. 此UDP流程的流程结构已准备好用于连接。
  4. 要发送的数据将附加到现有数据中。

您可以在下一段代码中继续下去udp_sendmsg

  lock_sock(sk);if (unlikely(up->pending)) {/* The socket is already corked while preparing it. *//* ... which is an evident application bug. --ANK */release_sock(sk);LIMIT_NETDEBUG(KERN_DEBUG pr_fmt("cork app bug 2\n"));err = -EINVAL;goto out;}/**      Now cork the socket to pend data.*/fl4 = &inet->cork.fl.u.ip4;fl4->daddr = daddr;fl4->saddr = saddr;fl4->fl4_dport = dport;fl4->fl4_sport = inet->inet_sport;up->pending = AF_INET;do_append_data:up->len += ulen;err = ip_append_data(sk, fl4, getfrag, msg->msg_iov, ulen,sizeof(struct udphdr), &ipc, &rt,corkreq ? msg->msg_flags|MSG_MORE : msg->msg_flags);

ip_append_data

ip_append_data是一个小包装函数之前调用下来做了两件大事__ip__append_data

  1. 检查MSG_PROBE标志是否从用户传入。此标志指示用户不想真正发送数据。应该探测路径(例如,确定PMTU)。
  2. 检查套接字的发送队列是否为空。如果是这样,则这意味着没有待处理的软木塞数据,因此ip_setup_cork被称为设置软木塞。

处理完上述条件后,将__ip_append_data调用该函数,其中包含用于将数据处理为数据包的大部分逻辑。

__ip_append_data

ip_append_data如果套接字是软木塞的,或者ip_make_skb不是套接字是软木塞的,则调用此函数。无论哪种情况,此函数都将分配一个新的缓冲区来存储传入的数据,或者将数据附加到现有数据之后。

此工作围绕套接字的发送队列进行的方式。等待发送的现有数据(例如,如果套接字已插入),队列中将有一个条目,可以附加其他数据。

这个功能很复杂;它执行了几轮计算,以确定如何构造将传递到较低级网络层的skb,并且对于了解网络数据的传输方式,严格检查缓冲区分配过程并非绝对必要。

此功能的重要亮点包括:

  1. 如果硬件支持,则处理UDP分段卸载(UFO)。绝大多数网络硬件不支持UFO。如果您的网卡驱动程序确实支持它,它将设置功能标记NETIF_F_UFO
  2. 处理支持分散/收集IO的网卡。许多卡都支持此功能,并且使用NETIF_F_SG功能标记进行宣传。此功能的可用性表明,网卡可以处理传输数据的数据包,其中数据已在一组缓冲区中分配;内核不需要花费时间将多个缓冲区合并为一个缓冲区。需要避免这种额外的复制,并且大多数网卡都支持这种复制。
  3. 通过调用跟踪发送队列的大小sock_wmalloc。分配新的skb时,该skb的大小将计入拥有它的套接字,并且为套接字的发送队列分配的字节将增加。如果发送队列中没有足够的空间,则不会分配skb,并且会返回并跟踪错误。我们将在下面的调整部分中了解如何设置套接字发送队列的大小。
  4. 错误统计信息递增。此功能中的任何错误都会增加“丢弃”。我们将在下面的监视部分中了解如何读取此值。

成功完成此功能后,0将返回并将要传输的数据组装为适合网络设备并正在发送队列中等待的skb。

在未塞住的情况下,保存skb的队列将传递到__ip_make_skb上面描述的队列,在该队列中将skb 出队并准备通过进行发送udp_send_skb

在软木塞的情况下,的返回值__ip_append_data向上传递。数据位于发送队列上,直到udp_sendmsg确定是时候进行呼叫了udp_push_pending_frames,这将最终确定skb和call udp_send_skb

冲洗软木塞

现在,udp_sendmsg将继续检查来自的返回值(err如下)__ip_append_skb

if (err)udp_flush_pending_frames(sk);
else if (!corkreq)err = udp_push_pending_frames(sk);
else if (unlikely(skb_queue_empty(&sk->sk_write_queue)))up->pending = 0;
release_sock(sk);

让我们看一下每种情况:

  1. 如果存在错误(err非零),则将udp_flush_pending_frames调用,这将取消栓塞并从套接字的发送队列中删除所有数据。
  2. 如果未MSG_MORE指定发送此数据,则调用udp_push_pending_frames将尝试将数据传递到较低的网络层。
  3. 如果发送队列为空,则将套接字标记为不再插入。

如果附加操作成功完成,并且有更多数据要塞去,那么代码将通过清理并返回附加数据的长度来继续:

ip_rt_put(rt);
if (free)kfree(ipc.opt);
if (!err)return len;

这就是内核如何处理软木塞UDP套接字的方法。

错误会计

如果:

  1. 无塞子的快速路径无法生成skb或udp_send_skb报告错误,或者
  2. ip_append_data 无法将数据附加到已塞好的UDP套接字,或者
  3. udp_push_pending_framesudp_send_skb尝试传输已塞好的skb时返回错误

SNDBUFERRORS仅当收到的错误是ENOBUFS(没有可用的内核内存)或套接字已SOCK_NOSPACE设置(发送队列已满)时,统计信息才会递增:

/** ENOBUFS = no kernel mem, SOCK_NOSPACE = no sndbuf space.  Reporting* ENOBUFS might not be good (it's not tunable per se), but otherwise* we don't have a good statistic (IpOutDiscards but it can be too many* things).  We could add another new stat but at least for now that* seems like overkill.*/
if (err == -ENOBUFS || test_bit(SOCK_NOSPACE, &sk->sk_socket->flags)) {UDP_INC_STATS_USER(sock_net(sk),UDP_MIB_SNDBUFERRORS, is_udplite);
}
return err;

我们将在下面的监视部分中了解如何读取这些计数器。

udp_send_skb


udp_send_skb功能是如何udp_sendmsg最终将推动一个SKB到网络堆栈的下一层,在这种情况下,IP协议层。此功能做一些重要的事情:

  1. 将UDP标头添加到skb。
  2. 处理校验和:软件校验和,硬件校验和,或不校验(如果禁用)。
  3. 尝试通过调用将skb发送到IP协议层ip_send_skb
  4. 用于成功或失败传输的增量统计计数器。

让我们来看看。首先,创建一个UDP头:

static int udp_send_skb(struct sk_buff *skb, struct flowi4 *fl4)
{/* useful variables ... *//** Create a UDP header*/uh = udp_hdr(skb);uh->source = inet->inet_sport;uh->dest = fl4->fl4_dport;uh->len = htons(len);uh->check = 0;

接下来,处理校验和。有几种情况:

  1. 首先处理UDP-Lite校验和。
  2. 接着,如果插座被设置为不产生在所有(通过校验和setsockoptSO_NO_CHECK),它将被标记为这样。
  3. 接下来,如果硬件支持UDP校验和,udp4_hwcsum则将调用它进行设置。请注意,如果数据包被分段,内核将在软件中生成校验和。您可以在的源代码中udp4_hwcsum看到它。
  4. 最后,通过调用来生成软件校验和udp_csum
if (is_udplite)                                  /*     UDP-Lite      */csum = udplite_csum(skb);else if (sk->sk_no_check == UDP_CSUM_NOXMIT) {   /* UDP csum disabled */skb->ip_summed = CHECKSUM_NONE;goto send;} else if (skb->ip_summed == CHECKSUM_PARTIAL) { /* UDP hardware csum */udp4_hwcsum(skb, fl4->saddr, fl4->daddr);goto send;} elsecsum = udp_csum(skb);

接下来,添加psuedo标头:

uh->check = csum_tcpudp_magic(fl4->saddr, fl4->daddr, len,sk->sk_protocol, csum);
if (uh->check == 0)uh->check = CSUM_MANGLED_0;

如果校验和为0,则根据RFC 768将等效补码设置为校验和。最后,将skb传递到IP协议栈,并递增统计信息:

send:err = ip_send_skb(sock_net(sk), skb);if (err) {if (err == -ENOBUFS && !inet->recverr) {UDP_INC_STATS_USER(sock_net(sk),UDP_MIB_SNDBUFERRORS, is_udplite);err = 0;}} elseUDP_INC_STATS_USER(sock_net(sk),UDP_MIB_OUTDATAGRAMS, is_udplite);return err;

如果ip_send_skb成功完成,则OUTDATAGRAMS统计量将递增。如果IP协议层报告错误,SNDBUFERRORS则递增该错误,但前提是该错误是ENOBUFS(缺少内核内存)并且没有启用错误队列。

在进入IP协议层之前,让我们看一下如何在Linux内核中监视和调整UDP协议层。

监视:UDP协议层统计信息


用于获取UDP协议统计信息的两个非常有用的文件是:

  • /proc/net/snmp
  • /proc/net/udp

/proc/net/snmp

通过阅读监视详细的UDP协议统计信息/proc/net/snmp

$ cat / proc / net / snmp | grep Udp \:
Udp:InDatagrams NoPorts InErrors OutDatagrams RcvbufErrors SndbufErrors
Udp:16314 0 0 17161 0 0

为了准确了解这些统计信息在何处递增,您需要仔细阅读内核源代码。在某些情况下,某些错误被统计在多个统计中。

  • InDatagramsrecvmsg由userland程序用来读取数据报的时间增加。当UDP数据包被封装并发回进行处理时,该值也会增加。
  • NoPorts:当UDP数据包到达没有程序正在侦听的端口时增加。
  • InErrors:在以下几种情况下增加:接收队列中没有内存,看到错误的校验和,以及sk_add_backlog添加数据报失败。
  • OutDatagrams:当将UDP数据包无误传递到要发送的IP协议层时增加。
  • RcvbufErrors:在sock_queue_rcv_skb报告没有可用内存时增加;如果sk->sk_rmem_alloc大于或等于,则会发生这种情况sk->sk_rcvbuf
  • SndbufErrors:如果IP协议层在尝试发送数据包时报告错误,并且未设置错误队列,则增加。如果没有发送队列空间或内核内存可用,也将增加。
  • InCsumErrors:当检测到UDP校验和失败时增加。请注意,在所有可以找到的情况下,InCsumErrors都与递增InErrors。因此,InErrorsInCsumErros应该在接收端产生与内存相关的错误计数。

请注意,UDP协议层发现的一些错误会在其他协议层的统计文件中报告。这样的一个例子:路由错误。发现的路由错误udp_sendmsg将导致IP协议层的OutNoRoutes统计信息增加。

/proc/net/udp

通过阅读监视UDP套接字统计信息 /proc/net/udp

$ cat / proc / net / udpsl local_address rem_address st tx_queue rx_queue tr tm->当retrnsmt uid超时inode ref指针掉落时515:00000000:B346 00000000:0000 07 00000000:00000000 00:00000000 00000000 104 0 7518 2 0000000000000000 0558:00000000:0371 00000000:0000 07 00000000:00000000 00:00000000 00000000 0 0 7408 2 0000000000000000 0588:0100007F:038F 00000000:0000 07 00000000:00000000 00:00000000 00000000 0 0 7511 2 0000000000000000 0769:00000000:0044 00000000:0000 07 00000000:00000000 00:00000000 00000000 0 0 7673 2 0000000000000000 0812:00000000:006F 00000000:0000 07 00000000:00000000 00:00000000 00000000 0 0 7407 2 0000000000000000 0

第一行描述了以下各行中的每个字段:

  • sl:套接字的内核哈希槽
  • local_address:套接字的十六进制本地地址和端口号,以分隔:
  • rem_address:套接字的十六进制远程地址和端口号,以分隔:
  • st:套接字的状态。奇怪的是,UDP协议层似乎使用了某些TCP套接字状态。在上面的示例中,7TCP_CLOSE
  • tx_queue:在内核中为传出UDP数据报分配的内存量。
  • rx_queue:内核中为传入的UDP数据报分配的内存量。
  • trtm->whenretrnsmt:这些字段是通过UDP协议层使用。
  • uid:创建此套接字的用户的有效用户ID。
  • timeout:UDP协议层未使用。
  • inode:与此套接字对应的索引节点号。您可以使用它来帮助您确定哪个用户进程打开了此套接字。检查/proc/[pid]/fd,其中将包含指向的符号链接socket[:inode]
  • ref:套接字的当前引用计数。
  • pointer:内核中的内存地址struct sock
  • drops:与此套接字关联的数据报丢弃数。请注意,这不包括与发送数据报有关的任何丢弃(在已塞好的UDP套接字上或以其他方式);从本博客文章所检查的内核版本开始,该值仅在接收路径中递增。

可以在中找到net/ipv4/udp.c输出此代码的代码。

调优:套接字发送队列内存


可以通过设置net.core.wmem_maxsysctl 来调整发送队列(也称为写队列)的最大大小。

通过设置来增加最大发送缓冲区的大小sysctl

$ sudo sysctl -w net.core.wmem_max = 8388608

sk->sk_write_queue从该net.core.wmem_default值开始,也可以通过设置sysctl进行调整,如下所示:

通过设置来调整默认的初始发送缓冲区大小sysctl

$ sudo sysctl -w net.core.wmem_default = 8388608

您还可以sk->sk_write_queue通过setsockopt从应用程序调用并传递来设置大小SO_SNDBUF。您可以设置的最大值setsockoptnet.core.wmem_max

但是,您可以net.core.wmem_max通过调用setsockopt和传递来覆盖限制SO_SNDBUFFORCE,但是运行应用程序的用户需要此CAP_NET_ADMIN功能。

sk->sk_wmem_alloc每次通过调用分配skb时,都会递增__ip_append_data。正如我们将看到的,UDP数据报的传输速度很快,并且通常不会在发送队列中花费很多时间。

IP协议层


UDP协议层只需调用即可将skbs转换为IP协议ip_send_skb,因此让我们从那里开始并映射IP协议层!

ip_send_skb

ip_send_skb函数位于./net/ipv4/ip_output.c中,并且非常简短。ip_local_out如果ip_local_out返回某种错误,它只会调用并增加错误统计信息。让我们来看看:

int ip_send_skb(struct net *net, struct sk_buff *skb)
{int err;err = ip_local_out(skb);if (err) {if (err > 0)err = net_xmit_errno(err);if (err)IP_INC_STATS(net, IPSTATS_MIB_OUTDISCARDS);}return err;
}

如上所示,ip_local_out将调用,然后再处理返回值。对的调用net_xmit_errno有助于将任何错误从较低级别“转换”为IP和UDP协议层可以理解的错误。如果发生任何错误,则IP协议统计信息“ OutDiscards”将增加。稍后我们将查看要获取该统计信息的文件。现在,让我们继续往下钻,看看哪里ip_local_out带我们。

ip_local_out 和 __ip_local_out


对我们来说幸运的是,两者ip_local_out__ip_local_out都很简单。ip_local_out只需调用__ip_local_out并基于返回值,将调用路由层以输出数据包:

int ip_local_out(struct sk_buff *skb)
{int err;err = __ip_local_out(skb);if (likely(err == 1))err = dst_output(skb);return err;
}

从源头上可以看出__ip_local_out,该函数首先执行两项重要操作:

  1. 设置IP数据包的长度
  2. 调用ip_send_check以计算要写入IP数据包头中的校验和。该ip_send_check函数将调用一个名为ip_fast_csum计算校验和的函数。在x86和x86_64体系结构上,此功能是通过汇编实现的。您可以在此处阅读64位实现,并在此处阅读32位实现。

接下来,IP协议层将通过调用进入netfilter nf_hooknf_hook函数的返回值将传递回ip_local_out。如果nf_hookreturn 1,则表明该数据包被允许通过,并且调用方应沿着自身传递它。正如我们在上面看到的,这就是发生的情况:ip_local_out检查的返回值1并通过调用dst_output自身将数据包传递给它。让我们看一下以下代码__ip_local_out

int __ip_local_out(struct sk_buff *skb)
{struct iphdr *iph = ip_hdr(skb);iph->tot_len = htons(skb->len);ip_send_check(iph);return nf_hook(NFPROTO_IPV4, NF_INET_LOCAL_OUT, skb, NULL,skb_dst(skb)->dev, dst_output);
}

netfilter和 nf_hook

为了简洁起见(和我的RSI),我决定跳过对Netfilter,iptables和conntrack的深入研究。您可以从此处和此处开始,深入探讨netfilter的源代码。

短版本是nf_hook是其中要求的包装nf_hook_thresh,如果任何过滤器(安装了指定的协议的家庭和钩类型首先检查NFPROTO_IPV4NF_INET_LOCAL_OUT分别在该情况下,)和尝试返回执行回IP协议层,以避免深入到netfilter以及钩在下面的任何东西,例如iptables和conntrack。

请记住:如果您有大量或非常复杂的netfilter或iptables规则,这些规则将在发起原始sendmsg调用的用户进程的CPU上下文中执行。如果您设置了CPU固定,以限制该过程只能在特定的CPU(或一组CPU)上执行,请注意,CPU将花费系统时间来处理出站iptables规则。根据系统的工作负载,如果您在此处测量性能下降,则可能需要小心地将进程固定到CPU或降低规则集的复杂性。

出于讨论的目的,我们假设nf_hook返回1值指示调用者(在本例中为IP协议层)应将数据包沿自身传递。

目标缓存


dst代码在Linux内核中实现了与协议无关的目标缓存。要了解如何dst设置条目以继续发送UDP数据报,我们需要简要检查dst条目和路由的生成方式。目标缓存,路由和邻居子系统都可以自己进行极其详细的检查。为了我们的目的,我们可以快速浏览一下这一切如何融合在一起。

我们在上面看到的代码调用dst_output(skb)。此函数只是查找dst附加到的条目skb并调用输出函数。让我们来看看:

/* Output packet to network from transport.  */
static inline int dst_output(struct sk_buff *skb)
{return skb_dst(skb)->output(skb);
}

似乎很简单,但是该输出函数如何dst首先附加到条目?

重要的是要了解以多种不同方式添加了目标缓存条目。到目前为止,我们一直在遵循的代码路径中看到的一种方法是对ip_route_output_flowfrom 的调用udp_sendmsg。该ip_route_output_flow函数调用__ip_route_output_key哪个调用__mkroute_output。该__mkroute_output函数创建路由和目标缓存条目。当这样做时,它确定哪个输出功能适合此目的地。大多数情况下,此功能为ip_output

ip_output

因此,dst_output执行output函数,在UDP IPv4情况下为ip_output。该ip_output函数很简单:

int ip_output(struct sk_buff *skb)
{struct net_device *dev = skb_dst(skb)->dev;IP_UPD_PO_STATS(dev_net(dev), IPSTATS_MIB_OUT, skb->len);skb->dev = dev;skb->protocol = htons(ETH_P_IP);return NF_HOOK_COND(NFPROTO_IPV4, NF_INET_POST_ROUTING, skb, NULL, dev,ip_finish_output,!(IPCB(skb)->flags & IPSKB_REROUTED));
}

首先,更新统计计数器IPSTATS_MIB_OUT。该IP_UPD_PO_STATS宏将增加字节数和数据包数。我们将在后面的部分中看到如何获取IP协议层统计信息以及它们各自的含义。接下来,skb按照协议设置要发送的设备。

最后,通过调用将控制权传递给netfilter NF_HOOK_COND。查看函数原型NF_HOOK_COND将有助于更清楚地解释其工作原理。从./include/linux/netfilter.h:

static inline int
NF_HOOK_COND(uint8_t pf, unsigned int hook, struct sk_buff *skb,struct net_device *in, struct net_device *out,int (*okfn)(struct sk_buff *), bool cond)

NF_HOOK_COND通过检查传入的条件来工作。在这种情况下,条件为!(IPCB(skb)->flags & IPSKB_REROUTED。如果此条件为true,则将skb传递到netfilter。如果netfilter允许数据包通过,okfn则调用。在这种情况下,okfnip_finish_output

ip_finish_output

ip_finish_output功能也简短明了。让我们来看看:

static int ip_finish_output(struct sk_buff *skb)
{
#if defined(CONFIG_NETFILTER) && defined(CONFIG_XFRM)
        /* Policy lookup after SNAT yielded a new policy */if (skb_dst(skb)->xfrm != NULL) {IPCB(skb)->flags |= IPSKB_REROUTED;return dst_output(skb);}
#endif
        if (skb->len > ip_skb_dst_mtu(skb) && !skb_is_gso(skb))return ip_fragment(skb, ip_finish_output2);elsereturn ip_finish_output2(skb);
}

如果在此内核中启用了netfilter和数据包转换,skb则会更新的标志并将其通过发送回dst_output。两种较常见的情况是:

  1. 如果数据包的长度大于MTU,并且数据包的分段不会被分流到设备,ip_fragment则在数据传输之前调用来帮助分段数据包。
  2. 否则,数据包将直接传递到ip_finish_output2

在继续通过内核之前,让我们绕道讨论一下Path MTU Discovery。

路径MTU发现

Linux提供了到目前为止我一直未提及的功能:Path MTU Discovery。此功能使内核可以自动确定特定路由的最大MTU。确定此值并发送小于或等于该路由的MTU的数据包意味着可以避免IP分段。这是首选设置,因为分片数据包会消耗系统资源,并且似乎很容易避免:只需发送足够小的数据包,就不需要分片。

您可以通过setsockopt使用SOL_IP级别和IP_MTU_DISCOVERoptname 调用应用程序来按套接字调整“路径MTU发现”设置。optval可以是IP协议手册页中描述的几个值之一。您可能要设置的值为:IP_PMTUDISC_DO表示“始终执行路径MTU发现”。更高级的网络应用程序或诊断工具可以选择自己实现RFC 4821,以在应用程序启动时为一条或多条特定路由确定PMTU。在这种情况下,您可以使用IP_PMTUDISC_PROBE告诉内核设置“ Do n't Fragment”位的选项,但允许您发送大于PMTU的数据。

您的应用程序可以通过getsockopt使用SOL_IPIP_MTUoptname 调用来检索PMTU 。您可以使用它来帮助指导您的应用在尝试传输之前将构建的UDP数据报的大小。

如果启用了PTMU发现,则任何尝试发送大于PMTU的UDP数据的尝试都将导致应用程序收到错误代码EMSGSIZE。然后,该应用程序可以重试,但是数据较少。

强烈建议启用PTMU发现,因此,我将避免详细描述IP分片代码路径。当我们看一下IP协议层统计信息时,我将解释所有统计信息,包括与碎片相关的统计信息。他们中的许多人增加了ip_fragment。在片段或非片段两种情况下都ip_finish_output2被调用,因此让我们继续。

ip_finish_output2

ip_finish_output2IP分片之后被调用,也可以直接从ip_finish_output。此功能在将数据包传递到邻居缓存之前处理各种统计计数器。让我们看看它是如何工作的:

static inline int ip_finish_output2(struct sk_buff *skb)
{/* variable declarations */if (rt->rt_type == RTN_MULTICAST) {IP_UPD_PO_STATS(dev_net(dev), IPSTATS_MIB_OUTMCAST, skb->len);} else if (rt->rt_type == RTN_BROADCAST)IP_UPD_PO_STATS(dev_net(dev), IPSTATS_MIB_OUTBCAST, skb->len);/* Be paranoid, rather than too clever. */if (unlikely(skb_headroom(skb) < hh_len && dev->header_ops)) {struct sk_buff *skb2;skb2 = skb_realloc_headroom(skb, LL_RESERVED_SPACE(dev));if (skb2 == NULL) {kfree_skb(skb);return -ENOMEM;}if (skb->sk)skb_set_owner_w(skb2, skb->sk);consume_skb(skb);skb = skb2;}

如果与此数据包关联的路由结构是多播类型,则使用宏会同时破坏OutMcastPktsOutMcastOctets计数器IP_UPD_PO_STATS。否则,如果路由类型是广播OutBcastPktsOutBcastOctets计数器碰撞。

接下来,执行检查以确保skb结构具有足够的空间容纳需要添加的任何链路层标头。否则,将分配一个额外的房间以进行呼叫,skb_realloc_headroom并且将新skb的费用计入关联的套接字。

        rcu_read_lock_bh();nexthop = (__force u32) rt_nexthop(rt, ip_hdr(skb)->daddr);neigh = __ipv4_neigh_lookup_noref(dev, nexthop);if (unlikely(!neigh))neigh = __neigh_create(&arp_tbl, &nexthop, dev, false);

继续,我们可以看到,下一跳是通过查询路由层,然后查询邻居缓存来计算的。如果未找到邻居,则通过调用创建邻居__neigh_create。例如,第一次将数据发送到另一台主机时可能就是这种情况。请注意,使用arp_tbl(在./net/ipv4/arp.c中定义)调用此函数以在ARP表中创建邻居条目。其他系统(例如IPv6或DECnet)维护自己的ARP表,并将不同的结构传递到__neigh_create。这篇文章的目的不是要详细介绍邻居缓存,但是如果必须创建邻居,那么创建缓存可能会导致缓存增长,这毫无意义。这篇文章将在以下各节中介绍有关邻居缓存的更多详细信息。无论如何,邻居缓存将导出自己的一组统计信息,以便可以测量这种增长。有关更多信息,请参见下面的监视部分。

        if (!IS_ERR(neigh)) {int res = dst_neigh_output(dst, neigh, skb);rcu_read_unlock_bh();return res;}rcu_read_unlock_bh();net_dbg_ratelimited("%s: No header cache and no neighbour!\n",__func__);kfree_skb(skb);return -EINVAL;
}

最后,如果没有错误返回,dst_neigh_output则调用该方法将skb传递给它以进行输出。否则,将释放skb并返回EINVAL。此处的错误会波动回来,并导致OutDiscards以增量方式增加到ip_send_skb。让我们继续dst_neigh_output并继续研究Linux内核的netdevice子系统。

dst_neigh_output

dst_neigh_output功能为我们做两件事。首先,回想一下本博客文章的前面部分,我们看到,如果MSG_CONFIRM通过功能的辅助消息指定了用户sendmsg,则会翻转标志以指示远程主机的目标缓存条目仍然有效,并且不应进行垃圾回收。该检查将在此处进行,并且confirmed邻居上的字段将设置为当前吉菲数。

static inline int dst_neigh_output(struct dst_entry *dst, struct neighbour *n,struct sk_buff *skb)
{const struct hh_cache *hh;if (dst->pending_confirm) {unsigned long now = jiffies;dst->pending_confirm = 0;/* avoid dirtying neighbour */if (n->confirmed != now)n->confirmed = now;}

其次,检查邻居的状态并调用适当的输出函数。让我们看一下条件并尝试了解发生了什么:

        hh = &n->hh;if ((n->nud_state & NUD_CONNECTED) && hh->hh_len)return neigh_hh_output(hh, skb);elsereturn n->output(n, skb);
}

如果考虑到邻居NUD_CONNECTED,则意味着它是以下一项或多项:

  • NUD_PERMANENT:静态路由。
  • NUD_NOARP:不需要ARP请求(例如,目标是多播或广播地址或回送设备)。
  • NUD_REACHABLE:邻居“可达”。每当成功处理目标的ARP请求时,该目标就会标记为可到达。

hh缓存了“硬件头”()(因为我们之前已经发送过数据并且之前已经生成过数据),请调用neigh_hh_output。否则,调用该output函数。这两个代码路径都dev_queue_xmit以skb 结尾,将skb传递到Linux net设备子系统,在到达设备驱动程序层之前,将对其进行更多处理。让我们遵循neigh_hh_outputn->output代码路径,直到到达dev_queue_xmit

neigh_hh_output

如果目的地是NUD_CONNECTED且硬件头已被缓存,neigh_hh_output将被调用,在将skb移交给之前进行少量处理dev_queue_xmit。让我们从./include/net/neighbour.h看一下:

static inline int neigh_hh_output(const struct hh_cache *hh, struct sk_buff *skb)
{unsigned int seq;int hh_len;do {seq = read_seqbegin(&hh->hh_lock);hh_len = hh->hh_len;if (likely(hh_len <= HH_DATA_MOD)) {/* this is inlined by gcc */memcpy(skb->data - HH_DATA_MOD, hh->hh_data, HH_DATA_MOD);} else {int hh_alen = HH_DATA_ALIGN(hh_len);memcpy(skb->data - hh_alen, hh->hh_data, hh_alen);}} while (read_seqretry(&hh->hh_lock, seq));skb_push(skb, hh_len);return dev_queue_xmit(skb);
}

理解该功能有些棘手,部分是由于使用了用于同步缓存的硬件标头上的读/写的锁定原语。这段代码使用了seqlock。您可以将do { } while()上面的循环想象为一个简单的重试机制,它将尝试在循环中执行操作,直到可以成功执行为止。

尝试循环本身来确定在复制之前是否需要对齐硬件标头的长度。这是必需的,因为某些硬件标头(如IEEE 802.11标头)大于HH_DATA_MOD(16个字节)。

一旦将数据复制到skb并使用来更新skb的跟踪数据的内部指针skb_push,则skb会dev_queue_xmit进入Linux网络设备子系统。

n->output

如果目的地不是NUD_CONNECTED或硬件头尚未缓存,则代码沿n->output路径继续。outputneigbour结构上的函数指针附加了什么?这要看情况。要了解如何设置,我们需要更多地了解邻居缓存的工作方式。

一个struct neighbour包含几个重要的领域。nud_state如上所示,该字段是一个output函数和一个ops结构。回想一下__neigh_createip_finish_output2如果在缓存中未找到现有条目,我们看到调用它的时间有多早。当__neigh_creaet被称为邻居被分配其output 功能的初始设置到neigh_blackhole。随着__neigh_create代码的进行,它将根据邻居的状态调整output指向适当的output函数的值。

例如,当代码确定要连接的邻居时,neigh_connect将用于设置output指针neigh->ops->connected_output。或者,neigh_suspect将用于将output指针设置为neigh->ops->output当代码怀疑邻居可能关闭时(例如,/proc/sys/net/ipv4/neigh/default/delay_first_probe_time自发送探测以来已超过秒)。

换句话说:neigh->output设置为另一个指针,neigh->ops_connected_output或者neigh->ops->output根据其状态。哪里neigh->ops来的?

分配邻居后,arp_constructor(从./net/ipv4/arp.c)将被调用以设置的某些字段struct neighbour。特别地,此函数检查与该邻居关联的设备,并且该设备是否公开了header_ops包含cache功能的结构(以太网设备可以)neigh->ops设置为./net/ipv4/arp.c中定义的以下结构:

static const struct neigh_ops arp_hh_ops = {.family =               AF_INET,.solicit =              arp_solicit,.error_report =         arp_error_report,.output =               neigh_resolve_output,.connected_output =     neigh_resolve_output,
};

因此,无论邻居缓存代码是否将邻居视为“已连接”或“可疑”,该neigh_resolve_output函数都将被附加neigh->output并在n->output上面被调用时被调用。

neigh_resolve_output

此功能的目的是尝试解析未连接的邻居或已连接但没有缓存的硬件标头的邻居。让我们看一下这个函数是如何工作的:

/* Slow and careful. */int neigh_resolve_output(struct neighbour *neigh, struct sk_buff *skb)
{struct dst_entry *dst = skb_dst(skb);int rc = 0;if (!dst)goto discard;if (!neigh_event_send(neigh, skb)) {int err;struct net_device *dev = neigh->dev;unsigned int seq;

该代码首先进行一些基本检查,然后进行调用neigh_event_send。该neigh_event_send函数是短包装程序__neigh_event_send,将在周围进行繁重的工作以解决邻居问题。您可以__neigh_event_send在./net/core/neighbour.c中阅读的源代码,但是从代码的高层可以看出,用户最感兴趣的是以下三种情况:

  1. 处于状态NUD_NONE(分配时为默认状态)的邻居将假定设置为,/proc/sys/net/ipv4/neigh/default/app_solicit并立即发送ARP请求,并/proc/sys/net/ipv4/neigh/default/mcast_solicit允许发送探测(如果未设置,则状态标记为NUD_FAILED)。邻居状态将被更新并设置为NUD_INCOMPLETE
  2. 状态NUD_STALE为Neighbor的邻居将更新为,NUD_DELAYED并设置计时器以稍后对其进行探测(现在是现在的时间+ /proc/sys/net/ipv4/neigh/default/delay_first_probe_time秒)。
  3. NUD_INCOMPLETE将检查中的所有邻居(包括上述情况1中的邻居),以确保未解决的邻居的排队数据包数量小于或等于/proc/sys/net/ipv4/neigh/default/unres_qlen。如果更多,则将数据包出队并丢弃,直到长度小于或等于proc中的值。对于所有此类情况,邻居缓存统计信息中的统计信息计数器都会增加。

如果需要立即ARP探测,将发送它。__neigh_event_send将返回要么0表明该邻居被视为“已连接”或“已延迟”,要么返回1。的返回值0允许neigh_resolve_output继续:

                if (dev->header_ops->cache && !neigh->hh.hh_len)neigh_hh_init(neigh, dst);

如果与邻居关联的设备的协议实现(在我们的情况下为以太网)支持缓存硬件标头,并且当前未缓存,则调用neigh_hh_init将对其进行缓存。

                do {__skb_pull(skb, skb_network_offset(skb));seq = read_seqbegin(&neigh->ha_lock);err = dev_hard_header(skb, dev, ntohs(skb->protocol),neigh->ha, NULL, skb->len);} while (read_seqretry(&neigh->ha_lock, seq));

接下来,使用seqlock同步对邻居结构的硬件地址的访问,dev_hard_header在尝试为skb创建以太网头时将读取该地址。一旦seqlock允许继续执行,就进行错误检查:

                if (err >= 0)rc = dev_queue_xmit(skb);elsegoto out_kfree_skb;}

如果写了以太网头而没有返回错误,则将skb dev_queue_xmit传递给Linux网络设备子系统以进行传输。如果出现错误,a goto将删除skb,设置返回码并返回错误:

out:return rc;
discard:neigh_dbg(1, "%s: dst=%p neigh=%p\n", __func__, dst, neigh);
out_kfree_skb:rc = -EINVAL;kfree_skb(skb);goto out;
}
EXPORT_SYMBOL(neigh_resolve_output);

在进入Linux网络设备子系统之前,让我们看一些用于监视和转换IP协议层的文件。

监控:IP协议层


/proc/net/snmp

通过阅读监视详细的IP协议统计信息/proc/net/snmp

$ cat / proc / net / snmp
Ip:转发默认TTL InReceives InHdrErrors InAddrErrors ForwDatagrams InUnknownProtos InDiscards InDelivers OutRequests OutDiscards OutNoRoutes ReasmTimeout ReasmReqds ReasmOKs ReasmFails FragOKs FragFails FragCreates
IP:1 64 25922988125 0 0 15771700 0 0 25898327616 22789396404 12987882 51 1 10129840 2196520 1 0 0 0
...

该文件包含几个协议层的统计信息。IP协议层首先出现。第一行包含下一行中每个对应值的空格分隔名称。

在IP协议层中,您会发现统计信息计数器被颠簸。这些计数器由C枚举引用。所有有效的枚举值及其对应的字段名称/proc/net/snmp都可以在include / uapi / linux / snmp.h中找到:

enum
{IPSTATS_MIB_NUM = 0,
/* frequently written fields in fast path, kept in same cache line */IPSTATS_MIB_INPKTS,     /* InReceives */IPSTATS_MIB_INOCTETS,     /* InOctets */IPSTATS_MIB_INDELIVERS,     /* InDelivers */IPSTATS_MIB_OUTFORWDATAGRAMS,   /* OutForwDatagrams */IPSTATS_MIB_OUTPKTS,      /* OutRequests */IPSTATS_MIB_OUTOCTETS,      /* OutOctets *//* ... */

一些有趣的统计数据:

  • OutRequests:每次尝试发送IP数据包时增加。看来,无论成功与否,此值都会随着每次发送而增加。
  • OutDiscards:每次丢弃IP数据包时增加。如果将数据附加到skb(对于已塞好的套接字)失败,或者IP之下的层返回错误,则可能发生这种情况。
  • OutNoRouteudp_sendmsg如果无法为给定目标生成路由,则在多个位置(例如,UDP协议层())中增加。当应用程序在UDP套接字上调用“连接”但找不到路由时,也会增加。
  • FragOKs:每个分段的数据包增加一次。例如,将数据包分成3个片段将使该计数器增加一次。
  • FragCreates:每个创建的片段增加一次。例如,将数据包分成3个片段将使该计数器增加三次。
  • FragFails:如果尝试分段但不允许分段则增加(因为设置了“不分段”位)。如果输出片段失败,则也增加。

其他统计信息记录在接收方博客文章中。

/proc/net/netstat

通过阅读监视扩展的IP协议统计信息/proc/net/netstat

$ cat / proc / net / netstat | grep IpExt
IpExt:InNoRoutes InTruncatedPkts InMcastPkts OutMcastPkts InBcastPkts OutBcastPkts InOctets OutOctets InMcastOctets OutMcastOctets InBcastOctets OutBcastOctets InCsumErrors InNoECTPkts InECT0Pktsu InCEPkts
ipExt:0 0 0 0 277959 0 14568040307695 32991309088496 0 0 58649349 0 0 0 0 0

格式类似于/proc/net/snmp,除了这两行以开头IpExt

一些有趣的统计数据:

  • OutMcastPkts:每次发送发往多播地址的数据包时增加。
  • OutBcastPkts:每次发送目的地为广播地址的数据包时增加。
  • OutOctects:输出的包字节数。
  • OutMcastOctets:输出的组播数据包字节数。
  • OutBcastOctets:输出的广播包字节数。

其他统计信息记录在接收方博客文章中。

请注意,在IP层中的特定位置,每个位置都会增加。代码会不时地四处移动,并且可能会出现重复计数错误或其他会计错误。如果这些统计信息对您很重要,则强烈建议您阅读IP协议层源代码以获取对您重要的指标,因此您了解何时增加(和不增加)。

Linux网络设备子系统


在开始使用包传输路径之前dev_queue_xmit,让我们花一点时间来讨论一些重要概念,这些概念将在接下来的部分中出现。

Linux流量控制

Linux支持一种称为流量控制的功能。此功能使系统管理员可以控制如何从计算机传输数据包。这篇博客文章不会深入探讨Linux流量控制的各个方面。本文档对系统,其控制和功能进行了深入的研究。值得一提的一些概念使接下来看到的代码更易于理解。

交通控制系统包含几套不同的排队系统,它们提供了不同的功能来控制交通流量。各个排队系统通常qdisc被称为排队学科。您可以将qdiscs视为调度程序。qdiscs决定何时以及如何传输数据包。

在Linux上,每个接口都有一个与之关联的默认qdisc。对于仅支持单个传输队列的网络硬件,将使用默认的qdisc pfifo_fast。支持多个传输队列的网络硬件使用默认的qdisc mq。您可以通过运行来检查系统tc qdisc

还需要注意的是,某些设备支持硬件中的流量控制,这可以使管理员将流量控制卸载到网络硬件上,并节省系统上的CPU资源。

现在已经介绍了这些想法,让我们dev_queue_xmit从./net/core/dev.c继续进行下去。

dev_queue_xmit 和 __dev_queue_xmit

dev_queue_xmit是一个简单的包装器__dev_queue_xmit

int dev_queue_xmit(struct sk_buff *skb)
{return __dev_queue_xmit(skb, NULL);
}
EXPORT_SYMBOL(dev_queue_xmit);

接下来就是__dev_queue_xmit完成繁重工作的地方。让我们看一看,一步一步地看一下这段代码。跟随:

static int __dev_queue_xmit(struct sk_buff *skb, void *accel_priv)
{struct net_device *dev = skb->dev;struct netdev_queue *txq;struct Qdisc *q;int rc = -ENOMEM;skb_reset_mac_header(skb);/* Disable soft irqs for various locks below. Also* stops preemption for RCU.*/rcu_read_lock_bh();skb_update_prio(skb);

上面的代码开始于:

  1. 声明变量。
  2. 通过调用准备要处理的skb skb_reset_mac_header。这将重置skb的内部指针,以便可以访问以太网标头。
  3. rcu_read_lock_bh被调用以准备读取下面代码中的RCU保护的数据结构。阅读有关安全使用RCU的更多信息。
  4. skb_update_prio如果使用网络优先级cgroup,则调用来设置skb的优先级。

现在,我们将讨论传输数据的更复杂部分;)

        txq = netdev_pick_tx(dev, skb, accel_priv);

这里的代码尝试确定要使用哪个传输队列。正如您将在本文后面看到的那样,某些网络设备公开了多个传输队列来传输数据。让我们看看它是如何工作的。

netdev_pick_tx

netdev_pick_tx代码位于./net/core/flow_dissector.c中。让我们来看看:

struct netdev_queue *netdev_pick_tx(struct net_device *dev,struct sk_buff *skb,void *accel_priv)
{int queue_index = 0;if (dev->real_num_tx_queues != 1) {const struct net_device_ops *ops = dev->netdev_ops;if (ops->ndo_select_queue)queue_index = ops->ndo_select_queue(dev, skb,accel_priv);elsequeue_index = __netdev_pick_tx(dev, skb);if (!accel_priv)queue_index = dev_cap_txqueue(dev, queue_index);}skb_set_queue_mapping(skb, queue_index);return netdev_get_tx_queue(dev, queue_index);
}

如上所示,如果网络设备仅支持单个TX队列,则跳过更复杂的代码,并返回该单个TX队列。高端服务器上使用的大多数设备将具有多个TX队列。具有多个TX队列的设备有两种情况:

  1. 驱动程序实现ndo_select_queue,可用于以硬件或功能特定的方式更智能地选择TX队列,或者
  2. 驱动程序未实现`ndo_select_queue,因此内核应自行选择设备。

从3.13内核开始,没有很多驱动程序实现ndo_select_queue。bnx2x和ixgbe驱动程序实现了此功能,但仅用于以太网光纤通道(FCoE)。鉴于此,我们假设网络设备未实现ndo_select_queue和/或未使用FCoE。在这种情况下,内核将使用选择tx队列__netdev_pick_tx

一旦__netdev_pick_tx确定了队列索引,skb_set_queue_mapping将缓存该值(稍后将在流量控制代码中使用该值),netdev_get_tx_queue并将查找并返回指向该队列的指针。__netdev_pick_tx在回到之前,让我们看一下工作原理__dev_queue_xmit

__netdev_pick_tx

让我们看一下内核如何选择TX队列用于传输数据。从./net/core/flow_dissector.c:

u16 __netdev_pick_tx(struct net_device *dev, struct sk_buff *skb)
{struct sock *sk = skb->sk;int queue_index = sk_tx_queue_get(sk);if (queue_index < 0 || skb->ooo_okay ||queue_index >= dev->real_num_tx_queues) {int new_index = get_xps_queue(dev, skb);if (new_index < 0)new_index = skb_tx_hash(dev, skb);if (queue_index != new_index && sk &&rcu_access_pointer(sk->sk_dst_cache))sk_tx_queue_set(sk, new_index);queue_index = new_index;}return queue_index;
}

该代码首先通过调用来检查传输队列是否已在套接字上进行缓存sk_tx_queue_get,如果尚未缓存,-1则返回该代码。

下一个if语句检查以下条件是否为真:

  • queue_index <0。如果尚未设置队列,则会发生这种情况。
  • 如果ooo_okay设置了标志。如果设置了此标志,则意味着现在允许乱序数据包。协议层必须适当设置此标志。当已确认流的所有未完成数据包时,TCP协议层将设置此标志。发生这种情况时,内核可以为此数据包选择其他TX队列。UDP协议层未设置此标志-因此UDP数据包永远不会ooo_okay设置为非零值。
  • 如果队列索引大于队列数。如果用户最近通过更改了设备上的队列计数,则会发生这种情况ethtool。稍后再详细介绍。

在任何一种情况下,代码都会下降到慢速路径中以获取发送队列。首先get_xps_queue,尝试使用用户配置的映射将传输队列链接到CPU。这称为“传输数据包导向”。我们将更仔细地研究什么是传输数据包导向(XPS)以及其工作原理。

如果由于该内核不支持XPS或系统管理员未配置XPS而get_xps_queue返回-1,或者配置的映射引用无效的队列,则代码将继续调用skb_tx_hash

一旦XPS或内核使用来自动选择skb_tx_hash了队列,便使用将该队列缓存在套接字对象上sk_tx_queue_set并返回。skb_tx_hash在继续之前,让我们看看XPS是如何工作的dev_queue_xmit

传输封包导向(XPS)

传输数据包导向(XPS)是一项功能,允许系统管理员确定哪些CPU可以处理设备支持的每个可用传输队列的传输操作。此功能的目的主要是在处理传输请求时避免锁争用。使用XPS时,还有望获得其他好处,例如减少缓存逐出和避免在NUMA计算机上进行远程内存访问。

通过查看 XPS 的内核文档,可以了解有关XPS如何工作的更多信息。我们将在下面研究如何为您的系统调整XPS,但是现在,您只需要了解配置XPS,系统管理员就可以定义将传输队列映射到CPU的位图。

上面代码中的函数调用get_xps_queue将参考该用户指定的映射,以确定应使用哪个传输队列。如果get_xps_queue返回-1skb_tx_hash将代替使用。

skb_tx_hash

如果XPS未包含在内核中,或者未配置,或者建议使用不可用的队列(因为可能是用户调整了队列数),skb_tx_hash则该队列将确定应将数据发送到哪个队列。准确了解skb_tx_hash工作原理非常重要,具体取决于您的传输工作量。请注意,此代码已随时间进行了调整,因此,如果您使用的内核版本与本文档不同,则应直接查阅内核源代码。

让我们从./include/linux/netdevice.h看一下它是如何工作的:

/** Returns a Tx hash for the given packet when dev->real_num_tx_queues is used* as a distribution range limit for the returned value.*/
static inline u16 skb_tx_hash(const struct net_device *dev,const struct sk_buff *skb)
{return __skb_tx_hash(dev, skb, dev->real_num_tx_queues);
}

该代码只是__skb_tx_hash从./net/core/flow_dissector.c调用到。此函数中有一些有趣的代码,让我们看一下:

/** Returns a Tx hash based on the given packet descriptor a Tx queues' number* to be used as a distribution range.*/
u16 __skb_tx_hash(const struct net_device *dev, const struct sk_buff *skb,unsigned int num_tx_queues)
{u32 hash;u16 qoffset = 0;u16 qcount = num_tx_queues;if (skb_rx_queue_recorded(skb)) {hash = skb_get_rx_queue(skb);while (unlikely(hash >= num_tx_queues))hash -= num_tx_queues;return hash;}

此函数中的第一个if节是一个有趣的短路。函数名称skb_rx_queue_recorded有点误导。skb具有一个queue_mapping用于rx和tx 的字段。无论如何,如果系统正在接收数据包并将其转发到其他地方,则此if语句可以为true。如果不是这种情况,代码将继续。

        if (dev->num_tc) {u8 tc = netdev_get_prio_tc_map(dev, skb->priority);qoffset = dev->tc_to_txq[tc].offset;qcount = dev->tc_to_txq[tc].count;}

为了理解这段代码,重要的是要提到程序可以设置套接字上发送的数据的优先级。这可以通过使用来完成setsockoptSOL_SOCKETSO_PRIORITY分别为水平和OPTNAME。有关的更多信息,请参见socket(7)手册页SO_PRIORITY

请注意,如果您已使用该setsockopt选项IP_TOS在应用程序中的特定套接字上发送的IP数据包上设置了TOS标志(或者,如果作为辅助消息传递给,则以每个数据包为单位sendmsg),则内核将转换TOS选项集由您优先完成于skb->priority

如前所述,某些网络设备支持基于硬件的流量控制系统。如果num_tc非零,则表示该设备支持基于硬件的流量控制。

如果该数字不为零,则表示该设备支持基于硬件的流量控制。将参考将数据包优先级映射到基于硬件的流量控制的优先级映射。将根据此映射为数据优先级选择适当的流量类别。

接下来,将为流量类别生成适当的传输队列范围。它们将用于确定传输队列。

如果num_tc为零(因为网络设备不支持基于硬件的流量控制),则将qcountqoffset变量分别设置为传输队列和的数量0

使用qcountqoffset,将计算发送队列的索引:

        if (skb->sk && skb->sk->sk_hash)hash = skb->sk->sk_hash;elsehash = (__force u16) skb->protocol;hash = __flow_hash_1word(hash);return (u16) (((u64) hash * qcount) >> 32) + qoffset;
}
EXPORT_SYMBOL(__skb_tx_hash);

最后,相应的队列索引返回到__netdev_pick_tx

正在恢复 __dev_queue_xmit


此时,已选择适当的传输队列。__dev_queue_xmit可以继续:

        q = rcu_dereference_bh(txq->qdisc);#ifdef CONFIG_NET_CLS_ACT
        skb->tc_verd = SET_TC_AT(skb->tc_verd, AT_EGRESS);
#endif
        trace_net_dev_queue(skb);if (q->enqueue) {rc = __dev_xmit_skb(skb, q, dev, txq);goto out;}

它首先获得对与此队列关联的排队规则的引用。回想一下,我们之前看到单个传输队列设备的默认值为pfifo_fastqdisc,而对于多队列设备的默认值为mqqdisc。

接下来,如果您的内核中已启用数据包分类API,则代码会将流量分类“判决”分配给传出数据。接下来,检查队列规则以查看是否存在将数据排队的方法。诸如noqueueqdisc之类的一些排队规则没有队列。如果有队列,代码将调低__dev_xmit_skb以继续处理数据以进行传输。之后,执行跳至该函数的结尾。我们将在__dev_xmit_skb短期内介绍一下。现在,让我们看看如果没有队列会发生什么,首先给出一个非常有用的注释:

        /* The device has no queue. Common case for software devices:loopback, all the sorts of tunnels...Really, it is unlikely that netif_tx_lock protection is necessaryhere.  (f.e. loopback and IP tunnels are clean ignoring statisticscounters.)However, it is possible, that they rely on protectionmade by us here.Check this and shot the lock. It is not prone from deadlocks.Either shot noqueue qdisc, it is even simpler 8)*/if (dev->flags & IFF_UP) {int cpu = smp_processor_id(); /* ok because BHs are off */

如评论所示,可能具有qdisc且没有队列的唯一设备是回送设备和隧道设备。如果设备当前已启动,则将保存当前的CPU。它用于下一个检查,有些棘手,让我们看一下:

                if (txq->xmit_lock_owner != cpu) {if (__this_cpu_read(xmit_recursion) > RECURSION_LIMIT)goto recursion_alert;

有两种情况:此设备队列上的发送锁是否由该CPU拥有。如果是这样,xmit_recursion则在此处检查按CPU分配的计数器变量,以确定计数是否超过RECURSION_LIMIT。一个程序可能会尝试发送数据并在代码中此位置附近被抢占。调度程序可以选择另一个程序来运行。如果第二个程序也尝试发送数据并到达此处。因此,使用xmit_recursion计数器可以防止RECURSION_LIMIT程序竞相传输数据。我们继续吧:

                        HARD_TX_LOCK(dev, txq, cpu);if (!netif_xmit_stopped(txq)) {__this_cpu_inc(xmit_recursion);rc = dev_hard_start_xmit(skb, dev, txq);__this_cpu_dec(xmit_recursion);if (dev_xmit_complete(rc)) {HARD_TX_UNLOCK(dev, txq);goto out;}}HARD_TX_UNLOCK(dev, txq);net_crit_ratelimited("Virtual device %s asks to queue packet!\n",dev->name);} else {/* Recursion is detected! It is possible,* unfortunately*/
recursion_alert:net_crit_ratelimited("Dead loop on virtual device %s, fix it urgently!\n",dev->name);}}

其余代码从尝试获取传输锁定开始。检查要使用的设备的传输队列,以查看传输是否已停止。如果不是,则xmit_recursion变量增加,并且数据向下传递到更靠近要发送的设备。我们将dev_hard_start_xmit在后面详细介绍。完成此操作后,将释放锁定并打印警告。

或者,如果当前CPU是发送锁定所有者,或者如果RECURSION_LIMIT命中了,则不执行任何发送,但会打印警告。函数中的其余代码设置错误代码并返回。

由于我们对真正的以太网设备感兴趣,因此,让我们继续通过早先的代码所采用的代码路径__dev_xmit_skb

__dev_xmit_skb

现在,我们__dev_xmit_skb从./net/core/dev.c进入队列规范,网络设备并传输队列参考:

static inline int __dev_xmit_skb(struct sk_buff *skb, struct Qdisc *q,struct net_device *dev,struct netdev_queue *txq)
{spinlock_t *root_lock = qdisc_lock(q);bool contended;int rc;qdisc_pkt_len_init(skb);qdisc_calculate_pkt_len(skb, q);/** Heuristic to force contended enqueues to serialize on a* separate lock before trying to get qdisc main lock.* This permits __QDISC_STATE_RUNNING owner to get the lock more often* and dequeue packets faster.*/contended = qdisc_is_running(q);if (unlikely(contended))spin_lock(&q->busylock);

该代码首先使用qdisc_pkt_len_initqdisc_calculate_pkt_len计算出供qdisc稍后使用的数据的准确长度。对于将通过基于硬件的发送卸载(如我们前面看到的UDP分段卸载)传递的skb,这是必要的,因为需要考虑在发生分段时将添加的其他标头。

接下来,使用一个锁来帮助减少对qdisc主锁的争用(第二个锁,我们将在后面介绍)。如果qdisc当前正在运行,则其他尝试传输的程序将与qdisc争用busylock。这使正在运行的qdisc可以处理数据包并与较少数量的第二主锁程序竞争。随着竞争者数量的减少,此技巧可提高吞吐量。您可以在此处阅读描述此内容的原始提交消息。接下来,进行主锁:

        spin_lock(root_lock);

现在,我们处理一个处理3种可能情况的if语句:

  1. qdisc被禁用。
  2. qdisc允许数据包绕过排队系统,没有其他数据包可发送,并且qdisc当前未运行。qdisc允许数据包绕过以“节省工作”的qdisc-换句话说,qdisc不会出于流量整形目的而延迟数据包传输。
  3. 所有其他情况。

让我们看一下在每种情况下发生的情况,以禁用的qdisc开头:

        if (unlikely(test_bit(__QDISC_STATE_DEACTIVATED, &q->state))) {kfree_skb(skb);rc = NET_XMIT_DROP;

这很简单。如果禁用了qdisc,则释放数据并将返回码设置为NET_XMIT_DROP。接下来,一个qdisc允许数据包绕过,没有其他未运行的数据包,当前未运行:

        } else if ((q->flags & TCQ_F_CAN_BYPASS) && !qdisc_qlen(q) &&qdisc_run_begin(q)) {/** This is a work-conserving queue; there are no old skbs* waiting to be sent out; and the qdisc is not running -* xmit the skb directly.*/if (!(dev->priv_flags & IFF_XMIT_DST_RELEASE))skb_dst_force(skb);qdisc_bstats_update(q, skb);if (sch_direct_xmit(skb, q, dev, txq, root_lock)) {if (unlikely(contended)) {spin_unlock(&q->busylock);contended = false;}__qdisc_run(q);} elseqdisc_run_end(q);rc = NET_XMIT_SUCCESS;

这个if语句有点棘手。整个语句的评估结果true似乎满足以下所有条件:

  1. q->flags & TCQ_F_CAN_BYPASS:qdisc允许数据包绕过排队系统。对于“节省工作量”的qdiscs来说,这是正确的。即,不出于流量整形目的而不延迟数据包传输的qdiscs被视为“节省工作”,并允许数据包绕过。该pfifo_fast队列规定允许的数据包绕过排队系统。
  2. !qdisc_qlen(q):qdisc的队列中没有等待发送的数据。
  3. qdisc_run_begin(p):此函数调用将把qdisc的状态设置为“正在运行”并返回true,或者如果qdisc已经在运行,则返回false。

如果以上所有条件都为真,则:

  • IFF_XMIT_DST_RELEASE标志的检验。如果启用,则此标志表示允许内核释放skb的目标缓存结构。该函数中的代码检查该标志是否被禁用,并对该结构强制执行引用计数。
  • qdisc_bstats_update 用于增加qdisc发送的字节数和数据包数。
  • sch_direct_xmit用于尝试传输数据包。稍后,我们还将深入研究sch_direct_xmit它,因为它也用于较慢的代码路径中。

sch_direct_xmit检查以下两种情况的返回值:

  1. 队列不为空(>0返回)。在这种情况下,将释放防止其他程序争用的锁定,__qdisc_run并调用该锁定以重新启动qdisc处理。
  2. 队列为空(0返回)。在这种情况下qdisc_run_end,用于关闭qdisc处理。

无论哪种情况,都将返回值NET_XMIT_SUCCESS设置为返回码。那还不错。让我们检查一下最后一种情况:全部捕获:

        } else {skb_dst_force(skb);rc = q->enqueue(skb, q) & NET_XMIT_MASK;if (qdisc_run_begin(q)) {if (unlikely(contended)) {spin_unlock(&q->busylock);contended = false;}__qdisc_run(q);}}

在所有其他情况下:

  1. 调用skb_dst_force以在skb的目标缓存引用上强制引用计数增加。
  2. 通过调用enqueue队列光盘的功能将数据排队到qdisc 。存储返回码。
  3. 调用qdisc_run_begin(p)以将qdisc标记为正在运行。如果尚未运行,则将busylock其释放并__qdisc_run(p)调用它以开始qdisc处理。

然后,该函数通过释放一些锁并返回返回代码来完成:

        spin_unlock(root_lock);if (unlikely(contended))spin_unlock(&q->busylock);return rc;

调优:传输数据包导向(XPS)


为了使XPS能够正常运行,必须在内核配置中启用它(在Ubuntu 3.13.0上为Ubuntu),并使用一个位掩码来描述哪些CPU应该处理给定接口和TX队列的数据包。

这些位掩码类似于RPS位掩码,你可以找到一些文件关于内核文档这些位掩码。

简而言之,可以在以下位置找到要修改的位掩码:

/sys/class/net/DEVICE_NAME/queues/QUEUE/xps_cpus

因此,对于eth0和传输队列0,您将修改文件:/sys/class/net/eth0/queues/tx-0/xps_cpus用一个十六进制数字指示哪些CPU应该处理来自eth0传输队列0的传输完成。正如文档指出的那样,在某些配置中XPS可能是不必要的。

排队学科!


为了遵循网络数据的路径,我们需要稍微移入qdisc代码。这篇文章并不打算涵盖每个不同的传输队列选项的具体细节。如果您对此感兴趣,请查看此优秀指南。

就本博客而言,我们将通过检查通用数据包调度程序代码的工作方式来继续执行代码路径。特别是,我们将探讨如何qdisc_run_beginqdisc_run_end__qdisc_run,和sch_direct_xmit工作,以移动网络的数据更接近驾驶员的发射。

让我们开始研究qdisc_run_begin工作原理并从那里开始。

qdisc_run_begin 和 qdisc_run_end

qdisc_run_begin函数可以在./include/net/sch_generic.h中找到:

static inline bool qdisc_run_begin(struct Qdisc *qdisc)
{if (qdisc_is_running(qdisc))return false;qdisc->__state |= __QDISC___STATE_RUNNING;return true;
}

此功能很简单:__state检查qdisc 标志。如果已经运行,false则返回。否则,__state将更新以启用该__QDISC___STATE_RUNNING位。

同样,qdisc_run_end 是抗高潮:

static inline void qdisc_run_end(struct Qdisc *qdisc)
{qdisc->__state &= ~__QDISC___STATE_RUNNING;
}

它只是禁用了__QDISC___STATE_RUNNINGqdisc __state字段中的位。重要的是要注意,这两个功能都只是将位翻转。都不会真正开始或停止自行处理。__qdisc_run另一方面,该功能实际上将开始处理。

__qdisc_run

的代码__qdisc_run看似简短:

void __qdisc_run(struct Qdisc *q)
{int quota = weight_p;while (qdisc_restart(q)) {/** Ordered by possible occurrence: Postpone processing if* 1. we've exceeded packet quota* 2. another process needs the CPU;*/if (--quota <= 0 || need_resched()) {__netif_schedule(q);break;}}qdisc_run_end(q);
}

此功能从获取weight_p值开始。通常,这是通过sysctl设置的,也可以在接收路径中使用。稍后我们将介绍如何调整此值。这个循环有两件事:

  1. qdisc_restart在忙循环中调用,直到返回false(或触发下面的中断)为止。
  2. 确定配额是下降到零以下还是need_resched()返回true。如果其中一个为true__netif_schedule则被调用,并且循环中断。

记住:到目前为止,内核仍在代表sendmsg用户程序对原始调用的执行。用户程序当前正在累积系统时间。如果用户程序已经用尽了其在内核中的时间配额,need_resched则将返回true。如果仍有可用配额,而用户程序尚未使用,则时间已到,qdisc_restart将再次调用。

让我们看看如何qdisc_restart(q)工作,然后我们将深入研究__netif_schedule(q)

qdisc_restart

让我们跳入以下代码qdisc_restart

/** NOTE: Called under qdisc_lock(q) with locally disabled BH.** __QDISC_STATE_RUNNING guarantees only one CPU can process* this qdisc at a time. qdisc_lock(q) serializes queue accesses for* this queue.**  netif_tx_lock serializes accesses to device driver.**  qdisc_lock(q) and netif_tx_lock are mutually exclusive,*  if one is grabbed, another must be free.** Note, that this procedure can be called by a watchdog timer** Returns to the caller:*                                0  - queue is empty or throttled.*                                >0 - queue is not empty.**/
static inline int qdisc_restart(struct Qdisc *q)
{struct netdev_queue *txq;struct net_device *dev;spinlock_t *root_lock;struct sk_buff *skb;/* Dequeue packet */skb = dequeue_skb(q);if (unlikely(!skb))return 0;WARN_ON_ONCE(skb_dst_is_noref(skb));root_lock = qdisc_lock(q);dev = qdisc_dev(q);txq = netdev_get_tx_queue(dev, skb_get_queue_mapping(skb));return sch_direct_xmit(skb, q, dev, txq, root_lock);
}

qdisc_restart函数以有用的注释开头,描述了一些用于调用此函数的锁定约束。该功能执行的第一个操作是尝试从qdisc出队。

该功能dequeue_skb将尝试获取要发送的下一个数据包。如果队列为空,qdisc_restart将返回false(导致__qdisc_run上面的循环保释)。

假设有要发送的数据,则代码将继续获取对qdisc队列锁,qdisc的关联设备和发送队列的引用。

所有这些都传递给sch_direct_xmit。让我们看一下dequeue_skb,然后我们再回来sch_direct_xmit

dequeue_skb

让我们dequeue_skb从./net/sched/sch_generic.c看一下。此函数处理两种主要情况:

  1. 使由于之前无法发送而被重新排队的数据出队,或者
  2. 将新数据从qdisc中出队以进行处理。

让我们看一下第一种情况:

static inline struct sk_buff *dequeue_skb(struct Qdisc *q)
{struct sk_buff *skb = q->gso_skb;const struct netdev_queue *txq = q->dev_queue;if (unlikely(skb)) {/* check the reason of requeuing without tx lock first */txq = netdev_get_tx_queue(txq->dev, skb_get_queue_mapping(skb));if (!netif_xmit_frozen_or_stopped(txq)) {q->gso_skb = NULL;q->q.qlen--;} elseskb = NULL;

请注意,该代码从引用gso_skbqdisc的字段开始。此字段包含对已重新排队的数据的引用。如果没有数据重新排队,则此字段为NULL。如果该字段不是NULL,则代码将通过获取数据的传输队列并检查队列是否已停止来继续执行。如果队列未停止,gso_skb则清除该字段,并减少队列长度计数器。如果队列已停止,则数据仍附加到gso_skb,但NULL将从此函数返回。

让我们检查下一种情况,其中没有重新排队的数据:

        } else {if (!(q->flags & TCQ_F_ONETXQUEUE) || !netif_xmit_frozen_or_stopped(txq))skb = q->dequeue(q);}return skb;
}

在没有数据重新排队的情况下,将评估另一个棘手的复合if语句。如果:

  1. qdisc没有单个传输队列,或者
  2. 传输队列未停止

然后,dequeue将调用qdisc的函数以获取新数据。的内部实现dequeue将取决于qdisc的实现和功能。

该函数通过返回要处理的数据来完成。

sch_direct_xmit

现在我们来sch_direct_xmit(在./net/sched/sch_generic.c中),它是将数据向下移动到网络设备的重要参与者。让我们一步一步地完成它:

/** Transmit one skb, and handle the return status as required. Holding the* __QDISC_STATE_RUNNING bit guarantees that only one CPU can execute this* function.** Returns to the caller:*                                0  - queue is empty or throttled.*                                >0 - queue is not empty.*/
int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q,struct net_device *dev, struct netdev_queue *txq,spinlock_t *root_lock)
{int ret = NETDEV_TX_BUSY;/* And release qdisc */spin_unlock(root_lock);HARD_TX_LOCK(dev, txq, smp_processor_id());if (!netif_xmit_frozen_or_stopped(txq))ret = dev_hard_start_xmit(skb, dev, txq);HARD_TX_UNLOCK(dev, txq);

该代码首先解锁qdisc锁,然后锁定发送锁。请注意,这HARD_TX_LOCK是一个宏:

#define HARD_TX_LOCK(dev, txq, cpu) {                   \if ((dev->features & NETIF_F_LLTX) == 0) {      \__netif_tx_lock(txq, cpu);              \}                                               \
}

该宏正在检查设备的NETIF_F_LLTX功能标志中是否设置了标志。此标志已弃用,新设备驱动程序不应使用此标志。此内核版本中的大多数驱动程序都不使用此标志,因此此检查的评估结果为true,并且将获取此数据的传输队列的锁定。

接下来,检查传输队列以确保它没有停止然后dev_hard_start_xmit被调用。稍后我们将看到,dev_hard_start_xmit处理将网络数据从Linux内核的网络设备子系统转换到设备驱动程序本身进行传输。该函数的返回码将被存储,接下来将进行检查以确定传输是否成功。

一旦运行(或由于队列停止而被跳过),将释放队列的传输锁。让我们继续:

        spin_lock(root_lock);if (dev_xmit_complete(ret)) {/* Driver sent out skb successfully or skb was consumed */ret = qdisc_qlen(q);} else if (ret == NETDEV_TX_LOCKED) {/* Driver try lock failed */ret = handle_dev_cpu_collision(skb, txq, q);

接下来,再次对该qdisc进行锁定,然后dev_hard_start_xmit检查的返回值。通过调用检查第一种情况,该调用dev_xmit_complete仅检查返回值以确定数据是否已成功发送。如果是这样,则将qdisc队列长度设置为返回值。

如果dev_xmit_complete返回false,将检查返回值以查看是否从设备驱动程序dev_hard_start_xmit返回NETDEV_TX_LOCKED。当驱动程序尝试自己锁定传输队列并失败时,具有不赞成使用的NETIF_F_LLTX功能标志的设备可以返回NETDEV_TX_LOCKED。在这种情况下,handle_dev_cpu_collision被称为处理锁争用。我们将在handle_dev_cpu_collision短期内仔细研究一下,但是现在,让我们继续sch_direct_xmit看一下所有问题:

        } else {/* Driver returned NETDEV_TX_BUSY - requeue skb */if (unlikely(ret != NETDEV_TX_BUSY))net_warn_ratelimited("BUG %s code %d qlen %d\n",dev->name, ret, q->q.qlen);ret = dev_requeue_skb(skb, q);}

因此,如果驱动程序没有传输数据,并且不是由于传输锁定被保持,则可能是由于NETDEV_TX_BUSY(如果未打印警告)。NETDEV_TX_BUSY驱动程序可以返回“ 0”以指示设备或驱动程序“忙”,并且当前无法传输数据。在这种情况下,dev_requeue_skb用于将要重试的数据排队。

该函数通过(可能)调整返回值来结束:

        if (ret && netif_xmit_frozen_or_stopped(txq))ret = 0;return ret;

让我们深入了解handle_dev_cpu_collisiondev_requeue_skb

handle_dev_cpu_collision

handle_dev_cpu_collision来自./net/sched/sch_generic.c的的代码处理两种情况:

  1. 传输锁定由当前CPU保持。
  2. 发送锁定由其他CPU保持。

在第一种情况下,将其作为配置问题处理,因此会打印警告。在第二种情况下,统计计数器cpu_collision将增加,并且数据将通过dev_requeue_skb发送以重新排队以便稍后传输。回忆前面,我们看到其中的代码dequeue_skb专门处理了重新排队的skb。

的代码handle_dev_cpu_collision简短,值得快速阅读:

static inline int handle_dev_cpu_collision(struct sk_buff *skb,struct netdev_queue *dev_queue,struct Qdisc *q)
{int ret;if (unlikely(dev_queue->xmit_lock_owner == smp_processor_id())) {/** Same CPU holding the lock. It may be a transient* configuration error, when hard_start_xmit() recurses. We* detect it by checking xmit owner and drop the packet when* deadloop is detected. Return OK to try the next skb.*/kfree_skb(skb);net_warn_ratelimited("Dead loop on netdevice %s, fix it urgently!\n",dev_queue->dev->name);ret = qdisc_qlen(q);} else {/** Another cpu is holding lock, requeue & delay xmits for* some time.*/__this_cpu_inc(softnet_data.cpu_collision);ret = dev_requeue_skb(skb, q);}return ret;
}

让我们看一下dev_requeue_skb它的作用,因为我们将看到此函数from sch_direct_xmit

dev_requeue_skb

幸运的dev_requeue_skb是,来自./net/sched/sch_generic.c的源代码简短而直接:

/* Modifications to data participating in scheduling must be protected with* qdisc_lock(qdisc) spinlock.** The idea is the following:* - enqueue, dequeue are serialized via qdisc root lock* - ingress filtering is also serialized via qdisc root lock* - updates to tree and tree walking are only done under the rtnl mutex.*/static inline int dev_requeue_skb(struct sk_buff *skb, struct Qdisc *q)
{skb_dst_force(skb);q->gso_skb = skb;q->qstats.requeues++;q->q.qlen++;        /* it's still part of the queue */__netif_schedule(q);return 0;
}

此功能可做一些事情:

  1. 它在skb上强制引用计数。
  2. 它将skb附加到qdisc的gso_skb字段。回想一下,我们看到dequeue_skb在将数据从qdisc的队列中拉出之前,该字段已签入。
  3. 统计计数器被撞。
  4. 队列的大小增加。
  5. __netif_schedule 叫做。

简单明了。让我们刷新一下如何到达这里,然后进行检查__netif_schedule

提醒,同时循环 __qdisc_run


回想一下,我们是通过检查__qdisc_run包含以下代码的函数到达此处的:

void __qdisc_run(struct Qdisc *q)
{int quota = weight_p;while (qdisc_restart(q)) {/** Ordered by possible occurrence: Postpone processing if* 1. we've exceeded packet quota* 2. another process needs the CPU;*/if (--quota <= 0 || need_resched()) {__netif_schedule(q);break;}}qdisc_run_end(q);
}

此代码通过qdisc_restart在循环中反复调用来工作,该循环在内部使skb出队,并尝试通过调用来传输它们sch_direct_xmit,该调用dev_hard_start_xmit下降到驱动程序以进行实际传输。任何无法传输的内容都会重新排队以在NET_TXsoftirq中传输。

传输过程的下一步是检查dev_hard_start_xmit以查看如何调用驱动程序以发送数据。在这之前,我们应该研究__netif_schedule,充分了解双方__qdisc_rundev_requeue_skb工作。

__netif_schedule

让我们__netif_schedule从./net/core/dev.c跳入:

void __netif_schedule(struct Qdisc *q)
{if (!test_and_set_bit(__QDISC_STATE_SCHED, &q->state))__netif_reschedule(q);
}
EXPORT_SYMBOL(__netif_schedule);

此代码检查并设置__QDISC_STATE_SCHEDqdisc状态中的位。如果该位被翻转(这意味着它以前不在__QDISC_STATE_SCHED状态中),则代码将调用__netif_reschedule,这不会花费太多时间,但会产生非常有趣的副作用。让我们来看看:

static inline void __netif_reschedule(struct Qdisc *q)
{struct softnet_data *sd;unsigned long flags;local_irq_save(flags);sd = &__get_cpu_var(softnet_data);q->next_sched = NULL;*sd->output_queue_tailp = q;sd->output_queue_tailp = &q->next_sched;raise_softirq_irqoff(NET_TX_SOFTIRQ);local_irq_restore(flags);
}

此函数执行以下操作:

  1. 保存当前的本地IRQ状态,并通过调用来禁用IRQ local_irq_save
  2. 获取当前的CPU softnet_data结构。
  3. 将qdisc添加到softnet_data的输出队列中。
  4. 提起NET_TX_SOFTIRQsoftirq。
  5. 恢复IRQ状态并重新启用中断。

您可以通过阅读我们之前有关网络堆栈接收方的文章来了解有关数据结构初始化的softnet_data更多信息。

上面函数中的重要代码是:raise_softirq_irqoff触发NET_TX_SOFTIRQsoftirq。softirqs及其注册也包含在我们以前的文章中。简而言之,您可以想到softirq是内核线程,它们以很高的优先级执行并代表内核处理数据。它们用于处理传入的网络数据以及处理传出的数据。

从上NET_TX_SOFTIRQ一篇文章中您将看到,softirq已net_tx_action注册了功能。这意味着有一个正在执行的内核线程net_tx_action。该线程偶尔会暂停并raise_softirq_irqoff恢复它。让我们看一下net_tx_action它的作用,以便我们可以了解内核进程如何传输请求。

net_tx_action

./net/core/dev.c中的net_tx_action函数在运行时处理两个主要问题:

  1. softnet_data正在执行的CPU 的结构的完成队列。
  2. softnet_data正在执行的CPU 的结构的输出队列。

实际上,该函数的代码是两个大的if块。让我们一次带他们一个,同时记住这段代码一直在softirq上下文中作为独立的内核线程执行。目的net_tx_action是执行无法在整个网络堆栈的发送端的热路径中执行的代码;工作被推迟,稍后由执行线程处理net_tx_action

net_tx_action 完成队列

所述softnet_data的完成队列仅仅是等待被释放skbs队列。该功能dev_kfree_skb_irq可用于将skbs添加到队列中,以便稍后释放。设备驱动程序通常使用它来延迟释放消耗的skb。驱动程序希望推迟释放skb而不是简单地释放skb的原因是,释放内存可能会花费一些时间,并且在某些实例(如hardirq处理程序)中,代码需要尽快执行并返回。

看一下net_tx_action释放完成队列上的skbs 的代码:

        if (sd->completion_queue) {struct sk_buff *clist;local_irq_disable();clist = sd->completion_queue;sd->completion_queue = NULL;local_irq_enable();while (clist) {struct sk_buff *skb = clist;clist = clist->next;WARN_ON(atomic_read(&skb->users));trace_kfree_skb(skb, net_tx_action);__kfree_skb(skb);}}

如果完成队列中有条目,则while循环将遍历skb的链接列表,并调用__kfree_skb它们中的每一个以释放其内存。请记住,此代码在称为softirq的单独“线程”中运行–并非特别代表任何用户程序运行。

net_tx_action 输出队列

输出队列完全有不同的用途。如前所述,__netif_reschedule通常通过from 调用to将数据添加到输出队列中__netif_schedule__netif_schedule到目前为止,我们在两个实例中调用了该函数:

  • dev_requeue_skb:如我们所见,如果驱动程序报告错误代码NETDEV_TX_BUSY或发生CPU冲突,则可以调用此函数。
  • __qdisc_run:我们也较早地看到了此功能。__netif_schedule一旦超出配额或需要重新安排流程,它也会调用。

在任何一种情况下,__netif_schedule都会调用该函数,该函数会将qdisc添加到softnet_data的输出队列中进行处理。我将输出队列处理代码分为三个块。让我们看一下第一个:

        if (sd->output_queue) {struct Qdisc *head;local_irq_disable();head = sd->output_queue;sd->output_queue = NULL;sd->output_queue_tailp = &sd->output_queue;local_irq_enable();

此块仅确保输出队列上存在qdiscs,如果存在,则将其设置head为第一个条目并移动队列的尾指针。

接下来,while遍历qdsics列表的循环开始:

                while (head) {struct Qdisc *q = head;spinlock_t *root_lock;head = head->next_sched;root_lock = qdisc_lock(q);if (spin_trylock(root_lock)) {smp_mb__before_clear_bit();clear_bit(__QDISC_STATE_SCHED,&q->state);qdisc_run(q);spin_unlock(root_lock);

上面的代码部分将头指针向前移动,并获得对qdisc锁的引用。spin_trylock用于检查是否可以获取锁;请注意,此调用专门用于此调用,因为它不会阻塞。如果已持有该锁,spin_trylock则将立即返回,而不是等待获取该锁。

如果spin_trylock成功获得了锁,它将返回一个非零值。在这种情况下,qdisc的状态字段的__QDISC_STATE_SCHED位被翻转并被qdisc_run调用,翻转__QDISC___STATE_RUNNING位并开始执行踢__qdisc_run

这个很重要。这里发生的事情是我们之前代表用户进行的系统调用运行的处理循环现在再次运行,但是在softirq上下文中,因为此qdisc的skb传输无法传输。这种区别很重要,因为它会影响您监视发送大量数据的应用程序的CPU使用率的方式。让我用另一种方式陈述一下:

  • 程序的系统时间将包括调用驱动程序尝试发送数据所花费的时间,无论发送是否完成或驱动程序返回错误。
  • 如果该发送在驱动程序层不成功(例如,因为设备正忙于发送其他内容),则qdisc将被添加到输出队列中,稍后由softirq线程进行处理。在这种情况下,将花费softirq(si)时间来尝试传输数据。

因此,发送数据所花费的总时间是与发送相关的系统调用的系统时间和softirq的softirq时间的组合NET_TX

无论如何,以上代码通过释放qdisc锁来完成。如果spin_trylock上面的调用掉落以获取锁,则会执行以下代码:

                        } else {if (!test_bit(__QDISC_STATE_DEACTIVATED,&q->state)) {__netif_reschedule(q);} else {smp_mb__before_clear_bit();clear_bit(__QDISC_STATE_SCHED,&q->state);}}}}

该代码仅在无法获得qdisc锁的情况下执行,处理两种情况。要么:

  1. 没有禁用qdisc,但是无法获得执行锁qdisc_run。因此,致电__netif_reschedule。调用__netif_reschedule此处会将qdisc放回到该函数当前从其出队的队列中。这样可以在可能已放弃锁定的情况下稍后再次检查qdisc。
  2. qdisc标记为已停用,请确保__QDISC_STATE_SCHED也清除状态标志。

终于可以见到我们的朋友了 dev_hard_start_xmit


因此,我们遍历了整个网络堆栈dev_hard_start_xmit。也许您是通过sendmsg系统调用直接到达此处的,或者您是通过处理qdisc上的网络数据的softirq线程到达此处的。dev_hard_start_xmit将调用设备驱动程序以实际执行传输操作。

dev_hard_start_xmit函数处理两种主要情况:

  • 准备发送的网络数据,或
  • 具有分段卸载的网络数据需要处理。

我们将从准备发送的网络数据的情况开始,看看如何处理这两种情况。让我们看一下(在这里跟随:./net/code/dev.c:

int dev_hard_start_xmit(struct sk_buff *skb, struct net_device *dev,struct netdev_queue *txq)
{const struct net_device_ops *ops = dev->netdev_ops;int rc = NETDEV_TX_OK;unsigned int skb_len;if (likely(!skb->next)) {netdev_features_t features;/** If device doesn't need skb->dst, release it right now while* its hot in this cpu cache*/if (dev->priv_flags & IFF_XMIT_DST_RELEASE)skb_dst_drop(skb);features = netif_skb_features(skb);

该代码通过使用获得对设备驱动程序公开操作的引用开始ops。稍后,当需要让驱动程序进行一些传输数据的工作时,将使用此方法。该代码检查skb->next以确保此数据不属于已准备好进行分段的数据链的一部分,并继续执行以下两项操作:

  1. 首先,它检查IFF_XMIT_DST_RELEASE标志是否在设备上设置。该标志在此内核中的任何“真实”以太网设备中均未使用。但是,它由回送设备和其他一些软件设备使用。如果启用了此标志,则可以减少目标缓存条目上的引用计数,因为驱动程序将不需要它。
  2. 接下来,netif_skb_features用于从设备获取功能标记,并根据要发送数据的协议对其进行一些修改(dev->protocol)。例如,如果协议是设备可以校验和的协议,则skb将被标记为协议。VLAN标签(如果已设置)还将导致其他功能标志被翻转。

接下来,将检查vlan标记,如果设备无法卸载VLAN标记,__vlan_put_tag则将在软件中使用该标记:

                if (vlan_tx_tag_present(skb) &&!vlan_hw_offload_capable(features, skb->vlan_proto)) {skb = __vlan_put_tag(skb, skb->vlan_proto,vlan_tx_tag_get(skb));if (unlikely(!skb))goto out;skb->vlan_tci = 0;}

之后,将检查数据是否是封装卸载请求,例如对于GRE。在这种情况下,功能标志将更新为包括可用的任何特定于设备的硬件封装功能:

                /* If encapsulation offload request, verify we are testing* hardware encapsulation features instead of standard* features for the netdev*/if (skb->encapsulation)features &= dev->hw_enc_features;

接下来,netif_needs_gso用于确定skb本身是否完全需要分段。如果skb需要分段,但设备不支持该分段,netif_needs_gso则将返回true指示应该在软件中进行分段。在这种情况下,dev_gso_segment被称为进行分段,代码将跳下gso以传输数据包。稍后我们将看到GSO路径。

                if (netif_needs_gso(skb, features)) {if (unlikely(dev_gso_segment(skb, features)))goto out_kfree_skb;if (skb->next)goto gso;}

如果数据不需要分段,则将处理其他一些情况。第一:数据是否需要线性化?也就是说,如果数据分散在多个缓冲区中,还是首先需要将它们全部组合成一个线性缓冲区,设备是否可以支持发送网络数据?绝大多数网卡不需要在传输之前对数据进行线性化处理,因此在几乎所有情况下,该结果都将被评估为false并被跳过。

                                     else {if (skb_needs_linearize(skb, features) &&__skb_linearize(skb))goto out_kfree_skb;

接下来提供有用的注释,解释下一种情况。该数据包将被检查以确定是否仍然需要校验和。如果设备不支持校验和,将立即在软件中生成校验和:

                        /* If packet is not checksummed and device does not* support checksumming for this protocol, complete* checksumming here.*/if (skb->ip_summed == CHECKSUM_PARTIAL) {if (skb->encapsulation)skb_set_inner_transport_header(skb,skb_checksum_start_offset(skb));elseskb_set_transport_header(skb,skb_checksum_start_offset(skb));if (!(features & NETIF_F_ALL_CSUM) &&skb_checksum_help(skb))goto out_kfree_skb;}}

现在我们来看看水龙头!回想一下接收方博客文章,我们看到了数据包如何传递到数据包分路器(如PCAP)。此函数中的下一个代码块将要传输的数据包移交给数据包抽头(如果有的话)。

                if (!list_empty(&ptype_all))dev_queue_xmit_nit(skb, dev);

最后,驱动程序ops用于通过调用将数据向下传递到设备ndo_start_xmit

                skb_len = skb->len;rc = ops->ndo_start_xmit(skb, dev);trace_net_dev_xmit(skb, rc, dev, skb_len);if (rc == NETDEV_TX_OK)txq_trans_update(txq);return rc;}

返回的返回值ndo_start_xmit指示是否发送了数据包。我们看到了此返回值将如何影响上层:该函数上方的qdisc可能会将数据重新排队,以便稍后可以再次发送。

让我们看一下GSO案例。如果由于此功能中发生的分段或先前已分段但未能发送但已排队等待再次发送的数据包而已将skb拆分为数据包链,则此代码将运行。

gso:do {struct sk_buff *nskb = skb->next;skb->next = nskb->next;nskb->next = NULL;if (!list_empty(&ptype_all))dev_queue_xmit_nit(nskb, dev);skb_len = nskb->len;rc = ops->ndo_start_xmit(nskb, dev);trace_net_dev_xmit(nskb, rc, dev, skb_len);if (unlikely(rc != NETDEV_TX_OK)) {if (rc & ~NETDEV_TX_MASK)goto out_kfree_gso_skb;nskb->next = skb->next;skb->next = nskb;return rc;}txq_trans_update(txq);if (unlikely(netif_xmit_stopped(txq) && skb->next))return NETDEV_TX_BUSY;} while (skb->next);

您可能已经猜到了,这段代码是一个while循环,它循环访问分段数据时生成的skb列表。

每个数据包是:

  • 通过数据包水龙头(如果有的话)。
  • 通过传递给驾驶员ndo_start_xmit

通过调整需要发送的skb列表来处理传输数据包中的任何错误。错误将返回堆栈,未发送的skb可能会重新排队以便稍后再次发送。

此功能的最后一部分是在出现以上任何错误的情况下清理并释放数据:

out_kfree_gso_skb:if (likely(skb->next == NULL)) {skb->destructor = DEV_GSO_CB(skb)->destructor;consume_skb(skb);return rc;}
out_kfree_skb:kfree_skb(skb);
out:return rc;
}
EXPORT_SYMBOL_GPL(dev_hard_start_xmit);

在继续进行设备驱动程序之前,让我们看一下可以对我们刚刚遍历的代码进行的一些监视和调整。

监控qdiscs


使用tc命令行工具

通过使用监视您的qdisc统计信息 tc

$ tc -s qdisc show dev eth1
qdisc mq 0:根发送31973946891907字节2298757402 pkt(丢弃0,超出限制0重新排队1776429)积压0b 0p排队1776429

为了监视系统的数据包传输状况,至关重要的是检查与网络设备连接的队列规则的统计信息。您可以通过运行命令行工具来检查状态tc。上面的示例显示了如何检查eth1接口的统计信息。

  • bytes:被压低到驱动程序进行传输的字节数。
  • pkt:被推送到驱动程序进行传输的数据包数量。
  • dropped:qdisc丢弃的数据包数。如果传输队列长度不足以容纳正在排队的数据,则会发生这种情况。
  • overlimits:取决于排队原则,但可以是由于达到限制而无法排队的数据包数量,和/或可以是在出队时触发限制事件的数据包数量。
  • requeuesdev_requeue_skb已调用重新排队skb的次数。请注意,多次重新排队的skb每次重新排队都会使该计数器碰撞。
  • backlog:当前在qdisc队列中的字节数。通常,每次将数据包入队时,此数字都会增加。

一些qdsics可能会导出其他统计信息。每个qdisc都不同,并且可能在不同的时间碰撞这些计数器。您可能需要研究所使用的qdisc的来源,以准确了解何时可以在系统上增加这些值,以帮助了解对您造成的后果。

调优qdiscs

增加加工重量 __qdisc_run

您可以调整__qdisc_run前面看到的循环权重(quota上面看到的变量),这将导致__netif_schedule执行更多的调用。结果将是将当前qdisc output_queue多次添加到当前CPU 的列表中,这将导致对发送数据包进行更多处理。

示例:使用`sysctl`增加所有qdiscs的`__qdisc_run`配额。

$ sudo sysctl -w net.core.dev_weight = 600

增加传输队列长度

每个网络设备都有一个txqueuelen可以修改的调节旋钮。大多数qdisc txqueuelen在排队最终应由qdisc传输的数据时会检查设备是否具有足够的字节。您可以调整他的参数以增加qdisc可能排队的字节数。

示例:将eth0的txqueuelen增加到10000。

$ sudo ifconfig eth0 txqueuelen 10000

以太网设备的默认值为1000。您可以txqueuelen通过阅读的输出来检查网络设备ifconfig

网络设备驱动程序


我们的旅程即将结束。关于数据包传输,有一个重要的概念需要理解。大多数设备和驱动程序将数据包传输分为两个步骤:

  1. 正确安排数据,并触发设备以DMA将RAM中的数据写入DMA并将其写入网络
  2. 传输完成后,设备将引发中断,因此驱动程序可以取消映射缓冲区,释放内存或以其他方式清除其状态。

第二阶段通常称为“传输完成”阶段。我们将同时检查这两个方面,但我们将从第一个阶段开始:传输阶段。

我们看到dev_hard_start_xmit调用了ndo_start_xmit(持有锁)来传输数据,因此让我们首先检查驱动程序如何注册an ndo_start_xmit,然后我们将深入研究该函数的工作方式。

正如在以前的博客中,我们将检查igb驱动程序。

驱动注册


驱动程序为一系列操作实现了一系列功能,例如:

  • 传送资料(ndo_start_xmit
  • 获取统计信息(ndo_get_stats64
  • 装卸设备ioctlsndo_do_ioctl
  • 和更多。

这些函数将导出为排列在结构中的一系列函数指针。让我们看一下igb 驱动程序源中这些操作的结构定义:

static const struct net_device_ops igb_netdev_ops = {.ndo_open               = igb_open,.ndo_stop               = igb_close,.ndo_start_xmit         = igb_xmit_frame,.ndo_get_stats64        = igb_get_stats64,/* ... more fields ... */
};

该结构在igb_probe函数中注册:

static int igb_probe(struct pci_dev *pdev, const struct pci_device_id *ent)
{/* ... lots of other stuff ... */netdev->netdev_ops = &igb_netdev_ops;/* ... more code ... */
}

正如我们在上一节中看到的那样,更高级别的代码将获得对设备netdev_ops结构的引用,并调用适当的函数。如果您想了解更多有关如何准确启动PCI设备以及何时igb_probe调用何处的信息,请参阅我们另一篇博客文章中的驱动程序初始化部分。

与传输数据 ndo_start_xmit


网络堆栈的较高层使用该net_device_ops结构来调用驱动程序以执行各种操作。如前所述,qdisc代码调用ndo_start_xmit将数据向下传递到驱动程序进行传输。的ndo_start_xmit如上所示,对于大多数硬件设备,在持有锁的同时调用函数。

igb设备驱动程序中,注册到的函数ndo_start_xmit称为igb_xmit_frame,因此让我们开始igb_xmit_frame学习该驱动程序如何传输数据。按照./drivers/net/ethernet/intel/igb/igb_main.c的说明进行操作,并牢记在执行以下代码的整个过程中都将保持锁定:

netdev_tx_t igb_xmit_frame_ring(struct sk_buff *skb,struct igb_ring *tx_ring)
{struct igb_tx_buffer *first;int tso;u32 tx_flags = 0;u16 count = TXD_USE_COUNT(skb_headlen(skb));__be16 protocol = vlan_get_protocol(skb);u8 hdr_len = 0;/* need: 1 descriptor per page * PAGE_SIZE/IGB_MAX_DATA_PER_TXD,*       + 1 desc for skb_headlen/IGB_MAX_DATA_PER_TXD,*       + 2 desc gap to keep tail from touching head,*       + 1 desc for context descriptor,* otherwise try next time*/if (NETDEV_FRAG_PAGE_MAX_SIZE > IGB_MAX_DATA_PER_TXD) {unsigned short f;for (f = 0; f < skb_shinfo(skb)->nr_frags; f++)count += TXD_USE_COUNT(skb_shinfo(skb)->frags[f].size);} else {count += skb_shinfo(skb)->nr_frags;}

该函数首先确定使用TXD_USER_COUNT宏来确定需要多少个传输描述符来传输传入的数据。该count值在描述符数量处初始化以适合skb。然后调整它以解决需要传输的任何其他片段。

        if (igb_maybe_stop_tx(tx_ring, count + 3)) {/* this is a hard error */return NETDEV_TX_BUSY;}

然后,驱动程序调用一个内部函数igb_maybe_stop_tx,该函数将检查所需的描述符数量,以确保传输队列有足够的可用空间。如果不是,NETDEV_TX_BUSY则返回此处。正如我们在qdisc代码的前面所看到的,这将导致qdisc重新排队稍后要重试的数据。

        /* record the location of the first descriptor for this packet */first = &tx_ring->tx_buffer_info[tx_ring->next_to_use];first->skb = skb;first->bytecount = skb->len;first->gso_segs = 1;

然后,代码获得对发送队列中下一个可用缓冲区信息的重新定义。此结构将跟踪稍后设置缓冲区描述符所需的信息。对数据包的引用及其大小将复制到缓冲区信息结构中。

        skb_tx_timestamp(skb);

上面的代码从调用开始,该调用skb_tx_timestamp用于获取基于软件的传输时间戳。应用程序可以使用传输时间戳来确定数据包通过网络堆栈的传输路径所花费的时间。

一些设备还支持为在硬件中传输的数据包生成时间戳。这使系统可以将时间戳记卸载到设备上,并允许程序员获得更准确的时间戳,因为它将更接近硬件发生实际传输时的时间戳。我们现在将看到此代码:

        if (unlikely(skb_shinfo(skb)->tx_flags & SKBTX_HW_TSTAMP)) {struct igb_adapter *adapter = netdev_priv(tx_ring->netdev);if (!(adapter->ptp_tx_skb)) {skb_shinfo(skb)->tx_flags |= SKBTX_IN_PROGRESS;tx_flags |= IGB_TX_FLAGS_TSTAMP;adapter->ptp_tx_skb = skb_get(skb);adapter->ptp_tx_start = jiffies;if (adapter->hw.mac.type == e1000_82576)schedule_work(&adapter->ptp_tx_work);}}

某些网络设备可以使用“ 精确时间协议”在硬件中对数据包进行时间戳记。当用户请求硬件时间戳时,驱动程序代码将在此处进行处理。

if上面的语句检查SKBTX_HW_TSTAMP标志。该标志指示用户请求了硬件时间戳。如果用户请求硬件加盖时间戳,则该代码将接下来检查是否ptp_tx_skb已设置。一次可以对一个数据包进行时间戳记,因此此处引用了要时间戳记的数据包,并SKBTX_IN_PROGRESS在skb上设置了标志。将tx_flags更新以标记该IGB_TX_FLAGS_TSTAMP标志。该tx_flags变量稍后将被复制到缓冲区信息结构中。

将对skb进行引用,将当前的jiffies计数复制到ptp_tx_start。该值将由驱动程序中的其他代码使用,以确保TX硬件时间戳未挂起。最后,如果这是以太网硬件适配器,则该schedule_work函数用于启动工作队列82576

        if (vlan_tx_tag_present(skb)) {tx_flags |= IGB_TX_FLAGS_VLAN;tx_flags |= (vlan_tx_tag_get(skb) << IGB_TX_FLAGS_VLAN_SHIFT);}

上面的代码将检查vlan_tciskb 的字段是否已设置。如果已设置,则IGB_TX_FLAGS_VLAN启用该标志并存储VLAN ID。

        /* record initial flags and protocol */first->tx_flags = tx_flags;first->protocol = protocol;

标志和协议记录到缓冲区信息结构中。

        tso = igb_tso(tx_ring, first, &hdr_len);if (tso < 0)goto out_drop;else if (!tso)igb_tx_csum(tx_ring, first);

接下来,驱动程序调用其内部函数igb_tso。此功能将确定skb是否需要分段。如果是这样,则缓冲区信息引用(first)将更新其标志,以向硬件指示需要TSO。

igb_tso如果0不需要,1则返回TSO,否则返回。如果0返回,igb_tx_csum则将在需要且受此协议支持时调用以处理启用校验和卸载。该igb_tx_csum函数将检查skb的属性,并翻转缓冲区信息中的一些标志位,first以表明需要进行校验和卸载。

        igb_tx_map(tx_ring, first, hdr_len);

igb_tx_map调用该函数以准备要由设备消耗的数据以进行传输。接下来,我们将详细研究此功能。

        /* Make sure there is space in the ring for the next send. */igb_maybe_stop_tx(tx_ring, DESC_NEEDED);return NETDEV_TX_OK;

传输完成后,驱动程序将检查以确保有足够的空间可用于其他传输。如果不是,则关闭队列。无论哪种情况,NETDEV_TX_OK都将返回到较高层(qdisc代码)。

out_drop:igb_unmap_and_free_tx_resource(tx_ring, first);return NETDEV_TX_OK;
}

最后是一些错误处理代码。仅当igb_tso遇到某种错误时,此代码才会被击中。将igb_unmap_and_free_tx_resource被用于清理数据。NETDEV_TX_OK在这种情况下也将返回。传输不成功,但是驱动程序释放了相关的资源,没有任何事情要做。请注意,在这种情况下,该驱动程序不会增加丢包率,但可能应该增加。

igb_tx_map

igb_tx_map函数处理将skb数据映射到RAM的DMA可用区域的详细信息。它还会更新设备上传输队列的尾指针,这将触发设备“唤醒”,从RAM中获取数据并开始传输数据。

让我们简要地看一下此函数的工作方式:

static void igb_tx_map(struct igb_ring *tx_ring,struct igb_tx_buffer *first,const u8 hdr_len)
{struct sk_buff *skb = first->skb;/* ... other variables ... */u32 tx_flags = first->tx_flags;u32 cmd_type = igb_tx_cmd_type(skb, tx_flags);u16 i = tx_ring->next_to_use;tx_desc = IGB_TX_DESC(tx_ring, i);igb_tx_olinfo_status(tx_ring, tx_desc, tx_flags, skb->len - hdr_len);size = skb_headlen(skb);data_len = skb->data_len;dma = dma_map_single(tx_ring->dev, skb->data, size, DMA_TO_DEVICE);

上面的代码做了几件事:

  1. 声明一组变量并对其进行初始化。
  2. 使用IGB_TX_DESC宏确定获取对下一个可用描述符的引用。
  3. igb_tx_olinfo_status将更新tx_flags并将其复制到描述符(tx_desc)中。
  4. 捕获大小和数据长度,以便以后使用。
  5. dma_map_single用于构造为获取DMA可用地址所需的任何内存映射skb->data。这样做是为了使设备可以从内存中读取数据包数据。

接下来是驱动程序中非常密集的循环,用于为skb的每个片段生成有效的映射。究竟如何发生的细节并不特别重要,但值得一提:

  • 驱动程序遍历一组数据包片段。
  • 当前描述符已填充数据的DMA地址。
  • 如果片段的大小大于单个IGB描述符可以传输的大小,则构造多个描述符以指向DMA可用区域的块,直到整个片段被描述符指向为止。
  • 描述符迭代器被颠簸。
  • 剩余长度减少。
  • 循环在以下情况之一时终止:没有片段剩余或整个数据长度已耗尽。

下面提供了用于循环的代码,以供上面的描述参考。这应该向读者进一步说明,尽可能避免碎片是一个好主意。在堆栈的每个层(包括驱动程序)都需要运行许多其他代码来处理它。

        tx_buffer = first;for (frag = &skb_shinfo(skb)->frags[0];; frag++) {if (dma_mapping_error(tx_ring->dev, dma))goto dma_error;/* record length, and DMA address */dma_unmap_len_set(tx_buffer, len, size);dma_unmap_addr_set(tx_buffer, dma, dma);tx_desc->read.buffer_addr = cpu_to_le64(dma);while (unlikely(size > IGB_MAX_DATA_PER_TXD)) {tx_desc->read.cmd_type_len =cpu_to_le32(cmd_type ^ IGB_MAX_DATA_PER_TXD);i++;tx_desc++;if (i == tx_ring->count) {tx_desc = IGB_TX_DESC(tx_ring, 0);i = 0;}tx_desc->read.olinfo_status = 0;dma += IGB_MAX_DATA_PER_TXD;size -= IGB_MAX_DATA_PER_TXD;tx_desc->read.buffer_addr = cpu_to_le64(dma);}if (likely(!data_len))break;tx_desc->read.cmd_type_len = cpu_to_le32(cmd_type ^ size);i++;tx_desc++;if (i == tx_ring->count) {tx_desc = IGB_TX_DESC(tx_ring, 0);i = 0;}tx_desc->read.olinfo_status = 0;size = skb_frag_size(frag);data_len -= size;dma = skb_frag_dma_map(tx_ring->dev, frag, 0,size, DMA_TO_DEVICE);tx_buffer = &tx_ring->tx_buffer_info[i];}

一旦构造了所有必要的描述符,并且所有skb的数据都已映射到可DMA的地址,驱动程序便进入其最终步骤以触发传输:

        /* write last descriptor with RS and EOP bits */cmd_type |= size | IGB_TXD_DCMD;tx_desc->read.cmd_type_len = cpu_to_le32(cmd_type);

写入终止描述符以向设备指示它是最后一个描述符。

        netdev_tx_sent_queue(txring_txq(tx_ring), first->bytecount);/* set the timestamp */first->time_stamp = jiffies;

使用netdev_tx_sent_queue添加到此传输队列的字节数调用该函数。此功能是字节查询限制功能的一部分,我们将在稍后详细介绍。当前的抖动存储在第一个缓冲区信息结构中。

接下来,有些棘手的事情:

        /* Force memory writes to complete before letting h/w know there* are new descriptors to fetch.  (Only applicable for weak-ordered* memory model archs, such as IA-64).** We also need this memory barrier to make certain all of the* status bits have been updated before next_to_watch is written.*/wmb();/* set next_to_watch value indicating a packet is present */first->next_to_watch = tx_desc;i++;if (i == tx_ring->count)i = 0;tx_ring->next_to_use = i;writel(i, tx_ring->tail);/* we need this if more than one processor can write to our tail* at a time, it synchronizes IO on IA64/Altix systems*/mmiowb();return;

上面的代码正在做一些重要的事情:

1.Start通过使用该wmb函数被调用来强制存储器写入完成。这是作为适合CPU平台的特殊指令执行的,通常称为“写屏障”。这在某些CPU架构上很重要,因为如果我们触发设备启动DMA而未确保完成更新内部状态的所有内存写操作,则设备可能会从状态不一致的RAM中读取数据。本文和本讲座深入探讨了内存排序的细节。

  1. next_to_watch字段设置。它将在完成阶段稍后使用。
  2. 计数器next_to_use增加,传输队列的字段更新为下一个可用描述符。
  3. 传输队列的尾部用writel功能更新。writel向存储器映射的I / O地址写入“ long” 。在这种情况下,地址是tx_ring->tail(这是硬件地址),要写入的值是i。对设备的写操作会触发设备,让它知道已准备好将其他数据从RAM进行DMA并写入网络。
  4. 最后,调用该mmiowb函数。该函数将执行适用于CPU体系结构的适当指令,从而对内存映射的写操作进行排序。它也是一个写障碍,但是对于内存映射的I / O写。

如果您想了解有关,以及何时使用它们的更多信息,则可以阅读一些有关 Linux内核随附的内存屏障的出色文档。wmbmmiowb

最后,代码总结了一些错误处理。仅当尝试将skb数据地址映射到DMA可用地址时,如果DMA API返回了错误,则此代码才执行。

dma_error:dev_err(tx_ring->dev, "TX DMA map failed\n");/* clear dma mappings for failed tx_buffer_info map */for (;;) {tx_buffer = &tx_ring->tx_buffer_info[i];igb_unmap_and_free_tx_resource(tx_ring, tx_buffer);if (tx_buffer == first)break;if (i == 0)i = tx_ring->count;i--;}tx_ring->next_to_use = i;

在进行传输完成之前,让我们检查一下上面经过的内容:动态队列限制。

动态队列限制(DQL)

正如您在本博文中所看到的,随着网络数据越来越靠近设备进行传输,网络数据在各个阶段都要花很多时间坐在队列中。随着队列大小的增加,数据包将花费更长的时间坐在未传输的队列中,即,随着队列大小的增加,数据包的传输等待时间也会增加。

解决此问题的一种方法是施加背压。动态队列限制(DQL)系统是一种机制,设备驱动程序可以使用该机制向网络系统施加反压,以防止在设备无法传输时过多的数据排队等待传输,

要使用此系统,网络设备驱动程序需要在其传输和完成例程中进行一些简单的API调用。DQL系统在内部将使用一种算法来确定何时传输了足够的数据。一旦达到此限制,传输队列将被暂时禁用。禁用队列是对网络系统产生反压力的原因。当DQL系统确定足够的数据已完成传输时,将自动重新启用该队列。

请查看关于DQL系统的这组出色的幻灯片,以获取一些性能数据以及DQL中内部算法的说明。

netdev_tx_sent_queue我们刚刚看到的代码中调用的函数是DQL API的一部分。当数据排队到设备进行传输时,将调用此函数。传输完成后,驱动程序将调用netdev_tx_completed_queue。在内部,这两个函数都将调用DQL库(位于./lib/dynamic_queue_limits.c和./include/linux/dynamic_queue_limits.h中),以确定是否应该禁用,重新启用或保留发送队列。是

DQL导出sysfs中的统计信息和调整旋钮。无需调整DQL;该算法将随着时间调整其参数。但是,出于完整性考虑,我们稍后将看到如何监视和调整DQL。

传输完成


设备发送完数据后,将产生一个中断信号,表明发送已完成。然后,设备驱动程序可以安排一些长时间运行的工作来完成,例如取消映射内存区域和释放数据。具体如何工作取决于设备。对于igb驱动程序(及其关联的设备),将触发相同的IRQ以完成发送和接收数据包。这意味着,对于该igb驱动程序NET_RX用于处理两个传输完井和传入分组接收。

让我重申一下这一点,以强调这一点的重要性:您的设备可能会收到与接收到的数据包相同的中断,以表示信号已完成数据包传输。如果是这样,NET_RXsoftirq的运行处理传入的数据包和发送的完成。

由于两个操作共享相同的IRQ,因此只能注册一个IRQ处理函数,并且必须处理两种可能的情况。接收网络数据时,请回想以下流程:

  1. 接收到网络数据。
  2. 网络设备引发IRQ。
  3. 设备驱动程序的IRQ处理程序将执行,清除IRQ并确保计划运行softIRQ(如果尚未运行)。在此触发的这个softIRQ是NET_RXsoftIRQ。
  4. softIRQ本质上是作为单独的内核线程执行的。它运行并实现NAPI轮询循环。
  5. NAPI轮询循环只是一段代码,只要有足够的预算,它就会在循环收集数据包中执行。
  6. 每次处理数据包时,预算都会减少,直到没有更多数据包要处理,预算达到0或时间片到期为止。

igb驱动程序(和ixgbe驱动程序[greets,tyler])中的上述第5步在处理传入数据之前先处理TX完成。请记住,根据驱动程序的实现,用于TX完成和输入数据的处理功能可能共享相同的处理预算。的igbixgbe的驱动分别跟踪的TX完成和传入分组预算,所以处理TX完井不一定会耗尽RX预算。

也就是说,整个NAPI轮询循环在硬编码的时间片内运行。这意味着,如果您要处理大量的TX完成处理,则TX完成可能要比处理传入数据花费更多的时间片。对于那些在非常高的负载环境中运行网络硬件的人来说,这可能是一个重要的考虑因素。

让我们看看在实践中igb驾驶员如何发生这种情况。

发送完成IRQ

这篇文章不会重述Linux内核接收方网络博客文章中已经介绍的信息,而是按顺序列出步骤并链接到接收方博客文章中的相应部分,直到达到传输完成为止。

因此,让我们从头开始:

  1. 网络设备被启动。
  2. IRQ处理程序已注册。
  3. 用户程序将数据发送到网络套接字。数据在网络堆栈中传输,直到设备从内存中获取并传输数据为止。
  4. 设备完成数据传输,并提出一个IRQ信号传输完成。
  5. 驱动程序的IRQ处理程序执行以处理中断。
  6. IRQ处理程序调用napi_schedule以响应IRQ。
  7. 该NAPI代码触发NET_RX软中断执行。
  8. NET_RXsofitrq功能,net_rx_action 开始执行。
  9. net_rx_action函数调用驱动程序的注册NAPI poll函数。
  10. 该NAPI调查功能igb_poll,被执行。

轮询功能igb_poll是代码分离并处理传入数据包和传输完成的地方。让我们深入研究该函数的代码,看看发生在哪里。

igb_poll

让我们看一下igb_poll(从./drivers/net/ethernet/intel/igb/igb_main.c)的代码:

/***  igb_poll - NAPI Rx polling callback*  @napi: napi polling structure*  @budget: count of how many packets we should handle**/
static int igb_poll(struct napi_struct *napi, int budget)
{struct igb_q_vector *q_vector = container_of(napi,struct igb_q_vector,napi);bool clean_complete = true;#ifdef CONFIG_IGB_DCA
        if (q_vector->adapter->flags & IGB_FLAG_DCA_ENABLED)igb_update_dca(q_vector);
#endif
        if (q_vector->tx.ring)clean_complete = igb_clean_tx_irq(q_vector);if (q_vector->rx.ring)clean_complete &= igb_clean_rx_irq(q_vector, budget);/* If all work not completed, return budget and keep polling */if (!clean_complete)return budget;/* If not enough Rx work done, exit the polling mode */napi_complete(napi);igb_ring_irq_enable(q_vector);return 0;
}

该函数按顺序执行一些操作:

  1. 如果在内核中启用了直接缓存访问(DCA)支持,则会预热CPU缓存,以便对RX环的访问将达到CPU缓存。您可以在接收方网络文章的其他部分中阅读有关DCA的更多信息。
  2. igb_clean_tx_irq 被称为执行发送完成操作。
  3. igb_clean_rx_irq 接下来被称为执行传入数据包处理。
  4. 最后,clean_complete检查以确定是否还有更多可以完成的工作。如果是这样,budget则返回。如果发生这种情况,net_rx_action请将此NAPI结构移至轮询列表的末尾,以便稍后再次处理。

要了解有关igb_clean_rx_irq工作原理的更多信息,请阅读上一篇博客文章的这一部分。

这篇博文主要涉及发送方,因此我们将继续研究igb_clean_tx_irq以上内容。

igb_clean_tx_irq

在./drivers/net/ethernet/intel/igb/igb_main.c中查看此功能的源代码。

它有点长,所以我们将其分成大块并仔细研究:

static bool igb_clean_tx_irq(struct igb_q_vector *q_vector)
{struct igb_adapter *adapter = q_vector->adapter;struct igb_ring *tx_ring = q_vector->tx.ring;struct igb_tx_buffer *tx_buffer;union e1000_adv_tx_desc *tx_desc;unsigned int total_bytes = 0, total_packets = 0;unsigned int budget = q_vector->tx.work_limit;unsigned int i = tx_ring->next_to_clean;if (test_bit(__IGB_DOWN, &adapter->state))return true;

该函数从初始化一些有用的变量开始。值得一看的是budget。如您所见,budget已初始化为此队列的tx.work_limit。在igb驱动程序中,tx.work_limit被初始化为硬编码值IGB_DEFAULT_TX_WORK(128)。

重要的是要注意,尽管我们现在正在看的TX完成代码NET_RX与接收处理在相同的softirq中运行,但是TX和RX函数在igb驱动程序中彼此之间没有共享处理预算。由于整个轮询功能在同一时间段内运行,因此该igb_poll功能的单次运行不可能使传入的数据包处理不足或传输完成。只要igb_poll被调用,两者都会被处理。

继续,上面的代码片段通过检查网络设备是否关闭来完成。如果是这样,它将返回true并退出igb_clean_tx_irq

        tx_buffer = &tx_ring->tx_buffer_info[i];tx_desc = IGB_TX_DESC(tx_ring, i);i -= tx_ring->count;
  1. 将该tx_buffer变量初始化为在位置传输缓冲区信息结构tx_ring->next_to_clean(其本身被初始化为0)。
  2. 获取对关联描述符的引用,并将其存储在中tx_desc
  3. 计数器i减少传输队列的大小。该值可以调整(我们将在调整部分中看到),但将其初始化为IGB_DEFAULT_TXD(256)。

接下来,循环开始。它包含一些有用的注释,以解释每个步骤发生的情况:

        do {union e1000_adv_tx_desc *eop_desc = tx_buffer->next_to_watch;/* if next_to_watch is not set then there is no work pending */if (!eop_desc)break;/* prevent any other reads prior to eop_desc */read_barrier_depends();/* if DD is not set pending work has not been completed */if (!(eop_desc->wb.status & cpu_to_le32(E1000_TXD_STAT_DD)))break;/* clear next_to_watch to prevent false hangs */tx_buffer->next_to_watch = NULL;/* update the statistics for this packet */total_bytes += tx_buffer->bytecount;total_packets += tx_buffer->gso_segs;/* free the skb */dev_kfree_skb_any(tx_buffer->skb);/* unmap skb header data */dma_unmap_single(tx_ring->dev,dma_unmap_addr(tx_buffer, dma),dma_unmap_len(tx_buffer, len),DMA_TO_DEVICE);/* clear tx_buffer data */tx_buffer->skb = NULL;dma_unmap_len_set(tx_buffer, len, 0);
  1. 首先eop_desc设置为缓冲区的next_to_watch字段。这是在我们之前看到的传输代码中设置的。
  2. 如果eop_desc(eop =封包结尾)为NULL,则没有任何待处理的工作。
  3. read_barrier_depends调用该函数,该函数将针对该CPU体系结构执行适当的CPU指令,以防止在该屏障上对读取进行重新排序。
  4. 接下来,在数据包描述符末尾检查状态位eop_desc。如果E1000_TXD_STAT_DD未设置该位,则传输尚未完成,因此请中断循环。
  5. 清除tx_buffer->next_to_watch。驾驶员中的看门狗计时器将监视此字段,以确定是否挂起了发送。清除该字段将防止看门狗触发。
  6. 统计计数器针对发送的总字节和数据包进行更新。一旦所有描述符都处理完毕,这些将被复制到驱动程序读取的统计计数器中。
  7. skb已释放。
  8. dma_unmap_single 用于取消映射skb数据区域。
  9. tx_buffer->skb设置为NULL,将tx_buffer取消映射。

接下来,在上面的循环中开始另一个循环:

                /* clear last DMA location and unmap remaining buffers */while (tx_desc != eop_desc) {tx_buffer++;tx_desc++;i++;if (unlikely(!i)) {i -= tx_ring->count;tx_buffer = tx_ring->tx_buffer_info;tx_desc = IGB_TX_DESC(tx_ring, 0);}/* unmap any remaining paged data */if (dma_unmap_len(tx_buffer, len)) {dma_unmap_page(tx_ring->dev,dma_unmap_addr(tx_buffer, dma),dma_unmap_len(tx_buffer, len),DMA_TO_DEVICE);dma_unmap_len_set(tx_buffer, len, 0);}}

这个内部循环将遍历每个传输描述符,直到tx_desc到达eop_desc。此代码取消映射任何其他描述符引用的数据。

外循环继续:

                /* move us one more past the eop_desc for start of next pkt */tx_buffer++;tx_desc++;i++;if (unlikely(!i)) {i -= tx_ring->count;tx_buffer = tx_ring->tx_buffer_info;tx_desc = IGB_TX_DESC(tx_ring, 0);}/* issue prefetch for next Tx descriptor */prefetch(tx_desc);/* update budget accounting */budget--;} while (likely(budget));

外循环增加迭代器并减少该budget值。检查循环不变式以确定循环是否应继续。

        netdev_tx_completed_queue(txring_txq(tx_ring),total_packets, total_bytes);i += tx_ring->count;tx_ring->next_to_clean = i;u64_stats_update_begin(&tx_ring->tx_syncp);tx_ring->tx_stats.bytes += total_bytes;tx_ring->tx_stats.packets += total_packets;u64_stats_update_end(&tx_ring->tx_syncp);q_vector->tx.total_bytes += total_bytes;q_vector->tx.total_packets += total_packets;

这段代码:

  1. 调用netdev_tx_completed_queue,它是上述DQL API的一部分。如果处理了足够的完成,这将有可能重新启用传输队列。
  2. 统计信息将添加到它们的适当位置,以便用户可以访问它们,我们将在后面看到。

通过首先检查IGB_RING_FLAG_TX_DETECT_HANG标志是否设置来继续执行代码。每次运行计时器回调时,看门狗计时器都会设置此标志,以强制定期检查传输队列。如果该标志现在恰好处于打开状态,则代码将继续并检查传输队列是否已挂起:

        if (test_bit(IGB_RING_FLAG_TX_DETECT_HANG, &tx_ring->flags)) {struct e1000_hw *hw = &adapter->hw;/* Detect a transmit hang in hardware, this serializes the* check with the clearing of time_stamp and movement of i*/clear_bit(IGB_RING_FLAG_TX_DETECT_HANG, &tx_ring->flags);if (tx_buffer->next_to_watch &&time_after(jiffies, tx_buffer->time_stamp +(adapter->tx_timeout_factor * HZ)) &&!(rd32(E1000_STATUS) & E1000_STATUS_TXOFF)) {/* detected Tx unit hang */dev_err(tx_ring->dev,"Detected Tx Unit Hang\n""  Tx Queue             <%d>\n""  TDH                  <%x>\n""  TDT                  <%x>\n""  next_to_use          <%x>\n""  next_to_clean        <%x>\n""buffer_info[next_to_clean]\n""  time_stamp           <%lx>\n""  next_to_watch        <%p>\n""  jiffies              <%lx>\n""  desc.status          <%x>\n",tx_ring->queue_index,rd32(E1000_TDH(tx_ring->reg_idx)),readl(tx_ring->tail),tx_ring->next_to_use,tx_ring->next_to_clean,tx_buffer->time_stamp,tx_buffer->next_to_watch,jiffies,tx_buffer->next_to_watch->wb.status);netif_stop_subqueue(tx_ring->netdev,tx_ring->queue_index);/* we are about to reset, no point in enabling stuff */return true;}

if上面的语句检查:

  • tx_buffer->next_to_watch 设置好了
  • 电流jiffies大于time_stamp发送到的记录的电流tx_buffer,并加上了超时因子,并且
  • 设备的发送状态寄存器未设置为E1000_STATUS_TXOFF

如果这三个测试都为真,则打印出检测到挂起的错误。netif_stop_subqueue用于关闭队列并true返回。

让我们继续阅读代码,看看如果没有发送挂起检查,或者如果有,但是没有检测到挂起,将会发生什么:

#define TX_WAKE_THRESHOLD (DESC_NEEDED * 2)
        if (unlikely(total_packets &&netif_carrier_ok(tx_ring->netdev) &&igb_desc_unused(tx_ring) >= TX_WAKE_THRESHOLD)) {/* Make sure that anybody stopping the queue after this* sees the new next_to_clean.*/smp_mb();if (__netif_subqueue_stopped(tx_ring->netdev,tx_ring->queue_index) &&!(test_bit(__IGB_DOWN, &adapter->state))) {netif_wake_subqueue(tx_ring->netdev,tx_ring->queue_index);u64_stats_update_begin(&tx_ring->tx_syncp);tx_ring->tx_stats.restart_queue++;u64_stats_update_end(&tx_ring->tx_syncp);}}return !!budget;

在上面的代码中,驱动程序将重新启动传输队列(如果先前已禁用)。它首先检查是否:

  • 一些数据包已完成处理(total_packets非零),并且
  • netif_carrier_ok 确保没有关闭设备,并且
  • 传输队列中未使用的描述符的数量大于或等于TX_WAKE_THRESHOLD。此阈值似乎42在我的x86_64系统上。

如果满足所有条件,则使用写屏障(smp_mb)。接下来检查另一组条件:

  • 如果队列已停止,并且
  • 设备未关闭

然后netif_wake_subqueue调用以唤醒传输队列,并向高层发出信号,通知它们可以再次将数据排队。该restart_queue统计计数器增加。接下来,我们将了解如何读取该值。

最后,返回一个布尔值。如果还有剩余未使用的预算,true则返回false。检入此值igb_poll以确定要返回的内容net_rx_action

igb_poll 返回值

igb_poll函数具有以下代码来确定要返回的内容net_rx_action

        if (q_vector->tx.ring)clean_complete = igb_clean_tx_irq(q_vector);if (q_vector->rx.ring)clean_complete &= igb_clean_rx_irq(q_vector, budget);/* If all work not completed, return budget and keep polling */if (!clean_complete)return budget;

换句话说,如果:

  • igb_clean_tx_irq 在不耗尽其传输完成预算的情况下清除了所有传输完成,并且
  • igb_clean_rx_irq 清除所有传入数据包而不会耗尽其数据包处理预算

然后,将返回全部预算金额(64对于包括在内的大多数驱动程序,igb都将其硬编码为)。如果RX或TX处理中的任何一个都无法完成(因为还有更多工作要做),则通过调用napi_complete0返回NAPI来禁用它:

        /* If not enough Rx work done, exit the polling mode */napi_complete(napi);igb_ring_irq_enable(q_vector);return 0;
}

监控网络设备

有几种不同的方法来监视网络设备,以提供不同级别的粒度和复杂性。让我们从最细粒度开始,然后移到最小粒度。

使用 ethtool -S

您可以安装ethtool运行的Ubuntu系统上:sudo apt-get install ethtool

安装后,您可以通过传递-S标志以及要进行统计的网络设备的名称来访问统计信息。

使用`ethtool -S`监视详细的NIC设备统计信息(例如,传输错误)。

$ sudo ethtool -S eth0
NIC统计信息:rx_packets:597028087tx_packets:5924278060rx_bytes:112643393747tx_bytes:990080156714rx_broadcast:96tx_broadcast:116rx_multicast:20294528....

监视此数据可能很困难。它很容易获得,但是没有字段值的标准化。不同的驱动程序,甚至同一驱动程序的不同版本,可能会产生具有相同含义的不同字段名称。

您应该在标签中查找带有“ drop”,“ buffer”,“ miss”,“ errors”等的值。接下来,您将必须阅读驱动程序源。您将能够确定哪些值完全由软件计算(例如,当没有内存时增加),以及哪些值直接通过寄存器读取来自硬件。如果是寄存器值,则应查阅硬件的数据手册,以确定计数器的真正含义。通过给出的许多标签ethtool可能会产生误导。

使用sysfs

sysfs还提供了许多统计信息值,但是它们比提供的直接NIC级别统计信息略高一些。

您可以通过cat在文件上使用来找到丢失的传入网络数据帧的数量,例如eth0 。

使用sysfs监视更高级别的NIC统计信息。

$ cat / sys / class / net / eth0 / statistics / tx_aborted_errors
2

计数器值将被拆分后的文件一样tx_aborted_errorstx_carrier_errorstx_compressedtx_dropped,等。

不幸的是,由驱动程序决定每个字段的含义,从而决定何时递增它们以及值从何而来。您可能会注意到,某些驱动程序将某种错误情况视为掉线,而其他驱动程序可能将其视为未命中。

如果这些值对您很关键,则需要阅读驱动程序源和设备数据手册,以准确了解驱动程序对每个值的含义。

使用 /proc/net/dev

甚至更高级别的文件还/proc/net/dev为系统上的每个网络适配器提供了高级摘要式信息。

通过阅读监视高级NIC统计信息/proc/net/dev

$ cat / proc / net / dev
间| 接收| 发送面|字节数据包错误掉落fifo帧压缩多播|字节数据包错误掉落fifo colls载波压缩eth0:110346752214 597737500 0 2 0 0 0 20963860 990024805984 6066582604 0 0 0 0 0 0lo:428349463836 1579868535 0 0 0 0 0 0 428349463836 1579868535 0 0 0 0 0 0

该文件显示了您在上述sysfs文件中找到的值的子集,但它可以用作有用的常规参考。

上面提到的警告同样适用于此:如果这些值对您很重要,您仍然需要阅读驱动程序源,以准确了解它们在何时,何地以及为何递增,以确保您理解错误,掉落或fifo与您的驱动程序相同。

监视动态队列限制

您可以通过阅读位于下的文件来监视网络设备的动态队列限制/sys/class/net/NIC/queues/tx-QUEUE_NUMBER/byte_queue_limits/

更换NIC你的设备名(eth0eth1,等),并tx-QUEUE_NUMBER与发送队列数(tx-0tx-1tx-2等)。

其中一些文件是:

  • hold_time:初始化为HZ(单赫兹)。如果的队列已满hold_time,则减小最大大小。
  • inflight:此值等于(排队的数据包数-完成的数据包数)。它是当前正在传输的尚未处理完成的数据包数。
  • limit_max:一个硬编码的值,在我的x86_64系统上设置为DQL_MAX_LIMIT1879048192
  • limit_min:一个硬编码值,设置为0
  • limit:介于limit_min和之间的值limit_max,表示当前可以排队的最大对象数。

修改任何这些值之前,强烈建议您阅读这些演示幻灯片,以深入了解该算法。

监控包通过读取在飞行中传输/sys/class/net/eth0/queues/tx-0/byte_queue_limits/inflight

$ cat / sys / class / net / eth0 / queues / tx-0 / byte_queue_limits / inflight
350

调整网络设备


检查正在使用的TX队列数

如果您的NIC和系统上加载的设备驱动程序支持多个传输队列,通常可以使用来调整TX队列(也称为TX通道)的数量ethtool

使用以下命令检查NIC传输队列的数量 ethtool

$ sudo ethtool -l eth0
eth0的通道参数:
预设最大值:
接收:0
TX:0
其他:0
合计:8
当前的硬件设置:
接收:0
TX:0
其他:0
合计:4

此输出显示预设的最大值(由驱动程序和硬件强制执行)和当前设置。

注意:并非所有设备驱动程序都支持此操作。

如果您的NIC不支持此操作,则会显示错误。

$ sudo ethtool -l eth0
eth0的通道参数:
无法获取设备通道参数
:不支持操作

这意味着您的驱动程序尚未实现ethtool get_channels操作。这可能是因为NIC不支持调整队列数量,不支持多个传输队列,或者您的驱动程序尚未更新以处理此功能。

调整使用的TX队列数

找到当前和最大队列数后,您可以使用来调整值sudo ethtool -L

注意:某些设备及其驱动程序仅支持为发送和接收而配对的组合队列,如上节中的示例所示。

将合并的NIC发送和接收队列设置为8 ethtool -L

$ sudo ethtool -L eth0合并8

如果您的设备和驱动程序支持RX和TX的各个设置,并且您只想将TX队列数更改为8,则可以运行:

使用设置NIC传输队列的数量为8 ethtool -L

$ sudo ethtool -L eth0 tx 8

注意:对于大多数驱动程序,进行这些更改将关闭接口,然后将其恢复;与此接口的连接将中断。但是,对于一次更改而言,这可能无关紧要。

调整发送队列的大小

某些NIC及其驱动程序还支持调整TX队列的大小。确切的工作方式是特定于硬件的,但是幸运的是ethtool为用户提供了一种调整大小的通用方法。TX的大小增加可能不会产生太大的差异,因为DQL用于防止高层网络代码有时会排队更多数据。但是,您可能希望将TX队列增加到最大大小,并让DQL为您整理其他所有内容:

使用以下命令检查当前的NIC队列大小 ethtool -g

$ sudo ethtool -g eth0
eth0的环参数:
预设最大值:
接收:4096
迷你接收:0
接收超大:0
TX:4096
当前的硬件设置:
接收:512
迷你接收:0
接收超大:0
TX:512

以上输出表明该硬件最多支持4096个接收和发送描述符,但当前仅使用512个。

使用以下命令将每个TX队列的大小增加到4096 ethtool -G

$ sudo ethtool -G eth0发送4096

注意:对于大多数驱动程序,进行这些更改将关闭接口,然后将其恢复;与此接口的连接将中断。但是,对于一次更改而言,这可能无关紧要。

结束

结束!现在,您了解了有关数据包在Linux上如何工作的所有知识:从用户程序到设备驱动程序,再到返回。

附加功能

还有一些值得一提的其他值得一提的东西,这些其他地方似乎都不是很正确。

减少ARP流量(MSG_CONFIRM

sendsendto以及和sendmsg系统调用都需要一个flags参数。如果将MSG_CONFIRM标志传递给来自应用程序的这些系统调用,则将导致dst_neigh_output内核中发送路径上的函数更新邻居结构的时间戳。这样的结果是将不会垃圾收集邻居结构。由于邻居缓存条目将保持更暖和更长的时间,因此可以防止生成其他ARP流量。

UDP瓶塞

我们在整个UDP协议栈中广泛检查了UDP软木塞。如果要在应用程序中使用它,可以通过将setsockopt级别设置为IPPROTO_UDP,将optname设置为UDP_CORK并将optval设置为来启用UDP插入1

时间戳记

如以上博客文章所述,网络堆栈可以收集传出数据的时间戳。请参阅上述网络堆栈演练,以了解在软件中发生发送时间戳的位置。某些NIC甚至还支持在硬件上添加时间戳。

如果您想尝试确定内核网络堆栈为发送数据包增加了多少延迟,则此功能很有用。

关于时间戳的内核文档非常出色,甚至包括一个附带的示例程序和Makefile,您都可以签出!。

确定您的驱动程序和设备支持的时间戳模式ethtool -T

$ sudo ethtool -T eth0
eth0的时间戳参数:
能力:软件传输(SOF_TIMESTAMPING_TX_SOFTWARE)软件接收(SOF_TIMESTAMPING_RX_SOFTWARE)软件系统时钟(SOF_TIMESTAMPING_SOFTWARE)
PTP硬件时钟:无
硬件发送时间戳模式:无
硬件接收过滤器模式:无

不幸的是,此NIC不支持硬件传输时间戳,但是仍可以在该系统上使用软件时间戳,以帮助我确定内核添加到数据包传输路径的延迟时间。

结论

Linux网络堆栈很复杂。

正如我们在上面看到的,即使是最简单的事情NET_RX也无法保证能像我们期望的那样工作。即使RX名称中已标明,传输完成仍在此softIRQ中进行处理。

这突显了我认为是问题核心的内容:除非仔细阅读并了解其工作原理,否则不可能优化和监视网络堆栈。您无法深入监控不了解的代码。

帮助Linux联网或其他系统

在浏览网络堆栈时需要其他帮助吗?对本文中的任何内容或相关内容有疑问吗?给我们发送电子邮件,让我们知道我们将如何提供帮助。

相关文章

如果您喜欢此职位,则可以享受我们其他一些低级别的技术职位:

  • 监视和调整Linux网络协议栈:接收数据
  • 监控和调整Linux网络协议栈的图解指南:接收数据
  • Linux系统调用权威指南
  • strace工作如何?
  • ltrace工作如何?
  • APT哈希总和不匹配
  • HOWTO:GPG签署并验证Deb软件包和APT存储库
  • HOWTO:GPG签署并验证RPM软件包和yum存储库

监视和调整Linux网络协议栈:发送数据相关推荐

  1. 监视和调整Linux网络协议栈:接收数据

    Table of Contents 有关监视和调整Linux网络协议栈的建议 总览 详细外观 网络设备驱动程序 初始化 网络设备初始化 启动网络设备 监控网络设备 调整网络设备 SoftIRQ 什么是 ...

  2. 监控和调整Linux网络协议栈的图解指南:接收数据

    Table of Contents 入门 最初设定 数据到达 网络数据处理开始 网络数据处理继续 协议栈和用户态套接字 结论 监视和调整Linux网络协议栈:接收数据(图解):https://rtoa ...

  3. linux网络协议栈之数据包处理过程,Linux网络协议栈之数据包处理过程

    这篇文档是基于 x86 体系结构和转发 IP 分组的. 数据包在 Linux 内核链路层路径 接收分组 1 接收中断 如果网卡收到一个和自己 MAC 地址匹配或链路层广播的以太网帧,它就会产生一个中断 ...

  4. Linux网络协议栈:关闭一个还有没发送数据完的TCP连接

    <监视和调整Linux网络协议栈:接收数据> <监控和调整Linux网络协议栈的图解指南:接收数据> <Linux网络 - 数据包的接收过程> <Linux网 ...

  5. Linux网络协议栈:中断下半部处理

    <Linux中断处理:上半部和下半部> <Linux网络协议栈:中断下半部处理> 目录 数据包上送 网络中断下半部处理 总结 推荐阅读 在<Linux网络协议栈:网络包接 ...

  6. Linux网络协议栈:网络包接收过程

    目录 一 Linux网络收包总览 二 Linux启动 2.1 创建ksoftirqd内核线程 2.2 网络子系统初始化 2.3 协议栈注册 2.4 网卡驱动初始化 2.5 启动网卡 三 迎接数据的到来 ...

  7. Linux网络协议栈:一个TCP链接的耗时

    <一次系统调用开销到底有多大?strace.time.perf命令> 目录 一 正常TCP连接建立过程 二 TCP连接建立时的异常情况 1)客户端connect系统调用耗时失控 2)半/全 ...

  8. Linux网络协议栈:NAPI机制与处理流程分析(图解)

    Table of Contents NAPI机制 NAPI缺陷 使用 NAPI 先决条件 非NAPI帧的接收 netif_rx - 将网卡中收到的数据包放到系统中的接收队列中 enqueue_to_b ...

  9. linux 虚拟机大量udp请求失败_理解 Linux 网络栈:Linux 网络协议栈简单总结分析...

    1. Linux 网络路径 1.1 发送端 1.1.1 应用层 (1) Socket 应用层的各种网络应用程序基本上都是通过 Linux Socket 编程接口来和内核空间的网络协议栈通信的.Linu ...

最新文章

  1. ABP官方文档翻译 0.0 ABP官方文档翻译目录
  2. haproxy定义规则限制IP访问
  3. gettext()方法输出空白_如何将文档内容输出为无水印图片?超简单的操作方法看这里...
  4. 容器编排技术 -- Kubernetes kubectl create secret generic 命令详解
  5. 如何将div高度填满剩余高度
  6. Star UML指导手册
  7. 课堂测试2014.3.10
  8. eNSP------三层交换机配置(拓扑图+命令)
  9. 电信光猫 友华PT921G 研究
  10. Morgan Fairchild Makes the Most of It With 'The Graduate'
  11. Python解二元一次方程
  12. UE4 Mixamo使用教程
  13. Python tkinter的简单使用,在绘布上播放GIF和图片
  14. Vim 编辑器及其基本操作
  15. AAA与AAM指令代码实验
  16. ZYNQ开发系列——双串口打印以及串口波特率设置
  17. 是否为取变量名烦恼?中文生成英文变量的windows桌面端工具(vue+electron)
  18. M1 MacBook 续航太好,苹果以为电量指示器坏了
  19. Xilinx FPGA资源解析与使用系列——Transceiver(九)TX buffer使用和旁路
  20. Global Illumination_Voxel Global Illumintaion (VXGI)

热门文章

  1. 市场调研策划书_市场调查计划书模板
  2. Java并发编程-ThreadPool线程池
  3. scrap连接django
  4. appium启动APP配置参数:
  5. [14-01] 闭包
  6. delphi 一些知识文章地址记录(正则)
  7. PostMan入门使用教程
  8. Android利用Filter过滤数据
  9. eclipse中git的配置、提交代码、从远程导入代码
  10. 接收字节流_Java中的IO流之输入流|乐字节