今天讨论下内核常见锁的机制与实现分析. 第一个问题内核何时会发生临界资源的竞争访问? 对于非抢占UP(uni processor)内核只有一种情况会发生竞争, 即高优先级异常/中断处理函数抢占内核线程的cpu并访问了临界资源. 如果内核开启抢占还会引发另一种可能, 即在中断返回或系统调用返回时另一线程抢占当前线程的cpu并访问了临界资源. 在SMP架构中又增加一种情况: 多线程在多核上运行访问临界资源. 对于前两种情况内核对临界资源的保护的实质是对cpu的互斥访问(禁止中断/抢占), 而对于后一种情况仅仅保证对cpu的互斥访问就不够充分了, 现代的cpu通常通过对总线的互斥访问(对内存独占访问与同步刷新cache指令)来实现对临界资源的保护.
在本文中我们将基于32bit ARMv7 SMP架构分析几种内核锁的实现原理并说明它们的应用场景.

1. 自旋锁

自旋锁是最基础的内核锁, 许多其它锁都是基于自旋锁实现的, 自旋锁spinlock_t(defined in include/linux/spinlock_types.h)定义见下.

 1 typedef struct spinlock {
 2     union {
 3         struct raw_spinlock rlock;
 4 #ifdef CONFIG_DEBUG_LOCK_ALLOC
 5 #define LOCK_PADSIZE (offsetof(struct raw_spinlock, dep_map))
 6         struct {
 7             u8 __padding[LOCK_PADSIZE];
 8             struct lockdep_map dep_map;
 9         };
10 #endif
11     };
12 } spinlock_t;
13 typedef struct raw_spinlock {
14     arch_spinlock_t raw_lock;
15 #ifdef CONFIG_GENERIC_LOCKBREAK
16     unsigned int break_lock;
17 #endif
18 #ifdef CONFIG_DEBUG_SPINLOCK
19     unsigned int magic, owner_cpu;
20     void *owner;
21 #endif
22 #ifdef CONFIG_DEBUG_LOCK_ALLOC
23     struct lockdep_map dep_map;
24 #endif
25 } raw_spinlock_t;

作为内核锁的基础, 自旋锁的一共封装了三层(有点复杂), 最外层名字是spinlock_t, 其包含一个名为raw_spinlock_t(原生的自旋锁)的结构, 而该结构又包含了一个名为arch_spinlock_t的结构. arch_spinlock_t(defined in arch/arm/include/asm/spinlock_types.h)才是基于架构的真正的自旋锁结构, 其在ARM架构上定义如下.

 1 typedef struct {
 2     union {
 3         u32 slock;
 4         struct __raw_tickets {
 5 #ifdef __ARMEB__
 6             u16 next;
 7             u16 owner;
 8 #else
 9             u16 owner;
10             u16 next;
11 #endif
12         } tickets;
13     };
14 } arch_spinlock_t;

由于自旋锁的接口较多在, 我们先从最常见的spin_lock()/spin_unlock()(defined in include/linux/spinlock.h)开始看起, 并且假定所有调试宏均关闭.

 1 #define raw_spin_lock(lock) _raw_spin_lock(lock)
 2 static inline void spin_lock(spinlock_t *lock)
 3 {
 4     raw_spin_lock(&lock->rlock);
 5 }
 6 #define raw_spin_unlock(lock) _raw_spin_unlock(lock)
 7 static inline void spin_unlock(spinlock_t *lock)
 8 {
 9     raw_spin_unlock(&lock->rlock);
10 }

_raw_spin_lock()/_raw_spin_unlock()的实现区分SMP/UP架构, 我们简要说下UP下的实现即关抢占(如果内核不开抢占即什么都不做). 然后看下SMP架构的实现, 其定义见kernel/spinlock.c(先忽略DEBUG宏与其它扩展功能宏).

 1 #ifndef CONFIG_INLINE_SPIN_LOCK
 2 void __lockfunc _raw_spin_lock(raw_spinlock_t *lock)
 3 {
 4     __raw_spin_lock(lock);
 5 }
 6 EXPORT_SYMBOL(_raw_spin_lock);
 7 #endif
 8 #ifndef CONFIG_UNINLINE_SPIN_UNLOCK
 9 #define _raw_spin_unlock(lock) __raw_spin_unlock(lock)
