经过长期探索,发现一个不需要手动设置线程休眠时间(e.g. std::this_thread::sleep_for(std::chrono::microseconds(1)))的代码:

Github: https://github.com/log4cplus/ThreadPool
#ifndef THREAD_POOL_H_7ea1ee6b_4f17_4c09_b76b_3d44e102400c
#define THREAD_POOL_H_7ea1ee6b_4f17_4c09_b76b_3d44e102400c#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <atomic>
#include <functional>
#include <stdexcept>
#include <algorithm>
#include <cassert>namespace progschj {class ThreadPool {public:explicit ThreadPool(std::size_t threads= (std::max)(2u, std::thread::hardware_concurrency()));template<class F, class... Args>auto enqueue(F&& f, Args&&... args)->std::future<typename std::result_of<F(Args...)>::type>;void wait_until_empty();void wait_until_nothing_in_flight();void set_queue_size_limit(std::size_t limit);void set_pool_size(std::size_t limit);~ThreadPool();private:void start_worker(std::size_t worker_number,std::unique_lock<std::mutex> const &lock);// need to keep track of threads so we can join themstd::vector< std::thread > workers;// target pool sizestd::size_t pool_size;// the task queuestd::queue< std::function<void()> > tasks;// queue length limitstd::size_t max_queue_size = 100000;// stop signalbool stop = false;// synchronizationstd::mutex queue_mutex;std::condition_variable condition_producers;std::condition_variable condition_consumers;std::mutex in_flight_mutex;std::condition_variable in_flight_condition;std::atomic<std::size_t> in_flight;struct handle_in_flight_decrement{ThreadPool & tp;handle_in_flight_decrement(ThreadPool & tp_): tp(tp_){ }~handle_in_flight_decrement(){std::size_t prev= std::atomic_fetch_sub_explicit(&tp.in_flight,std::size_t(1),std::memory_order_acq_rel);if (prev == 1){std::unique_lock<std::mutex> guard(tp.in_flight_mutex);tp.in_flight_condition.notify_all();}}};};// the constructor just launches some amount of workersinline ThreadPool::ThreadPool(std::size_t threads): pool_size(threads), in_flight(0){std::unique_lock<std::mutex> lock(this->queue_mutex);for (std::size_t i = 0; i != threads; ++i)start_worker(i, lock);}// add new work item to the pool// 有两种方法可以实现调用类成员,// 一种是使用   bind: .enqueue(std::bind(&Dog::sayHello, &dog));// 一种是用   mem_fn: .enqueue(std::mem_fn(&Dog::sayHello), this)template<class F, class... Args>auto ThreadPool::enqueue(F&& f, Args&&... args)-> std::future<typename std::result_of<F(Args...)>::type>{using return_type = typename std::result_of<F(Args...)>::type;auto task = std::make_shared< std::packaged_task<return_type()> >(std::bind(std::forward<F>(f), std::forward<Args>(args)...));std::future<return_type> res = task->get_future();std::unique_lock<std::mutex> lock(queue_mutex);if (tasks.size() >= max_queue_size)// wait for the queue to empty or be stoppedcondition_producers.wait(lock,[this]{return tasks.size() < max_queue_size|| stop;});// don't allow enqueueing after stopping the poolif (stop)//若线程池已经开始析构,这是不允许加入新事件throw std::runtime_error("enqueue on stopped ThreadPool");tasks.emplace([task]() { (*task)(); });std::atomic_fetch_add_explicit(&in_flight,std::size_t(1),std::memory_order_relaxed);condition_consumers.notify_one();return res;}// the destructor joins all threadsinline ThreadPool::~ThreadPool(){std::unique_lock<std::mutex> lock(queue_mutex);stop = true;pool_size = 0;condition_consumers.notify_all();condition_producers.notify_all();condition_consumers.wait(lock, [this] { return this->workers.empty(); });assert(in_flight == 0);}inline void ThreadPool::wait_until_empty(){std::unique_lock<std::mutex> lock(this->queue_mutex);this->condition_producers.wait(lock,[this] { return this->tasks.empty(); });}inline void ThreadPool::wait_until_nothing_in_flight(){std::unique_lock<std::mutex> lock(this->in_flight_mutex);this->in_flight_condition.wait(lock,[this] { return this->in_flight == 0; });}inline void ThreadPool::set_queue_size_limit(std::size_t limit){std::unique_lock<std::mutex> lock(this->queue_mutex);if (stop)return;std::size_t const old_limit = max_queue_size;max_queue_size = (std::max)(limit, std::size_t(1));if (old_limit < max_queue_size)condition_producers.notify_all();}inline void ThreadPool::set_pool_size(std::size_t limit){if (limit < 1)limit = 1;std::unique_lock<std::mutex> lock(this->queue_mutex);if (stop)return;std::size_t const old_size = pool_size;assert(this->workers.size() >= old_size);pool_size = limit;if (pool_size > old_size){// create new worker threads// it is possible that some of these are still running because// they have not stopped yet after a pool size reduction, such// workers will just keep runningfor (std::size_t i = old_size; i != pool_size; ++i)start_worker(i, lock);}else if (pool_size < old_size)// notify all worker threads to start downsizingthis->condition_consumers.notify_all();}inline void ThreadPool::start_worker(std::size_t worker_number, std::unique_lock<std::mutex> const &lock){assert(lock.owns_lock() && lock.mutex() == &this->queue_mutex);assert(worker_number <= this->workers.size());auto worker_func =[this, worker_number]{for (;;){std::function<void()> task;bool notify;{std::unique_lock<std::mutex> lock(this->queue_mutex);this->condition_consumers.wait(lock,[this, worker_number] {return this->stop || !this->tasks.empty()|| pool_size < worker_number + 1; });// deal with downsizing of thread pool or shutdownif ((this->stop && this->tasks.empty())|| (!this->stop && pool_size < worker_number + 1)){// detach this worker, effectively marking it stoppedthis->workers[worker_number].detach();// downsize the workers vector as much as possiblewhile (this->workers.size() > pool_size&& !this->workers.back().joinable())this->workers.pop_back();// if this is was last worker, notify the destructorif (this->workers.empty())this->condition_consumers.notify_all();return;}else if (!this->tasks.empty()){task = std::move(this->tasks.front());this->tasks.pop();notify = this->tasks.size() + 1 == max_queue_size|| this->tasks.empty();}elsecontinue;}handle_in_flight_decrement guard(*this);if (notify){std::unique_lock<std::mutex> lock(this->queue_mutex);condition_producers.notify_all();}task();}};if (worker_number < this->workers.size()) {std::thread & worker = this->workers[worker_number];// start only if not already runningif (!worker.joinable()) {worker = std::thread(worker_func);}}elsethis->workers.push_back(std::thread(worker_func));}} // namespace progschj#endif // THREAD_POOL_H_7ea1ee6b_4f17_4c09_b76b_3d44e102400c
Demo
#include <iostream>
#include <vector>
#include <chrono>#include "ThreadPool.h"using namespace progschj;int main()
{ThreadPool pool;std::vector< std::future<int> > results;for(int i = 0; i < 8; ++i) {results.emplace_back(pool.enqueue([i] {std::cout << "hello " << i << std::endl;std::this_thread::sleep_for(std::chrono::seconds(1));std::cout << "world " << i << std::endl;return i*i;}));}pool.wait_until_empty();pool.wait_until_nothing_in_flight ();for(auto && result: results)std::cout << result.get() << ' ';std::cout << std::endl;return 0;
}

