文章目录

  • 1 什么是线程池
  • 2 C++实现一个线程池
    • 2.1 线程安全的单例类
    • 2.2 线程安全的队列类
    • 2.3 线程池
    • 2.4 工作线程
      • 2.4.1 ThreadWorker.h
      • 2.4.2 ThreadWorker.cpp
    • 2.5 线程池测试代码

1 什么是线程池

线程池从本质上可以看做是一个多生产者多消费者的多线程应用。

一个线程池包括以下四个基本组成部分:

  • 线程池管理器:用于创建并管理线程池,包括创建线程池,销毁线程池,添加新的工作线程,添加工作任务;
  • 工作线程:属于线程池中的线程,用于处理实际任务,在没有工作任务时等待,在任务队列不为空时主动获取任务并处理任务;
  • 任务接口:每个任务必须实现的接口,以供工作线程调度任务的执行;
  • 工作任务队列:用于存放需要处理的工作任务,采用先进先出机制;

线程池根据机器性能预先创建多个工作线程,位于主线程的线程池接收到工作任务并存入到工作任务队列中,工作线程从工作队列中取出工作任务进行处理,如果工作队列为空,则工作线程进入挂起状态。

2 C++实现一个线程池

在C++中实现一个线程池,通过对线程池的特性分析,线程池主要有以下功能:

  • 线程池可以创建给定数量的工作线程,工作线程执行在子线程中开启任务函数,并等待工作队列的新任务
  • 线程池可主动关闭线程池并结束所创建的工作线程;
  • 线程池对象可被多个工作线程互斥访问,我们可以将线程池看作为一个单例,而这个单例可满足被多个线程互斥访问,这需要实现一个线程安全的单例模式;
  • 需要实现一个线程安全的队列,在队列中有新任务压入时通知工作线程领取工作任务,当队列为空时阻塞线程,避免资源开销;并支持主动解锁。

2.1 线程安全的单例类

Singleton.h

#ifndef SINGLETON_H
#define SINGLETON_H#include <memory>
#include <mutex>template<typename T>
class Singleton
{public:// 获取全局单例对象template<typename ...Args>static std::shared_ptr<T> GetInstance(Args&&... args){if (!m_pSingleton){std::lock_guard<std::mutex> gLock(m_Mutex);if (nullptr == m_pSingleton){m_pSingleton = std::make_shared<T>(std::forward<Args>(args)...);}}return m_pSingleton;}// 主动析构单例对象(提供接口,但是不建议主动调用)static void DeleteInstance(){if (m_pSingleton != nullptr){m_pSingleton.reset();m_pSingleton = nullptr;}}private:explicit Singleton();Singleton(const Singleton&) = delete;Singleton& operator=(const Singleton&) = delete;~Singleton() = default;private:static std::shared_ptr<T> m_pSingleton;static std::mutex m_Mutex;
};template<typename T>
std::shared_ptr<T> Singleton<T>::m_pSingleton = nullptr;template<typename T>
std::mutex Singleton<T>::m_Mutex;#endif

2.2 线程安全的队列类

参考项目链接:https://github.com/alfredopons/queue-thread-safe

并对该线程安全类进行了魔改,主要增加了不管当前队列是否有工作任务,都可强制退出的新功能,方便线程池退出。

SafeQueue.hpp

