文章目录

  • 1. 基本原理
  • 2. 代码实现
    • 2.1 使用链表实现无锁队列
    • 2.2 使用数组实现环形无锁队列
  • 3. ABA 问题及解决
  • 4. 参考资料


1. 基本原理

源于1994年10月发表在国际并行与分布式会议上的论文【无锁队列的实现.pdf】。CAS(Compare And Swap,CAS维基百科)指令。CAS的实现可参考下面的代码:

bool CAS(int* pAddr, int nExpected, int nNew) atomically {if(*pAddr == nExpected) {*pAddr = nNew;return true;} else return false;
}
//CAS返回bool告知原子性交换是否成功

CAS 根据字面意思即可理解,就是对数据进行交换的一种原子操作。对应到 CPU 指令的话就是cmpchg
无锁队列的内部实现实际上也是原子操作,可以避免多线程调用出现的不可预知的情况,也即进行线程的并发同步。主要的核心就是函数__sync_bool_compare_and_swap,返回类型是 bool 型,原子交换成功返回 true,失败返回 false。



2. 代码实现


2.1 使用链表实现无锁队列

入队操作

Enqueue(EleType x) {Node* q = new Node();q->x = x;q->next = nullptr;do {Node* p = tail;} while(!CAS(p->next, nullptr, q));CAS(tail, p, q);
}

这样实现的话会导致若是某个线程在执行入队操作时,在CAS(tail, p, q)执行之前,也即将尾节点替换为新加入的入队节点前挂掉了,那么将导致其它的线程在执行入队操作CAS(p->next, nullptr, q)时一直返回 false,即无限循环于此,因为 p->next 已经更改为 q,但是tail并没有更新为 q 仍然是 p 。

基于此,可以对以上的入队操作进行改进,在do while();当中找到无锁队列的实际尾节点,即p->next==nullptr,再进行 tail 节点的替换,实现代码如下:

Enqueue(EleType x) {Node* q= new Node();q->x = x;q->next = nullptr;Node* p = tail;Node* oldP = tail;do {while(p->next != nullptr)p = p->next;} while(!CAS(p->next, nullptr, q));CAS(tail, oldP, q);
}

这样的话即使出现上文中讲述的更新 tail 前线程挂掉的情况,在进入到 do while(); 循环时,p 将会指定为无锁队列的实际尾部节点,从而CAS(p->next, nullptr, q)返回 true,结束循环,更新 tail 节点值。

出队操作

Dequeue() {do {Node* p = head->next;if(p == nullptr)return EMPTY;} while(CAS(head->next, p, p->next));return p->val;
}
//这里的head是一个哑头节点

模板无锁队列类的实现

有了如上的思想,那么就可以实现一个模板无锁队列类,代码如下:

// 定义一个链表实现队列
template <typename ElemType>
struct qnode // 链表节点
{struct qnode *_next;ElemType _data;
};template <typename ElemType>
class Queue
{private:struct qnode<ElemType> *volatile _head = NULL;  // 随着pop后指向的位置是不一样的, head不是固定的struct qnode<ElemType> *volatile _tail = NULL;public:Queue() {_head = _tail = new qnode<ElemType>;_head->_next = NULL;_tail->_next = NULL;printf("Queue _head:%p\n", _head);}void push_list(const ElemType& e) {struct qnode<ElemType>* p = new qnode<ElemType>;if (!p) return ; //p生成失败p->next = NULL;p->data = e;struct qnode<ElemType>* t = _tail;struct qnode<ElemType>* old_t = _tail;do {// 当非NULL的时候说明不是尾部,因此需要指向下一个节点while (t->next != NULL) {  t = t->next;}// 如果t->next为则null换为p} while (!__sync_bool_compare_and_swap(&t->next, NULL, p));// 如果尾部和原来的尾部相等,则换为p。__sync_bool_compare_and_swap(&_tail, old_t, p);}bool pop_list(ElemType& e) { //e作为传出参数,记录对头的元素值struct qnode<ElemType>* p = NULL;do {p = _head;if (p->next == NULL) return false;// 如果头部等于p,那么就指向p的下一个} while (!__sync_bool_compare_and_swap(&_head, p, p->next));e = p->data; //p为最初的head值,返回旧队头值delete p;p = NULL;return true;}
};

