设计并发队列

#include <pthread.h>
#include <list>
using namespace std;template <typename T>
class Queue
{
public: Queue( ) { pthread_mutex_init(&_lock, NULL); } ~Queue( ) { pthread_mutex_destroy(&_lock);} void push(const T& data);T pop( );
private: list<T> _list; pthread_mutex_t _lock;
};template <typename T>
void Queue<T>::push(const T& value )
{ pthread_mutex_lock(&_lock);_list.push_back(value);pthread_mutex_unlock(&_lock);
}template <typename T>
T Queue<T>::pop( )
{ if (_list.empty( )) { throw "element not found";}pthread_mutex_lock(&_lock); T _temp = _list.front( );_list.pop_front( );pthread_mutex_unlock(&_lock);return _temp;
}

上述代码是有效的。但是,请考虑这样的情况:您有一个很长的队列(可能包含超过 100,000 个元素),而且在代码执行期间的某个时候,从队列中读取数据的线程远远多于添加数据的线程。因为添加和取出数据操作使用相同的互斥锁,所以读取数据的速度会影响写数据的线程访问锁。那么,使用两个锁怎么样?一个锁用于读取操作,另一个用于写操作。给出修改后的 Queue 类。

template <typename T>
class Queue
{
public: Queue( ) { pthread_mutex_init(&_rlock, NULL); pthread_mutex_init(&_wlock, NULL);} ~Queue( ) { pthread_mutex_destroy(&_rlock);pthread_mutex_destroy(&_wlock);} void push(const T& data);T pop( );
private: list<T> _list; pthread_mutex_t _rlock, _wlock;
};template <typename T>
void Queue<T>::push(const T& value )
{ pthread_mutex_lock(&_wlock);_list.push_back(value);pthread_mutex_unlock(&_wlock);
}template <typename T>
T Queue<T>::pop( )
{ if (_list.empty( )) { throw "element not found";}pthread_mutex_lock(&_rlock);T _temp = _list.front( );_list.pop_front( );pthread_mutex_unlock(&_rlock);return _temp;
}

设计并发阻塞队列

目前,如果读线程试图从没有数据的队列读取数据,仅仅会抛出异常并继续执行。但是,这种做法不总是我们想要的,读线程很可能希望等待(即阻塞自身),直到有数据可用时为止。这种队列称为阻塞的队列。如何让读线程在发现队列是空的之后等待?一种做法是定期轮询队列。但是,因为这种做法不保证队列中有数据可用,它可能会导致浪费大量 CPU 周期。推荐的方法是使用条件变量,即 pthread_cond_t 类型的变量。

template <typename T>
class BlockingQueue
{
public: BlockingQueue ( ) { pthread_mutexattr_init(&_attr); // set lock recursivepthread_mutexattr_settype(&_attr,PTHREAD_MUTEX_RECURSIVE_NP); pthread_mutex_init(&_lock,&_attr);pthread_cond_init(&_cond, NULL);} ~BlockingQueue ( ) { pthread_mutex_destroy(&_lock);pthread_cond_destroy(&_cond);} void push(const T& data);bool push(const T& data, const int seconds); //time-out push
    T pop( );T pop(const int seconds); // time-out popprivate: list<T> _list; pthread_mutex_t _lock;pthread_mutexattr_t _attr;pthread_cond_t _cond;
};template <typename T>
T BlockingQueue<T>::pop( )
{ pthread_mutex_lock(&_lock);while (_list.empty( )) { pthread_cond_wait(&_cond, &_lock) ;}T _temp = _list.front( );_list.pop_front( );pthread_mutex_unlock(&_lock);return _temp;
}template <typename T>
void BlockingQueue <T>::push(const T& value )
{ pthread_mutex_lock(&_lock);const bool was_empty = _list.empty( );_list.push_back(value);pthread_mutex_unlock(&_lock);if (was_empty) pthread_cond_broadcast(&_cond);
}

并发阻塞队列设计有两个要注意的方面:

1.可以不使用 pthread_cond_broadcast,而是使用 pthread_cond_signal。但是,pthread_cond_signal 会释放至少一个等待条件变量的线程,这个线程不一定是等待时间最长的读线程。尽管使用 pthread_cond_signal 不会损害阻塞队列的功能,但是这可能会导致某些读线程的等待时间过长。

2.可能会出现虚假的线程唤醒。因此,在唤醒读线程之后,要确认列表非空,然后再继续处理。强烈建议使用基于 while 循环的 pop()。

设计有超时限制的并发阻塞队列

在许多系统中,如果无法在特定的时间段内处理新数据,就根本不处理数据了。例如,新闻频道的自动收报机显示来自金融交易所的实时股票行情,它每 n 秒收到一次新数据。如果在 n 秒内无法处理以前的一些数据,就应该丢弃这些数据并显示最新的信息。根据这个概念,我们来看看如何给并发队列的添加和取出操作增加超时限制。这意味着,如果系统无法在指定的时间限制内执行添加和取出操作,就应该根本不执行操作。

template <typename T>
bool BlockingQueue <T>::push(const T& data, const int seconds)
{struct timespec ts1, ts2;const bool was_empty = _list.empty( );clock_gettime(CLOCK_REALTIME, &ts1);pthread_mutex_lock(&_lock);clock_gettime(CLOCK_REALTIME, &ts2);if ((ts2.tv_sec – ts1.tv_sec) <seconds) {was_empty = _list.empty( );_list.push_back(value);}pthread_mutex_unlock(&_lock);if (was_empty) pthread_cond_broadcast(&_cond);
}template <typename T>
T BlockingQueue <T>::pop(const int seconds)
{ struct timespec ts1, ts2; clock_gettime(CLOCK_REALTIME, &ts1); pthread_mutex_lock(&_lock);clock_gettime(CLOCK_REALTIME, &ts2);// First Check: if time out when get the _lock if ((ts1.tv_sec – ts2.tv_sec) < seconds) { ts2.tv_sec += seconds; // specify wake up timewhile(_list.empty( ) && (result == 0)) { result = pthread_cond_timedwait(&_cond, &_lock, &ts2) ;}if (result == 0) // Second Check: if time out when timedwait
        {T _temp = _list.front( );_list.pop_front( );pthread_mutex_unlock(&_lock);return _temp;}}pthread_mutex_unlock(&lock);throw "timeout happened";
}