这两句代码是可以保证所有线程都收回时再进行下一步。

pool.wait_until_empty();
pool.wait_until_nothing_in_flight ();

上述代码是引用于下面这个代码:

Github: https://github.com/progschj/ThreadPool

有人对这个代码做简单的解释,可以参考:

http://www.nodekey.com/threadpool-e6-b3-a8-e8-a7-a3/
使用线程池加速的时候可以用Lambda表达,也可以这么写:
mpool.enqueue(Function_name, 函数参数1, 参数2, 参数3);

但是如果Function是成员函数,那么就会出现下面这个恶心的问题:

non-standard syntax; use '&' to create a pointer to member

具体解释可以参考:

https://www.cnblogs.com/blog-vincent-0x1F7/p/9668533.html
https://linustechtips.com/topic/772287-unable-to-use-stdthread-part-2/

解决办法是:

 有两种方法可以实现调用类成员,一种是使用   bind: .enqueue(std::bind(&Dog::sayHello, &dog));一种是用   mem_fn: .enqueue(std::mem_fn(&Dog::sayHello), this)

基于C++实现线程池加速相关推荐

  1. 基于EPOLL+多进程+线程池的server框架设想_程序世界_百度空间

    基于EPOLL+多进程+线程池的server框架设想_程序世界_百度空间 基于EPOLL+多进程+线程池的server框架设想 最近,看了几个开源代码的server框架,有了一些自己的想法,把它记下来 ...

  2. hystrix 源码 线程池隔离_基于hystrix的线程池隔离

    hystrix进行资源隔离,其实是提供了一个抽象,叫做command,就是说,你如果要把对某一个依赖服务的所有调用请求,全部隔离在同一份资源池内 对这个依赖服务的所有调用请求,全部走这个资源池内的资源 ...

  3. 学习笔记(35续):Python网络编程并发编程-基于gevent及线程池实现的并发套接字通讯

    1.基于线程池实现并发套接字通讯:因为套接字涉及地是I/O密集模型,因此使用多线程会有高效率 ''' 服务器 '''#基于线程池完成并发的套接字通讯 from socket import * from ...

  4. 开源项目SMSS开发指南(二)——基于libevent的线程池

    libevent是一套轻量级的网络库,基于事件驱动开发.能够实现多线程的多路复用和注册事件响应.本文将介绍libevent的基本功能以及如何利用libevent开发一个线程池. 一. 使用指南 监听服 ...

  5. C++ 并发编程(四):基于 Asio 的线程池

    目前项目中使用的线程池(详见:http://threadpool.sourceforge...),虽然能用,但是代码复杂且很久没有人维护了. 本文结合 Thread 和 Asio,实现了一个线程池.一 ...

  6. 基于requests模块的cookie,session和线程池爬取

    基于requests模块的cookie,session和线程池爬取 有些时候,我们在使用爬虫程序去爬取一些用户相关信息的数据(爬取张三"人人网"个人主页数据)时,如果使用之前req ...

  7. Java线程池实现原理及其在美团业务中的实践

    来自:美团技术团队 随着计算机行业的飞速发展,摩尔定律逐渐失效,多核CPU成为主流.使用多线程并行计算逐渐成为开发人员提升服务器性能的基本武器.J.U.C提供的线程池ThreadPoolExecuto ...

  8. 线程池在美团的最佳实践

    随着计算机行业的飞速发展,摩尔定律逐渐失效,多核CPU成为主流.使用多线程并行计算逐渐成为开发人员提升服务器性能的基本武器.J.U.C提供的线程池:ThreadPoolExecutor类,帮助开发人员 ...

  9. 【有料】Java线程池实现原理及其在美团业务中的实践

    随着计算机行业的飞速发展,摩尔定律逐渐失效,多核CPU成为主流.使用多线程并行计算逐渐成为开发人员提升服务器性能的基本武器.J.U.C提供的线程池:ThreadPoolExecutor类,帮助开发人员 ...

最新文章

  1. 从源码分析DEARGUI之add_text_point
  2. element ui 中el-input搜索输入框或者普通输入框无法输入的问题讨论
  3. 拦截器HandlerInterceptor、ResponseBodyAdvice和@ExceptionHandler执行顺序
  4. C#-利用Marshal类实现序列化
  5. MapReduce入门2-流量监控
  6. 工作338:pc重置筛选条件
  7. QT关于Excel的操作
  8. 3D建模软件:犀牛Rhino 7.16.22067.13002
  9. 【control】模型预测控制(MPC)
  10. 【流程发现算法概述】
  11. 负反馈放大电路的四种组态
  12. 基于STM32的简易数码相册
  13. JVM3-类文件结构
  14. Win7系统还原,创建还原点,永久保存自定义还原点,不被删除
  15. 《现代密码学》学习笔记——第三章 分组密码 [三]分组密码的运行模式
  16. HDU-6441-Find Integer-费马大定理+奇偶数列法则
  17. 笔记本电脑睡眠或休眠无法唤醒怎么办?
  18. 监控prometheus-2
  19. mysql 1264_关于MySQL的1264错误处理及sql_mode设置
  20. 长短期记忆网络LSTM

热门文章

  1. pfSense 2.4.3 发布,包含重要的安全修复补丁
  2. Noip2017 跳房子——普及组
  3. 计算机房一般在办公楼建设吗,写字楼大厦机房建设技术方案.doc
  4. python科学计算整理
  5. SQL Server 远程无法连接
  6. 《帝企鹅日记》观后感
  7. .NET 4.0 Interop新特性ICustomQueryInterface (转载)
  8. php中操作mysql的函数库
  9. 标准c语言怎么绘图,C语言绘图问题
  10. pat 食物链(状态压缩求哈密顿回路)