该模板类的实现,也即是记录队头和队尾节点,链表的头节点没有存放数据(同样是哑节点?):

  • push 到队尾的时候,先判断当前指针是否是队列的实际尾部,即使用CAS进行判断是不是,如果是,则 p->next == nullptr,将 p->next 替换为 q, 同时更新 tail 尾节点的值;如果不是,则在 do while(); 当中进行寻找实际队尾节点;
  • pop 出队头的时候,同样也是在 do while(); 循环当中判断队列是否非空,非空则将之前的节点值返回,并将其删除,更新 head 头节点的值;

2.2 使用数组实现环形无锁队列

此外,还可以使用数组实现,因为环形数组一经内存申请后,不会再涉及内存请求和释放:

  • 队列实现的形式是环形数组的形式;
  • 队列的元素的值,初始的时候是三种可能的值。HEAD、TAIL、EMPTY;
  • 数组一开始所有的元素都初始化为 EMPTY。有两个相邻的元素初始化为 HEAD 与 TAIL,代表着空队列;
  • 入队操作。假设数据 x 要入队列,定位 TAIL 的位置,使用 double-CAS 方法把 (TAIL, EMPTY) 更新成 (x, TAIL)。需要注意,如果找不到 (TAIL, EMPTY),则说明队列满了。
  • 出队操作。定位 HEAD 的位置,把 (HEAD, x) 更新成 (EMPTY, HEAD),并把 x 返回。同样需要注意,如果 x 是 TAIL,则说明队列为空。


3. ABA 问题及解决

简单的说就是线程A将当前值修改为10,此时线程B将值改为11,然后又有一个线程C把值又改为10,这样的话对于线程A来说取到的内存值和当前值是没变的,所以可以更新,但实际上是经过变化的,所以不符合实际逻辑的。

注意到CAS比较的是指针取内容得到的值,那么,假定某个线程准备出队操作,首先声明一个指向p指针head结点,接着要进行CAS操作,CAS(head,p,p->next)。假定在执行CAS操作之前,有个线程进行了入队操作,此时,head!=p,正常情形CAS(head,p,p->next)应该返回为false。但是,在CAS(head,p,p->next)之前,又有线程进行了入队操作,而入队的这个结点占用的内存恰恰是最开始的时候p所指向的内存,再恰恰经过一系列出队操作,使得当前头指针刚好指向刚刚入队操作的那块结点,最后,才开始,进行CAS操作。我们会发现原本应该返回为false的CAS操作,返回了true!(CAS比较的是内存地址所存放的值,==)。

解决ABA问题,可加入版本号这一控制信息,Java中有AtomicStampedReference类可以添加版本在比对内存值的时候加以区分。



4. 参考资料

  1. 基于CAS实现的无锁队列(多生产者多消费者)

  2. CAS无锁队列的原理及实现(附代码)

