大多数的并行程序都需要在底层使用锁机制进行同步,简单来讲,锁无非是一套简单的原语,它们保证程序(或进程)对某一资源的互斥访问来维持数据的一致性,如果没有锁机制作为保证,多个线程可能同时访问某一资源,假设没有精心设计的(很复杂)无锁算法保证程序正确执行,那么后果往往非常严重的。无锁算法难于使用,所以一般而言都使用锁来保证程序的一致性。

如果更新某一数据结构的操作比较缓慢,那么互斥的锁是一个比较好的选择,此时如果某一进程或线程被阻塞,操作系统会重新接管控制权,并调度其他进程(或线程)继续执行,原先被阻塞的进程处于睡眠状态。控制权的转换伴随着进程上下文的切换,而这往往是一个昂贵而耗时的操作,所以对于等待锁的时间比较短,那么应该使用其他更高效的方法。


自旋锁(spinlock)

自旋锁(Spinlock)是一种常用的互斥(Mutual Exclusion)同步原语(Synchronization Primitive),试图进入临界区(Critical Section)的线程使用忙等待(Busy Waiting)的方式检测锁的状态,若锁未被持有则尝试获取。与其他锁不同,自旋锁仅仅只是“自旋”,即不停地检查某一锁是否已经被解开,自旋锁是非常快的,所以加锁-解锁操作耗时很短,然而,自旋锁也不是万精油,当因互斥导致进程睡眠的时间很长时,使用自旋锁是不明智的选择。

下面我们考虑实现自己的自旋锁,首先我们需要一些原语,幸好GCC已经为我们提供了一些内置函数,

#define atomic_xadd(P, V) __sync_fetch_and_add((P), (V))#define cmpxchg(P, O, N) __sync_val_compare_and_swap((P), (O), (N))#define atomic_inc(P) __sync_add_and_fetch((P), 1)#define atomic_dec(P) __sync_add_and_fetch((P), -1) #define atomic_add(P, V) __sync_add_and_fetch((P), (V))#define atomic_set_bit(P, V) __sync_or_and_fetch((P), 1<<(V))#define atomic_clear_bit(P, V) __sync_and_and_fetch((P), ~(1<<(V)))

然而,我们也需要自己实现其他的几个原子操作,如下:

/* Compile read-write barrier */#define barrier() asm volatile("": : :"memory")

/* Pause instruction to prevent excess processor bus usage */ #define cpu_relax() asm volatile("pause\n": : :"memory")

/* Atomic exchange (of various sizes) */static inline void *xchg_64(void *ptr, void *x){    __asm__ __volatile__("xchgq %0,%1"                :"=r" ((unsigned long long) x)                :"m" (*(volatile long long *)ptr), "0" ((unsigned long long) x)                :"memory");

return x;}

static inline unsigned xchg_32(void *ptr, unsigned x){    __asm__ __volatile__("xchgl %0,%1"                :"=r" ((unsigned) x)                :"m" (*(volatile unsigned *)ptr), "0" (x)                :"memory");

return x;}

static inline unsigned short xchg_16(void *ptr, unsigned short x){    __asm__ __volatile__("xchgw %0,%1"                :"=r" ((unsigned short) x)                :"m" (*(volatile unsigned short *)ptr), "0" (x)                :"memory");

return x;}

/* Test and set a bit */static inline char atomic_bitsetandtest(void *ptr, int x){char out;    __asm__ __volatile__("lock; bts %2,%1\n""sbb %0,%0\n"                :"=r" (out), "=m" (*(volatile long long *)ptr)                :"Ir" (x)                :"memory");

return out;}

自旋锁可以使用交换原语实现,如下:

#define EBUSY 1typedef unsigned spinlock;

static void spin_lock(spinlock *lock){while (1)    {if (!xchg_32(lock, EBUSY)) return;

while (*lock) cpu_relax();    }}

static void spin_unlock(spinlock *lock){    barrier();    *lock = 0;}

static int spin_trylock(spinlock *lock){return xchg_32(lock, EBUSY);}

