可以考虑一个单向链表实现的队列(Queue)结构,在多线程环境下,多个线程同时对这个队列添加元素和取出元素的时候

势必要考虑采用锁的机制来进行同步以防止链表结构被破坏。

一般的做法是不管是读取还是写入的时候都使用同一把锁来进行互斥,这样实现起来比较简单但是却很低效。

本篇文章主要讲述的是对于一个链表实现的队列,通过采用头部锁和尾部锁的方式来分别对添加元素和取出元素进行互斥,

在多线程的环境中可以并行实现同时添加和取出操作(同时添加或者是同时取出必须要互斥)来达到队列数据高效读取的简单算法。

采用的语言是JAVA。

package twolocksample;
 
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
 
/* 这个类是用来放入单项链表队列中的元素类
 * 
 */
class Node<E> {
    // 保存节点的数值
    private E value;
    
    // 单项链表的指向下一个节点的引用
    private Node<E> next;
    
    // 一个带值的构造函数
    public Node(E e) {
        value = e;
    }
    
    public void setValue(E value) {
        this.value = value;
    }
    public E getValue() {
        return value;
    }
    public void setNext(Node<E> next) {
        this.next = next;
    }
    public Node<E> getNext() {
        return next;
    }
}
 
/* 这个类实现了一个简单的带两个同步锁单向队列,
 * 通过头部锁(只对headNode节点的读取进行互斥)和尾部锁
 * (只对tailNode的节点读取进行互斥)分别对offer和poll进行互斥
 * 从而可以让offer和poll同时并行进行,大幅度提高存储效率。
 * 
 * 一般的两个同步锁队列里在队列为空的情况下加入offer第一个元素和
 * poll第一个元素的处理会发生头部锁尾部锁发生冲突而导致比较难处理的情况,
 * 而本实现通过在初始化队列里添加一个不起实际作用的哨兵节点来保证
 * 队列的任何时候都不会出现空的情况从而避免了,头部锁和尾部锁出现冲突的问题。
 * 也就是说这个队列里任何之后都至少有一个哨兵节点的存在。
 * 
 */
public class TwoLockLinkQueue<E> {
    
    // 头部锁用来互斥多个poll的线程
    private Lock headLock = new ReentrantLock();
    // 尾部锁用来互斥多个offer的线程
    private Lock tailLock = new ReentrantLock();
    // 指向头部节点的引用
    private Node<E> headNode;
    // 指向尾部节点的引用
    private Node<E> tailNode;
    
    public TwoLockLinkQueue() {
        // 初始化时生成一个哨兵节点,让头和尾的引用都
        // 指向这个哨兵节点
        Node<E> sentinelNode = new Node<E>(null);
        headNode = sentinelNode;
        tailNode = sentinelNode;
    }
    
    // 放入一个元素到队列的最末端
    public boolean offer( E e ) {
        if(e == null) {
            return false;
        }
        // 【注意3】这里生成节点的时候直接将值赋到了节点里 
        Node<E> newNode = new Node<E>(e);
        tailLock.lock();
        try {
            // 【注意1】
            // 此处在队列为空(只有哨兵节点)的情况下与【注意2】
            // 的处理会发生冲突,冲突内容是此处要把哨兵节点的next设
            // 为新追加出来的节点,而【注意2】的处理要取出哨兵节点的next节点
            // 好在冲突的处理都是单次的内存读与写,实际上在CPU层面是原子
            // 操作,所以不会破坏链表的结构。
            // 最后,如果【注意1】的线程先执行【注意2】的线程后执行的话,【注意2】
            // 的poll处理会正常获取到【注意1】里offer进去的新节点,反之【注意2】
            // 会取到一个Null节点,这都是合情合理的。
            // 唯一一点要注意的是如果没有按照【注意3】的处理进行而是先把节点加入连表里,
            // 再给节点赋值的话,就会导致poll出来的节点里的值不正确的问题出现。
            tailNode.setNext(newNode);
            tailNode = newNode;
        } finally {
            tailLock.unlock();
        }
        return true;
    }
    
    // 从头部取出第一个元素
    public E pool() {
        headLock.lock();
        try {
            // 【注意2】
            // 请参考【注意1】的描述
            Node<E> newHeadNode = headNode.getNext();
            if( newHeadNode == null ) {
                return null;
            } else {
                // 这个算法巧妙在poll操作取出的实际不是第一个哨兵节点
                // 而且哨兵节点的Next节点的值,
                // 老的哨兵节点已经完成了它的任务被抛弃,现在将
                // 被poll出来的节点作为哨兵节点继续工作。
                headNode = newHeadNode;
                return newHeadNode.getValue();
            }
        }finally {
            headLock.unlock();
        }
    }
    
