最近看了看我的计划,写道这里也算是到了一半,大部分都是讲的单一的C++11的用法,基本都是理论知识,就像我上大学的时候,老师一直讲理论知识,结局就是能去能不去的时候,我选择了后者。所以在这里穿插一下小的综合运用文章,让大家知道为什么要用C++11,C++11好在哪里,项目中如何运用C++11.

首先介绍一下背景。在我们的工作中,避免不了多线程之间的配合。在现在处理器动辄8核16核的背景下,如果我们的程序还停留在单线程的模型,那么我们就没法享受多处理器带来的性能提成。之前看过我司代码中的threadpool。写的那叫一个滴水不漏,每个小细节都有大量的代码去实现。不但非常冗长,而且以我的智商基本上读不懂。唯一的有点就是:真稳定。不过threadpool的模型已经很难融入现代C++了。

所以有必要通过C++11来重新实现一下threadpool,对比一下modern

C++和C98.

1. 为什么要有threadpool?

如果谈论threadpool,你会想到有什么功能呢?

传统的模型大概是这样的,把一个函数指针传给threadpool。然后thread们会在合适的时候调用这个函数。那么还有一个问题就是函数的返回值怎么传递回调用的线程。这个功能往往有很多种方法,我司的思路就是调用你的callback将函数返回值返回给你。当然不是返回给调用函数的线程。

以上的描述中反映的threadpool的两个最基本的需求:

  1. 可以把一个可执行的对象扔给threadpool去执行。

  2. 可以把执行的返回值带回。

其实这就是threadpool存在的合理性-- 把工作扔给它,我只要知道结果就行。当然任务扔给threadpool后,你就可以去干一些别的工作。

有人会说,扔给threadpool,无非是让别的线程去干活,干的总活并没有减少。相反,一些threadpool的开销反而让工作变的更慢。至于这个问题我想用redis来举例子。

众所周知,redis最新版本支持的多线程。redis的作者在解释为什么引入多线程的时候说过。在他们维护redis的时候,发现redis的瓶颈竟然出现在分配内存上(从socket上拷贝内存)。所以你会发现redis起了多线,只是为了加速内存拷贝,最终的逻辑还是在一个线程执行的。所以可以看出,可以把较慢的代码或者可以流水操作的代码让不同的线程执行。

2. 现代化threadpool提出了什么更高的要求?

之前我们分享过std::function。std::function 是C++11提供的可执行代码的包装器,它可以是一个普通函数,或者是函数指针,或者是lambda...,所以对于我们来说,threadpool也要支持std::function能支持的类型。

关于返回值,还有如何返回到calling thread,之前我们也分享过std::future.

还有就是线程间的同步,之前我们分享过 std::condition_variable。

还有就是thread的包装器std::packaged_task.

至此我们凑齐了实现threadpool的几大件,下面我们看看如何来实现它

3. 原理:

3.1 对象定义

要实现一个threadpool。我们要有以下的信息:

  1. 我们要有个结构体,记住我们控制的thread。

  2. 我们要有个结构体,记住我们要做的事情。

  3. 我们要有个condition_variable来做线程间同步。

  4. 为了优雅的推出,我们要有个标志位,标志着我现在想推出了,大家都退下吧。

功能上要有:

  1. 构造函数

  2. 析构函数

  3. 最重要的 -- 添加任务的函数

class ThreadPool
{public:ThreadPool(size_t);template <class F, class... Args>auto enqueue(F &&f, Args &&... args) -> std::future<typename std::result_of<F(Args...)>::type>;~ThreadPool();private:// need to keep track of threads so we can join themstd::vector<std::thread> workers;// the task queuestd::queue<std::function<void()>> tasks;// synchronizationstd::mutex queue_mutex;std::condition_variable condition;bool stop;
};

3.2 初始化

这里构建了我们需要的thread。并把它放在一个vector里。