上面的自旋锁已经能够工作,但是也会产生问题,因为多个线程可能产生竞争,因为在锁释放的时候其他的每个线程都想获得锁。这会导致处理器总线的负载增大,从而使性能降低,所以接下来我们将实现另外一种自旋锁,该自旋锁能够感知下一个获得锁的进程或线程,因此能够大大减轻处理器总线负载。

下面我们介绍另外一种自旋锁,MCS自旋锁,该锁使用链表维护申请者的请求序列,

typedef struct mcs_lock_t mcs_lock_t;struct mcs_lock_t{    mcs_lock_t *next;int spin;};typedef struct mcs_lock_t *mcs_lock;

static void lock_mcs(mcs_lock *m, mcs_lock_t *me){    mcs_lock_t *tail;

    me->next = NULL;    me->spin = 0;

    tail = xchg_64(m, me);

/* No one there? */if (!tail) return;

/* Someone there, need to link in */    tail->next = me;

/* Make sure we do the above setting of next. */    barrier();

/* Spin on my spin variable */while (!me->spin) cpu_relax();

return;}

static void unlock_mcs(mcs_lock *m, mcs_lock_t *me){/* No successor yet? */if (!me->next)    {/* Try to atomically unlock */if (cmpxchg(m, me, NULL) == me) return;

/* Wait for successor to appear */while (!me->next) cpu_relax();    }

/* Unlock next one */    me->next->spin = 1;    }

static int trylock_mcs(mcs_lock *m, mcs_lock_t *me){    mcs_lock_t *tail;

    me->next = NULL;    me->spin = 0;

/* Try to lock */    tail = cmpxchg(m, NULL, &me);

/* No one was there - can quickly return */if (!tail) return 0;

return EBUSY;}

当然,MCS锁也是有问题的,因为它的API除了需要传递锁的地址外,还需要传递另外一个结构,下面介绍另外一种自旋锁算法,K42锁算法,

typedef struct k42lock k42lock;struct k42lock{    k42lock *next;    k42lock *tail;};

static void k42_lock(k42lock *l){    k42lock me;    k42lock *pred, *succ;    me.next = NULL;

    barrier();

    pred = xchg_64(&l->tail, &me);if (pred)    {        me.tail = (void *) 1;

        barrier();        pred->next = &me;        barrier();

while (me.tail) cpu_relax();    }

    succ = me.next;

if (!succ)    {        barrier();        l->next = NULL;

if (cmpxchg(&l->tail, &me, &l->next) != &me)        {while (!me.next) cpu_relax();

            l->next = me.next;        }    }else    {        l->next = succ;    }}

static void k42_unlock(k42lock *l){    k42lock *succ = l->next;

    barrier();

if (!succ)    {if (cmpxchg(&l->tail, &l->next, NULL) == (void *) &l->next) return;

while (!l->next) cpu_relax();        succ = l->next;    }

    succ->tail = NULL;}

static int k42_trylock(k42lock *l){if (!cmpxchg(&l->tail, NULL, &l->next)) return 0;

return EBUSY;}

K42和MCS锁都需要遍历链表才能找到下一个最可能获得锁的进程(或线程),有时查找可能比较费时,所以我们再次改进后:

typedef struct listlock_t listlock_t;struct listlock_t{    listlock_t *next;int spin;};typedef struct listlock_t *listlock;

#define LLOCK_FLAG    (void *)1

static void listlock_lock(listlock *l){    listlock_t me;    listlock_t *tail;

/* Fast path - no users  */if (!cmpxchg(l, NULL, LLOCK_FLAG)) return;

    me.next = LLOCK_FLAG;    me.spin = 0;

/* Convert into a wait list */    tail = xchg_64(l, &me);

if (tail)    {/* Add myself to the list of waiters */if (tail == LLOCK_FLAG) tail = NULL;        me.next = tail;

/* Wait for being able to go */while (!me.spin) cpu_relax();

return;    }

/* Try to convert to an exclusive lock */if (cmpxchg(l, &me, LLOCK_FLAG) == &me) return;

/* Failed - there is now a wait list */    tail = *l;

/* Scan to find who is after me */while (1)    {/* Wait for them to enter their next link */while (tail->next == LLOCK_FLAG) cpu_relax();

if (tail->next == &me)        {/* Fix their next pointer */            tail->next = NULL;

return;        }

        tail = tail->next;    }}