/** SafeQueue.hpp* Copyright (C) 2019 Alfredo Pons Menargues <apons@linucleus.com>** This program is free software: you can redistribute it and/or modify it* under the terms of the GNU General Public License as published by the* Free Software Foundation, either version 3 of the License, or* (at your option) any later version.** This program is distributed in the hope that it will be useful, but* WITHOUT ANY WARRANTY; without even the implied warranty of* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.* See the GNU General Public License for more details.** You should have received a copy of the GNU General Public License along* with this program.  If not, see <http://www.gnu.org/licenses/>.*/
#ifndef SAFEQUEUE_HPP_
#define SAFEQUEUE_HPP_#include <queue>
#include <list>
#include <mutex>
#include <thread>
#include <cstdint>
#include <condition_variable>/** A thread-safe asynchronous queue */
template <class T, class Container = std::list<T>>
class SafeQueue
{typedef typename Container::value_type value_type;typedef typename Container::size_type size_type;typedef Container container_type;public:/*! Create safe queue. */SafeQueue() = default;SafeQueue (SafeQueue&& sq){m_queue = std::move (sq.m_queue);}SafeQueue (const SafeQueue& sq){std::lock_guard<std::mutex> lock (sq.m_mutex);m_queue = sq.m_queue;}/*! Destroy safe queue. */~SafeQueue(){std::lock_guard<std::mutex> lock (m_mutex);}/*** Sets the maximum number of items in the queue. Defaults is 0: No limit* \param[in] item An item.*/void set_max_num_items (unsigned int max_num_items){m_max_num_items = max_num_items;}/***  Pushes the item into the queue.* \param[in] item An item.* \return true if an item was pushed into the queue*/bool push (const value_type& item){std::lock_guard<std::mutex> lock (m_mutex);if (m_max_num_items > 0 && m_queue.size() > m_max_num_items)return false;m_queue.push (item);m_condition.notify_one();return true;}/***  Pushes the item into the queue.* \param[in] item An item.* \return true if an item was pushed into the queue*/bool push (const value_type&& item){std::lock_guard<std::mutex> lock (m_mutex);if (m_max_num_items > 0 && m_queue.size() > m_max_num_items)return false;m_queue.push (item);m_condition.notify_one();return true;}/***  Pops item from the queue. If queue is empty, this function blocks until item becomes available.* \param[out] item The item.*/void pop (value_type& item){std::unique_lock<std::mutex> lock (m_mutex);m_condition.wait (lock, [this]() // Lambda funct{return !m_queue.empty() || m_Unlock;});if (!m_queue.empty()){item = m_queue.front();m_queue.pop();}}/***  Pops item from the queue using the contained type's move assignment operator, if it has one..*  This method is identical to the pop() method if that type has no move assignment operator.*  If queue is empty, this function blocks until item becomes available.* \param[out] item The item.*/void move_pop (value_type& item){std::unique_lock<std::mutex> lock (m_mutex);m_condition.wait (lock, [this]() // Lambda funct{return !m_queue.empty() || m_Unlock;});if (!m_queue.empty()){item = std::move(m_queue.front());m_queue.pop();}}/***  Tries to pop item from the queue.* \param[out] item The item.* \return False is returned if no item is available.*/bool try_pop (value_type& item){std::unique_lock<std::mutex> lock (m_mutex);if (m_queue.empty())return false;item = m_queue.front();m_queue.pop();return true;}/***  Tries to pop item from the queue using the contained type's move assignment operator, if it has one..*  This method is identical to the try_pop() method if that type has no move assignment operator.* \param[out] item The item.* \return False is returned if no item is available.*/bool try_move_pop (value_type& item){std::unique_lock<std::mutex> lock (m_mutex);if (m_queue.empty())return false;item = std::move (m_queue.front());m_queue.pop();return true;}/***  Pops item from the queue. If the queue is empty, blocks for timeout microseconds, or until item becomes available.* \param[out] t An item.* \param[in] timeout The number of microseconds to wait.* \return true if get an item from the queue, false if no item is received before the timeout.*/bool timeout_pop (value_type& item, std::uint64_t timeout){std::unique_lock<std::mutex> lock (m_mutex);if (m_queue.empty()){if (timeout == 0)return false;if (m_condition.wait_for (lock, std::chrono::microseconds (timeout)) == std::cv_status::timeout)return false;}item = m_queue.front();m_queue.pop();return true;}/***  Pops item from the queue using the contained type's move assignment operator, if it has one..*  If the queue is empty, blocks for timeout microseconds, or until item becomes available.*  This method is identical to the try_pop() method if that type has no move assignment operator.* \param[out] t An item.* \param[in] timeout The number of microseconds to wait.* \return true if get an item from the queue, false if no item is received before the timeout.*/bool timeout_move_pop (value_type& item, std::uint64_t timeout){std::unique_lock<std::mutex> lock (m_mutex);if (m_queue.empty()){if (timeout == 0)return false;if (m_condition.wait_for (lock, std::chrono::microseconds (timeout)) == std::cv_status::timeout)return false;}item = std::move (m_queue.front());m_queue.pop();return true;}/***  Gets the number of items in the queue.* \return Number of items in the queue.*/size_type size() const{std::lock_guard<std::mutex> lock (m_mutex);return m_queue.size();}/***  Check if the queue is empty.* \return true if queue is empty.*/bool empty() const{std::lock_guard<std::mutex> lock (m_mutex);return m_queue.empty();}/***  Swaps the contents.* \param[out] sq The SafeQueue to swap with 'this'.*/void swap (SafeQueue& sq){if (this != &sq){std::lock_guard<std::mutex> lock1 (m_mutex);std::lock_guard<std::mutex> lock2 (sq.m_mutex);m_queue.swap (sq.m_queue);if (!m_queue.empty())m_condition.notify_all();if (!sq.m_queue.empty())sq.m_condition.notify_all();}}/*! The copy assignment operator */SafeQueue& operator= (const SafeQueue& sq){if (this != &sq){std::lock_guard<std::mutex> lock1 (m_mutex);std::lock_guard<std::mutex> lock2 (sq.m_mutex);std::queue<T, Container> temp {sq.m_queue};m_queue.swap (temp);if (!m_queue.empty())m_condition.notify_all();}return *this;}/*! The move assignment operator */SafeQueue& operator= (SafeQueue && sq){std::lock_guard<std::mutex> lock (m_mutex);m_queue = std::move (sq.m_queue);if (!m_queue.empty())  m_condition.notify_all();return *this;}void lock(){m_Unlock = false;}void unlock(){m_Unlock = true;m_condition.notify_all();}void clear(){std::lock_guard<std::mutex> lock(m_mutex);while (!m_queue.empty()){m_queue.pop();}}private:std::queue<T, Container> m_queue;mutable std::mutex m_mutex;std::condition_variable m_condition;unsigned int m_max_num_items = 0;std::atomic<bool> m_Unlock = false;
};/*! Swaps the contents of two SafeQueue objects. */
template <class T, class Container>
void swap (SafeQueue<T, Container>& q1, SafeQueue<T, Container>& q2)
{q1.swap (q2);
}
#endif /* SAFEQUEUE_HPP_ */

2.3 线程池

ThreadPool.h

#ifndef THREAD_POOL_H
#define THREAD_POOL_H#include <vector>
#include <thread>
#include <atomic>#include "SafeQueue.hpp"
#include "ThreadWorker.h"
#include "Singleton.h"class ThreadPool
{public:ThreadPool():m_bThreadPoolStop(false){}static std::shared_ptr<ThreadPool> GetSingleton(){return Singleton<ThreadPool>::GetInstance();}void CreateThreads(unsigned int worker_num){for (int i = 0; i < worker_num; i++){std::shared_ptr<ThreadWorker> pThreadWorker = std::make_shared<ThreadWorker>(i);std::shared_ptr<std::thread> pThread = std::make_shared<std::thread>(&ThreadWorker::Run, pThreadWorker);m_ThreadWorkers.push_back(pThreadWorker);m_Threads.push_back(pThread);}}virtual~ThreadPool(){m_TaskQueue.clear();}void AddTask(const std::string& task_str){m_TaskQueue.push(task_str);}bool IsStop(){return m_bThreadPoolStop;}void Stop(){m_bThreadPoolStop = true;m_TaskQueue.unlock();for (int i = 0; i < m_Threads.size(); ++i){if (m_Threads[i]->joinable()){m_Threads[i]->join();}}}SafeQueue<std::string>& GetTaskQueue(){return m_TaskQueue;}private:std::atomic<bool> m_bThreadPoolStop;SafeQueue<std::string> m_TaskQueue;std::vector<std::shared_ptr<ThreadWorker>> m_ThreadWorkers;std::vector<std::shared_ptr<std::thread>> m_Threads;};#endif // !THREAD_POOL_H

2.4 工作线程

2.4.1 ThreadWorker.h

#ifndef THREAD_WORKER_H
#define THREAD_WORKER_Hclass ThreadPool;
class ThreadWorker
{public:ThreadWorker();ThreadWorker(int thread_index);virtual~ThreadWorker();void Run();private:int m_ThreadIndex;
};#endif // !THREAD_WORKER_H

2.4.2 ThreadWorker.cpp

#include <iostream>
#include "ThreadWorker.h"
#include "ThreadPool.h"ThreadWorker::ThreadWorker()
{}ThreadWorker::ThreadWorker(int thread_index):m_ThreadIndex(-1)
{m_ThreadIndex = thread_index;std::cout << "线程" << m_ThreadIndex << "被创建" << std::endl;
}ThreadWorker::~ThreadWorker()
{}void ThreadWorker::Run()
{while (!ThreadPool::GetSingleton()->IsStop()){std::string task_str = "";ThreadPool::GetSingleton()->GetTaskQueue().pop(task_str);if (!task_str.empty()){std::cout << "线程" << m_ThreadIndex << "处理了" << task_str << std::endl;}      }std::cout << "子线程"<<m_ThreadIndex<< "退出" << std::endl;
}

2.5 线程池测试代码

main.cpp

#include <vld.h>
#include <iostream>
#include <string>#include "ThreadPool.h"int main()
{ThreadPool::GetSingleton()->CreateThreads(10);std::thread prod_thread1([&]() {for(int i=0;i<3000;i++){ThreadPool::GetSingleton()->AddTask(std::to_string(i));std::this_thread::sleep_for(std::chrono::milliseconds(1));}std::cout << "生产线程1退出" << std::endl;});std::thread prod_thread2([&]() {for (int i = 0; i < 3000; i++){ThreadPool::GetSingleton()->AddTask(std::to_string(i));std::this_thread::sleep_for(std::chrono::milliseconds(1));}std::cout << "生产线程2退出" << std::endl;});if(prod_thread1.joinable())prod_thread1.join();if (prod_thread2.joinable())prod_thread2.join();ThreadPool::GetSingleton()->Stop();std::cout << "执行完成" << std::endl;return 0;
}

执行结果:

如果有兴趣可以访问我的个站:https://www.stubbornhuang.com/,更多干货!

C++11 - 构建一个符合实际应用要求的线程池相关推荐

  1. java如何创建一个两个数的队列_java线程池 如何构建一个线程立即到拉到MAX数量跑业务,线程到MAX了,额外的队列可以存储任务的线程池...

    背景:JDK的线程池的运作原理 : JDK的线程池的构造函数有7个参数,分别是corePoolSize.maximumPoolSize.keepAliveTime.unit.workQueue.thr ...

  2. 一个DEMO让你彻底理解线程池

    目录 一.简介 二.线程池任务场景 场景一:提交5个任务,执行总耗时500ms 场景二:提交10个任务,执行总耗时500ms 场景三:提交11个任务,执行总耗时1000ms 场景四:提交20个任务,执 ...

  3. 一个demo让你彻底理解线程池工作流程

    网上关于线程池的八股文太多了我不多说,说了你也记不住,记住了也理解不了,理解了也不会用- 想了很久,终于想出一个demo,加上十个场景,让你能逐步理解线程池真正的工作流程 相信我,认真看完这篇文章,你 ...

  4. 构建一个给爬虫使用的代理IP池

    做网络爬虫时,一般对代理IP的需求量比较大.因为在爬取网站信息的过程中,很多网站做了反爬虫策略,可能会对每个IP做频次控制.这样我们在爬取网站时就需要很多代理IP. 代理IP的获取,可以从以下几个途径 ...

  5. 【重难点】【JUC 05】线程池核心设计与实现、线程池使用了什么设计模式、要你设计的话,如何实现一个线程池

    [重难点][JUC 05]线程池核心设计与实现.线程池使用了什么设计模式.要你设计的话,如何实现一个线程池 文章目录 [重难点][JUC 05]线程池核心设计与实现.线程池使用了什么设计模式.要你设计 ...

  6. 程序随笔——C++实现的一个线程池

    1.线程池简介 我们知道在线程池是一种多线程处理形式,处理过程中我们将相应的任务提交给线程池,线程池会分配对应的工作线程执行任务或存放在任务队列中,等待执行. 面向对象编程中,创建和销毁对象是需要消耗 ...

  7. C++11线程池探索

    最近在学习线程池(基于C++11),了解了线程池的大概框架后便开始动手写,胡思乱想了很多种写法,也走了很多弯路,以下是学习过程: zii ThreadPool1.0 #pragma once #inc ...

  8. C++实现一个简易的线程池

    文章目录 线程池的概念 什么是线程池 线程池的优点 线程池的应用场景 线程池的实现 实现思路 代码实现 线程池的概念 什么是线程池 顾名思义,线程池就是一个有很多空闲线程的池子(线程的数量受到限制), ...

  9. Linux下设计一个简单的线程池

    定义 什么是线程池?简单点说,线程池就是有一堆已经创建好了的线程,初始它们都处于空闲等待状态,当有新的任务需要处理的时候,就从这个池子里面取一个空闲等待的线程来处理该任务,当处理完成了就再次把该线程放 ...

最新文章

  1. Blippar放大招,要开源其AR和计算机视觉技术
  2. 解决Raspberry Pi不识别RTL8188eu无线网卡芯片的问题
  3. python27安装-linux下安装python27 nginx 和uwsgi
  4. windows下GIT使用记录--00准备阶段
  5. SimpleAdapter真不简单!
  6. cms java垃圾回收_java cms垃圾回收器总结
  7. 网络与IO知识扫盲(四):C10K问题、BIO的弊端与NIO的引入
  8. ArcGIS生成根据点图层生成等值面并减小栅格锯齿的操作步骤
  9. 【Java入门】泛型的学习与应用
  10. 如何找到需要的rpm包
  11. html5--6-50 动画效果-变形
  12. SQL Sever2008 无法启动
  13. mysql 主从复制 gtid_Mysql-GTID主从复制
  14. 卡方检验用于特征选择
  15. 30个非常有趣的404错误页面设计欣赏
  16. 整人游戏-色盲在线测试
  17. pta 7-5 输出2到n之间的全部素数 (15 分)
  18. web开发字体图标制作
  19. css盒模型(标准模式和怪异模式)
  20. python 汉字 简繁体 转换方法

热门文章

  1. ubuntu安装输入法----小企鹅输入法
  2. 含泪整理最优质农用机器3dm Rhino犀牛资源素材,你想要的这里都有
  3. c语言反应能力的手机游戏,锻炼反应能力的游戏合集
  4. CVPR 2021 Authors Guidelines 投稿须知 中英文对照翻译
  5. 怎么判断两个工程师谁的代码能力强?【改编】
  6. 大红喜庆版UI猜灯谜又叫猜字谜微信小程序源码下载
  7. vue 微信登录(前后台详细教程)
  8. Python 之父:这才是Python受欢迎的本质真相!文末有福利!
  9. 行业应用 | 大数据对新闻传播的影响 大数据
  10. 二刷剑指Offer:剑指Offer+LeetCode(全53题)