这个thread只干一件事,那就是等condition_variable的通知,如果有通知,那么从task queue里边拿出一个task,并执行该task。

当然还有一些判断是否退出的逻辑。

inline ThreadPool::ThreadPool(size_t threads): stop(false)
{for (size_t i = 0; i < threads; ++i)workers.emplace_back([this] {for (;;){std::function<void()> task;{std::unique_lock<std::mutex> lock(this->queue_mutex);this->condition.wait(lock,[this] { return this->stop || !this->tasks.empty(); });if (this->stop && this->tasks.empty())return;task = std::move(this->tasks.front());this->tasks.pop();}task();}});
}

4. 添加任务API

说到函数,不可避免的就是函数的实现和函数的参数,为了实现支持不同的类型,我们选择了用模板来适配。

同时为了得到返回值,我们将返回值设置成了future。那么问题来了,这该如何实现?是不是想起了packaged_task? 如果忘了,回忆一下吧。

packaged_task可以将可执行的工作打包,然后获取它的future。

至此我们就可以实现我们的功能了。思路就是来了一个可执行的工作,首先封装成packaged_task。然后把这个task放到task queue中。并且通知一个线程说queue里边有东西了,赶紧去干活。

在返回之前,得到它的future并返回。

实现如下:

template <class F, class... Args>
auto ThreadPool::enqueue(F &&f, Args &&... args)-> std::future<typename std::result_of<F(Args...)>::type>
{using return_type = typename std::result_of<F(Args...)>::type;auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));std::future<return_type> res = task->get_future();{std::unique_lock<std::mutex> lock(queue_mutex);// don't allow enqueueing after stopping the poolif (stop)throw std::runtime_error("enqueue on stopped ThreadPool");tasks.emplace([task]() { (*task)(); });}condition.notify_one();return res;
}

至此,所有功能都实现了,有了C++11,是不是一切都变得美好了起来,用了60行就实现了以前无数行才能实现的功能,而且简单易懂,支持现代化的C++调用。

5. 完整代码