设计有大小限制的并发阻塞队列

最后,讨论有大小限制的并发阻塞队列。这种队列与并发阻塞队列相似,但是对队列的大小有限制。在许多内存有限的嵌入式系统中,确实需要有大小限制的队列。
对于阻塞队列,只有读线程需要在队列中没有数据时等待。对于有大小限制的阻塞队列,如果队列满了,写线程也需要等待。

template <typename T>
class BoundedBlockingQueue
{
public: BoundedBlockingQueue (int size) : maxSize(size) { pthread_mutex_init(&_lock, NULL); pthread_cond_init(&_rcond, NULL);pthread_cond_init(&_wcond, NULL);_array.reserve(maxSize);} ~BoundedBlockingQueue ( ) { pthread_mutex_destroy(&_lock);pthread_cond_destroy(&_rcond);pthread_cond_destroy(&_wcond);} void push(const T& data);T pop( );
private: vector<T> _array; // or T* _array if you so preferint maxSize;pthread_mutex_t _lock;pthread_cond_t _rcond, _wcond;
};template <typename T>
void BoundedBlockingQueue <T>::push(const T& value )
{ pthread_mutex_lock(&_lock);const bool was_empty = _array.empty( );while (_array.size( ) == maxSize) { pthread_cond_wait(&_wcond, &_lock);} _array.push_back(value);pthread_mutex_unlock(&_lock);if (was_empty) pthread_cond_broadcast(&_rcond);
}template <typename T>
T BoundedBlockingQueue<T>::pop( )
{ pthread_mutex_lock(&_lock);const bool was_full = (_array.size( ) == maxSize);while(_array.empty( )) { pthread_cond_wait(&_rcond, &_lock) ;}T _temp = _array.front( );_array.erase( _array.begin( ));pthread_mutex_unlock(&_lock);if (was_full)pthread_cond_broadcast(&_wcond);return _temp;
}

要注意的第一点是,这个阻塞队列有两个条件变量而不是一个。如果队列满了,写线程等待 _wcond 条件变量;读线程在从队列中取出数据之后需要通知所有线程。同样,如果队列是空的,读线程等待 _rcond 变量,写线程在把数据插入队列中之后向所有线程发送广播消息。如果在发送广播通知时没有线程在等待 _wcond 或 _rcond,会发生什么?什么也不会发生;系统会忽略这些消息。还要注意,两个条件变量使用相同的互斥锁。

来源《用于并行计算的多线程数据结构》
http://www.ibm.com/developerworks/cn/aix/library/au-multithreaded_structures1/index.html
http://www.ibm.com/developerworks/cn/aix/library/au-multithreaded_structures2/index.html

转载于:https://www.cnblogs.com/luxiaoxun/archive/2012/10/07/2713576.html