10 #endif
11 #if !defined(CONFIG_GENERIC_LOCKBREAK) || defined(CONFIG_DEBUG_LOCK_ALLOC)
12 static inline void __raw_spin_lock(raw_spinlock_t *lock)
13 {
14     preempt_disable();
15     spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
16     LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
17 }
18 #endif
19 static inline void __raw_spin_unlock(raw_spinlock_t *lock)
20 {
21     spin_release(&lock->dep_map, 1, _RET_IP_);
22     do_raw_spin_unlock(lock);
23     preempt_enable();
24 }

这里注意的一点是内核加锁的函数默认不内联, 解锁函数默认内联(为什么我也不知道). 我们先来看看__raw_spin_lock(), 首先关抢占, spin_acquire()是用于debug不正常释放spinlock的调试接口(需开启DEBUG_LOCK_ALLOC)忽略不计, 最后调用LOCK_CONTENDED()(defined in include/linux/lockdep.h)宏, 因此加锁的关键步骤在于LOCK_CONTENDED()宏.

#define LOCK_CONTENDED(_lock, try, lock) lock(_lock)

即__raw_spin_lock()最终调用do_raw_spin_lock()(defined in include/linux/spinlock.h). 该接口中的__acquire()宏是内核sparse检测宏, 用于检测该锁是否未成对使用. 最终调用的是arch_spin_lock()(defined in arch/arm/include/asm/spinlock.h). 该接口包含内嵌汇编, 为方便理解将反汇编代码也贴上来.

 1 static inline void do_raw_spin_lock(raw_spinlock_t *lock) __acquires(lock)
 2 {
 3     __acquire(lock);
 4     arch_spin_lock(&lock->raw_lock);
 5 }
 6 static inline void arch_spin_lock(arch_spinlock_t *lock)
 7 {
 8     unsigned long tmp;
 9     u32 newval;
10     arch_spinlock_t lockval;
11     __asm__ __volatile__(
12 "1: ldrex %0, [%3]\n"
13 "   add %1, %0, %4\n"
14 "   strex %2, %1, [%3]\n"
15 "   teq %2, #0\n"
16 "   bne 1b"
17     : "=&r" (lockval), "=&r" (newval), "=&r" (tmp)
18     : "r" (&lock->slock), "I" (1 << TICKET_SHIFT)
19     : "cc");
20     while (lockval.tickets.next != lockval.tickets.owner) {
21         wfe();
22         lockval.tickets.owner = ACCESS_ONCE(lock->tickets.owner);
23     }
24     smp_mb();
25 }
26 c03b0e90 <_raw_spin_lock>:
27 c03b0e90:   e1902f9f    ldrex r2, [r0]
28 c03b0e94:   e2823801    add r3, r2, #65536 ; 0x10000
29 c03b0e98:   e1801f93    strex r1, r3, [r0]
30 c03b0e9c:   e3310000    teq r1, #0
31 c03b0ea0:   1afffffa    bne c03b0e90 <_raw_spin_lock>
32 c03b0ea4:   e6ff3072    uxth r3, r2
33 c03b0ea8:   e7ef2852    ubfx r2, r2, #16, #16
34 c03b0eac:   e1520003    cmp r2, r3
35 c03b0eb0:   0a000003    beq c03b0ec4 <_raw_spin_lock+0x34>
36 c03b0eb4:   e320f002    wfe
37 c03b0eb8:   e1d030b0    ldrh r3, [r0]
38 c03b0ebc:   e6ff3073    uxth r3, r3
39 c03b0ec0:   eafffff9    b c03b0eac <_raw_spin_lock+0x1c>
40 c03b0ec4:   f57ff05f    dmb sy
41 c03b0ec8:   e12fff1e    bx lr

有了反汇编指令理解代码容就易理解多了. 这里需要首先补充一下ARM架构相关基础知识(见ARM ref manual ARMv7-A & ARMv7-R edition markup). 首先要明确何种操作才属于原子操作, 这里摘抄一下ARM ref manual中对原子操作的说明.

A3.5.3
In ARMv7, the single-copy atomic processor accesses are:
all byte accesses
all halfword accesses to halfword-aligned locations
all word accesses to word-aligned locations

