队列作为最常用的基础数据结构之一,相信大家都已经非常非常熟悉了,这里省略关于队列的介绍。在平时开发中队列的出现频率非常非常高,因此我们也会很关心队列的性能问题。当并发访问队列时,队列的性能往往受到同步手段的制约,最简单的方式是使用互斥锁对整个队列加锁,但其并发性能却惨不忍睹。

因此,有了各式各样的无锁队列实现,本文介绍其中的一种实现。还是老样子,实现基于x86体系结构,Linux环境。

初始化

和大部分无锁数据结构的实现一样,这个无锁队列的实现也是基于链表的,声明如下:

#ifndef _QUEUE_H_

#define _QUEUE_H_

#define CACHE_ALIGN_SIZE 64

#define CACHE_ALIGNED __attribute__((aligned(CACHE_ALIGN_SIZE)))

template

class Queue

{

public:

Queue() : head_(NULL), tail_(NULL)

{

head_ = tail_ = new qnode();

head_->next = NULL;

}

virtual ~Queue()

{

T tmp;

while (dequeue(tmp)) {

}

delete head_;

}

void enqueue(const T &data);

bool dequeue(T &data);

private:

class qnode

{

public:

T data;

qnode *next;

};

qnode * head_ CACHE_ALIGNED;

qnode * tail_ CACHE_ALIGNED;

};

#endif /* _QUEUE_H_ */

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

#ifndef _QUEUE_H_

#define _QUEUE_H_

#define CACHE_ALIGN_SIZE 64

#define CACHE_ALIGNED __attribute__((aligned(CACHE_ALIGN_SIZE)))

template

classQueue

{

public:

Queue():head_(NULL),tail_(NULL)

{

head_=tail_=newqnode();

head_->next=NULL;

}

virtual~Queue()

{

Ttmp;

while(dequeue(tmp)){

}

deletehead_;

}

voidenqueue(constT&data);

booldequeue(T&data);

private:

classqnode

{

public:

Tdata;

qnode*next;

};

qnode*head_CACHE_ALIGNED;

qnode*tail_CACHE_ALIGNED;

};

#endif /* _QUEUE_H_ */

初始化后队列的头尾都指向一个dummy节点,并且头尾指针都按照cache line对齐,以避免产生false sharing问题。

进队(enqueue)

进队操作分三步:

创建新节点

将当前尾节点的

next指针指向新节点

修改尾指针指向新创建的节点

其中第二步和第三步在常规的队列实现中往往需要借助于锁来保证原子性,如果不使用锁会怎么样?

除了第一步是没有竞争的,第二步和第三步在并发执行的时候都会有竞争(都是对共享变量的read-modify-write操作),这要求我们需要使用原子操作来实现这两步。然而仅仅这样也还是不够的,这不能保证第二步和第三步的原子性,因此需要一些特殊的处理,看代码:

template

void Queue::enqueue(const T &data)