    // 简单起见我们用一个嵌套类来简单测试一下这个队列的正确性
    static class TestClass {
        // 用一个原子类来获取Offer到队列中的元素个数
        static AtomicInteger count = new AtomicInteger(0);
        // 主角登场
        static TwoLockLinkQueue<Integer> queue = new TwoLockLinkQueue<Integer>();
        
        public static void main(String[] args) {
            
            final int offerCount = 5;
            final int threadCount = 5;
            
            // 这个类是专门用来Offer数据的Runnable类
            class task1 implements Runnable {
                private TwoLockLinkQueue<Integer> queue;
                public task1(TwoLockLinkQueue<Integer> queue) {
                    this.queue = queue;
                }
                public void run() {
                    for(int i =0;i<offerCount;i++) {
                        int value = count.addAndGet(1);
                        queue.offer(value);
                        System.out.println("["+Thread.currentThread()+
                            "]"+"offer:"+value);
                    }
                }
            }
            
            // 这个类是专门用来poll数据的Runnable类
            class task2 implements Runnable {
                private TwoLockLinkQueue<Integer> queue;
                public task2(TwoLockLinkQueue<Integer> queue) {
                    this.queue = queue;
                }
                public void run() {
                    while(!Thread.interrupted()) {
                        Integer value = queue.pool();
                        if(value != null) {
                            System.out.println("["+Thread.currentThread()+
                                "]"+"poll:"+value);
                        }
                    }
                }
            }
            // 使用一个线程池来管理所有线程
            ExecutorService pool = Executors.newCachedThreadPool();
            // 分别开启了offer的线程和poll的线程
            for(int i=0;i<threadCount;i++) {
                pool.execute(new task1(queue));
                pool.execute(new task2(queue));
            }
            pool.shutdown();
            
            // 等待一段时间之后结束所有的线程
            try{
                pool.awaitTermination(1000, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e1){
            }
            pool.shutdownNow();  
        }
    }
}

参考的执行结果为
[Thread[pool-1-thread-1,5,main]]offer:1
[Thread[pool-1-thread-6,5,main]]poll:3
[Thread[pool-1-thread-4,5,main]]poll:2
[Thread[pool-1-thread-1,5,main]]offer:3
[Thread[pool-1-thread-4,5,main]]poll:4
[Thread[pool-1-thread-1,5,main]]offer:4
[Thread[pool-1-thread-1,5,main]]offer:5
[Thread[pool-1-thread-1,5,main]]offer:6
[Thread[pool-1-thread-4,5,main]]poll:5
[Thread[pool-1-thread-4,5,main]]poll:6
[Thread[pool-1-thread-5,5,main]]offer:7
[Thread[pool-1-thread-5,5,main]]offer:8
[Thread[pool-1-thread-4,5,main]]poll:7
[Thread[pool-1-thread-2,5,main]]poll:1
[Thread[pool-1-thread-5,5,main]]offer:9
[Thread[pool-1-thread-5,5,main]]offer:10
[Thread[pool-1-thread-6,5,main]]poll:8
[Thread[pool-1-thread-8,5,main]]poll:11
[Thread[pool-1-thread-5,5,main]]offer:11
[Thread[pool-1-thread-2,5,main]]poll:10
[Thread[pool-1-thread-9,5,main]]offer:12
[Thread[pool-1-thread-9,5,main]]offer:13
[Thread[pool-1-thread-9,5,main]]offer:14
[Thread[pool-1-thread-9,5,main]]offer:15
[Thread[pool-1-thread-4,5,main]]poll:9
[Thread[pool-1-thread-3,5,main]]offer:2
[Thread[pool-1-thread-9,5,main]]offer:17
[Thread[pool-1-thread-2,5,main]]poll:15
[Thread[pool-1-thread-6,5,main]]poll:14
[Thread[pool-1-thread-10,5,main]]poll:13
[Thread[pool-1-thread-8,5,main]]poll:12
[Thread[pool-1-thread-6,5,main]]poll:16
[Thread[pool-1-thread-2,5,main]]poll:18
[Thread[pool-1-thread-7,5,main]]offer:16
[Thread[pool-1-thread-4,5,main]]poll:17
[Thread[pool-1-thread-3,5,main]]offer:18
[Thread[pool-1-thread-4,5,main]]poll:19
[Thread[pool-1-thread-7,5,main]]offer:19
[Thread[pool-1-thread-3,5,main]]offer:20
[Thread[pool-1-thread-3,5,main]]offer:22
[Thread[pool-1-thread-3,5,main]]offer:23
[Thread[pool-1-thread-10,5,main]]poll:20
[Thread[pool-1-thread-4,5,main]]poll:21
[Thread[pool-1-thread-7,5,main]]offer:21
[Thread[pool-1-thread-7,5,main]]offer:24
[Thread[pool-1-thread-7,5,main]]offer:25
[Thread[pool-1-thread-2,5,main]]poll:23
[Thread[pool-1-thread-8,5,main]]poll:22
[Thread[pool-1-thread-6,5,main]]poll:25
[Thread[pool-1-thread-4,5,main]]poll:24

※最后再说几点
1.考虑到篇幅原因,测试代码中offerCount和threadCount的设值都比较小,大家可以进行调整以测试更多线程和更多次数读取时的动作

2.上面的执行结果看起来会有乱序的情况,那只是线程在打印结果的时候出现了乱序(因为在打印结果处理的地方没有做线程互斥)

实际队列里数据的添加和取出都是按照顺序进行的,链表也没有出现被破坏的情况

3.在JAVA中已经有了更加高效和功能强大的BlockingQueue的各种实现了,所以实际的编程中请使用Java的标准并行库

3.上面的代码也并不是没有现实意义,在一些底层C语言的嵌入式开发中可能没有像JAVA一样好用的并行库,所以可以参照上面的算法实现一个高效

线程安全的C语言的队列来,同样可以应用得很好。
其实只要实现 reentry

package com.nowcoder;import org.apache.ibatis.jdbc.Null;
import org.springframework.expression.spel.ast.NullLiteral;
import org.junit.Test;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.junit.runner.RunWith;
class Node<E>extends  ReentrantLock{private  E value;private  Node<E> next;public Node(E e){value = e;}public void setValue(E value) {this.value = value;}public void setNext(Node<E> next) {this.next = next;}public E getValue() {return value;}public Node<E> getNext() {return next;}
}class  TwoLockLinkQueue<E> {private Lock headLock = new ReentrantLock();private Lock tailLock = new ReentrantLock();private Node<E> headNode;private Node<E> tailNode;public TwoLockLinkQueue() {Node<E> temp = new Node<E>(null);headNode = temp;tailNode = temp;}public boolean offer(E e) {if (e == null)return false;Node<E> newNode = new Node<E>(e);tailLock.lock();try {tailNode.setNext(newNode);tailNode = tailNode.getNext();} finally {tailLock.unlock();}return true;}public E pool() {headLock.lock();try {Node<E> newHaedNode = headNode.getNext();if (newHaedNode == null) {return null;}headNode = newHaedNode;return newHaedNode.getValue();} finally {headLock.unlock();}}}
public class TestClass {private static AtomicInteger count= new AtomicInteger(0);;TwoLockLinkQueue<Integer> queue = new TwoLockLinkQueue<Integer>();public  TwoLockLinkQueue<Integer> getQueue() {return queue;}@Testpublic void main(String[] arg){final  int offerCount = 5;final  int threadCount = 5;class task2 implements Runnable {private TwoLockLinkQueue<Integer> queue;public task2(TwoLockLinkQueue<Integer> queue) {this.queue = queue;}public void run() {while(!Thread.interrupted()) {Integer value = queue.pool();if(value != null) {System.out.println("["+Thread.currentThread()+"]"+"poll:"+value);}}}}class task1 implements Runnable {private TwoLockLinkQueue<Integer> queue;public task1(TwoLockLinkQueue<Integer> queue) {this.queue = queue;}public void run() {for (int i = 0;i<offerCount;i++){int value = count.addAndGet(1);queue.offer(value);System.out.println("["+Thread.currentThread()+"]"+"offer:"+value);}}}ExecutorService pool = Executors.newCachedThreadPool();for (int i=0;i<threadCount ;i++){pool.execute(new task1(this.getQueue()));pool.execute(new task1(this.getQueue()));}pool.shutdown();}}

实现一个多线程安全的单向有序链表,add单个结点、与其他链表合并相关推荐