static void listlock_unlock(listlock *l){    listlock_t *tail;    listlock_t *tp;

while (1)    {        tail = *l;

        barrier();

/* Fast path */if (tail == LLOCK_FLAG)        {if (cmpxchg(l, LLOCK_FLAG, NULL) == LLOCK_FLAG) return;

continue;        }

        tp = NULL;

/* Wait for partially added waiter */while (tail->next == LLOCK_FLAG) cpu_relax();

/* There is a wait list */if (tail->next) break;

/* Try to convert to a single-waiter lock */if (cmpxchg(l, tail, LLOCK_FLAG) == tail)        {/* Unlock */            tail->spin = 1;

return;        }

        cpu_relax();    }

/* A long list */    tp = tail;    tail = tail->next;

/* Scan wait list */while (1)    {/* Wait for partially added waiter */while (tail->next == LLOCK_FLAG) cpu_relax();

if (!tail->next) break;

        tp = tail;        tail = tail->next;    }

    tp->next = NULL;

    barrier();

/* Unlock */    tail->spin = 1;}

static int listlock_trylock(listlock *l){/* Simple part of a spin-lock */if (!cmpxchg(l, NULL, LLOCK_FLAG)) return 0;

/* Failure! */return EBUSY;

等等,还可以改进,可以在自旋锁里面嵌套一层自旋锁,

typedef struct bitlistlock_t bitlistlock_t;struct bitlistlock_t{    bitlistlock_t *next;int spin;};

typedef bitlistlock_t *bitlistlock;

#define BLL_USED    ((bitlistlock_t *) -2LL)

static void bitlistlock_lock(bitlistlock *l){    bitlistlock_t me;    bitlistlock_t *tail;

/* Grab control of list */while (atomic_bitsetandtest(l, 0)) cpu_relax();

/* Remove locked bit */    tail = (bitlistlock_t *) ((uintptr_t) *l & ~1LL);

/* Fast path, no waiters */if (!tail)    {/* Set to be a flag value */        *l = BLL_USED;return;    }

if (tail == BLL_USED) tail = NULL;    me.next = tail;    me.spin = 0;

    barrier();

/* Unlock, and add myself to the wait list */    *l = &me;

/* Wait for the go-ahead */while (!me.spin) cpu_relax();}

static void bitlistlock_unlock(bitlistlock *l){    bitlistlock_t *tail;    bitlistlock_t *tp;

/* Fast path - no wait list */if (cmpxchg(l, BLL_USED, NULL) == BLL_USED) return;

/* Grab control of list */while (atomic_bitsetandtest(l, 0)) cpu_relax();

    tp = *l;

    barrier();

/* Get end of list */    tail = (bitlistlock_t *) ((uintptr_t) tp & ~1LL);

/* Actually no users? */if (tail == BLL_USED)    {        barrier();        *l = NULL;return;    }

/* Only one entry on wait list? */if (!tail->next)    {        barrier();

/* Unlock bitlock */        *l = BLL_USED;

        barrier();

/* Unlock lock */        tail->spin = 1;

return;    }

    barrier();

/* Unlock bitlock */    *l = tail;

    barrier();

/* Scan wait list for start */do    {        tp = tail;        tail = tail->next;    }while (tail->next);

    tp->next = NULL;

    barrier();

/* Unlock */    tail->spin = 1;}

static int bitlistlock_trylock(bitlistlock *l){if (!*l && (cmpxchg(l, NULL, BLL_USED) == NULL)) return 0;

return EBUSY;}

还可以再次改进,如下

/* Bit-lock for editing the wait block */#define SLOCK_LOCK                 1#define SLOCK_LOCK_BIT            0

/* Has an active user */#define SLOCK_USED                2

#define SLOCK_BITS                3

typedef struct slock slock;struct slock{    uintptr_t p;};

typedef struct slock_wb slock_wb;struct slock_wb{/*     * last points to the last wait block in the chain.     * The value is only valid when read from the first wait block.*/    slock_wb *last;

/* next points to the next wait block in the chain. */    slock_wb *next;

/* Wake up? */int wake;};

/* Wait for control of wait block */static slock_wb *slockwb(slock *s){    uintptr_t p;

/* Spin on the wait block bit lock */while (atomic_bitsetandtest(&s->p, SLOCK_LOCK_BIT))    {        cpu_relax();    }

    p = s->p;

if (p <= SLOCK_BITS)    {/* Oops, looks like the wait block was removed. */        atomic_dec(&s->p);return NULL;    }

return (slock_wb *)(p - SLOCK_LOCK);}

static void slock_lock(slock *s){    slock_wb swblock;

/* Fastpath - no other readers or writers */if (!s->p && (cmpxchg(&s->p, 0, SLOCK_USED) == 0)) return;

/* Initialize wait block */    swblock.next = NULL;    swblock.last = &swblock;    swblock.wake = 0;

while (1)    {        uintptr_t p = s->p;

        cpu_relax();

/* Fastpath - no other readers or writers */if (!p)        {if (cmpxchg(&s->p, 0, SLOCK_USED) == 0) return;continue;        }

if (p > SLOCK_BITS)        {            slock_wb *first_wb, *last;

            first_wb = slockwb(s);if (!first_wb) continue;

            last = first_wb->last;            last->next = &swblock;            first_wb->last = &swblock;

/* Unlock */            barrier();            s->p &= ~SLOCK_LOCK;

break;        }

/* Try to add the first wait block */if (cmpxchg(&s->p, p, (uintptr_t)&swblock) == p) break;    }

/* Wait to acquire exclusive lock */while (!swblock.wake) cpu_relax();}

static void slock_unlock(slock *s){    slock_wb *next;    slock_wb *wb;    uintptr_t np;

while (1)    {        uintptr_t p = s->p;

/* This is the fast path, we can simply clear the SRWLOCK_USED bit. */if (p == SLOCK_USED)        {if (cmpxchg(&s->p, SLOCK_USED, 0) == SLOCK_USED) return;continue;        }

/* There's a wait block, we need to wake the next pending user */        wb = slockwb(s);if (wb) break;

        cpu_relax();    }

    next = wb->next;if (next)    {/*         * There's more blocks chained, we need to update the pointers         * in the next wait block and update the wait block pointer.*/        np = (uintptr_t) next;

        next->last = wb->last;    }else    {/* Convert the lock to a simple lock. */        np = SLOCK_USED;    }

    barrier();/* Also unlocks lock bit */    s->p = np;    barrier();

/* Notify the next waiter */    wb->wake = 1;

/* We released the lock */}

static int slock_trylock(slock *s){/* No other readers or writers? */if (!s->p && (cmpxchg(&s->p, 0, SLOCK_USED) == 0)) return 0;

return EBUSY;}

下面是另外一种实现方式,称为stack-lock算法,

typedef struct stlock_t stlock_t;struct stlock_t{    stlock_t *next;};

typedef struct stlock_t *stlock;

static __attribute__((noinline)) void stlock_lock(stlock *l){    stlock_t *me = NULL;

    barrier();    me = xchg_64(l, &me);

/* Wait until we get the lock */while (me) cpu_relax();}

#define MAX_STACK_SIZE    (1<<12)

static __attribute__((noinline)) int on_stack(void *p){int x;

    uintptr_t u = (uintptr_t) &x;

return ((u - (uintptr_t)p + MAX_STACK_SIZE) < MAX_STACK_SIZE * 2);}

static __attribute__((noinline)) void stlock_unlock(stlock *l){    stlock_t *tail = *l;    barrier();

/* Fast case */if (on_stack(tail))    {/* Try to remove the wait list */if (cmpxchg(l, tail, NULL) == tail) return;

        tail = *l;    }

/* Scan wait list */while (1)    {/* Wait for partially added waiter */while (!tail->next) cpu_relax();

if (on_stack(tail->next)) break;

        tail = tail->next;    }

    barrier();

/* Unlock */    tail->next = NULL;}

static int stlock_trylock(stlock *l){    stlock_t me;

if (!cmpxchg(l, NULL, &me)) return 0;

return EBUSY;}

改进后变成,

typedef struct plock_t plock_t;struct plock_t{    plock_t *next;};

typedef struct plock plock;struct plock{    plock_t *next;    plock_t *prev;    plock_t *last;};

static void plock_lock(plock *l){    plock_t *me = NULL;    plock_t *prev;

    barrier();    me = xchg_64(l, &me);

    prev = NULL;

/* Wait until we get the lock */while (me)    {/* Scan wait list for my previous */if (l->next != (plock_t *) &me)        {            plock_t *t = l->next;

while (me)            {if (t->next == (plock_t *) &me)                {                    prev = t;

while (me) cpu_relax();

goto done;                }

if (t->next) t = t->next;                cpu_relax();            }        }        cpu_relax();    }

done:        l->prev = prev;    l->last = (plock_t *) &me;}

static void plock_unlock(plock *l){    plock_t *tail;

/* Do I know my previous? */if (l->prev)    {/* Unlock */        l->prev->next = NULL;return;    }

    tail = l->next;    barrier();

/* Fast case */if (tail == l->last)    {/* Try to remove the wait list */if (cmpxchg(&l->next, tail, NULL) == tail) return;

        tail = l->next;    }

/* Scan wait list */while (1)    {/* Wait for partially added waiter */while (!tail->next) cpu_relax();

if (tail->next == l->last) break;

        tail = tail->next;    }

    barrier();

/* Unlock */    tail->next = NULL;}

static int plock_trylock(plock *l){    plock_t me;

if (!cmpxchg(&l->next, NULL, &me))    {        l->last = &me;return 0;    }

return EBUSY;}

下面介绍另外一种算法,ticket lock算法,实际上,Linux内核正是采用了该算法,不过考虑到执行效率,人家是以汇编形式写的,

typedef union ticketlock ticketlock;

union ticketlock{    unsigned u;struct    {        unsigned short ticket;        unsigned short users;    } s;};

static void ticket_lock(ticketlock *t){    unsigned short me = atomic_xadd(&t->s.users, 1);

while (t->s.ticket != me) cpu_relax();}

static void ticket_unlock(ticketlock *t){    barrier();    t->s.ticket++;}

static int ticket_trylock(ticketlock *t){    unsigned short me = t->s.users;    unsigned short menew = me + 1;    unsigned cmp = ((unsigned) me << 16) + me;    unsigned cmpnew = ((unsigned) menew << 16) + me;

if (cmpxchg(&t->u, cmp, cmpnew) == cmp) return 0;

return EBUSY;}

static int ticket_lockable(ticketlock *t){    ticketlock u = *t;    barrier();return (u.s.ticket == u.s.users);}

至此,自旋锁各种不同的实现介绍完毕,亲,你明白了吗?:)