class ThreadPool
{public:ThreadPool(size_t);template  <class F, class... Args>auto enqueue(F &&f, Args &&... args) -> std::future<typename std::result_of<F(Args...)>::type>;~ThreadPool();private:std::vector<std::thread> workers;std::queue<std::function<void()>> tasks;std::mutex queue_mutex;std::condition_variable condition;bool stop;
};
template <class F, class... Args>
auto ThreadPool::enqueue(F &&f, Args &&... args)-> std::future<typename std::result_of<F(Args...)>::type>
{using return_type = typename std::result_of<F(Args...)>::type;auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));std::future<return_type> res = task->get_future();{std::unique_lock<std::mutex> lock(queue_mutex);// don't allow enqueueing after stopping the poolif (stop)throw std::runtime_error("enqueue on stopped ThreadPool");tasks.emplace([task]() { (*task)(); });}condition.notify_one();return res;
}
inline ThreadPool::ThreadPool(size_t threads): stop(false)
{for (size_t i = 0; i < threads; ++i)workers.emplace_back([this] {for (;;){std::function<void()> task;{std::unique_lock<std::mutex> lock(this->queue_mutex);this->condition.wait(lock,[this] { return this->stop || !this->tasks.empty(); });if (this->stop && this->tasks.empty())return;task = std::move(this->tasks.front());this->tasks.pop();}task();}});
}
inline ThreadPool::~ThreadPool()
{{std::unique_lock<std::mutex> lock(queue_mutex);stop = true;}condition.notify_all();for (std::thread &worker : workers)worker.join();
}
#include <iostream>
#include <vector>
#include <chrono>
#include <map>
#include <thread>
#include <string>
#include <tuple>
#include "MThreadPool.hpp"
#include <log4cplus/logger.h>
//#include <log4cplus/consoleappender.h>
#include <log4cplus/fileappender.h>
#include <log4cplus/layout.h>
//#include <log4cplus/ndc.h>
//#include <log4cplus/mdc.h>
#include <log4cplus/helpers/loglog.h>
#include <log4cplus/thread/threads.h>
//#include <log4cplus/helpers/sleep.h>
#include <log4cplus/loggingmacros.h>using namespace std;
using namespace log4cplus;
using namespace log4cplus::helpers;
Logger logger = Logger::getInstance(LOG4CPLUS_TEXT("Max:"));using namespace std;
typedef tuple<string, int> M_TUPLE;
typedef std::function<M_TUPLE()> M_FUNCTION;
vector<std::function<tuple<string, int>()>> M_VECTOR;
typedef std::lock_guard<std::recursive_mutex> M_GUARD;
typedef std::unique_lock<std::recursive_mutex> M_UNIQUE;
std::recursive_mutex func_mutex;enum RET_CODE
{M_SUCCESS = 0,M_FAIL,M_MAX
};M_TUPLE M_put()
{std::string M_func(__func__);// fucntion bodyLOG4CPLUS_DEBUG(logger, "hello!");std::this_thread::sleep_for(std::chrono::microseconds(100)); //seconds(1));LOG4CPLUS_DEBUG(logger, "world!");return std::make_tuple(M_func, M_SUCCESS);
}
void M_LOG()
{
}int main()
{log4cplus::initialize();try{SharedObjectPtr<Appender> append_1(new FileAppender("Test.log"));append_1->setName(LOG4CPLUS_TEXT("First"));log4cplus::tstring pattern = LOG4CPLUS_TEXT("[%d{%m/%d/%y %H:%M:%S,%Q}] %c %-5p - %m [%l]%n");//  std::tstring pattern = LOG4CPLUS_TEXT("%d{%c} [%t] %-5p [%.15c{3}] %%%x%% - %m [%l]%n");append_1->setLayout(std::auto_ptr<Layout>(new PatternLayout(pattern)));Logger::getRoot().addAppender(append_1);logger.setLogLevel(DEBUG_LOG_LEVEL);}catch (...){Logger::getRoot().log(FATAL_LOG_LEVEL, LOG4CPLUS_TEXT("Exception occured..."));}LOG4CPLUS_DEBUG(logger, "set logger done!"<< "\nhello log4cplus\n");int thread_num = std::thread::hardware_concurrency();if (!thread_num){thread_num = 1;}M_VECTOR.push_back(M_put);M_VECTOR.push_back(M_put);M_VECTOR.push_back(M_put);M_VECTOR.push_back(M_put);std::cout << " start " << thread_num << "threads" << std::endl;ThreadPool pool(thread_num);std::vector<std::future<M_TUPLE>> results;M_FUNCTION tmp;while (!M_VECTOR.empty()){{M_GUARD lock1(func_mutex);tmp = M_VECTOR.back();}results.emplace_back(pool.enqueue([=] {return tmp();}));{M_GUARD lock1(func_mutex);M_VECTOR.pop_back();}}for (auto &&result : results){std::string tmp_str;int tmp_bool;tie(tmp_str, tmp_bool) = result.get();cout << "string is " << tmp_str << "bool is " << tmp_bool << endl;}std::cout << std::flush;return 0;
}

用C++11 实现 thread pool相关推荐

  1. mysql5.6 thread pool_mysql5.6 thread pool

    从percona 的压测来看,确实很牛笔啊.提升很大. http://www.mysqlperformanceblog.com/2014/01/29/percona-server-thread-poo ...

  2. Reporting Service 告警w WARN: Thread pool pressure. Using current thread for a work item

    如果Reporting Service偶尔出现不可访问或访问出错情况,这种情况一般没有做监控的话,很难捕捉到.出现这种问题,最好检查Reporting Service的日志文件. 今天早上就遇到这样一 ...

  3. 【案例】常驻查询引发的thread pool 性能问题之二

    一 现象     某业务单机4个实例中的一个实例出现连接数远高于其他三个实例(正常是4K,问题实例是8K+),但是这4个实例的配置完全相同.业务开发反馈为部分连接失败.     执行show proc ...

  4. 白话Elasticsearch67-不随意调节jvm和thread pool的原因jvm和服务器内存分配的最佳实践

    文章目录 概述 不随意调节jvm和thread pool的原因 jvm gc threadpool jvm和服务器内存分配的最佳实践 jvm heap分配 将机器上少于一半的内存分配给es 为什么不要 ...

  5. Thread pool引起的程序连接数据库响应慢

    数据库版本:percona-mysql 5.6.16 ​在很长一段时间,都会出现程序连接数据库,出现响应慢的情况,正常在几到几十毫秒之间,但是偶尔会出现上百毫秒的情况: 开始由于开发重新设置并调整过程 ...

  6. 自定义parallelStream的thread pool

    文章目录 简介 通常操作 使用自定义ForkJoinPool 总结 自定义parallelStream的thread pool 简介 之前我们讲到parallelStream的底层使用到了ForkJo ...

  7. worksteal thread pool

    worksteal的场景 对于一个线程池,每个线程有一个队列,想象这种场景,有的线程队列中有大量的比较耗时的任务堆积,而有的线程队列却是空的,现象就是有的线程处于饥饿状态,而有的线程处于消化不良的状态 ...

  8. c++11中thread join和detach的区别

    线程状态: 在一个线程的生存期内,可以在多种状态之间转换,不同的操作系统可以实现不同的线程模型,定义许多不同的线程状态,每个状态还可以包含多个子状态,但大体来说,如下几种状态是通用的: 1)就绪:参与 ...

  9. MySQL Thread pool 操作过程

    Thread pool 操作过程:      thread pool 包含一定数量的 thread groups,每个groups 管理一定量的client connections,当mysql建立 ...