memory accesses caused by LDREXD and STREXD instructions to doubleword-aligned locations. LDM, LDC, LDC2, LDRD, STM, STC, STC2, STRD, PUSH, POP, RFE, SRS, VLDM, VLDR, VSTM, and VSTR instructions are executed as a sequence of word-aligned word accesses. Each 32-bit word access is guaranteed to be single-copy atomic. A subsequence of two or more word accesses from the sequence might not exhibit single-copy atomicity.

In a multiprocessing system, writes to a memory location are multi-copy atomic if the following conditions are both true:
All writes to the same location are serialized, meaning they are observed in the same order by all observers, although some observers might not observe all of the writes.
A read of a location does not return the value of a write until all observers observe that write.
Writes to Normal memory are not multi-copy atomic.

即在单核cpu上对按字节, 半字与字对齐的字节, 半字与字访问都是原子的, 使用LDREXD与STREXD指令访问按双字对齐的双字也是原子的. 但在多核cpu上只有满足以下条件的向内存地址的访问才是原子的: 所有向该地址写操作都是串行的, 即它们被所有观察者按同一顺序观察, 所有向该地址读操作都不会返回被改写的值知道所有观察者都观察到那次写操作. 在多核cpu上向normal内存的写操作都不是原子的. 所以ARMv6开始引入了同步原语(synchronization primitives).

Synchronization primitives must ensure correct operation of system semaphores in the memory order model. The synchronization primitive instructions are defined as those instructions that are used to ensure memory synchronization:
LDREX, STREX, LDREXB, STREXB, LDREXD, STREXD, LDREXH, STREXH.
SWP, SWPB. Use of these instructions is deprecated from ARMv6.
Before ARMv6, support consisted of the SWP and SWPB instructions. ARMv6 introduced new Load-Exclusive and Store-Exclusive instructions LDREX and STREX, and deprecated using the SWP and SWPB instructions.
ARMv7 introduces:
additional Load-Exclusive and Store-Exclusive instructions, LDREXB, LDREXD, LDREXH, STREXB, STREXD, and STREXH
the Clear-Exclusive instruction CLREX
the Load-Exclusive, Store-Exclusive and Clear-Exclusive instructions in the Thumb instruction set.

同步原语必须保证正确的(内存模型中的)系统信号量操作, 同步原语指令用于保证内存同步操作. 它们包括ldrex与strex(及其访问单字节, 半字与双字的同类指令), swp与swpb用于ARMv6之前的架构. ARMv7引入ltrex, strex与clrex(及它们的thumb版本).
此外ARM还引入了内存屏障指令与事件指令. 内存屏障指令包含三条, 分别为dmb, dsb与isb(其中dmb是内存访问屏障, 用于同步cache中数据, dsb与isb分别为数据与指令同步屏障, 用于修改内存数据后等待其它cpu的cache同步). 事件指令包含两条, 分别为wfe(wait for event)与sev(send event).

让我们回到arch_spin_lock(), 该接口首先使用ldrex对lock->tickets独占访问(结果保存在R2中), 并对该结构体的next成员加1, 并使用strex独占更新该值(更新结果返回在R1中), 如果该值非0即更新失败(可能是该cpu cache与内存不一致或被其它cpu修改), 重新执行以上步骤直至返回0. 以上流程可以见ARM ref manual中对ldrex与strex指令使用的说明.
至此自旋锁代码与原子计数代码类似, 早期的内核即通过对unsigned int自增自减实现加锁. 既然如此为何还需要后面的代码呢? 在没有读过自旋锁实现之前我一直有这个问题: 直接使用原子计数替代自旋锁不行吗? 在看了源码后才理解两者的区别, 内核之所以设计next与owner两个变量是为了保证竞争锁的公平性. 我们可以看到每个cpu加锁时都是在拷贝一个锁的结构体到栈(寄存器)中, 因此最近获取并释放锁的cpu的cache中更有可能存在备份, 也更有可能重新获取锁, 这对其它获取锁的任务不公平. 因此内核使用fifo方式来解决不公平竞争, 每个任务获取的task仅仅是该任务的排序号, 在获取排序号后还需要不断读取owner值, 只有当owner与next相等时才表示当前任务获取该锁, 否则该任务只能继续等待. 在等待过程中会使用wfe指令, 该指令为ARM为防止程序重复读取锁而降低cpu性能而设计的指令, 用于降低功耗与减少总线竞争, 直到被sev指令唤醒. 再获取自旋锁后(next等于owner)还需执行dmb指令通知其它获取锁的cpu刷新cache(这是同步原语必须遵循的内存模型, 即获取锁到使用锁之间必须执行dmb指令, 释放锁前必须执行dmb指令, 详见ARM ref manual中对spin lock的描述).

