点这里排版好一点

本来这篇应该是上周发的,拖延症又犯了

上一篇主要讨论了Linux线程的调度算法

这篇来谈谈线程间的同步问题,暂时不包括IPC(InterProcess Communication)问题,IPC还是很有趣的。

有趣的事情就要慢慢品对吧,留到下次再来谈 (主要准备不过来 hhh 太真实了)

PS: 以下解析的Linux kernel版本号为4.19.25

Thread synchronization

Motivation

为什么线程之间需要同步?

一个原因,同一个父进程下的所有子线程共享同一个PC,同一个寄存器,同一个共享库(同一片天空)

所以当多个子线程同时对同一个变量进行操作的时候,就很有可能出现热点,甚至错误情况,这就是同步问题。

另外一个原因,很多时候线程之间执行情况实际上是有一定顺序的,下一个线程需要知道上一个线程有没有完成执行任务。

当然线程权限没有那么大,这些事情都是调度程序来做的,但线程有感知上一个线程完成与否的需求,这就是互斥问题。

所以,总的而言,线程同步主要解决的是同步互斥问题。

至于怎么解决,常见的套路主要是在栈中设立一个原子变量,通过抢占这个全部变量实现同步互斥。

具体而言,有互斥量mutex, 锁Lock, 读写锁rwlock, 条件变量Condition, 屏障Barrier etc.

Souce code

这一部分代码比较多,有些还比较晦涩,Linux kernel4以后的代码相较于2.×版本还有比较大的改动
然后在工程中,这一部分还是很有用的,比如所有线程安全都是基于互斥量这个概念实现的,再比如说写个redis锁 etc.
掌握这部分会对你维护多线程问题有所帮助!!!

Linux的线程同步机制和Nachos中使用的机制(信号量,锁,条件变量)基本一致。采用了互斥量mutex,条件变量,信号量,读写锁。

Mutex

Linux 下通过声明一个Mutex的类来实现互斥量的实现。另外还声明了一个ww_mutex(wound/wait)来避免死锁

Linux kerenal 中关于Mutex struct的代码在<include/linux/mutex.h>

struct mutex {atomic_long_t           owner;     // mutex 获得的owner ID// 若==0, 则mutex未被占用;// 若> 0, 则mutex被此ownerId占用,// 只能由当前owner解mutexspinlock_t              wait_lock; // 自旋锁类型
#ifdef CONFIG_MUTEX_SPIN_ON_OWNERstruct optimistic_spin_queue osq;  /* Spinner MCS lock */
#endifstruct list_head        wait_list; // 等待队列
#ifdef CONFIG_DEBUG_MUTEXESvoid                    *magic;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOCstruct lockdep_map      dep_map;
#endif
};

上面的struct用一个原子变量owner来实现mutex的互斥效果, 这里已经和kernel 2.×版本不一样了。

当owner为0时,表示这个mutex还未被占用。当mutex不为零的时候,只能由id == owner的线程解除占用

另外定义了一个wait_list用于存储被sleep的thread

这部分代码和nachos中Semaphore的设计基本一致

而具体实现mutex的代码位于<kernel/locking/mutex.c>

__mutex_init函数主要做一些变量声明和初始化的工作。

void
__mutex_init(struct mutex *lock, const char *name, struct lock_class_key *key)
{atomic_long_set(&lock->owner, 0);   // init atomic 变量 ownerspin_lock_init(&lock->wait_lock);   // init 自旋锁类型变量INIT_LIST_HEAD(&lock->wait_list);   // init 等待队列变量
#ifdef CONFIG_MUTEX_SPIN_ON_OWNERosq_lock_init(&lock->osq);
#endifdebug_mutex_init(lock, name, key);
}

以加锁为例,调用的的是mutex_lock函数。

void __sched mutex_lock(struct mutex *lock)
{might_sleep();                       // 打印堆栈 debug sleepif (!__mutex_trylock_fast(lock))     // atomic获得owner, 如果能__mutex_lock_slowpath(lock); //
}
EXPORT_SYMBOL(mutex_lock);
#endif

其中,might_sleep()是一个全局Linux API,主要用于在中断时候,debug打印context堆栈,这个API在后面被广泛使用。

__mutex_trylock_fast(lock) 是一个去获取lock的owner的函数,如果能获取则返回true

