女主宣言

select, 你可以不用它,但你不能不了解它。
通过阅读本文,可以帮你理清select的来龙去脉, 让你从中了解到:我们常说的select的1024限制指的是什么 ?怎么会有这样的限制?都说select效率不高,是这样吗?为什么 ?select使用中有坑吗?

注:本文的所有内容均指针对 Linux Kernel, 当前使用的源码版本是 5.3.0。

PS:丰富的一线技术、多元化的表现形式,尽在“360云计算”,点关注哦!

1

原型

int select (int __nfds, fd_set *__restrict __readfds,fd_set *__restrict __writefds,fd_set *__restrict __exceptfds,struct timeval *__restrict __timeout);

如你所知,select是IO多种复用的一种实现,它将需要监控的fd分为读,写,异常三类,使用fd_set表示,当其返回时要么是超时,要么是有至少一种读,写或异常事件发生。

2

相关数据结构

FD_SET

FD_SET是select最重要的数据结构了,其在内核中的定义如下:

typedef __kernel_fd_set     fd_set;
#undef __FD_SETSIZE
#define __FD_SETSIZE    1024typedef struct {unsigned long fds_bits[__FD_SETSIZE / (8 * sizeof(long))];
} __kernel_fd_set;

我们来简化下,fd_set是一个struct, 其内部只有一个 由16个元素组成的unsigned long数组,这个数组一共可以表示16 × 64 = 1024位, 每一位用来表示一个 fd, 这也就是 select针对读,定或异常每一类最多只能有 1024个fd 限制的由来。

3

相关宏

下面这些宏定义在内核代码fs/select.c中:

  • FDS_BITPERLONG: 返回每个long有多少位,通常是64bits

#define FDS_BITPERLONG    (8*sizeof(long))
  • FDS_LONGS(nr): 获取 nr 个fd 需要用几个long来表示

#define FDS_LONGS(nr) (((nr)+FDS_BITPERLONG-1)/FDS_BITPERLONG)
  • FD_BYTES(nr): 获取 nr 个fd 需要用 多少个字节来表示

#define FDS_BYTES(nr) (FDS_LONGS(nr)*sizeof(long))

下面这些宏可以在gcc源码中找到:

  • FD_ZERO: 初始化一个fd_set

#define __FD_ZERO(s) \do {                                          \unsigned int __i;                               \fd_set *__arr = (s);                            \for (__i = 0; __i < sizeof (fd_set) / sizeof (__fd_mask); ++__i)        \__FDS_BITS (__arr)[__i] = 0;                          \} while (0)

将上面所说的由16个元素组成的unsigned long数组每一个元素都设为 0;

  • __FD_SET(d, s): 将一个fd 赋值到 一个 fd_set

#define __FD_SET(d, s) \((void) (__FDS_BITS (s)[__FD_ELT(d)] |= __FD_MASK(d)))

分三步:

a. __FD_ELT(d): 确定赋值到数组的哪一个元素

 #define   __FD_ELT(d) \__extension__                                   \({ long int __d = (d);                          \(__builtin_constant_p (__d)                      \? (0 <= __d && __d < __FD_SETSIZE                       \? (__d / __NFDBITS)                            \: __fdelt_warn (__d))                          \: __fdelt_chk (__d)); })

其中 #define __NFDBITS (8 * (int) sizeof (__fd_mask)) , 即__NFDBITS = 64

这里实现使用了__builtin_constant_p针对常量作了优化,我也没有太理解常量与非常量实现方案有什么不同,我们暂时忽略这个细节看本质。

本质就是 一个 unsigned long有64位,直接 __d / __NFDBITS取模就可以确定用数组的哪一个元素了;

b. __FD_MASK(d): 确定赋值到一个 unsigned long的哪一位

#define   __FD_MASK(d)    ((__fd_mask) (1UL << ((d) % __NFDBITS)))

直接 (d) % __NFDBITS)取余后作为 1 左移的位数即可

c. |= :用 位或 赋值即可;

4

在内核中的实现

调用层级

系统调用入口位置

位于fs/select.c中

SYSCALL_DEFINE5(select, int, n, fd_set __user *, inp, fd_set __user *, outp,fd_set __user *, exp, struct timeval __user *, tvp)
{return kern_select(n, inp, outp, exp, tvp);
}

入口函数 kern_select