让我们回头再看下解锁流程, 由于解锁代码都是内联函数(需定义UNINLINE_SPIN_UNLOCK才能解内联)此处就不贴出汇编代码了. 值得注意的是解锁流程不仅做dmb, 再修改lock->tickets.owner后还要做dsb(内存同步屏障, In addition, no instruction that appears in program order after the DSB instruction can execute until the DSB completes), 最后再执行sev通知其它阻塞的线程.

 1 static inline void do_raw_spin_unlock(raw_spinlock_t *lock) __releases(lock)
 2 {
 3     arch_spin_unlock(&lock->raw_lock);
 4     __release(lock);
 5 }
 6 static inline void arch_spin_unlock(arch_spinlock_t *lock)
 7 {
 8     smp_mb();
 9     lock->tickets.owner++;
10     dsb_sev();
11 }

自旋锁的其它接口与其类似, 这里就不详细分析了, 如想更多了解相关api可以看下include/linux/spinlock.h头文件首部的注释, 这里简要说下几个常用的api. spin_lock_irq()/spin_unlock_irq()多了个开关中断的步骤, spin_lock_irqsave()/spin_unlock_irqrestore()多了开关中断并保存中断状态的步骤. 各个接口的应用场景, 如果临界资源不会在中断中被访问使用最基础的版本, 否则来自中断的访问需要保存中断状态(可能存在高优先级中断抢占低优先级中断), 而非中断的访问仅关闭中断即可.

2. 信号量

自旋锁在获取锁失败时会不断重复, 这对系统而言是低效的(尽管使用wfe指令降低功耗, 但是cpu仍然没有工作), 因此内核在此基础上引入了信号量. 我们先来看下信号量semaphore(defined in include/linux/semaphore.h)的定义.

1 struct semaphore {
2     raw_spinlock_t lock;
3     unsigned int count;
4     struct list_head wait_list;
5 };

semaphore中的lock用于内部加锁(用于保护结构体内其它成员), count为使用计数(即最多有多少个任务可以同时获取该锁), wait_list为等待队列(因获取不到信号量而阻塞的任务的队列). 通常我们使用DEFINE_SEMAPHORE()(defined in include/linux/semaphore.h)定义并初始化一个全局信号量, 使用sema_init()(defined in include/linux/semaphore.h)初始化一个信号量, 使用down()/down_trylock()(defined in kernel/semaphore.c)获取锁, 使用up()(defined in kernel/semaphore.c)释放锁. 我们先来看看获取锁的实现.

 1 void down(struct semaphore *sem)
 2 {
 3     unsigned long flags;
 4     raw_spin_lock_irqsave(&sem->lock, flags);
 5     if (likely(sem->count > 0))
 6         sem->count--;
 7     else
 8         __down(sem);
 9     raw_spin_unlock_irqrestore(&sem->lock, flags);
10 }
11 EXPORT_SYMBOL(down);
12 static noinline void __sched __down(struct semaphore *sem)
13 {
14     __down_common(sem, TASK_UNINTERRUPTIBLE, MAX_SCHEDULE_TIMEOUT);
15 }
16 static inline int __sched __down_common(struct semaphore *sem, long state, long timeout)
17 {
18     struct task_struct *task = current;
19     struct semaphore_waiter waiter;
20     list_add_tail(&waiter.list, &sem->wait_list);
21     waiter.task = task;
22     waiter.up = false;
23     for (;;) {
24         if (signal_pending_state(state, task))
25             goto interrupted;
26         if (unlikely(timeout <= 0))
27             goto timed_out;
28         __set_task_state(task, state);
29         raw_spin_unlock_irq(&sem->lock);
30         timeout = schedule_timeout(timeout);
31         raw_spin_lock_irq(&sem->lock);
32         if (waiter.up)
33             return 0;
34     }
35  timed_out:
36     list_del(&waiter.list);
37     return -ETIME;
38  interrupted:
39     list_del(&waiter.list);
40     return -EINTR;
41 }