CAS无锁队列的实现相关推荐

  1. 无锁CAS/无锁队列

    高并发,读写十分频繁,会使用CAS 1 互斥锁 自旋锁 原子操作 锁住的代码耗时短:counter++操作,自旋锁有优势 锁住的代码耗时长:for_add操作,自旋锁无优势[因为在等待的时候消耗的CP ...

  2. CAS操作与无锁队列

    在多线程编程中,为了保证内存的可见性,我们加入了一些锁的机制,例如信号量,互斥锁,条件变量等等,但是锁的机制不是一个简单的机制,需要加入很多的控制,所以在使用中又有了一些轻量级的同步机制,例如vola ...

  3. .net 延时操作_锁、CAS操作和无锁队列的实现

    (给算法爱好者加星标,修炼编程内功) 来源:yishizuofei blog.csdn.net/yishizuofei/article/details/78353722 锁的机制 锁和人很像,有的人乐 ...

  4. 锁、CAS操作和无锁队列的实现

    锁的机制 锁和人很像,有的人乐观,总会想到好的一方面,所以只要越努力,就会越幸运:有的人悲观,总会想到不好的一方面,患得患失,所以经常会做不好事.我一直把前一个当作为我前进的动力和方向,快乐充实的过好 ...

  5. 无锁CAS及无锁队列实现

    CAS ⽐较并交换(compare and swap, CAS),是原⼦操作的⼀种,可⽤于在多线程编程中实现不被打断的数据 交换操作,从⽽避免多线程同时改写某⼀数据时由于执⾏顺序不确定性以及中断的不可 ...

  6. 你应该知道的高性能无锁队列Disruptor

    1.何为队列 听到队列相信大家对其并不陌生,在我们现实生活中队列随处可见,去超市结账,你会看见大家都会一排排的站得好好的,等待结账,为什么要站得一排排的,你想象一下大家都没有素质,一窝蜂的上去结账,不 ...

  7. 基于数组的无锁队列(译)

    2019独角兽企业重金招聘Python工程师标准>>> 1 引言 最近对于注重性能的应用程序,我们有了一种能显著提高程序性能的选择:多线程.线程的概念实际上已经存在了很长时间.在过去 ...

  8. linux 无锁队列覆盖问题,无锁队列杂谈

    质量最大vczh粉(402740419) 10:13:17 nobody(1575393351)  10:10:09 无锁队列,怎么可能 ? 质量最大vczh粉(402740419) 10:13:23 ...

  9. gitclone 一个tag的地址_一个无锁队列和FreeList实现

    代码实现了这篇文章中的无锁队列. fangcun:简单,高效,实用的非阻塞(无锁)和阻塞并行队列算法​zhuanlan.zhihu.com 无锁队列需要实现一个FreeList来避免一个线程释放了结点 ...

最新文章

  1. Socket通信原理探讨(C++为例)
  2. keepalived 多个应用_Keepalived与LVS部署多个服务
  3. 比较字符串a和b的大小
  4. python编写一个登陆验证程序_python项目实战:实现验证码登录网址实例
  5. strace 哇,好多系统调用
  6. 小程序中利用Moment.js格式时间
  7. 【硬核课】最新《图卷积神经网络GCN》2020概述,76页ppt,NTU-Xavier Bresson,纽约大学深度学习课程...
  8. CVPR2021 | 视频超分辨率中时空蒸馏方案
  9. mysql sql语句优化面试题_SQL面试题之SQL优化
  10. 服装行业电子商务的概述
  11. Qt 编程使用Sapera LT API 实现盖革雪崩焦平面相机数据采集
  12. 不重装系统改硬盘模式: RAID ON 改成 AHCI
  13. 小米4 miui6 android,小米4怎么刷miui6?小米4刷miui6三种方法详解
  14. TCP/IP协议连接状态详解
  15. Windows下QT设置应用程序(exe)图标、任务栏托盘图标、任务栏窗口图标
  16. 如何使用MacVim呢?
  17. Visual Studio Code插件整理大全
  18. 2012年7月 逐鹿反APT
  19. 《菲利普·迪克的电子梦》——阐述人类深处的困惑与迷茫
  20. mysql 查找相似数据_MySQL性能优化做得好的人,都懂的索引绝技

热门文章

  1. 小孩写字慢又丑,怎么治?来看这里
  2. PandoraBox版本及已安装软件包
  3. Python编程思想【系列文章】
  4. 放弃哪吒造车增资,360的“智能汽车网络安全”牌不好打
  5. CarSim联合simulink仿真横向控制
  6. 鼠标侧键能改为ctrl吗_垂直鼠标真的能告别鼠标手吗?
  7. 淘宝、拼多多模式的集合体——CRMEB商城
  8. HSV中不同颜色对应的灰度范围
  9. 实战SPECjvm2008
  10. 12V系统汽车电子保护及TVS管选型