static int kern_select(int n, fd_set __user *inp, fd_set __user *outp,fd_set __user *exp, struct timeval __user *tvp)
{struct timespec64 end_time, *to = NULL;struct timeval tv;int ret;if (tvp) {if (copy_from_user(&tv, tvp, sizeof(tv)))return -EFAULT;to = &end_time;if (poll_select_set_timeout(to,tv.tv_sec + (tv.tv_usec / USEC_PER_SEC),(tv.tv_usec % USEC_PER_SEC) * NSEC_PER_USEC))return -EINVAL;}ret = core_sys_select(n, inp, outp, exp, to);return poll_select_finish(&end_time, tvp, PT_TIMEVAL, ret);
}

做三件事:

a. 如果设置了超时,首先准备时间戳 timespec64;

b. 调用 core_sys_select,这个是具体的实现,我们下面会重点介绍

c. poll_select_finish:作的主要工作就是更新用户调用select时传进来的 超时参数tvp,我列一下关键代码:

    ktime_get_ts64(&rts);rts = timespec64_sub(*end_time, rts);if (rts.tv_sec < 0)rts.tv_sec = rts.tv_nsec = 0;...struct timeval rtv;if (sizeof(rtv) > sizeof(rtv.tv_sec) + sizeof(rtv.tv_usec))memset(&rtv, 0, sizeof(rtv));rtv.tv_sec = rts.tv_sec;rtv.tv_usec = rts.tv_nsec / NSEC_PER_USEC;if (!copy_to_user(p, &rtv, sizeof(rtv)))return ret;

可以看到先获取当前的时间戳,然后通过timespec64_sub和传入的时间戳(接中传入的是超时时间,实现时会转化为时间戳)求出差值,将此差值传回给用户,即返回了剩余的超时时间。所以这个地方是个小陷阱,用户在调用select时,需要每次重新初始化这个超时时间。

通过core_sys_select实现

这个函数主要功能是在实现真正的select功能前,准备好 fd_set ,即从用户空间将所需的三类 fd_set 复制到内核空间。从下面的代码中你会看到对于每次的 select系统调用,都需要从用户空间将所需的三类 fd_set 复制到内核空间,这里存在性能上的损耗。

int core_sys_select(int n, fd_set __user *inp, fd_set __user *outp,fd_set __user *exp, struct timespec64 *end_time)
{fd_set_bits fds;void *bits;int ret, max_fds;size_t size, alloc_size;struct fdtable *fdt;/* Allocate small arguments on the stack to save memory and be faster */long stack_fds[SELECT_STACK_ALLOC/sizeof(long)];ret = -EINVAL;if (n < 0)goto out_nofds;/* max_fds can increase, so grab it once to avoid race */rcu_read_lock();fdt = files_fdtable(current->files);max_fds = fdt->max_fds;rcu_read_unlock();if (n > max_fds)n = max_fds;/** We need 6 bitmaps (in/out/ex for both incoming and outgoing),* since we used fdset we need to allocate memory in units of* long-words. */size = FDS_BYTES(n);bits = stack_fds;if (size > sizeof(stack_fds) / 6) {/* Not enough space in on-stack array; must use kmalloc */ret = -ENOMEM;if (size > (SIZE_MAX / 6))goto out_nofds;alloc_size = 6 * size;bits = kvmalloc(alloc_size, GFP_KERNEL);if (!bits)goto out_nofds;}fds.in      = bits;fds.out     = bits +   size;fds.ex      = bits + 2*size;fds.res_in  = bits + 3*size;fds.res_out = bits + 4*size;fds.res_ex  = bits + 5*size;if ((ret = get_fd_set(n, inp, fds.in)) ||(ret = get_fd_set(n, outp, fds.out)) ||(ret = get_fd_set(n, exp, fds.ex)))goto out;zero_fd_set(n, fds.res_in);zero_fd_set(n, fds.res_out);zero_fd_set(n, fds.res_ex);ret = do_select(n, &fds, end_time);if (ret < 0)goto out;if (!ret) {ret = -ERESTARTNOHAND;if (signal_pending(current))goto out;ret = 0;}if (set_fd_set(n, inp, fds.res_in) ||set_fd_set(n, outp, fds.res_out) ||set_fd_set(n, exp, fds.res_ex))ret = -EFAULT;out:if (bits != stack_fds)kvfree(bits);
out_nofds:return ret;
}

代码中的注释很清晰,我们这里简单过一下,分五步:

  • 规范化select系统调用传入的第一个参数 n

/* max_fds can increase, so grab it once to avoid race */rcu_read_lock();fdt = files_fdtable(current->files);max_fds = fdt->max_fds;rcu_read_unlock();if (n > max_fds)n = max_fds;

这个n是三类不同的fd_set中所包括的fd数值的最大值 + 1, linux task打开句柄从0开始,不加1的话可能会少监控fd.