down()首先对自旋锁加锁, 如果使用计数非零则自减后解锁并返回. 如果无剩余使用计数则调用__down_common()(defined in kernel/semaphore.c)将该任务加入信号量的等待队列并开启调度, 在for循环中会将task状态置为state(__down()调用时传参为TASK_UNINTERRUPTIBLE, 即不可中断睡眠), 因为之后该task将调度出去所以还要先解锁sem->lock, 然后调用schedule_timeout()(defined in kernel/timer.c)执行调度.

 1 signed long __sched schedule_timeout(signed long timeout)
 2 {
 3     struct timer_list timer;
 4     unsigned long expire;
 5     switch (timeout)
 6     {
 7     case MAX_SCHEDULE_TIMEOUT:
 8         schedule();
 9         goto out;
10     default:
11         if (timeout < 0) {
12             printk(KERN_ERR "schedule_timeout: wrong timeout value %lx\n", timeout);
13             dump_stack();
14             current->state = TASK_RUNNING;
15             goto out;
16         }
17     }
18     expire = timeout + jiffies;
19     setup_timer_on_stack(&timer, process_timeout, (unsigned long)current);
20     __mod_timer(&timer, expire, false, TIMER_NOT_PINNED);
21     schedule();
22     del_singleshot_timer_sync(&timer);
23     destroy_timer_on_stack(&timer);
24     timeout = expire - jiffies;
25  out:
26     return timeout < 0  0 : timeout;
27 }
28 EXPORT_SYMBOL(schedule_timeout);

schedule_timeout()中会先判断两种特殊情况: timeout为MAX_SCHEDULE_TIMEOUT(__down()的情况)及timeout<0但非MAX_SCHEDULE_TIMEOUT情况. 前者即无超时的调度(即该task的唤醒仅可能因另一task释放信号量唤醒等待队列, 见下文描述), 后者则认为定时器已到期直接唤醒task. 否则开启一个定时器并执行调度, 当调度返回时会销毁定时器.
当调度返回时需重新获取sem->lock, 然后再判断waiter.up, 为true即返回. 否则说明仍未竞争到锁, 重新进行调度, 当然此时state已可能发生变化, task也可能会有信号挂起, 需优先判断.
让我们回头看看up()是如何释放信号量并唤醒等待队列的.

 1 void up(struct semaphore *sem)
 2 {
 3     unsigned long flags;
 4     raw_spin_lock_irqsave(&sem->lock, flags);
 5     if (likely(list_empty(&sem->wait_list)))
 6         sem->count++;
 7     else
 8         __up(sem);
 9     raw_spin_unlock_irqrestore(&sem->lock, flags);
10 }
11 EXPORT_SYMBOL(up);
12 static noinline void __sched __up(struct semaphore *sem)
13 {
14     struct semaphore_waiter *waiter = \
15         list_first_entry(&sem->wait_list, struct semaphore_waiter, list);
16     list_del(&waiter->list);
17     waiter->up = true;
18     wake_up_process(waiter->task);
19 }

首先还是加锁保护, 如果等待队列为空即说明没有更多的竞争者, 直接返回使用计数即可(不像down()中那样用使用计数判断是因为可能使用计数为0同时等待队列为空的情况, 对于初始化count为1时情况尤其常见). 否则需要调用__up()唤醒等待队列(队列非空就无需增加使用计数, 反正会被唤醒的task取走), 在__up()中会寻找等待队列中第一个节点, 将其从链表中删除并标记waiter->up为true, 最后唤醒对应的task, 被唤醒的task检查到waiter->up为true则结束睡眠, 否则(由于其它原因唤醒)则开始新一轮调度.

最后我们看下down_trylock()/down_interruptible()与down()的区别. 普通的down()没有返回值且会因获取不到信号量而阻塞, 如果我们需要非阻塞的获取一个信号量就需要使用down_trylock(), 该接口只会获取一次信号量, 返回0表示获取成功, 返回1表示获取失败, 因此该接口可以用于中断上下文. 而down_interruptible()又是另一种作用, 该接口允许睡眠被信号中断(在调用__down_common()时标记state为TASK_INTERRUPTIBLE), 发生中断后返回-EINTR, 而正常获取锁的返回值是0. down_timeout()/down_killable()与此类似, 就不赘述了.

 1 int down_trylock(struct semaphore *sem)
 2 {
 3     unsigned long flags;
 4     int count;
 5     raw_spin_lock_irqsave(&sem->lock, flags);
 6     count = sem->count - 1;
 7     if (likely(count >= 0))
 8         sem->count = count;
 9     raw_spin_unlock_irqrestore(&sem->lock, flags);
10     return (count < 0);
11 }
12 EXPORT_SYMBOL(down_trylock);
13 int down_interruptible(struct semaphore *sem)
14 {
15     unsigned long flags;
16     int result = 0;
17     raw_spin_lock_irqsave(&sem->lock, flags);
18     if (likely(sem->count > 0))
19         sem->count--;
20     else
21         result = __down_interruptible(sem);
22     raw_spin_unlock_irqrestore(&sem->lock, flags);
23     return result;
24 }
25 EXPORT_SYMBOL(down_interruptible);