  1. 数据结构:假设有一个带头结点的单链表L,每个结点值由单个数字、小写字母和大写字母构成。设计一个算法将其拆分成3个带头结点的单链表L1、L2和L3,L1包含L中的所有数字结点,L2包含L中的所有小写字母

    假设有一个带头结点的单链表L,每个结点值由单个数字.小写字母和大写字母构成.设计一个算法将其拆分成3个带头结点的单链表L1.L2和L3,L1包含L中的所有数字结点,L2包含L中的所有小写字母结点,L3 ...

  2. java不带头结点单链表,java带头结点的单链表

    JAVA 循环双链表的建立 import java.util.Scanner; //循环双向链表的结点类 class DuLNode { private Object data;// 存放结点值 前驱 ...

  3. 单链表-在带头结点的单链表L中删除一个最小值结点(四指针)

    单链表的存储结构: typedef struct LinkList{int data;LinkList * next;} 分析: 要删除一个链表的最小值节点,首先想到的是肯定是要定义两个指针,但是 , ...

  4. Java设计链表(不带头结点的单链表)

    设计链表的实现.您可以选择使用单链表或双链表.单链表中的节点应该具有两个属性:val 和 next.val 是当前节点的值,next 是指向下一个节点的指针/引用.如果要使用双向链表,则还需要一个属性 ...