最新文章

  1. SharePoint 2010 新体验5 - Office Web Applications
  2. 混沌图像---三翅鹰
  3. 免费参会!百度智能云:从编码到网络传输,揭秘低延音视频背后的技术架构...
  4. ADSL使用注意事项
  5. C++学习——static
  6. 影响索引的mysql函数_mysql索引对排序的影响实例分析
  7. 第2章[2.5] Ext JS组件、容器与布局
  8. Node单线程高并发原理
  9. Linux下压缩、解压缩、效率,linux tar bz、bz2、gz、zip
  10. foxmail邮件加载失败重试_Foxmail提示错误的解决方案
  11. 使用Emit反射建立运行时模型
  12. 获取桌面DC: GetDC(GetDesktopWindow())与GetDC(NULL)
  13. EVEREST工具---检测硬件
  14. VOIP Codec 三剑客之 ISAC/ILBC -- ISAC (6) Spectrum Encode 模块
  15. clip_gradient_norms()
  16. 4.2.5 Kafka集群与运维(集群的搭建、监控工具 Kafka Eagle)
  17. R语言 配对t检验,对子变量到底填在哪?t.test paired=TRUE???
  18. Mac上如何用自带软件剪切音频(去除多余杂音)?
  19. Python:实现测试信用卡号码有效性credit card validator的算法(附完整源码)
  20. 12月编程语言排行榜公布啦~

热门文章

  1. BZOJ 2742: [HEOI2012]Akai的数学作业
  2. C#调用WSC(Windows Script Component)
  3. 【转】C#委托事件浅析
  4. 去除U盘插入后自动弹框的问题
  5. [LeetCode] Add Digits - 数字各个位数求和
  6. iOS之深入解析AppDelegate重构
  7. LeetCode Algorithm 面试题 10.05. 稀疏数组搜索
  8. Python设计模式之外观模式实例讲解
  9. Netty实战 IM即时通讯系统(三)Netty环境配置
  10. 【Qt】QModbusServer类