关于信号量的小结: 信号量是对自旋锁的封装, 信号量可以最多被count个任务持有(count值由信号量初始化时确定). 当计数耗尽后剩余任务只能(通过调度自身的方式)阻塞等待, 因此信号量一般不能用于中断上下文. 但是对于无需等待的操作(down_trylock()/up())是可以在中断上下文中使用的. 由于以上的特性, 信号量比自旋锁更适合在逻辑代码中用作保护. 另外被阻塞的任务是按请求的先后顺序依次唤醒的.

3. 原子计数

对单个32位(64位需使用64位对应的接口)变量进行原子操作. 常用接口有atomic_inc(v)/atomic_dec_and_test(v)(defined in arch/arm/include/asm/atomic.h), 具体实现如下(64位版本略).

 1 #if __LINUX_ARM_ARCH__ >= 6
 2 static inline void atomic_add(int i, atomic_t *v)
 3 {
 4     unsigned long tmp;
 5     int result;
 6     __asm__ __volatile__("@ atomic_add\n"
 7 "1: ldrex %0, [%3]\n"
 8 "   add %0, %0, %4\n"
 9 "   strex %1, %0, [%3]\n"
10 "   teq %1, #0\n"
11 "   bne 1b"
12     : "=&r" (result), "=&r" (tmp), "+Qo" (v->counter)
13     : "r" (&v->counter), "Ir" (i)
14     : "cc");
15 }
16 static inline int atomic_sub_return(int i, atomic_t *v)
17 {
18     unsigned long flags;
19     int val;
20     raw_local_irq_save(flags);
21     val = v->counter;
22     v->counter = val -= i;
23     raw_local_irq_restore(flags);
24     return val;
25 }
26 #else
27 #ifdef CONFIG_SMP
28 #error SMP not supported on pre-ARMv6 CPUs
29 #endif
30 static inline int atomic_add_return(int i, atomic_t *v)
31 {
32     unsigned long flags;
33     int val;
34     raw_local_irq_save(flags);
35     val = v->counter;
36     v->counter = val += i;
37     raw_local_irq_restore(flags);
38     return val;
39 }
40 #define atomic_add(i, v) (void) atomic_add_return(i, v)
41 static inline int atomic_sub_return(int i, atomic_t *v)
42 {
43     unsigned long flags;
44     int val;
45     raw_local_irq_save(flags);
46     val = v->counter;
47     v->counter = val -= i;
48     raw_local_irq_restore(flags);
49     return val;
50 }
51 #endif
52 #define atomic_inc(v) atomic_add(1, v)
53 #define atomic_dec_and_test(v) (atomic_sub_return(1, v) == 0)

原子操作在ARMv6前后架构上实现不同, 原因是ARMv6引入SMP. 对于单处理器架构引发竞争的可能只有中断(调度的话只要不是主动让出cpu, 时间片到期仍需要使能中断), 所以仅需关中断, 修改内存数据再使能中断即可. 对于SMP架构仅仅关中断肯定是不够的(因为可能同时有两个线程分别在两个核上访问一个内存数据), 所以对独占资源的操作从对cpu的互斥转变为对内存的互斥访问. strex与ldrex指令是str与ldr指令的exclusive版本, 内核会尝试独占访问该内存, 修改数据并独占更新该内存, 如果更新失败则重复以上步骤(类似cas指令).

4. 原子位操作

与原子操作类似, 不同的是修改特定位(因此需要考虑字节序问题). 常用的有test_and_set_bit(nr, p)/test_and_clear_bit(nr, p)(defined in arch/arm/include/asm/bitops.h), 由于CP15可以设置字节序, 此处仅列举原生字节序下接口.