  5. M19.单链表删除指定结点--类型总结(链表)

    题目链接 题目解析 tip: 使用双指针,使得两指针之间始终保持n个结点,这样当前一个指针到达链表末端时,第二个指针在要删除结点的前一个结点处. 相似题目: 判断链表是否有环 环链表的入口 solut ...

  6. 反转链表:输入一个链表的头结点,反转该链表并输出反转后的链表的头结点。...

    2019独角兽企业重金招聘Python工程师标准>>> 题目:定义一个函数,输入一个链表的头结点,反转该链表并输出反转后的链表的头结点.     为了正确的反转一个链表,需要调整链表 ...

  7. C语言实现了一个具有头结点的单链表(附完整源码)

    实现了一个具有头结点的单链表 有头结点的单链表 实现了一个具有头结点的单链表完整源码 有头结点的单链表 线性表的顺序存储结构的特点是逻辑关系上相邻的两个元素在物理位置上也相邻,因此可以随机存取表中的任 ...

  8. python中什么是链表_python中的数据结构-链表

    一.什么是链表 链表是由一系列节点构成,每个节点由一个值域和指针域构成,值域中存储着用户数据,指针域中存储这指向下一个节点的指针.根据结构的不同,链表可以分为单向链表.单向循环链表.双向链表.双向循环 ...

  9. 有头结点和没有头结点的单链表

    在我们遇到的大多数题中,一般都是有头结点的单链表.这里我们可以把没有头结点的单链表看成我们的特殊情况,接下来,我们将从有头结点的单链表和没有头结点的单链表的定义,插入一个元素,删除一个元素等方面进行对 ...

  10. 单链表:头结点和头指针的实现方式

    链式存储是什么样的结构? 链式存储结构的特点使用任意的存储单元存储线性表的数据元素,存储单元可以使连续的也可以是不连续的,因此,为了表示每个数据元素和下一个元素的关系,除了存储本身的信息之外,还需要存 ...

最新文章

  1. 微软开源的自动机器学习工具上新了:NNI概览及新功能详解
  2. pat天梯赛练习 L2-006
  3. HALCON关于显示的函数与介绍(持续更新)
  4. linux postgresql默认安装目录,postgresql - 三种安装方式(示例代码)
  5. php旧物交易开源代码_仿互站PHP源码 虚拟物品在线交易网站源码 附14套风格
  6. Python面试必备!最全面的重点知识汇总,建议收藏!
  7. dell服务器重装win10,戴尔dell重装win10系统后无法引导的解决方法(原创)
  8. 14 ABSOLUTE评估肿瘤纯度
  9. 简单工厂模式、工厂模式、抽象工厂模式
  10. 王文京:纵横30年,阵阵桂花香
  11. 分享三大外汇日内交易策略
  12. 如何理解工程测量中的各种误差
  13. 大疆网上测评题库_大疆笔试题
  14. Linux系统查看不到IP地址的解决方法(虚拟机)
  15. 【Python 骚操作】使用 Gitbook + Typora 打造一个属于自己的电子书网站
  16. 成功解决502 Bad Gateway错误。
  17. 删除桌面计算机,电脑设置小技巧(多余壁纸、屏保删除方法)
  18. 数据分析与数据仓库平台Panoply.io获700万美元A轮融资
  19. Effective Java (3rd Editin) 读书笔记:1 创建和销毁对象
  20. 一点也不流氓的搜狗输入法皮肤

热门文章

  1. endnote使用方法大全,endnote教程
  2. 完美解决pytorch多线程问题:Cannot re-initialize CUDA in forked subprocess. To use CUDA with multiprocessing
  3. 计算机组成原理——计算机的运算方法
  4. 陆探一号-中国-2022
  5. Anomaly Detection with partially Observed Anomalies论文笔记
  6. 维和医疗分队患者信息管理系统的开发与研究
  7. java ean13_【求大神指导】java实现EAN13条形码识别
  8. 如何在Ubuntu上安装Couch DB 1.5
  9. Echarts柱状图,实现不同系列,柱体之间的部分重叠效果
  10. 学计算机物理去戴维斯还是伦斯勒理工学院好,美国大学本科专业排名:应用物理...