Linux下设计并发队列相关推荐

  1. Linux下高并发socket最大连接数所受的各种限制

    修改最大打开文件数 # ulimit -n 修改最大进程数 # ulimit -u ------------------------------------------------------ Lin ...

  2. linux socket文件数限制,Linux下高并发socket最大连接数所受的限制问题

    Linux下高并发socket最大连接数所受的限制问题1.修改用户进程可打开文件数限制在Linux平台上,无论编写客户端程序还是服务端程序,在进行高并发TCP连接处理时, 最高的并发数量都要受到系统对 ...

  3. Linux进程最大socket数,Linux下高并发socket最大连接数所受的各种限制(详解)

    1.修改用户进程可打开文件数限制 在Linux平台上,无论编写客户端程序还是服务端程序,在进行高并发TCP连接处理时,最高的并发数量都要受到系统对用户单一进程同时可打开文件数量的限制(这是因为系统为每 ...

  4. 1 linux下tcp并发服务器的几种设计的模式套路,Linux下几种并发服务器的实现模式(详解)...

    1>单线程或者单进程 相当于短链接,当accept之后,就开始数据的接收和数据的发送,不接受新的连接,即一个server,一个client 不存在并发. 2>循环服务器和并发服务器 1.循 ...

  5. linux下添加mq队列管理配置,linux下 MQ第二弹:队列管理器的配置,实现双机MQI通道异步双向通信,亲测!!...

    本人前面文章一提到MQ 在linux下的安装,在此只描述队列管理器的配置,实现MQI通道的双向通讯. 一下方法我已亲测!! 贴出A,B两机的MQ配置记录: A机: #**--------------- ...

  6. 小作品--linux下设计MATLAB GUI 实现美图秀秀的功能

    一.作品介绍 首先展示一下效果: 点击左下方的三个图片选项,可以自动给画面中的人脸添加卡通耳朵,卡通胡子等特效, 智能美肤还不知道怎么实现,只留了个控件,没有具体功能. 接下来介绍下作品的制作思路: ...

  7. Linux下设计一个简单的线程池

    定义 什么是线程池?简单点说,线程池就是有一堆已经创建好了的线程,初始它们都处于空闲等待状态,当有新的任务需要处理的时候,就从这个池子里面取一个空闲等待的线程来处理该任务,当处理完成了就再次把该线程放 ...

  8. 教你修改Linux下高并发socket最大连接数所受的各种限制

    1.修改用户进程可打开文件数限制 在Linux平台上,无论编写客户端程序还是服务端程序,在进行高并发TCP连接处理时,最高的并发数量都要受到系统对用户单一进程同时可打开 文件数量的限制(这是因为系统为 ...

  9. 转:Linux下高并发socket最大连接数所受的各种限制

    1.修改用户进程可打开文件数限制 在Linux平台上,无论编写客户端程序还是服务端程序,在进行高并发TCP连接处理时,最高的并发数量都要受到系统对用户单一进程同时可打开文件数量的限制(这是因为系统为每 ...

最新文章

  1. android trace文件分析ANR
  2. [转]让你赚大钱成富翁的4个投资习惯
  3. FineReport:任意时刻只允许在一个客户端登陆账号的插件
  4. dropbox pac规则_来自Dropbox的Zulip聊天,Linux Foundation报告,FCC规则以及更多新闻
  5. eclipse 创建java窗体_eclipse创建Dialog窗口的操作流程
  6. java更改管理员的密码_java-第十章-类和对象-更改管理员密码
  7. windows技巧——notepad2 取代自带 notepad ,功能强大!
  8. 按住 ctrl 并滚动鼠标滚轮才可缩放地图_鼠标永远快不过键盘!32组PS常用快捷键你值得拥有!...
  9. T-MGCN时间多图卷积网络用于交通流预测
  10. excel 科学计数法导入数据库
  11. 博士申请 | 阿尔伯塔大学招收人工智能方向全奖博士生、硕士生
  12. linux下c使用lzma_lzma文件及命令简介
  13. 当我说转行大数据工程师时,众人笑我太疯癫,直到四个月后......
  14. 关于QQBot机器人掉线问题修复
  15. 将汉字数字转换成数字
  16. 微信小程序存在的风险_微信小程序开发技术风险存在,如何规避是重点
  17. SegY地震体数据可视化分析工具
  18. 温故而知新 知识点整理
  19. bootstrap表格 行编辑状态_JS表格组件BootstrapTable行内编辑解决方案x-editable
  20. SCO UNIX下磁带机的安装与备份

热门文章

  1. php函数知识点,php入门学习知识点七 PHP函数的基本应用_php基础
  2. java 获取mp4 缩略图_java获取视频缩略图
  3. VS2019项目打包生成.exe文件与Setup的步骤实现
  4. 清除ubuntu下缓存、软件安装包和多余内核
  5. [攻防世界 pwn]——CGfsb
  6. [BUUCTF-pwn]——ez_pz_hackover_2016
  7. 苏州中学2021届高考成绩查询,苏州中学排名前十名,2021年苏州中学排名一览表
  8. mysql中case when then 的使用
  9. Bugzilla/使用
  10. 观看TED演讲(计算机的发明和发展)感受