CentOS 7 Linux实时内核下的epoll性能分析

rtoax 2021年3月4日

1. 问题引入

一些参考链接见文末。

1.1. 测试调试环境

非实时环境:

3.10.0-1062.el7.x86_64
CentOS Linux release 7.7.1908 (Core)

实时环境:

3.10.0-1127.rt56.1093.el7.x86_64
CentOS Linux release 7.4.1708 (Core)

1.2. 问题描述

epoll就不用多说了,对于redis、memcached和nginx无一逃过epoll的加持。近期在实时内核中测试epoll的时延性能参数,发现epoll在实时内核中性能低下。

测试场景:

  1. 使用epoll+evenfd进行测试;
  2. 初始化阶段创建一个epoll fd,三个eventfd,并将三个eventfd加入epoll;
  3. 创建四个线程,线程回调函数中满载运行,无睡眠;
  4. 线程1监听epoll_wait,并使用eventfd_read进行读;
  5. 线程2、3、4使用eventfd_write进行写;

在非实时内核中运行上面的程序发现,CPU利用率可以达到惊人的400%:

   PID  %CPU COMMAND          P88866 398.3 Main             0

详细查看具体的四个线程,发现均可达到100%。

   PID %CPU COMMAND           P88868 99.9 Enqueue1          188870 99.9 Enqueue3          388869 99.7 Enqueue2          288867 99.3 Dequeue           088866  0.0 Main              0

在实时内核中:

   PID  %CPU COMMAND          P31785 205.9 Main             2
   PID %CPU COMMAND           P31786 53.8 Dequeue           031787 53.8 Enqueue1          131788 53.8 Enqueue2          231789 53.8 Enqueue3          331785  0.0 Main              2

详细查看具体的四个线程,发现均不能达到100%。

1.3. 问题追踪

为什么呢?百思不得其解,翻看实时内核相关,翻看红帽的官方手册(见参考链接)发现,电源配置和中断亲和性都需要进行配置,但是我进行了一顿设置无果,但是在查看系统中断过程中发现,在实时内核中运行上述应用会导致Rescheduling-interrupts的剧增。

我们采用如下命令实时监控系统中断:

watch -n1 cat /proc/interrupts

输出结果为(只保留变化明显的中断):

            CPU0       CPU1       CPU2       CPU3
...LOC:   12977669    9081580    1821991    1819625   Local timer interrupts
...RES:    1791064    2128812        748        726   Rescheduling interrupts
...

可见除去本地定时中断LOC,上述中断的变化微乎其微,再在实时内核上尝试,发现:

            CPU0       CPU1       CPU2       CPU3
...RES: 3785763488 2687470633 2751775083 2766930787   Rescheduling interrupts
...

Rescheduling-interrupts每秒竟然以10万数量级发生变化,Why?对于熟悉进程调度和阅读过内核代码的朋友来说,调度中断是由schedule()产生的,举个例子,当一个线程调用了sleep()函数,通过系统调用进入内核态,同时该线程会主动调用schedule()放弃CPU,内核对其他进程进行调度。那么epoll为什么要采用这种策略呢?

注意:
perf-tools:大神Brendan Gregg在GitHub上开源的内核窥探工具。

我们使用funccount工具进行内核函数调用窥探,它的用法如下:

./funccount
USAGE: funccount [-hT] [-i secs] [-d secs] [-t top] funcstring-d seconds      # total duration of trace-h              # this usage message-i seconds      # interval summary-t top          # show top num entries only-T              # include timestamp (for -i)eg,funccount 'vfs*'          # trace all funcs that match "vfs*"funccount -d 5 'tcp*'     # trace "tcp*" funcs for 5 secondsfunccount -t 10 'ext3*'   # show top 10 "ext3*" funcsfunccount -i 1 'ext3*'    # summary every 1 secondfunccount -i 1 -d 5 'ext3*' # 5 x 1 second summariesSee the man page and example file for more info.

我们对epoll_wait进行追踪统计,在非实时内核中结果如下:

Tracing "*epoll_wait*"... Ctrl-C to end.
^C
FUNC                              COUNT
SyS_epoll_wait                   979621

而在实时内核中,epoll_wait的调用次数和Rescheduling-interrupts成正比,Why?

接着,使用funcgraph命令对epoll_wait进行监控,在非实时内核中,给出一个epoll_wait的整体调用栈:

SyS_epoll_wait() {fget_light();ep_poll() {_raw_spin_lock_irqsave();_raw_spin_unlock_irqrestore();ep_scan_ready_list.isra.7() {mutex_lock() {_cond_resched();}_raw_spin_lock_irqsave();_raw_spin_unlock_irqrestore();ep_send_events_proc() {eventfd_poll();eventfd_poll();eventfd_poll();}_raw_spin_lock_irqsave();__pm_relax();_raw_spin_unlock_irqrestore();mutex_unlock();}}fput();
}

而在实时内核中,省略掉大部分代码,可见schedule()被epoll_wait调用:

SyS_epoll_wait() {fget_light() {__rcu_read_lock();__rcu_read_unlock();}ep_poll() {migrate_disable() {pin_current_cpu();}rt_spin_lock() {rt_spin_lock_slowlock() {_raw_spin_lock_irqsave();rt_spin_lock_slowlock_locked() {schedule() {__schedule() {...}}...rt_spin_lock() {rt_spin_lock_slowlock() {...schedule() {__schedule() {rcu_note_context_switch();_raw_spin_lock_irq();...rt_spin_lock() {rt_spin_lock_slowlock() {_raw_spin_lock_irqsave();rt_spin_lock_slowlock_locked() {...schedule() {__schedule() {rcu_note_context_switch();_raw_spin_lock_irq();deactivate_task() {...fput();
}

1.4. 实时内核源码路径分析

kernel-rt-3.10.0-1127.rt56.1093.el7.src

1.4.1. epoll_wait

在文件fs/eventpoll.c中有:


/** Implement the event wait interface for the eventpoll file. It is the kernel* part of the user space epoll_wait(2).*/
SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,int, maxevents, int, timeout)
{.../* Time to fish for events ... */error = ep_poll(ep, events, maxevents, timeout);...
}

1.4.2. ep_poll

首先找到自旋锁的位置:

static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,int maxevents, long timeout)
{...spin_lock_irqsave(&ep->lock, flags);...
}

在实时内核中在文件include\linux\spinlock.h有:

#ifdef CONFIG_PREEMPT_RT_FULL
# include <linux/spinlock_rt.h>
#else /* PREEMPT_RT_FULL */

1.4.3. spin_lock_irqsave

include\linux\spinlock_rt.h有:

#define spin_lock_irqsave(lock, flags)            \do {                       \typecheck(unsigned long, flags);   \flags = 0;                \spin_lock(lock);           \} while (0)

其中spin_lock定义为:

#define spin_lock(lock)              \do {                   \migrate_disable();     \rt_spin_lock(lock);        \} while (0)

找到了migrate_disable,继续寻找rt_spin_lock

void __lockfunc rt_spin_lock(spinlock_t *lock)
{rt_spin_lock_fastlock(&lock->lock, rt_spin_lock_slowlock);spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
}
EXPORT_SYMBOL(rt_spin_lock);

rt_spin_lock_fastlock如下:

/** preemptible spin_lock functions:*/
static inline void rt_spin_lock_fastlock(struct rt_mutex *lock,void  (*slowfn)(struct rt_mutex *lock))
{might_sleep();if (likely(rt_mutex_cmpxchg(lock, NULL, current)))rt_mutex_deadlock_account_lock(lock, current);elseslowfn(lock);
}

其中的slowfnrt_spin_lock_slowlock

static void noinline __sched rt_spin_lock_slowlock(struct rt_mutex *lock)
{struct rt_mutex_waiter waiter;unsigned long flags;rt_mutex_init_waiter(&waiter, true);raw_spin_lock_irqsave(&lock->wait_lock, flags);rt_spin_lock_slowlock_locked(lock, &waiter, flags);raw_spin_unlock_irqrestore(&lock->wait_lock, flags);debug_rt_mutex_free_waiter(&waiter);
}

其中rt_spin_lock_slowlock_locked如下:

void __sched rt_spin_lock_slowlock_locked(struct rt_mutex *lock,struct rt_mutex_waiter *waiter,unsigned long flags)
{...for (;;) {/* Try to acquire the lock again. */if (__try_to_take_rt_mutex(lock, self, waiter, STEAL_LATERAL))break;top_waiter = rt_mutex_top_waiter(lock);lock_owner = rt_mutex_owner(lock);...if (top_waiter != waiter || adaptive_wait(lock, lock_owner))schedule_rt_mutex(lock);...}...
}

期间当条件不满足的情况for (;;)会循环调用schedule_rt_mutex

# define schedule_rt_mutex(_lock)            schedule()

至此找到了Rescheduling-interrupts剧增的根本原因。

1.5. 非实时内核的spin_lock_irqsave内核路径

下面看一下不使用实时自旋锁的内核路径。非实时情况下,spin_lock_irqsave的定义如下:

#define spin_lock_irqsave(lock, flags)               \
do {                                \raw_spin_lock_irqsave(spinlock_check(lock), flags);    \
} while (0)

raw_spin_lock_irqsave定义如下:

#define raw_spin_lock_irqsave(lock, flags)           \do {                       \typecheck(unsigned long, flags);   \flags = _raw_spin_lock_irqsave(lock); \} while (0)

继续_raw_spin_lock_irqsave

unsigned long __lockfunc _raw_spin_lock_irqsave(raw_spinlock_t *lock)
{return __raw_spin_lock_irqsave(lock);
}
EXPORT_SYMBOL(_raw_spin_lock_irqsave);

__raw_spin_lock_irqsave如下:

static inline unsigned long __raw_spin_lock_irqsave(raw_spinlock_t *lock)
{...
#ifdef CONFIG_LOCKDEPLOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
#elsedo_raw_spin_lock_flags(lock, &flags);
#endifreturn flags;
}

无论使用do_raw_spin_lock还是do_raw_spin_lock_flags,最终都会调用arch_spin_lock

static inline void
do_raw_spin_lock_flags(raw_spinlock_t *lock, unsigned long *flags) __acquires(lock)
{__acquire(lock);arch_spin_lock_flags(&lock->raw_lock, *flags);
}
void do_raw_spin_lock(raw_spinlock_t *lock)
{debug_spin_lock_before(lock);arch_spin_lock(&lock->raw_lock);debug_spin_lock_after(lock);
}
static __always_inline void arch_spin_lock_flags(arch_spinlock_t *lock,unsigned long flags)
{arch_spin_lock(lock);
}

arch_spin_lock如下:

static __always_inline void arch_spin_lock(arch_spinlock_t *lock)
{register struct __raw_tickets inc = { .tail = TICKET_LOCK_INC };inc = xadd(&lock->tickets, inc);if (likely(inc.head == inc.tail))goto out;inc.tail &= ~TICKET_SLOWPATH_FLAG;for (;;) {unsigned count = SPIN_THRESHOLD;do {if (ACCESS_ONCE(lock->tickets.head) == inc.tail)goto out;cpu_relax();} while (--count);__ticket_lock_spinning(lock, inc.tail);}
out:    barrier();  /* make sure nothing creeps before the lock is taken */
}

可见非实时内核的spin_lock_irqsave是单纯的自旋,期间不会放弃CPU进行schedule()

2. rt_spin_lock_slowlock_locked分析

在非实时内核情况下的自旋锁的结构如下:

typedef struct qspinlock {atomic_t   val;
} arch_spinlock_t;typedef struct raw_spinlock {arch_spinlock_t raw_lock;
} raw_spinlock_t;
/** The non RT version maps spinlocks to raw_spinlocks*/
typedef struct spinlock {struct raw_spinlock rlock;
} spinlock_t;

在实时内核下,自旋锁定义为:

/*** The rt_mutex structure** @wait_lock:   spinlock to protect the structure* @waiters:   rbtree root to enqueue waiters in priority order* @waiters_leftmost: top waiter* @owner:  the mutex owner*/
struct rt_mutex {raw_spinlock_t     wait_lock;
#ifdef __GENKSYMS__struct plist_head    wait_list;
#elsestruct rb_root     waiters;struct rb_node      *waiters_leftmost;
#endifstruct task_struct    *owner;int          save_state;
};
/** PREEMPT_RT: spinlocks - an RT mutex plus lock-break field:*/
typedef struct spinlock {struct rt_mutex        lock;unsigned int       break_lock;
} spinlock_t;

造成rt_spin_lock_slowlock_locked代码如下:

void __sched rt_spin_lock_slowlock_locked(struct rt_mutex *lock,struct rt_mutex_waiter *waiter,unsigned long flags)
{struct task_struct *lock_owner, *self = current;struct rt_mutex_waiter *top_waiter;int ret;if (__try_to_take_rt_mutex(lock, self, NULL, STEAL_LATERAL))return;BUG_ON(rt_mutex_owner(lock) == self);/** We save whatever state the task is in and we'll restore it* after acquiring the lock taking real wakeups into account* as well. We are serialized via pi_lock against wakeups. See* try_to_wake_up().*/raw_spin_lock(&self->pi_lock);self->saved_state = self->state;__set_current_state(TASK_UNINTERRUPTIBLE);raw_spin_unlock(&self->pi_lock);ret = task_blocks_on_rt_mutex(lock, waiter, self, 0);BUG_ON(ret);for (;;) {/* Try to acquire the lock again. */if (__try_to_take_rt_mutex(lock, self, waiter, STEAL_LATERAL))break;top_waiter = rt_mutex_top_waiter(lock);lock_owner = rt_mutex_owner(lock);raw_spin_unlock_irqrestore(&lock->wait_lock, flags);debug_rt_mutex_print_deadlock(waiter);if (top_waiter != waiter || adaptive_wait(lock, lock_owner))schedule_rt_mutex(lock);raw_spin_lock_irqsave(&lock->wait_lock, flags);raw_spin_lock(&self->pi_lock);__set_current_state(TASK_UNINTERRUPTIBLE);raw_spin_unlock(&self->pi_lock);}/** Restore the task state to current->saved_state. We set it* to the original state above and the try_to_wake_up() code* has possibly updated it when a real (non-rtmutex) wakeup* happened while we were blocked. Clear saved_state so* try_to_wakeup() does not get confused.*/raw_spin_lock(&self->pi_lock);__set_current_state(self->saved_state);self->saved_state = TASK_RUNNING;raw_spin_unlock(&self->pi_lock);/** try_to_take_rt_mutex() sets the waiter bit* unconditionally. We might have to fix that up:*/fixup_rt_mutex_waiters(lock);BUG_ON(rt_mutex_has_waiters(lock) && waiter == rt_mutex_top_waiter(lock));BUG_ON(!RB_EMPTY_NODE(&waiter->tree_entry));
}

而在ep_poll函数中:

static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,int maxevents, long timeout)
{...spin_lock_irqsave(&ep->lock, flags);if (!ep_events_available(ep)) {for (;;) {set_current_state(TASK_INTERRUPTIBLE);if (ep_events_available(ep) || timed_out)break;if (signal_pending(current)) {res = -EINTR;break;}spin_unlock_irqrestore(&ep->lock, flags);if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS))timed_out = 1;spin_lock_irqsave(&ep->lock, flags);}}...
}

当有event发生ep_events_available或被信号打断signal_pendingfor (;;)循环才会终止,而在for (;;)期间,实时内核中自旋锁的获取和释放都需要重新调度,而非实时内核中不会这样。有人疑问schedule_hrtimeout_range在非实时内核中不也会别调用吗,看下这个函数中的实现:

if (expires && !expires->tv64) {__set_current_state(TASK_RUNNING);return 0;
}
if (!expires) {schedule();return -EINTR;
}

用户态的调用如下:

nfds = epoll_wait(ectx->epollfd, ectx->events, MAX_EVENTS, -1);

也就是说函数ep_poll的timeout值为-1,也就导致schedule_hrtimeout_range的to为NULL,下面的代码将被调用

if (!expires) {schedule();return -EINTR;
}

直接进行调度schedule(),待下次该进程被调度后继续轮询epollfd。

3. 结论

目前可以初步确认,当采用实时内核时,epoll_wait系统调用将自旋锁替换为spinlock_rt.h中定义的自旋锁,其内部进行进程调度触发重调度中断,而非实时内核中仅在ep_poll中会重新调度(通过对内核进行追踪,该调度函数被调用的概率极低)。

4. 参考链接

  • 实时补丁二进制RPM
  • 实时补丁源代码RPM
  • 3.10.0-1127.rt56.1093.el7.x86_64
  • Product Documentation for Red Hat Enterprise Linux for Real Time 7

CentOS 7 Linux实时内核下的epoll性能分析相关推荐

  1. CentOS 7 Linux实时内核下的epoll性能分析后续 | 火焰图分析

    在<CentOS 7 Linux实时内核下的epoll性能分析>从源码角度分析了epoll在实时内核和非实时内核之间的差异,为了更好的展示问题所在,这里给出epoll和select在实时内 ...

  2. 【realtime】红帽 RedHat Linux实时内核配置要点全面分析

    CentOS Linux实时性配置要点 rtoax 2021年2月 1. 概要 1.1. 实时性补丁 补丁地址:CentOS 7 - RealTime for x86_64: RealTime: ke ...

  3. [linux kernel] 内核下ksz8081驱动调试

    系统版本:Ubuntu18.04-64 编译器版本:gcc version 7.4.0 (Ubuntu/Linaro 7.4.0-1ubuntu1~18.04.1) uboot版本:2018.07 - ...

  4. epoll 性能分析(解决占用CPU 过高问题)2

    epoll 性能分析(解决占用CPU 过高问题)2 参考文章: (1)epoll 性能分析(解决占用CPU 过高问题)2 (2)https://www.cnblogs.com/Jimmy104/p/5 ...

  5. 【原创】xenomai+linux双内核下的时钟管理机制

    双内核下的时钟管理机制 版权声明:本文为本文为博主原创文章,转载请注明出处.如有问题,欢迎指正.博客地址:https://www.cnblogs.com/wsg1100/ 文章目录 双内核下的时钟管理 ...

  6. 浅析微前端架构下的Web性能分析

    我们都知道Web性能关乎用户体验,进一步影响用户留存.转化率,显然用户体验不友好,最终导致流失.可见Web页面性能对用户和企业而言,可谓举足轻重. 因此,对Web页面的性能分析相关性能优化,是开发者不 ...

  7. linux在内核下使用iic,实例解析linux内核I2C体系结构(2)

    四.在内核里写i2c设备驱动的两种方式 在 (1) Adapter方式(LEGACY) (下面的实例代码是在2.6.27内核的pca953x.c基础上修改的,原始代码采用的是本文将要讨论的第2种方式, ...

  8. 了解Linux实时内核

    了解Xenomai过程中,对现阶段的RTOS进行总结如下: 把现阶段的RTOS分成两个阵营: 非Linux阵营:VxWorks,RTEMS Linux阵营 :RT-linux,Preempt-rt,W ...

  9. linux 实时内核 xenomai 2.6.5 配置方法

    安装前准备 安装包准备:ubuntu 16.04   xenomai 2.6.5 ​ linux kernel 3.18.20 主要参考网址https://rtt-lwr.readthedocs.io ...

最新文章

  1. 《数学之美》第19章 谈谈数学模型的重要性
  2. UVA 1331 Minimax Triangulation DP, 三角剖分
  3. 在linux上配置oracle9,ORACLE_9安装与删除( linux下)
  4. Java集合之ArrayList源码解析
  5. 计算机视觉开源库OpenCV添加文字cv2.putText()参数详解
  6. JavaScript动态设置table的高度
  7. mvc3部署到mono上面遇到的问题
  8. 深度学习(二十三)——Fast Image Processing, SVDF, LCNN, LSTM进阶
  9. JavaScript DOM扩展——“选择符API和元素遍历”的注意要点
  10. 滴!你的“十三香”已发货,iPhone 13系列今日正式发售
  11. 各种应用程序错误,xx内存地址不能read等报错可能的原因与解决方案
  12. ubuntu下opencv3和opencv2共存
  13. 未来改变世界的十大新技术
  14. excel表格分割线一分为二_高效秘技!用EXCEL制作导航页和日志表管理日常工作...
  15. Serializer序列化器
  16. GIS空间分析(一)——空间分析与GIS
  17. CCF 201903-1 小中大
  18. linux twiki 添加权限,Ubuntu上TWiki安装和使用心得
  19. 如何在Windows 10上修复缩略图问题
  20. 【游戏】LOL只能攻击英雄,点不了小兵解决办法

热门文章

  1. php开发流程 restful,PhpBoot 入门(一) 快速开发 RESTful 接口
  2. python代码删掉了几行怎么撤回_仅78行代码实现微信撤回消息查看 | Python itchat
  3. 什么时候出来的_DNF手游官网正版什么时候出来?地下城手游上线日期分享
  4. 【PHP 面试知识梳理】
  5. 通向码农的道路(enet开源翻译计划 二)
  6. 关于外键,再唠叨一下下
  7. 训练代码_无需一行代码,完成模型训练和部署,这个AI工具开始公测
  8. mysql 备份数据库结账_用余额快照秒级导出实现财务报表
  9. elk如何同步到es 方案靠谱吗_架构设计:微服务架构如何划分?这6个标准原则让你一目了然...
  10. python编写函数判断三角形_使用Python三角函数公式计算三角形的夹角案例