{

qnode *node = new qnode();

node->data = data;

node->next = NULL;

qnode *t = NULL;

qnode *next = NULL;

while (true) {

t = tail_;

next = t->next;

asm volatile("" ::: "memory");

if (tail_ != t) {

continue;

}

if (next) {

__sync_bool_compare_and_swap(&tail_, t, next);

continue;

}

if (__sync_bool_compare_and_swap(&t->next, NULL, node)) {

break;

}

}

__sync_bool_compare_and_swap(&tail_, t, node);

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

template

voidQueue::enqueue(constT&data)

{

qnode*node=newqnode();

node->data=data;

node->next=NULL;

qnode*t=NULL;

qnode*next=NULL;

while(true){

t=tail_;

next=t->next;

asmvolatile("":::"memory");

if(tail_!=t){

continue;

}

if(next){

__sync_bool_compare_and_swap(&tail_,t,next);

continue;

}

if(__sync_bool_compare_and_swap(&t->next,NULL,node)){

break;

}

}

__sync_bool_compare_and_swap(&tail_,t,node);

}

注意代码17-20行,这段逻辑就是处理两个操作不原子的关键所在。当并发执行进队时,通过循环检查并向前推进尾指针来保证拿到最新的尾指针。

另一段有意思的代码是13-16行,这一行对变量

t进行检查,目的在于确认

t和

next的值是一致的,但是在这个特定的情景中我理解实际上可以删去而不影响正确性,因为后续的CAS操作同样会检查这一条件。

出队(dequeue)

出队操作相对入队要简单不少,只要把头指针向后移并拿出数据就可以了。但是在无锁并发的情况下仍有不少细节需要考虑,直接看代码:

template

bool Queue::dequeue(T &data)

{

qnode *t = NULL;

qnode *h = NULL;

qnode *next = NULL;

while (true) {

h = head_;

t = tail_;

next = h->next;

asm volatile("" ::: "memory");

if (head_ != h) {

continue;

}

if (!next) {

return false;

}

if (h == t) {

__sync_bool_compare_and_swap(&tail_, t, next);

continue;

}

data = next->data;

if (__sync_bool_compare_and_swap(&head_, h, next)) {

break;

}

}

h->next = (qnode *)1; // bad address, It's a trap!

/* delete h; */

return true;

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

template

boolQueue::dequeue(T&data)

{

qnode*t=NULL;

qnode*h=NULL;

qnode*next=NULL;

while(true){

h=head_;

t=tail_;

next=h->next;

asmvolatile("":::"memory");

if(head_!=h){

continue;

}

if(!next){

returnfalse;

}

if(h==t){

__sync_bool_compare_and_swap(&tail_,t,next);

continue;

}

data=next->data;

if(__sync_bool_compare_and_swap(&head_,h,next)){

break;

}

}

h->next=(qnode*)1;// bad address, It's a trap!

/* delete h; */

returntrue;

}

需要解释的地方不多,只有一个比较有意思的地方,在代码的12-15行有一个检查,这个检查和之前提到的一样,是用来保证

h和

next变量是一致的。和进队过程中的那个不同的是,这段逻辑不是可有可无的,因为在23行对

next指针的访问发生在CAS操作之前,如果访问了已经出队的节点则会导致程序崩溃。

What’s next?

到这里看上去这个无锁队列已经完成了,并且可以正确的运行。但你一定会发现一个严重的问题:这个队列中的内存从来没有被释放所以在不断的泄露。

没错,这就是无锁数据结构普遍存在的问题,因为没有使用互斥的同步机制,所以很难找到一个安全的释放内存的时机。你可能会认为在完成出队后释放掉内存不就可以了么?但是由于没有使用同步机制,我们无法保证这个已经出队的节点没有被其他并发线程持有,如果此时释放内存,Boom…如果之后这个内存又被重用,那又可能会遇到著名的ABA问题。

那这个无锁队列岂不是没用了么?当然不是,只是我们需要一种与之配套的内存生命周期管理机制,比如Linux内核中广泛使用的RCU,又比如我之后会介绍的Hazard Pointer。

一点解释

你可能注意到了我在出队逻辑的29-30行写了两行略有些奇怪的代码,这里我来做一些解释。实际上这里本应该执行的操作是释放

h指向的节点占用的内存,但是我在论文

我之所以做这样的修改,并不是因为论文中提出的算法不好,恰恰相反,论文中的算法非常精妙,但通用性不足,我只是单纯的为了引出一种更为通用的内存生命周期管理机制:Hazard Pointer。

参考资料

Michael M M, Scott M L. Simple, fast, and practical non-blocking and blocking concurrent queue algorithms[C]//Proceedings of the fifteenth annual ACM symposium on Principles of distributed computing. ACM, 1996: 267-275. ↩

浏览:

661

linux无锁队列性能对比,无锁队列的一种实现相关推荐

  1. 最新 iOS开发中的11种锁以及性能对比

    在平时开发中我们经常会使用多线程,多线程为我们带来了很大便利,也提高了程序的执行效率,但同时也带来了Data race,Data race的定义很简单:当至少有两个线程同时访问同一个变量,而且至少其中 ...

  2. linux 内核 性能,Linux内核十个版本性能对比

    [IT168 评论]从2008年1月底至今,Linux Kernel系统内核已经先后升级了十次,版本号也从2.6.24上升到2.6.33,并且下个版本2.6.34也已进入开发阶段.今天我们就看看过去两 ...

  3. 无锁队列的几种实现及其性能对比

    一.无锁队列用在什么样的场景? 当需要处理的数据非常多,比如行情数据,一秒处理非常多的数据的时候,可以考虑用无锁队列.但是如果一秒只需要处理几百或者几千的数据,是没有必要考虑用无锁队列的.用互斥锁就能 ...

  4. iOS开发中自旋和互斥锁的理解以及所有锁的性能比较

    补充: 可以看到除了 OSSpinLock 外,dispatch_semaphore 和 pthread_mutex 性能是最高的.苹果在新系统中已经优化了 pthread_mutex 的性能,所以它 ...

  5. 图解:为什么非公平锁的性能更高?

    作者 | 王磊 来源 | Java中文社群(ID:javacn666) 转载请联系授权(微信ID:GG_Stone) 在 Java 中 synchronized 和 ReentrantLock 默认使 ...

  6. Java中的锁机制 -- 乐观锁、悲观锁、自旋锁、可重入锁、读写锁、公平锁、非公平锁、共享锁、独占锁、重量级锁、轻量级锁、偏向锁、分段锁、互斥锁、同步锁、死锁、锁粗化、锁消除

    文章目录 1. Java中的锁机制 1.1 乐观锁 1.2 悲观锁 1.3 自旋锁 1.4 可重入锁(递归锁) 1.5 读写锁 1.6 公平锁 1.7 非公平锁 1.8 共享锁 1.9 独占锁 1.1 ...

  7. 消息队列MQ与微消息队列MQTT

    文章目录 参考文章 什么是消息队列,什么是RPC 为什么要使用MQ消息队列 1. 解耦(可用性) 2. 流量削峰 3. 数据分发 消息队列的缺点 多种主流传统消息队列MQ对比 传统消息队列Rocket ...

  8. redis5 stream php队列,使用redis stream实现队列服务

    1. stream类型 Redis5.0引入了Stream类型.该Stream类型的出现,几乎满足了消息队列具备的全部内容,包括但不限于: 消息ID的序列化生成 消息遍历 消息的阻塞和非阻塞读取 消息 ...

  9. 【高并发】多线程之无锁队列|性能优化

    队列操作模型 (1)单生产者--单消费者 (2)多生产者--单消费者 (3)单生产者--多消费者 (4)多生产者--多消费者 3.队列数据定长与变长 (1)队列数据定长 (2)队列数据变长 并发无锁处 ...

最新文章

  1. dataTables-使用详细说明整理
  2. 【压缩率3000%】上交大ICCV:精度保证下的新型深度网络压缩框架
  3. H5页面获取原生APP的登录状态
  4. C++ 实现分块查找(顺序存储结构)(完整代码)
  5. 现代软件工程 教学计划 中国科学技术大学-微软亚洲研究院联合培养班
  6. cdh-5.10.0搭建安装
  7. Springboot2 自定义异常处理
  8. 关于C#项目开发梳理
  9. html 背景图片旋转,CSS3只让背景图片旋转180度的实现示例
  10. coreldraw橙子怎么画_cdr怎么画一杯橙汁?CorelDRAW简单绘制的一杯满满的橙汁教程...
  11. 打开outlook显示服务器内存不足,Outlook2013无法打开邮箱,报错提示可用内存不足...
  12. java sort 没法用,$ group无法使用Spring聚合类后的$ sort管道
  13. android手机投影电视软件,Type-C手机投屏电视/投影仪超简单,快看你的可以吗?...
  14. ESP32片外PSRAM
  15. Python:语音处理,实现在线朗读RFC文档或本地文本文件
  16. MyEclipse如何配置Tomcat
  17. 上古卷轴3晨风详尽指引攻略
  18. UMLChina建模竞赛第3赛季第10轮:汽车、EA
  19. python里ipo是什么意思呢_IPO 指的是什么?公开募股和所谓的上市之间有什么区别?...
  20. 响铃:救市之作三星S6 edge+能否完成使命?

热门文章

  1. iconv java_libiconv之iconv函数的使用方法
  2. QML官方Demo学习之Scene Graph - Painted Item
  3. css3绘制环形_css画圆,如何用纯css实现一个动态画圆环效
  4. Vivo x9s设置deviceOwner后,无法使用应用分身
  5. 【博客473】Docker的health健康状态检查
  6. 【MISC】 米哈游中的架空文字
  7. SQL注入之MYSQL注入总结
  8. 程序员须知:面试中最容易被问到的18个算法题(附答案!)
  9. 爆料,华为重回深圳,深圳第二个硅谷来了-龙华九龙山未来可期
  10. ug在哪看服务器运行,ug的服务器怎么打开?