(全文完)

自己动手实现自旋锁(spinlock)相关推荐

  1. 自旋锁SpinLock小案例

    自旋锁(spinlock) 是指尝试获取锁的线程不会立即阻塞,而是采用循环的方式去尝试获取锁,这样的好处是减少线程上下文切换的消耗,缺点是循环会消耗CPU

  2. 【转】自旋锁-SpinLock(.NET 4.0+)

    短时间锁定的情况下,自旋锁(spinlock)更快.(因为自旋锁本质上不会让线程休眠,而是一直循环尝试对资源访问,直到可用.所以自旋锁线程被阻塞时,不进行线程上下文切换,而是空转等待.对于多核CPU而 ...

  3. C#线程锁(自旋锁SpinLock、互斥锁Mutex、混合锁Monitor | lock)

    一.自旋锁 自旋锁是指当一个线程在获取锁对象的时候,如果锁已经被其它线程获取,那么这个线程将会循环等待,不断的去获取锁,直到获取到了锁.适合于原子操作时间非常短的场景 优点:避免了线程上下文切换.性能 ...

  4. linux驱动22:自旋锁spinlock

    自旋锁: 和信号量不同的是自旋锁可在不能休眠的代码中使用,如中断处理例程.在正确使用时,自旋锁通常比信号量具有更高的性能. 如果锁可用,则锁定位被设置,代码继续进入临界区:相反则代码进入忙循环并重复检 ...

  5. 自旋锁spinlock解析

    1 基础概念 自旋锁与相互排斥锁有点类似,仅仅是自旋锁不会引起调用者睡眠.假设自旋锁已经被别的运行单元保持.调用者就一直循环在那里看是否该自旋锁的保持者已经释放了锁."自旋"一词就 ...

  6. .net 4.0新特性-自旋锁(SpinLock)

    概念: http://baike.baidu.com/view/1250961.htm?fr=ala0_1_1 http://blog.csdn.net/wzhwho/archive/2009/05/ ...

  7. 嵌入式 自旋锁、互斥锁、读写锁、递归锁

    互斥锁(mutexlock): 最常使用于线程同步的锁:标记用来保证在任一时刻,只能有一个线程访问该对象,同一线程多次加锁操作会造成死锁:临界区和互斥量都可用来实现此锁,通常情况下锁操作失败会将该线程 ...

  8. Linux内核自旋锁使用笔记

    Reference: spin_lock_bh()与spin_unlock_bh() Linux内核自旋锁 Linux自旋锁 Spinlock - Wikipedia, the free encycl ...

  9. Linux内核之内核同步(三)——自旋锁

    自旋锁 上回,我们说到为了避免并发,防止竞争,内核提供了一些方法来实现对内核共享数据的保护.如果临界区只是一个变量,那么使用原子操作即可,但实际上临界区大多是一些数据操作的集合,这时候使用原子操作不太 ...

最新文章

  1. C# FileSystemWatcher 在监控文件夹和文件时的用法
  2. linux webapi测试,Webapi管理和性能测试工具WebBenchmark
  3. Oracle_高级功能(3) synonym和database link
  4. python文件读取每一行操作
  5. ssh 远程登录_C.4 彻底解决-新版本Sentaurus TCAD的SSH远程登录问题!!!
  6. Java垃圾回收jconsole分析
  7. 兮米安装包制作工具图文教程集锦电子书
  8. Qt Creator 2.8.1,qt4.8.5 需要含gcc4.4 的mingw
  9. matlab2c基础使用教程(实矩阵、复矩阵)
  10. VirtualBox虚拟机压缩减少体积
  11. 【WLAN】【基础知识】WIFI那些事儿之Beamforming
  12. jsp简介lamitry_[提拉米苏] 找人一起玩,今晚刚开的号
  13. matlab如何读取一个图片,怎么用Matlab读入并显示图片文件
  14. 1ppi等于多少dpi_Android开发之显示篇(弄懂ppi、dpi、pt、px、dp、dip、sp之间的关系只需这一篇)...
  15. 山东科技大学第二届ACM校赛解题报告
  16. Java笔试面试题三(编程算法)
  17. vgg16_weights_tf_dim_ordering_tf_kernels_notop.h5
  18. Radware:防御现代鱼叉式网络钓鱼攻击的方法
  19. 解决Windows 无法打开文件夹 找不到应用程序
  20. 投资-理财书籍免费阅读

热门文章

  1. Docker Swarm服务发现和负载均衡原理
  2. hdu5040 不错的广搜旋转的摄像头
  3. hdu4662 简单搜索打表
  4. C语言经典例14-将一个正整数分解质因数
  5. 【错误记录】IntelliJ IDEA 编译 Groovy 报错 ( Could not open/create prefs root node Software\JavaSoft\Prefs )
  6. 【Android 逆向】Android 逆向通用工具开发 ( Android 逆向通用工具组成部分 | 各模块间的关联 )
  7. 【Flutter】底部导航栏实现 ( BottomNavigationBar 底部导航栏 | BottomNavigationBarItem 导航栏条目 | PageView )
  8. 【错误记录】Flutter 组件报错 ( No Directionality widget found. | RichText widgets require a Directionality )
  9. 【设计模式】享元模式 实现 ( 实现流程 | 抽象享元类 | 具体享元类 | 享元工厂 | 用户调用 | 代码模板 )
  10. jQuery 轮播图