用户在使用时可以有个偷懒的,就是将这个n设置 为 FD_SETSIZE,通常 是1024,  这将监控的范围扩大到了上限,但实际上远没有这么多fd需要监控,浪费资源。

linux man中的解释如下:

nfds should be set to the highest-numbered file descriptor in any of the three sets, plus 1.  The  indicated file descriptors in each set are checked, up to this limit (but see BUGS).

  • 计算内核空间所需要的fd_set的空间, 内核态需要三个fd_set来容纳用户态传递过来的参数,还需要三个fd_set来容纳select调用返回后生成的三个fd_set, 即一共是6个fd_set

   long stack_fds[SELECT_STACK_ALLOC/sizeof(long)];. . .size = FDS_BYTES(n);bits = stack_fds;if (size > sizeof(stack_fds) / 6) {/* Not enough space in on-stack array; must use kmalloc */ret = -ENOMEM;if (size > (SIZE_MAX / 6))goto out_nofds;alloc_size = 6 * size;bits = kvmalloc(alloc_size, GFP_KERNEL);if (!bits)goto out_nofds;}fds.in      = bits;fds.out     = bits +   size;fds.ex      = bits + 2*size;fds.res_in  = bits + 3*size;fds.res_out = bits + 4*size;fds.res_ex  = bits + 5*size;

这里有个小技巧,先从内核栈上分配空间,如果不够用,才使用 kvmalloc分配。

通过 size = FDS_BYTES(n);计算出单一一种fd_set所需字节数,

再能过 alloc_size = 6 * size; 即可计算出所需的全部字节数。

  • 初始化用作参数的和用作返回值的两类fd_set

if ((ret = get_fd_set(n, inp, fds.in)) ||(ret = get_fd_set(n, outp, fds.out)) ||(ret = get_fd_set(n, exp, fds.ex)))goto out;zero_fd_set(n, fds.res_in);zero_fd_set(n, fds.res_out);zero_fd_set(n, fds.res_ex);

主要使用 copy_from_user和 memset来实现。

  • 真正实现部分 do_select, 我们在下面详讲

  • 返回结果复制回用户空间

if (set_fd_set(n, inp, fds.res_in) ||set_fd_set(n, outp, fds.res_out) ||set_fd_set(n, exp, fds.res_ex))ret = -EFAULT;

这里又多了一次内核空间到用户空间的copy, 而且我们看到返回值也是用fd_set结构来表示,这意味着我们在用户空 间处理里也需要遍历每一位。

精华所在do_select

这里用到了Linux里一个很重要的数据结构 wait queue, 我们暂不打算展开来讲,先简单来说下其用法,比如我们在进程中read时经常要等待数据准备好,我们用伪码来写些流程:

// Read 代码
for (true) {if 数据准备好 {拷贝数据到用户空间bufferreturn} else {创建一个 wait_queue_entry_t  wait_entry;wait_entry.func = 自定义函数,被唤醒时会调用wait_entry.private = 自定义的数据结构将此 wait_entry 加入要读取数据的 设备的等待队列set_current_state(TASK_INTERRUPTIBLE) // 将当前进程状态设置为 TASK_INTERRUPTIBLEschedule() // 将当前进程调度走,进行进程切换}
}// 设备驱动端代码
if 设备有数据可读 {for ( 遍历其wait_queue ) {唤醒 每一个 wait_queue_entry调用 wait_entry.func {将上面读取进程状态设置为 TASK_RUNNING,并加入CPU核的运行队列,被再次调度后,将读取到数据}}
}

360私有云平台(HULK平台)管理着360公司90%以上的业务线,面对如此众多的服务器,如何进行管理?当然需要一套完善的工具来自动化。HULK平台的命令系统可以对批量机器执行脚本,命令系统的底层是基于SaltStack开发当然最大的问题在于机器部署的机房多。

do_select源码走读

  • 获取当前三类fd_set中最大的fd

    rcu_read_lock();retval = max_select_fd(n, fds);rcu_read_unlock();n = retval;

上面 n = retval中的 n, 即为三类fd_set中最大的fd, 也是下面要介绍的循环体的上限

  • 初始化用作wait queue中的 wait entry 的private数据结构

poll_initwait(&table);
  • 核心循环体

    我们讲注释写在这个代码块里

// 最外面是一个无限循环,它只有在poll到有效的事件,或者超时,或者有中断发生时,才会退出
for (;;) {unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp;bool can_busy_loop = false;// 首先获取需要监控的三类fd_set, 其实是 unsigned long 数组inp = fds->in; outp = fds->out; exp = fds->ex;// 初始化用于保存返回值的三类 fd_set对应的unsigned long 数组rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex;// 开始循环遍历覆盖的所有fd, 以上面得到的 n 上限for (i = 0; i < n; ++rinp, ++routp, ++rexp) {unsigned long in, out, ex, all_bits, bit = 1, j;unsigned long res_in = 0, res_out = 0, res_ex = 0;__poll_t mask;in = *inp++; out = *outp++; ex = *exp++;all_bits = in | out | ex;if (all_bits == 0) {//  如果走到这里,说明在三类fd_set的数组中,与当前下标对应的三个unsigned long的每一位均为0, 即当前// 不存在任何监控的fd, 开始下一次循环i += BITS_PER_LONG;continue;}// 前面介绍过 fd_set的数组元素是 unsigned long, 即一个元素可表示64个fd, 这里依次遍历这64个bitsfor (j = 0; j < BITS_PER_LONG; ++j, ++i, bit <<= 1) {struct fd f;if (i >= n)break;if (!(bit & all_bits))continue;// 走到这里,说明当前bit位上有监控的fdf = fdget(i);if (f.file) {// 针对当前fd, 设置其需要监控的事件wait_key_set(wait, in, out, bit,busy_flag);// vfs_poll 是重中之重,我们下一节会单独讲解,这里先说明它所作的事// 1.  初始化wait entry, 将其加入到这个fd对应的socket的等待队列中// 2.  获取当前socket是否有读,写,异常等事件并返回mask = vfs_poll(f.file, wait);fdput(f);// 按位与,看是否有相关事件if ((mask & POLLIN_SET) && (in & bit)) {res_in |= bit;retval++;wait->_qproc = NULL;}if ((mask & POLLOUT_SET) && (out & bit)) {res_out |= bit;retval++;wait->_qproc = NULL;}if ((mask & POLLEX_SET) && (ex & bit)) {res_ex |= bit;retval++;wait->_qproc = NULL;}/* got something, stop busy polling */if (retval) {can_busy_loop = false;busy_flag = 0;/** only remember a returned* POLL_BUSY_LOOP if we asked for it*/} else if (busy_flag & mask)can_busy_loop = true;}}// 按unsigned long赋值给返回值数组元素if (res_in)*rinp = res_in;if (res_out)*routp = res_out;if (res_ex)*rexp = res_ex;// 这里有两层for循环,这里主动出让CPU, 进行一次调度cond_resched();}wait->_qproc = NULL;// 四种情况下会返回// 1.  任意监控的fd上有事件发生//  2. 超时//  3. 有中断发生//  4. wait queue相关操作发生错误if (retval || timed_out || signal_pending(current))break;if (table.error) {retval = table.error;break;}......//  当前监控的fd上没有事件发生,也没有超时或中断发生,//   将当前进程设置为 TASK_INTERRUPTIBLE, 并调用 schedule//   等待事件发生时,对应的socket将当前进程唤醒后,从 这里//   继续运行if (!poll_schedule_timeout(&table, TASK_INTERRUPTIBLE,to, slack))timed_out = 1;}

简单总结如下:

a. 循环遍历每一个监控的fd;

b. 有下列情况之一则返回:

任意监控的fd上有事件发生
超时
有中断发生
wait queue相关操作发生错误

c. 如查无上述情况,将当前进程设置为 TASK_INTERRUPTIBLE, 并调用 schedule作进程切换;

d. 等待socket 事件发生,对应的socket将当前进程唤醒后,当前进程被再次调度切换回来,继续运行;

细心的你可能已经发现,这个有个影响效率的问题:即使只有一个监控中的fd有事件发生,当前进程就会被唤醒,然后要将所有监控的fd都遍历一边,依次调用vfs_poll来获取其有效事件,好麻烦啊~~~

5

vfs_poll 讲解

  • 调用层级

  • 作用

初始化wait entry, 将其加入到这个fd对应的socket的等待队列中
获取当前socket是否有读,写,异常等事件并返回
  • 加入等待队列时,最终会调用 fs_select.c中的 __pollwait

static void __pollwait(struct file *filp, wait_queue_head_t *wait_address,poll_table *p)
{struct poll_wqueues *pwq = container_of(p, struct poll_wqueues, pt);struct poll_table_entry *entry = poll_get_entry(pwq);if (!entry)return;entry->filp = get_file(filp);entry->wait_address = wait_address;entry->key = p->_key;init_waitqueue_func_entry(&entry->wait, pollwake);entry->wait.private = pwq;// 加入到socket的等待队列add_wait_queue(wait_address, &entry->wait);
}

6

总结

  • select调用中每类fd_set中最多容纳1024个fd;

  • 每次调用select都需要将三类fd_set从用户空间复制到内核空间;

  • wait queue是个好东西,select会被当前进程task加入到每一个监控的socket的等待队列;

  • select进程被唤醒后即使只有一个被监控的fd有事件发生,也会再次将所有的监控fd遍历一次;

  • 在遍历fd的过程中会调用cond_resched()来主动出让CPU, 作进程切换;

Linux select 一网打尽相关推荐

  1. linux—select具体解释

    linux-select具体解释 select系统调用时用来让我们的程序监视多个文件句柄的状态变化的.程序会停在select这里等待,直到被监视的文件句柄有一个或多个发生了状态改变. 关于文件句柄,事 ...

  2. Linux select函数用法和原理

    select函数的用法和原理 Linux上的select函数 select函数用于检测一组socket中是否有事件就绪.这里的事件为以下三类: 读事件就绪 在socket内核中,接收缓冲区中的字节数大 ...

  3. Linux Select

    Linux Select 在Linux中,我们可以使用select函数实现I/O端口的复用,传递给 select函数的参数会告诉内核: •我们所关心的文件描述符       •对每个描述符,我们所关心 ...

  4. Linux中断一网打尽(1) — 中断及其初始化

    女主宣言 通过本文您可以了解到:Linux 中断是什么,如何分类,能干什么?Linux 中断在计算机启动各阶段是如何初始化的? PS:丰富的一线技术.多元化的表现形式,尽在"360云计算&q ...

  5. linux select系统调用函数分析,Linux select系统调用

    linux系统提供了系统调用select,它允许程序挂起,并等待从不止一个文件描述符的输入.原理很简单: 1. 获取所需要的文件描述符列表: 2. 将此列表传给select: 3. select挂起直 ...

  6. linux select使用

    select系统调用是用来让我们的程序监视多个文件句柄(file descriptor)的状态变化的.程序会停在select这里等待,直到被监视的文件句柄有某一个或多个发生了状态改变. 文 件在句柄在 ...

  7. OS / Linux / Select 调用流程

    Linux下select调用的过程: 1.用户层应用程序调用 select(),底层调用 poll() . 2.核心层调用 sys_select() ------> do_select() . ...

  8. linux select read阻塞_linux下的IO模型详解

    开门见山,Linux下的如中IO模型:阻塞IO模型,非阻塞IO模型,IO复用模型,信号驱动IO模型,异步IO模型,见下图 接下来一一讲解这5种模型 阻塞型IO:最简单的一种IO模型,简单理解就是死等, ...

  9. Linux select TCP并发服务器与客户端编程

    介绍:运行在ubuntu linux系统,需要先打开一个终端运行服务端代码,这时,可以打开多个终端同时运行多个客户端代码(注意客户端数目要小于MAX_FD);在客户端输入数据后回车,可以看见服务器收到 ...

最新文章

  1. 基于uFUN开发板的心率计(一)DMA方式获取传感器数据
  2. UE capability与 双连接相关的参数。
  3. BufferedInputStream与BufferedOutputStream用法简介
  4. ABP 重写主键ID
  5. 检测xcode工程中配置信息是否正确
  6. 防止入侵者嗅探web密码
  7. Python 操作 redis
  8. Bootstrap3 折叠插件的调用方式
  9. 推荐95个极富创意的单页网站设计实例欣赏
  10. 五周第三次课(4月20日)shell介绍、命令历史、命令补全和别名、通配符 、输入输出重定向...
  11. 【转】C/C++中宏使用总结
  12. flutter-可拖动悬浮按钮
  13. eBay运营模式有哪些
  14. 服务器蓝屏的原因及解决办法
  15. Windows 11正式版来了!一文带你免费升级、镜像下载、最低系统要求
  16. ChatGPT所有插件详细教程
  17. 摸个鱼(算最大捕捞量)
  18. 校招linux基础知识,校招笔试整理 牛客网 2020小米校招(1)
  19. java3d立方体_java3d 立方体 加载纹理
  20. 在python将字符串中的空格转换为下划线_如何将下划线替换为空格,反之亦然?...

热门文章

  1. 自适应网页设计(转)
  2. 往事历历在目--我的2009年工作总结
  3. 世界公认的第一商人——成就犹太商人的15个信条
  4. Python学习笔记之列表(三)
  5. Android之Intent传递数据
  6. 【狂神JAVA】MyBatis笔记
  7. 调试技术: Linux core dump file
  8. iOS面试中经常问的点 - RunTime
  9. web服务器集群(多台web服务器)后session如何同步和共享
  10. 国内规模最大的商业WiFi运营商百米生活挂牌新三板