#ifndef CONFIG_SMP
#define ATOMIC_BITOP(name,nr,p) \
    (__builtin_constant_p(nr)  ____atomic_##name(nr, p) : _##name(nr,p))
#else
#define ATOMIC_BITOP(name, nr, p) _##name(nr, p)
#endif

#define test_and_set_bit(nr, p) ATOMIC_BITOP(test_and_set_bit, nr, p)
#define test_and_clear_bit(nr, p) ATOMIC_BITOP(test_and_clear_bit, nr, p)

我们先来看下非SMP架构下的接口, __builtin_constant_p()用于判断一个变量在编译期间是否为常量, 如果是则使用____atomic_*开头的函数, 否则与SMP架构处理一致.
我们以____atomic_test_and_set_bit()(defined in arch/arm/include/asm/bitops.h)为例看下该类接口的实现, 该函数实质仍然是关中断, 修改指定位再使能中断, 返回值指明test是否成功.

static inline int ____atomic_test_and_set_bit(unsigned int bit, volatile unsigned long *p)
{
    unsigned long flags;
    unsigned int res;
    unsigned long mask = 1UL << (bit & 31);

p += bit >> 5;

raw_local_irq_save(flags);
    res = *p;
    *p = res | mask;
    raw_local_irq_restore(flags);

return (res & mask) != 0;
}

我们再来看下SMP架构下的接口, 以_test_and_clear_bit()(defined in arch/arm/lib/testclearbit.S)为例.

1 #include <linux/linkage.h>
2 #include <asm/assembler.h>
3 #include "bitops.h"
4 .text
5 testop _test_and_clear_bit, bicne, strne

testop(defined in arch/arm/lib/bitops.h)宏作用是生成一个名为name的汇编函数, 和____atomic_test_and_set_bit()实现类似, 区别在于关中断的指令被替换成dmb指令, dmb之间的str与ldr指令被替换为独占访问, 独占更新后会判断返回值, 失败则重复以上步骤.

 1 #if __LINUX_ARM_ARCH__ >= 5
 2 .macro testop, name, instr, store
 3     ENTRY(\name)
 4     UNWIND(.fnstart)
 5     ands ip, r1, #3
 6     strneb r1, [ip]
 7     mov r2, #1
 8     and r3, r0, #31
 9     mov r0, r0, lsr #5
10     add r1, r1, r0, lsl #2
11     mov r3, r2, lsl r3
12     smp_dmb
13 1:  ldrex r2, [r1]
14     ands r0, r2, r3
15     \instr r2, r2, r3
16     strex ip, r2, [r1]
17     cmp ip, #0
18     bne 1b
19     smp_dmb
20     cmp r0, #0
21     movne r0, #1
22 2:  bx lr
23     UNWIND(.fnend)
24     ENDPROC(\name)
25 .endm
26 #else
27 .macro testop, name, instr, store
28     ENTRY(\name)
29     UNWIND(.fnstart)
30     ands ip, r1, #3
31     strneb r1, [ip]
32     and r3, r0, #31
33     mov r0, r0, lsr #5
34     save_and_disable_irqs ip
35     ldr r2, [r1, r0, lsl #2]!
36     mov r0, #1
37     tst r2, r0, lsl r3
38     \instr r2, r2, r0, lsl r3
39     \store r2, [r1]
40     moveq r0, #0
41     restore_irqs ip
42     mov pc, lr
43     UNWIND(.fnend)
44     ENDPROC(\name)
45 .endm
46 #endif

转载于:https://www.cnblogs.com/Five100Miles/p/8783545.html

内核常见锁的机制与实现分析1相关推荐

  1. Linux内核中锁机制之完成量、互斥量

    在上一篇博文中笔者分析了关于信号量.读写信号量的使用及源码实现,接下来本篇博文将讨论有关完成量和互斥量的使用和一些经典问题. 八.完成量 下面讨论完成量的内容,首先需明确完成量表示为一个执行单元需要等 ...

  2. linux 信号量锁 内核,Linux内核中锁机制之信号量、读写信号量

    在上一篇博文中笔者分析了关于内存屏障.读写自旋锁以及顺序锁的相关内容,本篇博文将着重讨论有关信号量.读写信号量的内容. 六.信号量 关于信号量的内容,实际上它是与自旋锁类似的概念,只有得到信号量的进程 ...

  3. 大话Linux内核中锁机制之原子操作、自旋锁【转】

    转自:http://blog.sina.com.cn/s/blog_6d7fa49b01014q7p.html 多人会问这样的问题,Linux内核中提供了各式各样的同步锁机制到底有何作用?追根到底其实 ...

  4. 常见的锁策略、synchronized中的锁优化机制

    一.常见的锁策略 锁策略,和普通程序猿基本没啥关系,和 "实现锁" 的人才有关系的 这里所提到的锁策略,和 Java 本身没关系,适用于所有和 "锁" 相关的情 ...

  5. Android锁屏机制原理分析

    转载自:http://www.2cto.com/kf/201401/273898.html 春节前最后几天了,工作上几乎没有什么要做.大致整理下之前工作中写的文档,PPT,手册. 由于去年一年完全转到 ...

  6. mysql更新锁机制_mysql查询更新时的锁表机制分析

    欢迎进入Linux社区论坛,与200万技术人员互动交流 >>进入 为了给高并发情况下的mysql进行更好的优化,有必要了解一下mysql查询更新时的锁表机制. 一.概述 MySQL有三种锁 ...

  7. mysql查询更新时的锁表机制分析

    为了给高并发情况下的mysql进行更好的优化,有必要了解一下mysql查询更新时的锁表机制. 一.概述 MySQL有三种锁的级别:页级.表级.行级. MyISAM和MEMORY存储引擎采用的是表级锁( ...

  8. Linux内核OOM机制的详细分析

    Linux 内核有个机制叫OOM killer(Out-Of-Memory killer),该机制会监控那些占用内存过大,尤其是瞬间很快消耗大量内存的进程,为了防止内存耗尽而内核会把该进程杀掉.典型的 ...

  9. Linux内核OOM机制的详细分析和防止进程被OOM杀死的方法

    转载自:http://blog.chinaunix.net/uid-29242873-id-3942763.html Linux 内核有个机制叫OOM killer(Out-Of-Memory kil ...

最新文章

  1. c语言中变量有什么作用是什么,C语言里面局部变量和临时变量有什么区别?
  2. ?????nested exception is org.apache.ibatis.reflection.ReflectionException: There is no getter for pr
  3. 幼儿园 php,input.php
  4. 设计PNG免抠素材|提高调性!透明液态气泡免扣素材
  5. 硬盘属于计算机主机吗,电脑主机换硬盘后还是不是原来的主机?
  6. 设计模式—23种设计模式总览
  7. 文件浏览器及数码相框 -2.3.2-freetype_arm-1
  8. 虚幻4引擎实现自动开门蓝图
  9. 网状结构(图)图的存储(邻接矩阵、邻接表)、图的遍历(深度DFS、广度BFS)、图的最短路径
  10. 数字转换成中文汉字数字
  11. 戴尔笔记本插入耳机没有反应
  12. Hadoop2.x和3.x版本区别
  13. 别具一格的沙漠星空跨年,COLMO与百位超级个体揭露未来营养生活图景
  14. 页面截图导出为PDF,以及PDF强行截断分页问题的处理
  15. LYL程序员小白的理解之简单易懂的Arduino的串口通讯
  16. 2021上半年全国计算机二级报名江苏,江苏2021年3月全国计算机等级考试报名公告...
  17. 旋转矩阵(侧重推理)
  18. 计算机程序和系统股票趋势,CTA投资与程序化交易:从经典CTA到现代研究方法
  19. idea打包war包方法,以及将war包部署到tomcat详细步骤
  20. 中国医科大学806计算机应用基础,中国医科大学2021年硕士研究生入学考试自命题考试科目...

热门文章

  1. Python数据分析:pandas玩转Excel(二)
  2. linux二进制数据16进制数据转换,[轉]16进制字符文本/二进制文件迷你互转器
  3. mysql delete 标记_MySQL删除操作其实是假删除
  4. webpack打包流程_了不起的 Webpack 构建流程学习指南
  5. python3.4和3.5的区别_在3.4和3.5之间的python中的协同程序,我如何保持支持的兼容性?...
  6. 山东大学青岛计算机学院贺平,计算机学院学子在全国数学建模竞赛中再获佳绩...
  7. 无心剑随感《译诗但求达意传神》
  8. 安卓学习笔记13:安卓触摸事件
  9. 【BZOJ1854】【codevs3358】游戏,二分图最大匹配
  10. opencv mat赋值_【3】OpenCV图像处理模块(18)重映射