static __always_inline bool __mutex_trylock_fast(struct mutex *lock)
{ww_acquire_ctxunsigned long curr = (unsigned long)current;unsigned long zero = 0UL;if (atomic_long_try_cmpxchg_acquire(&lock->owner, &zero, curr))  // 获取ownerreturn true;return false;
}

如果有权限获取owner则

static noinline void __sched
__mutex_lock_slowpath(struct mutex *lock)
{__mutex_lock(lock, TASK_UNINTERRUPTIBLE, 0, NULL, _RET_IP_);  // 调用__mutex_lock
}

然后再嵌套调用,不知道是为了什么,写了那么多层(可能是有别的地方 复用到了)

static int __sched
__mutex_lock(struct mutex *lock, long state, unsigned int subclass,struct lockdep_map *nest_lock, unsigned long ip)
{// 调用__mutex_lock_commonreturn __mutex_lock_common(lock, state, subclass, nest_lock, ip, NULL, false);
}

然后就到了Linux真正处理mock_lock的地方

static __always_inline int __schedw
__mutex_lock_common(struct mutex *lock, long state, unsigned int subclass,// lock TASK_UNINTERRUPTIBLE 0struct lockdep_map *nest_lock, unsigned long ip,      // NULL _RET_IP_struct ww_acquire_ctx *ww_ctx, const bool use_ww_ctx) // NULL false
{struct mutex_waiter waiter;bool first = false;struct ww_mutex *ww;     // ww = wound/wait mutex 用于死锁检验int ret;might_sleep(); // 一样的去打印context的堆栈ww = container_of(lock, struct ww_mutex, base); // 获得ww_mutexif (use_ww_ctx && ww_ctx) {                     // mutet_lock进不到这个,ww_mutex_lock有可能进if (unlikely(ww_ctx == READ_ONCE(ww->ctx))) // ww_mutex获得的ctx和需要的ctx对比return -EALREADY;/** Reset the wounded flag after a kill. No other process can* race and wound us here since they can't have a valid owner* pointer if we don't have any locks held.*/if (ww_ctx->acquired == 0)   // 如果ww_ctx没有被获得 则重设wounded 位ww_ctx->wounded = 0;}preempt_disable();  // 设置不可抢占mutex_acquire_nest(&lock->dep_map, subclass, 0, nest_lock, ip); // 检查mutex 需要的条件if (__mutex_trylock(lock) ||                                  // 尝试上lockmutex_optimistic_spin(lock, ww_ctx, use_ww_ctx, NULL)) {  // 尝试上乐观锁/* got the lock, yay! */lock_acquired(&lock->dep_map, ip);                    // 上lockif (use_ww_ctx && ww_ctx)                             // ww_mutex_lock时ww_mutex_set_context_fastpath(ww, ww_ctx);    // 设置上下文pathpreempt_enable();                                     // 解除不可抢占return 0;}spin_lock(&lock->wait_lock); // 对等待队列上自旋锁/** After waiting to acquire the wait_lock, try again.*/if (__mutex_trylock(lock)) {                                 // 那再试试呗 hhhif (use_ww_ctx && ww_ctx)__ww_mutex_check_waiters(lock, ww_ctx);goto skip_wait;}debug_mutex_lock_common(lock, &waiter); // 掉一下debug模式下mutet_lock_commonlock_contended(&lock->dep_map, ip);     // 去等锁if (!use_ww_ctx) {                      // mutex_lock时候/* add waiting tasks to the end of the waitqueue (FIFO): */__mutex_add_waiter(lock, &waiter, &lock->wait_list); // 加到wait_queue#ifdef CONFIG_DEBUG_MUTEXESwaiter.ww_ctx = MUTEX_POISON_WW_CTX;
#endif} else {/** Add in stamp order, waking up waiters that must kill* themselves.*/ret = __ww_mutex_add_waiter(&waiter, lock, ww_ctx); // 加到ww_mutex的wait_queueif (ret)goto err_early_kill;waiter.ww_ctx = ww_ctx;}waiter.task = current;set_current_state(state);  // 设置statefor (;;) {                // 做了一个自旋操作 retry lock/** Once we hold wait_lock, we're serialized against* mutex_unlock() handing the lock off to us, do a trylock* before testing the error conditions to make sure we pick up* the handoff.*/if (__mutex_trylock(lock))  // 等到了goto acquired;/** Check for signals and kill conditions while holding* wait_lock. This ensures the lock cancellation is ordered* against mutex_unlock() and wake-ups do not go missing.*/if (unlikely(signal_pending_state(state, current))) { // if state不对ret = -EINTR;goto err;}if (use_ww_ctx && ww_ctx) {  // 如果是ww_mutex 且 wait_queue 有需要被kill掉的ret = __ww_mutex_check_kill(lock, &waiter, ww_ctx);if (ret)goto err;}spin_unlock(&lock->wait_lock); // 解自旋锁schedule_preempt_disabled();   // 解除不可抢占/** ww_mutex needs to always recheck its position since its waiter* list is not FIFO ordered.*/if ((use_ww_ctx && ww_ctx) || !first) {first = __mutex_waiter_is_first(lock, &waiter);if (first)__mutex_set_flag(lock, MUTEX_FLAG_HANDOFF);}set_current_state(state); // update state/** Here we order against unlock; we must either see it change* state back to RUNNING and fall through the next schedule(),* or we must see its unlock and acquire.*/if (__mutex_trylock(lock) || // 再试一次(first && mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx, &waiter)))break;spin_lock(&lock->wait_lock);}spin_lock(&lock->wait_lock);
acquired:__set_current_state(TASK_RUNNING);if (use_ww_ctx && ww_ctx) {/** Wound-Wait; we stole the lock (!first_waiter), check the* waiters as anyone might want to wound us.*/if (!ww_ctx->is_wait_die &&!__mutex_waiter_is_first(lock, &waiter))__ww_mutex_check_waiters(lock, ww_ctx);}mutex_remove_waiter(lock, &waiter, current); // 从等待队列中removeif (likely(list_empty(&lock->wait_list)))__mutex_clear_flag(lock, MUTEX_FLAGS); // 清除flagdebug_mutex_free_waiter(&waiter);skip_wait:/* got the lock - cleanup and rejoice! */lock_acquired(&lock->dep_map, ip);if (use_ww_ctx && ww_ctx)ww_mutex_lock_acquired(ww, ww_ctx);spin_unlock(&lock->wait_lock); // cleanuppreempt_enable();return 0;err:__set_current_state(TASK_RUNNING);mutex_remove_waiter(lock, &waiter, current);
err_early_kill:spin_unlock(&lock->wait_lock);debug_mutex_free_waiter(&waiter);mutex_release(&lock->dep_map, 1, ip);preempt_enable();return ret;
}

上面的__mutex_commonmutex_lockww_mutex_lock两个函数复用

use_ww_ctx && ww_ctx这两个变量就是用来判断到底是被哪个函数复用了

然后函数很多逻辑都是为了减少等待时间,用了多次自旋锁进行等待,直到多次尝试之后还不能上锁的时候才真正去sleep等待

这样的操作虽然可能会增大单次上锁时间,但相比交换上下文Context的代价肯定是很省了

自旋锁 spinlock

自旋锁,就是一种反复重试的锁,因为实际生产过程中,经常会有稍微等一等这个互斥量就解除的情况

所以自旋锁在工程中用处还是很大的,很多java程序都要写spinlock

Spinlock相关代码在<include/linux/spinlock_api_smp.h>

static inline int __raw_spin_trylock(raw_spinlock_t *lock)
{preempt_disable(); // 设置不可抢占if (do_raw_spin_trylock(lock)) {  // 尝试获得自旋锁spin_acquire(&lock->dep_map, 0, 1, _RET_IP_); // 获得自旋锁return 1;}preempt_enable(); // 接触不可抢占return 0;
}

其中spin_acquire定义在<include/linux/lockdep.h>

#define spin_acquire(l, s, t, i)                lock_acquire_exclusive(l, s, t, NULL, i)
#define lock_acquire_exclusive(l, s, t, n, i)           lock_acquire(l, s, t, 0, 1, n, i)

而lock_acquire()实现的代码在<kernel/locking/lockdep.c>

void lock_acquire(struct lockdep_map *lock, unsigned int subclass,int trylock, int read, int check,struct lockdep_map *nest_lock, unsigned long ip)
{unsigned long flags;if (unlikely(current->lockdep_recursion)) // 如果锁的递归深度标志位!=0return;raw_local_irq_save(flags); // 刷一下flags到diskcheck_flags(flags); // 检查flagcurrent->lockdep_recursion = 1; // 互斥trace_lock_acquire(lock, subclass, trylock, read, check, nest_lock, ip); // 追踪锁获得 打印日志__lock_acquire(lock, subclass, trylock, read, check,irqs_disabled_flags(flags), nest_lock, ip, 0, 0);  // lock acquirecurrent->lockdep_recursion = 0; // 解除互斥raw_local_irq_restore(flags); // 再刷一下flags
}
EXPORT_SYMBOL_GPL(lock_acquire);

然后具体实现的时候,调用到__lock_acquire()

static int __lock_acquire(struct lockdep_map *lock, unsigned int subclass,int trylock, int read, int check, int hardirqs_off,struct lockdep_map *nest_lock, unsigned long ip,int references, int pin_count)
{struct task_struct *curr = current;struct lock_class *class = NULL;struct held_lock *hlock;unsigned int depth;int chain_head = 0;int class_idx;u64 chain_key;if (subclass < NR_LOCKDEP_CACHING_CLASSES)class = lock->class_cache[subclass]; // 找到cache/** Not cached?*/if (unlikely(!class)) {class = register_lock_class(lock, subclass, 0); // 注册lockif (!class)return 0;}atomic_inc((atomic_t *)&class->ops); // 原子操作获得class 操作符if (very_verbose(class)) {printk("nacquire class [%px] %s", class->key, class->name);if (class->name_version > 1)printk(KERN_CONT "#%d", class->name_version);printk(KERN_CONT "n");dump_stack();}depth = curr->lockdep_depth;  // init depthif (DEBUG_LOCKS_WARN_ON(depth >= MAX_LOCK_DEPTH)) // stack深度溢出return 0;class_idx = class - lock_classes + 1;if (depth) {hlock = curr->held_locks + depth - 1;if (hlock->class_idx == class_idx && nest_lock) {if (hlock->references) {/** Check: unsigned int references:12, overflow.*/if (DEBUG_LOCKS_WARN_ON(hlock->references == (1 << 12)-1)) // 2^12 - 1return 0;hlock->references++;} else {hlock->references = 2;}return 1;}}hlock = curr->held_locks + depth;if (DEBUG_LOCKS_WARN_ON(!class))return 0;hlock->class_idx = class_idx; // 记录hlock信息hlock->acquire_ip = ip;hlock->instance = lock;hlock->nest_lock = nest_lock;hlock->irq_context = task_irq_context(curr);hlock->trylock = trylock;hlock->read = read;hlock->check = check;hlock->hardirqs_off = !!hardirqs_off;hlock->references = references;
#ifdef CONFIG_LOCK_STAThlock->waittime_stamp = 0;hlock->holdtime_stamp = lockstat_clock();
#endifhlock->pin_count = pin_count;if (check && !mark_irqflags(curr, hlock))return 0;/* mark it as used: */if (!mark_lock(curr, hlock, LOCK_USED))return 0;if (DEBUG_LOCKS_WARN_ON(class_idx > MAX_LOCKDEP_KEYS)) // 又溢出了return 0;chain_key = curr->curr_chain_key;if (!depth) {/** How can we have a chain hash when we ain't got no keys?!*/if (DEBUG_LOCKS_WARN_ON(chain_key != 0))return 0;chain_head = 1;}hlock->prev_chain_key = chain_key;if (separate_irq_context(curr, hlock)) {chain_key = 0;chain_head = 1;}chain_key = iterate_chain_key(chain_key, class_idx);curr->curr_chain_key = chain_key;curr->lockdep_depth++;check_chain_key(curr);return 1;
}

__lock_acquire()spin_lockmutex_lock两个class调用

实际上它的操作对象不是对单一class加锁,是对一个锁类的加锁

这里为了降低lockdep的搜索消耗,用了一个cache

对于那些反复加放锁的部分有不小的性能上的提升

  • 读写锁rwlock

读写锁的主要目的就是实现某一种状态的并发性

  • 条件变量 Condition

条件变量则是为了实现线程的批处理,一个个batch执行,定义了单个唤醒 & 广播唤醒两种方式

  • 屏障 barrier

屏障的作用就很像两阶段锁协议,第一阶段只能等待,第二阶段只能运行

当未达到屏障约定的上限时,通过条件变量实现进入wait_queue

当达到屏障上限的时候,通过广播一次性唤醒

c++ pipe 同步 互斥_数一数Linux中有多少种线程同步策略-『Linux 源码解析(二)』...相关推荐

  1. SEAL全同态加密开源库(七) rns剩余数系统-源码解析

    SEAL全同态加密开源库(七) rns剩余数系统-源码解析 2021SC@SDUSC 2021-11-14 前言 这是SEAL开源库代码分析报告第六篇,本篇将分析util文件夹中的rns.h和rns. ...

  2. mysql 线程池源码模块_易语言Mysql线程池2.0模块源码

    易语言Mysql线程池2.0模块源码 易语言Mysql线程池2.0模块源码 系统结构:GetThis,初始化,关闭类线程,线程_测试,其他_附加文本,连接池初始化,取mysql句柄,释放mysql句柄 ...

  3. btcd源码解析——peer节点之间的区块数据同步 (3) —— 非headersFirstMode模式

    文章目录 1. 写在前面 2. 非headersFirstMode模式下的数据同步过程 2.1 peer A 发送"获取区块哈希"的请求 2.2 peer B 响应"获取 ...

  4. 死磕 java同步系列之ReentrantReadWriteLock源码解析

    问题 (1)读写锁是什么? (2)读写锁具有哪些特性? (3)ReentrantReadWriteLock是怎么实现读写锁的? (4)如何使用ReentrantReadWriteLock实现高效安全的 ...

  5. Spring5源码 - 12 Spring事件监听机制_异步事件监听应用及源码解析

    文章目录 Pre 实现原理 应用 配置类 Event事件 事件监听 EventListener 发布事件 publishEvent 源码解析 (反推) Spring默认的事件广播器 SimpleApp ...

  6. Java熔断框架有哪些_降级熔断框架 Hystrix 源码解析:滑动窗口统计

    降级熔断框架 Hystrix 源码解析:滑动窗口统计 概述 Hystrix 是一个开源的降级熔断框架,用于提高服务可靠性,适用于依赖大量外部服务的业务系统.什么是降级熔断呢? 降级 业务降级,是指牺牲 ...

  7. 增加数组下标_数组以及ArrayList源码解析

    点击上方"码之初"关注,···选择"设为星标" 与精品技术文章不期而遇 前言 前一篇我们对数据结构有了个整体的概念上的了解,没看过的小伙伴们可以看我的上篇文章: ...

  8. 关于互斥锁,条件变量的内核源码解析

    一.解决问题和适用范围 主要是用来等待一个条件,这个条件可能需要另一个线程来满足这个条件.这个和我们平常适用的pthread_mutex_lock的最大不同在于后者保护的一般是一个代码段(也就是关键区 ...

  9. Linux系统开发9 线程同步

    [本文谢绝转载原文来自http://990487026.blog.51cto.com] <大纲> Linux系统编程8 线程同步多线程共享资源,不加锁,同步互斥演示多线程共享资源,加锁,同 ...

最新文章

  1. linux 集群配置ssh无密码访问
  2. 纯css3鼠标经过出现文字或图片鼠标移走消失
  3. 应用在机器学习中的聚类数据集产生方法
  4. C运行时库和标准C++库
  5. 博士申请 | 加拿大Mila实验室唐建教授招收深度学习方向博士生和实习生
  6. Learn Python the first day.
  7. springBoot整合Listener
  8. ECC密钥结构和密码学基础
  9. LeetCode(7) - Reverse Integer
  10. CodeDom系列--事件(event)定义和反射调用
  11. Windows Phone 更改datePicker的显示格式
  12. 营销推广的好工具:博客营销
  13. 德州农工大学计算机硕士申请,2019至领留学获德州农工大学TAMU电子工程硕士ECE录取...
  14. mysql中将数字转化成汉字 基础
  15. 01超精美渐变色动态背景完整示例【CSS动效实战(纯CSS与JS动效)】
  16. 深入浅出WPF——XAML的优点
  17. 【正本清源】Synchronized 源码全解之偏向锁的获取(基于 Openjdk12)
  18. android相机保存文件为空,android 调用系统相机拍照,返回的data为null
  19. 【亲测可用】经验分享,开发在线考试平台(类似问卷星、考试星、阿里巴巴在线考试、易考通、考试云、移动网大、移动网校、中石化网络学院、宝武微学院、iTEST等)如何用程序代码防止学生作弊?
  20. 6-2 找出大于num的最小素数 (16 分)

热门文章

  1. 进程与线程 thread (二)——线程概念
  2. SpringCloud-Hytrix
  3. 多层AOP 解决AOP执行顺序
  4. DJango_生命周期
  5. 《SAP HANA平台应用开发》—第3章3.1节信息建模
  6. Android 线程管理
  7. golang 结构体 map 转化为 json
  8. python 获取当前时间 时间差
  9. mysql 时间差函数 timestampdiff datediff 简介
  10. linux 内核 获取随机数