1.线程池简介

我们知道在线程池是一种多线程处理形式,处理过程中我们将相应的任务提交给线程池,线程池会分配对应的工作线程执行任务或存放在任务队列中,等待执行。

面向对象编程中,创建和销毁对象是需要消耗一定时间的,因为创建一个对象要获取内存资源或者其它更多资源。所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁。如何利用已有对象来服务就是一个需要解决的关键问题,其实这就是一些"池化资源"技术产生的原因。当然线程池也同样适用这种思想。

因为线程的创建和销毁时需要消耗一定的时间的。假设,线程的创建消耗T1,线程执行任务的时间T2,线程的销毁销毁T3。当T1 + T3 > T2时候,使用线程池技术,通过线程的复用,就能提高程序的性能。

2.线程池的作用

  1. 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
  2. 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
  3. 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,并出现"OutOfMemory"的错误。
3.线程池的实现
我们这里实现的线程池是通过类CThreadPool来实现的。这个线程池类的构造函数如下所示:
[cpp] view plain copy
  1. CThreadPool(int corePoolSize, int maximumPoolSize, int keepAliveTime, CBlockingQueue<Task> *pQueue);

构造函数参数的含义:

  1. corePoolSize:核心池的大小,在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,默认情况下,在创建了线程池之后,线程池中的线程数为0,,当有任务到来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把达到的任务放到缓存对队列当中。
  2. maximumPoolSize:线程池最大线程数,表示在线程池中最多能创建多少个线程。
  3. keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。
  4. CBlockingQueue<Task>:任务队列,用来存放提交到线程池的任务。

我们实现的线程池的大概原理是:在创建线程池之后,线程池中并没有任何工作线程的,当使用线程池提供的execute方法,向线程池提交任务的时候,如果线程池中存在空闲的工作线程,那么就会复用该工作线程,并不会去创建新的工作线程;但是如果没有工作线程或空闲的工作线程,并且当然的工作线程数量小于核心池的大小时候,会创建一个工作线程去执行任务,若当前的工作线程数量达到了核心池的数量,那么就会将任务放入到队列中去;若队列满的情况下,如果没有达到线程池的最大线程数,那么就会将创建新的工作线程去执行任务;若线程数达到了最大的线程数,那么我们是抛出异常,这里没有提供一个拒绝的策略,后续有时间的会处理,目前就向采用抛出异常;并且在当前的工作线程数大于核心池数的时候,会有超时机制,关闭指定某时间的空闲工作线程,直到等于核心池的大小。当然,目前的实现还是有缺陷的,线程是结束了,当时并没释放到资源,目前没想到好的方法。

为了简化线程池的配置,我们提供了一个工厂类来进行线程池的创建。我们的工厂类支持创建三种线程池:

  1. newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
  2. newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
  3. newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

整个实现的原理就是这样,下面 直接上实现,我们的实现只适用Linux上,并且需要依赖boost库。一些实现是之前写的库。这里只贴出新实现的代码。

1.FactoryThreadPool.h

[cpp] view plain copy
  1. #ifndef __FACTORY_THREAD_POOL_H__
  2. #define __FACTORY_THREAD_POOL_H__
  3. #include "ThreadPool.h"
  4. #include "LinkedBlockingQueue.h"
  5. //创建线程池的工厂类,一般直接使用工厂类创建所需的线程池
  6. //共提供三种线程池:固定大小线程池、可缓存的线程池、单个后台线程
  7. class CFactoryThreadPool
  8. {
  9. public:
  10. CFactoryThreadPool() { }
  11. ~CFactoryThreadPool() { }
  12. //固定大小线程池。每次提交一个任务就创建一个线程池,直到线程池达到
  13. //线程池的最大大小。线程池的大小一旦达到最大值就会保持不变。
  14. CAbstractThreadPool *newFixedThreadPool(int iThreads)
  15. {
  16. CBlockingQueue<Task> *pBlockingQueue = new CLinkedBlockingQueue<Task>();
  17. return new CThreadPool(iThreads, iThreads, 0, pBlockingQueue);
  18. }
  19. //可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么
  20. //就会回收部分空闲(60s不执行任务)的线程,当任务数增加是,此线程池
  21. //又可以智能的添加新线程来处理任务,线程池的大小依赖于操作系统能创建
  22. //的最大线程的大小
  23. CAbstractThreadPool *newCachedThreadPool()
  24. {
  25. CBlockingQueue<Task> *pBlockingQueue = new CLinkedBlockingQueue<Task>(1);
  26. return new CThreadPool(0, INT_MAX, 60, pBlockingQueue);
  27. }
  28. //单个后台线程。这个线程池只有一个线程在工作,也就是相当于单线程串行
  29. //执行所有任务。保证所有任务的执行按照提交的顺序。
  30. CAbstractThreadPool * newSingleThreadExecutor()
  31. {
  32. CBlockingQueue<Task> *pBlockingQueue = new CLinkedBlockingQueue<Task>();
  33. return new CThreadPool(1, 1, 0, pBlockingQueue);
  34. }
  35. };
  36. #endif  //#ifndef __FACTORY_THREAD_POOL_H__

2.AbstractThreadPool.h

[cpp] view plain copy
  1. #ifndef __ABSTRACT_THREAD_POOL_H__
  2. #define __ABSTRACT_THREAD_POOL_H__
  3. #include "TaskFuncExecute.h"
  4. //抽象基类
  5. class CAbstractThreadPool
  6. {
  7. public:
  8. CAbstractThreadPool() { }
  9. virtual ~CAbstractThreadPool() { }
  10. virtual void execute(const Task &task) = 0;
  11. virtual void shutdown() = 0;
  12. };
  13. #endif  //#ifndef __ABSTRACT_THREAD_POOL_H__

3.TaskFuncExecute.h

[cpp] view plain copy
  1. #ifndef __TASK_FUNC_EXECUTE_H__
  2. #define __TASK_FUNC_EXECUTE_H__
  3. #include <boost/function.hpp>
  4. typedef boost::function<void(void)>   Task;
  5. //类CTaskFuncExecute,用来执行boost::function函数对象,里面实现了execute
  6. class CTaskFuncExecute
  7. {
  8. public:
  9. CTaskFuncExecute(const Task &task = nullTask)
  10. :m_TaskFunction(task)
  11. {
  12. }
  13. ~CTaskFuncExecute()
  14. {
  15. }
  16. CTaskFuncExecute(const CTaskFuncExecute &Excute)
  17. {
  18. m_TaskFunction = Excute.m_TaskFunction;
  19. }
  20. CTaskFuncExecute &operator=(const CTaskFuncExecute &Excute)
  21. {
  22. m_TaskFunction = Excute.m_TaskFunction;
  23. return *this;
  24. }
  25. void execute()
  26. {
  27. m_TaskFunction();
  28. }
  29. //定义一个空任务,实际上不做什么
  30. static void nullTask(void)
  31. {
  32. //nothing
  33. }
  34. private:
  35. Task m_TaskFunction;
  36. };
  37. #endif  //#ifndef __TASK_FUNC_EXECUTE_H__

4.Thread.h

[cpp] view plain copy
  1. #ifndef __THREAD_H__
  2. #define __THREAD_H__
  3. #include <pthread.h>
  4. #include <string>
  5. #include <boost/function.hpp>
  6. #include "TaskFuncExecute.h"
  7. #include "Condition.h"
  8. //CThread实现了对Linux中的一些线程操作的封装
  9. class CThread
  10. {
  11. friend void *startThread(void *pObj);
  12. public:
  13. typedef boost::function<void(void)> ThreadFunc;
  14. enum ThreadStatus {RUNNING, EXIT, JOIN};
  15. CThread(const ThreadFunc &Func, const std::string &strName);
  16. virtual ~CThread();
  17. void start(const Task &InitTask);
  18. int join();
  19. void exit(void *rval_ptr);
  20. void setname(std::string &strName);
  21. const std::string &name(void);
  22. pthread_t getSelfId();
  23. ThreadStatus getThreadStatus();
  24. private:
  25. ThreadStatus m_Status;          //线程的状态
  26. pthread_t m_tId;                //线程标识
  27. ThreadFunc m_Func;
  28. CMutexLock m_Mutex;
  29. std::string m_strThreadName;    //线程名
  30. };
  31. #endif  //#ifndef __THREAD_H__

5.thread.cpp

[cpp] view plain copy
  1. #include "Thread.h"
  2. #include "Logger.h"
  3. #include "Exception.h"
  4. using namespace Log;
  5. using namespace Exception;
  6. //一个辅助类,实现运行boost::function函数对象
  7. struct threadData
  8. {
  9. typedef CThread::ThreadFunc ThreadFunc;
  10. ThreadFunc m_Func;
  11. CThread *m_pThis;
  12. Task m_Task;
  13. threadData(const ThreadFunc &Func, CThread *pThis, const Task &InitTask)
  14. :m_Func(Func)
  15. ,m_pThis(pThis)
  16. ,m_Task(InitTask)
  17. {
  18. }
  19. void runThreadFunc()
  20. {
  21. try
  22. {
  23. m_Func();
  24. }
  25. catch (std::exception &ex)
  26. {
  27. LOG_FATAL << "runThreadFunc exception : " << ex.what();
  28. }
  29. catch (...)
  30. {
  31. LOG_FATAL << "runThreadFunc unknow exception";
  32. }
  33. }
  34. };
  35. void *startThread(void *pObj)
  36. {
  37. try
  38. {
  39. if (pObj == NULL)
  40. throw CException("startThread parament obj is null");
  41. threadData *pData = static_cast<threadData *>(pObj);
  42. pData->m_Task();
  43. pData->runThreadFunc();
  44. pData->m_pThis->m_Mutex.lock();
  45. pData->m_pThis->m_Status = CThread::EXIT;
  46. delete pData;
  47. pData->m_pThis->m_Mutex.unlock();
  48. }
  49. catch (const CException &ex)
  50. {
  51. LOG_FATAL << "throw exception : " << ex.what();
  52. }
  53. return NULL;
  54. }
  55. CThread::CThread(const ThreadFunc &Func, const std::string &strName)
  56. :m_Status(RUNNING)
  57. ,m_tId(0)
  58. ,m_Func(Func)
  59. ,m_Mutex()
  60. ,m_strThreadName(strName)
  61. {
  62. }
  63. CThread::~CThread()
  64. {
  65. if (m_Status != JOIN)
  66. {
  67. ::pthread_detach(m_tId);
  68. }
  69. }
  70. void CThread::start(const Task &InitTask)
  71. {
  72. threadData *pData = new threadData(m_Func, this, InitTask);
  73. int iRet = ::pthread_create(&m_tId, NULL, startThread, pData);
  74. if (iRet != 0)
  75. {
  76. //创建线程失败?认为这是个致命错误,会终止程序的运行
  77. LOG_FATAL << "pthread_create false return err " << iRet;
  78. }
  79. LOG_INFO << "create thread : " << m_strThreadName << ",tid = " << m_tId;
  80. }
  81. int CThread::join()
  82. {
  83. m_Mutex.lock();
  84. if (m_Status == JOIN)
  85. {
  86. m_Mutex.unlock();
  87. //重复的调用join,这里需要向日志系统输出错误信息。
  88. LOG_ERROR << "repeat call pthread_join";
  89. return -1;
  90. }
  91. m_Mutex.unlock();
  92. LOG_INFO << "join thread, tid = " << m_tId;
  93. int iRet = ::pthread_join(m_tId, NULL);
  94. m_Mutex.lock();
  95. m_Status = JOIN;
  96. m_Mutex.unlock();
  97. return iRet;
  98. }
  99. void CThread::exit(void *rval_ptr)
  100. {
  101. ::pthread_exit(rval_ptr);
  102. }
  103. const std::string &CThread::name(void)
  104. {
  105. CMutexLockPart lock(m_Mutex);
  106. return m_strThreadName;
  107. }
  108. void CThread::setname(std::string &strName)
  109. {
  110. CMutexLockPart lock(m_Mutex);
  111. m_strThreadName = strName;
  112. }
  113. pthread_t CThread::getSelfId()
  114. {
  115. return ::pthread_self();
  116. }
  117. CThread::ThreadStatus CThread::getThreadStatus()
  118. {
  119. CMutexLockPart lock(m_Mutex);
  120. return m_Status;
  121. }

6.ThreadPool.h

[cpp] view plain copy
  1. #ifndef __THREAD_POOL_H__
  2. #define __THREAD_POOL_H__
  3. #include <boost/ptr_container/ptr_vector.hpp>
  4. #include "AbstractThreadPool.h"
  5. #include "BlockingQueue.h"
  6. #include "Thread.h"
  7. #include "TaskFuncExecute.h"
  8. //BlockingQueue由使用者申请,在析构程池的时候释放
  9. class CThreadPool   : public CAbstractThreadPool
  10. {
  11. public:
  12. CThreadPool(int corePoolSize, int maximumPoolSize, int keepAliveTime, CBlockingQueue<Task> *pQueue);
  13. ~CThreadPool();
  14. void execute(const Task &task);
  15. void shutdown();
  16. private:
  17. void allWorker(const Task &task);
  18. bool addBlockingQueue(const Task &task);
  19. void runInWorkerThread();
  20. Task getTask();
  21. enum ThreadPoolStatus {RUNNING, SHUTDOWN};
  22. ThreadPoolStatus m_eStatue;
  23. int m_iCorePoolSize;        //核心池的大小
  24. int m_iMaximumPoolSize;     //最大的大小
  25. int m_iKeepAliveTime;       //空闲线程等待新任务的最长时间.0表示不使用
  26. int m_iCurrentThreadSum;    //当前的工作线程
  27. int m_iIdleThreadSum;       //空闲的工作线程
  28. CMutexLock m_MutexLock;
  29. CCondition m_Condition;
  30. CBlockingQueue<Task> *m_pQueue;
  31. boost::ptr_vector<CThread> m_vecWorker;   //所有的工作线程的集合
  32. };
  33. #endif  //#ifndef __THREAD_POOL_H__

7.ThreadPool.cpp

[cpp] view plain copy
  1. #include <boost/bind.hpp>
  2. #include "ThreadPool.h"
  3. #include "Exception.h"
  4. #include "Logger.h"
  5. using namespace Exception;
  6. using namespace Log;
  7. CThreadPool::CThreadPool(int corePoolSize, int maximumPoolSize, int keepAliveTime, CBlockingQueue<Task> *pQueue)
  8. :m_eStatue(RUNNING)
  9. ,m_iCorePoolSize(corePoolSize)
  10. ,m_iMaximumPoolSize(maximumPoolSize)
  11. ,m_iKeepAliveTime(keepAliveTime)
  12. ,m_iCurrentThreadSum(0)
  13. ,m_iIdleThreadSum(0)
  14. ,m_MutexLock()
  15. ,m_Condition(m_MutexLock)
  16. ,m_pQueue(pQueue)
  17. ,m_vecWorker()
  18. {
  19. m_vecWorker.clear();
  20. }
  21. CThreadPool::~CThreadPool()
  22. {
  23. m_vecWorker.erase(m_vecWorker.begin(), m_vecWorker.end());
  24. //队列由工厂类创建,这里释放掉
  25. if (m_pQueue != NULL)
  26. delete m_pQueue;    //这里需要释放队列
  27. }
  28. void CThreadPool::execute(const Task &task)
  29. {
  30. //为了简化实现,这里没有提供可选的策略。后续有时间可完善
  31. try
  32. {
  33. if (m_eStatue == RUNNING)
  34. {
  35. m_MutexLock.lock();
  36. if (m_iCorePoolSize == 0 || m_iCurrentThreadSum < m_iCorePoolSize)
  37. {
  38. if (m_iIdleThreadSum != 0)
  39. {
  40. m_MutexLock.unlock();
  41. //有空闲的工作线程,直接放到队列去
  42. (void)addBlockingQueue(task);
  43. }
  44. else
  45. {
  46. m_MutexLock.unlock();
  47. //继续分配工作线程
  48. allWorker(task);
  49. }
  50. }
  51. else
  52. {
  53. if (m_pQueue && m_pQueue->full())
  54. {
  55. m_MutexLock.unlock();
  56. allWorker(task);
  57. }
  58. else
  59. {
  60. m_MutexLock.unlock();
  61. (void)addBlockingQueue(task);
  62. }
  63. }
  64. }
  65. else
  66. {
  67. //线程池处于SHUTDOWN状态,此时如果在提交任务,
  68. //我们会抛出异常,这就是我们使用的默认策略
  69. m_MutexLock.unlock();
  70. throw CException("ThreadPool status SHUTDOWN!");
  71. }
  72. }
  73. catch (const CException &ex)
  74. {
  75. LOG_ERROR << "Throw exception : " << ex.what();
  76. }
  77. }
  78. void CThreadPool::shutdown()
  79. {
  80. m_MutexLock.lock();
  81. m_eStatue = SHUTDOWN;
  82. m_Condition.broadcast();
  83. m_MutexLock.unlock();
  84. for (boost::ptr_vector<CThread>::iterator iter = m_vecWorker.begin();
  85. iter != m_vecWorker.end(); ++iter)
  86. {
  87. iter->join();
  88. }
  89. }
  90. void CThreadPool::runInWorkerThread()
  91. {
  92. try
  93. {
  94. for(;;)
  95. {
  96. m_MutexLock .lock();
  97. if (m_eStatue == SHUTDOWN && m_pQueue && m_pQueue->empty())
  98. {
  99. m_MutexLock.unlock();
  100. return;
  101. }
  102. m_MutexLock.unlock();
  103. Task task = getTask();
  104. if (task == NULL)
  105. return;
  106. task();             //执行任务
  107. }
  108. }
  109. catch (const CException &ex)
  110. {
  111. LOG_ERROR << "runInWorkerThread throw exeception " << ex.what();
  112. }
  113. catch (...)
  114. {
  115. LOG_ERROR << "runInWorkerThread unknow error";
  116. }
  117. }
  118. Task CThreadPool::getTask()
  119. {
  120. CMutexLockPart lock(m_MutexLock);
  121. while (m_pQueue && m_pQueue->empty() && m_eStatue == RUNNING)
  122. {
  123. ++m_iIdleThreadSum;
  124. if (m_iKeepAliveTime == 0 || (m_iCorePoolSize && m_iCurrentThreadSum < m_iCorePoolSize))
  125. {
  126. m_Condition.wait();
  127. }
  128. else
  129. {
  130. bool bRet = m_Condition.waitForSeconds(m_iKeepAliveTime);
  131. if (bRet == true)
  132. {
  133. --m_iIdleThreadSum;
  134. return NULL;
  135. }
  136. }
  137. --m_iIdleThreadSum;
  138. }
  139. if (m_pQueue->empty() && m_eStatue == SHUTDOWN)
  140. return NULL;
  141. Task task = m_pQueue->poll();
  142. if (m_pQueue && (m_pQueue->size() > 0))
  143. m_Condition.signal();
  144. return task;
  145. }
  146. void CThreadPool::allWorker(const Task &task)
  147. {
  148. try
  149. {
  150. CMutexLockPart lock(m_MutexLock);
  151. if (m_iCurrentThreadSum >= m_iMaximumPoolSize)
  152. throw CException("Current Threads out of Max Threads");
  153. CThread *pWorkerThread = new CThread(boost::bind(&CThreadPool::runInWorkerThread, this), "Worker Thread");
  154. pWorkerThread->start(task);
  155. m_vecWorker.push_back(pWorkerThread);
  156. ++m_iCurrentThreadSum;
  157. }
  158. catch (const CException &ex)
  159. {
  160. LOG_ERROR << "ThreadPool allWorker throw exception " << ex.what();
  161. }
  162. }
  163. bool CThreadPool::addBlockingQueue(const Task &task)
  164. {
  165. try
  166. {
  167. LOG_INFO << "addBlockingQueue";
  168. CMutexLockPart lock(m_MutexLock);
  169. if (m_pQueue && m_pQueue->full())
  170. return false;
  171. if (m_pQueue && !m_pQueue->offer(task))
  172. throw CException("ThreadPool add BlockingQueue false");
  173. m_Condition.signal();
  174. return true;
  175. }
  176. catch (const CException &ex)
  177. {
  178. //这种情况应该是不会出现的,如果出现了,进行日志记录。继续运行
  179. LOG_ERROR << "ThreadPool addBlockingQueue throw exception " << ex.what();
  180. return true;
  181. }
  182. }

8.mainFixedThreadPool.cpp

[cpp] view plain copy
  1. #include "FactoryThreadPool.h"
  2. #include "Logger.h"
  3. #include <stdio.h>
  4. #include <unistd.h>
  5. #include <boost/bind.hpp>
  6. using namespace Log;
  7. //一个计算两个数之和的任务
  8. void Add(int iValue1, int iValue2)
  9. {
  10. LOG_INFO << iValue1 << " + " << iValue2 << " = " << iValue1 + iValue2;
  11. //sleep(1);
  12. }
  13. //这是固定大小线程池的测试代码,这里我们创建了一个固定大小为10的
  14. //线程池,该线程池用来执行计算两数之和。
  15. int main(void)
  16. {
  17. //日志的报警等级为INFO
  18. CLogger::setLogLevel(CLogger::DEBUG);
  19. //CLogger::setLogLevel(CLogger::INFO);
  20. //我们设置日志输出到文件
  21. CLogger::setOutputMode(LOGGER_MODE_LOGFILE);
  22. LOG_DEBUG << "---------------------start-------------------";
  23. LOG_DEBUG << "FixedThreadPool test start!";
  24. CFactoryThreadPool *pFactory = new CFactoryThreadPool();
  25. CAbstractThreadPool *pThreadPool = pFactory->newFixedThreadPool(10);
  26. //向线程池提交200个加法运算的任务
  27. for (int i = 0; i < 200; i++)
  28. {
  29. pThreadPool->execute(boost::bind(Add, i, i));
  30. }
  31. pThreadPool->shutdown();
  32. delete pThreadPool;
  33. delete pFactory;
  34. LOG_DEBUG << "----------------------end--------------------";
  35. return 0;
  36. }

9.mainCachedThreadPool.cpp

[cpp] view plain copy
  1. #include "FactoryThreadPool.h"
  2. #include "Logger.h"
  3. #include <stdio.h>
  4. #include <unistd.h>
  5. #include <boost/bind.hpp>
  6. using namespace Log;
  7. //一个计算两个数之和的任务
  8. void Add(int iValue1, int iValue2)
  9. {
  10. LOG_INFO << iValue1 << " + " << iValue2 << " = " << iValue1 + iValue2;
  11. sleep(1);
  12. }
  13. //这是可缓存线程池的测试代码,这里我们创建了一个固定大小为10的
  14. //线程池,该线程池用来执行计算两数之和。
  15. int main(void)
  16. {
  17. //日志的报警等级为INFO
  18. CLogger::setLogLevel(CLogger::DEBUG);
  19. //CLogger::setLogLevel(CLogger::INFO);
  20. //我们设置日志输出到文件
  21. CLogger::setOutputMode(LOGGER_MODE_LOGFILE);
  22. LOG_DEBUG << "---------------------start-------------------";
  23. LOG_DEBUG << "newCachedThreadPool test start!";
  24. CFactoryThreadPool *pFactory = new CFactoryThreadPool();
  25. CAbstractThreadPool *pThreadPool = pFactory->newCachedThreadPool();
  26. //向线程池提交200个加法运算的任务
  27. for (int i = 0; i < 200; i++)
  28. {
  29. pThreadPool->execute(boost::bind(Add, i, i));
  30. }
  31. pThreadPool->shutdown();
  32. delete pThreadPool;
  33. delete pFactory;
  34. LOG_DEBUG << "----------------------end--------------------";
  35. return 0;
  36. }

10.mainSingleThreadPool.cpp

[cpp] view plain copy
  1. #include "FactoryThreadPool.h"
  2. #include "Logger.h"
  3. #include <stdio.h>
  4. #include <unistd.h>
  5. #include <boost/bind.hpp>
  6. using namespace Log;
  7. //一个计算两个数之和的任务
  8. void Add(int iValue1, int iValue2)
  9. {
  10. LOG_INFO << iValue1 << " + " << iValue2 << " = " << iValue1 + iValue2;
  11. sleep(1);
  12. }
  13. //这是固定大小线程池的测试代码,这里我们创建了一个固定大小为10的
  14. //线程池,该线程池用来执行计算两数之和。
  15. int main(void)
  16. {
  17. //日志的报警等级为INFO
  18. CLogger::setLogLevel(CLogger::DEBUG);
  19. //CLogger::setLogLevel(CLogger::INFO);
  20. //我们设置日志输出到文件
  21. CLogger::setOutputMode(LOGGER_MODE_LOGFILE);
  22. LOG_DEBUG << "---------------------start-------------------";
  23. LOG_DEBUG << "SingleThreadPool test start!";
  24. CFactoryThreadPool *pFactory = new CFactoryThreadPool();
  25. CAbstractThreadPool *pThreadPool = pFactory->newSingleThreadExecutor();
  26. //向线程池提交200个加法运算的任务
  27. for (int i = 0; i < 200; i++)
  28. {
  29. pThreadPool->execute(boost::bind(Add, i, i));
  30. }
  31. pThreadPool->shutdown();
  32. delete pThreadPool;
  33. delete pFactory;
  34. LOG_DEBUG << "----------------------end--------------------";
  35. return 0;
  36. }

11.mainThreadPool.cpp

[cpp] view plain copy
  1. #include "ThreadPool.h"
  2. #include "Logger.h"
  3. #include "LinkedBlockingQueue.h"
  4. #include <stdio.h>
  5. #include <unistd.h>
  6. #include <boost/bind.hpp>
  7. using namespace Log;
  8. //一个计算两个数之和的任务
  9. void Add(int iValue1, int iValue2)
  10. {
  11. LOG_INFO << iValue1 << " + " << iValue2 << " = " << iValue1 + iValue2;
  12. sleep(2);
  13. }
  14. //这是线程池的测试代码,这里我们创建了一个大小为2的
  15. //线程池,该线程池最大允许的线程数为4,
  16. //该线程池用来执行计算两数之和。
  17. int main(void)
  18. {
  19. //日志的报警等级为INFO
  20. CLogger::setLogLevel(CLogger::DEBUG);
  21. //CLogger::setLogLevel(CLogger::INFO);
  22. //我们设置日志输出到文件
  23. CLogger::setOutputMode(LOGGER_MODE_LOGFILE);
  24. LOG_DEBUG << "---------------------start-------------------";
  25. LOG_DEBUG << "CommonThreadPool test start!";
  26. CThreadPool *pThreadPool = new CThreadPool(2, 4, 30, new CLinkedBlockingQueue<Task>(10));
  27. //向线程池提交200个加法运算的任务
  28. for (int i = 0; i < 15; i++)
  29. {
  30. pThreadPool->execute(boost::bind(Add, i, i));
  31. }
  32. pThreadPool->shutdown();
  33. delete pThreadPool;
  34. LOG_DEBUG << "----------------------end--------------------";
  35. return 0;
  36. }

我们共给出了4个测试代码,包括三个工厂类创建 的线程池和直接只有CThreadPool创建的线程池的测试。这里没有贴出具体的测试结果。因为测试结果输出是比较多的。

程序随笔——C++实现的一个线程池相关推荐

  1. 随笔之如何实现一个线程池

    为什么80%的码农都做不了架构师?>>>    一 缘由:     最近因工作问题,需要实现一个简单的线程池,满足一下要求, 可伸缩,即一旦发现线程不够用,则可以动态增加线程.(至于 ...

  2. java 手编线程池_死磕 java线程系列之自己动手写一个线程池

    欢迎关注我的公众号"彤哥读源码",查看更多源码系列文章, 与彤哥一起畅游源码的海洋. (手机横屏看源码更方便) 问题 (1)自己动手写一个线程池需要考虑哪些因素? (2)自己动手写 ...

  3. 一个有趣的问题 : 如何设计一个线程池

    理解Java并发工具包线程池的设计 深度解读 java 线程池设计思想及源码实现 分布式锁unlock 问题产生原因分析: Step 1 :线程A先上同一个锁(Key)(20秒), 然后执行耗时业务, ...

  4. 深读源码-java线程系列之自己手写一个线程池

    问题 (1)自己动手写一个线程池需要考虑哪些因素? (2)自己动手写的线程池如何测试? 简介 线程池是Java并发编程中经常使用到的技术,那么自己如何动手写一个线程池呢?本文将手把手带你写一个可用的线 ...

  5. Rust:如何实现一个线程池?

    本文转自CSDN博文<rust 实战 - 实现一个线程工作池 ThreadPool>,作者:firefantasy. 这是我读过的最简单.描述最清晰的 rust 线程池设计原理和示范代码, ...

  6. 一个线程池中的线程异常了,那么线程池会怎么处理这个线程?

    一个线程池中的线程异常了,那么线程池会怎么处理这个线程? 参考文章: (1)一个线程池中的线程异常了,那么线程池会怎么处理这个线程? (2)https://www.cnblogs.com/fangua ...

  7. 工作中如何使用线程池的?自己如何定义一个线程池?

    工作中如何使用线程池的?自己如何定义一个线程池? import java.util.concurrent.*;public class MyThreadPoolDemo {public static ...

  8. JAVA 编写程序实现如下功能:一个线程进行如下运算1*2+2*3+3*4+……+19*20,而另一个线程则每隔一段时间读取前一个线程的运算结果。

    编写程序实现如下功能:一个线程进行如下运算12+23+34+--+1920,而另一个线程则每隔一段时间读取前一个线程的运算结果. class Count{private int sum;Count(i ...

  9. 设置iis网页服务器cpu占比,为什么iis的一个线程池占了100%cpu

    为什么iis的一个线程池占了快100%cpu, 这个站点是跑asp.net web api的,大多是数据库的操作. 当回收这个线程池后几分钟,cpu使用率就降下来了. 可是隔一天半天的再去服务器看,c ...

最新文章

  1. docker 安装部署 activemq ActiveMQ
  2. ACM PKU 1192 最优连通子集
  3. tf.where 用法
  4. 头条二面:你们公司怎么处理 MySQL 的 Binlog 日志?
  5. ZYNQ UARTLite接收不定长数据
  6. cnblogs修改网站图标icon
  7. Linux-kernel网桥代码分析(二)
  8. mysql ----DML(掌握)
  9. git 查看修改明细_Git(查看修改记录)
  10. 算法:回溯解决电话拨号中的字母组合Letter Combinations of a Phone Number
  11. 企业微信oauth认证_企业微信登陆
  12. win7设置html,Windows 7自动备份设置图解 设置Win7系统备份方式
  13. 33暴力破解(MD5撞击)
  14. coco人体姿态估计标注软件
  15. Java中有几种方法可以实现一个线程??用什么关键字修饰同步方法??stop()和suspend()方法为什么不推荐使用??
  16. 力扣—— 19. 删除链表的倒数第 N 个结点(java)、剑指 Offer 22. 链表中倒数第k个节点(java)
  17. 基于 AHB 总线的 SRAM 控制器设计
  18. 常用字符串API实现(笔试会考)
  19. 51nod lyk与gcd
  20. 成功登陆港交所,顺丰同城的“跑腿”生意好做吗?

热门文章

  1. 总线接口与计算机通信
  2. java中的codereview
  3. android_Text
  4. SQL Server 2005 DTS导入平面数据
  5. confluence 编辑器加载_Onlyoffice集成Confluence的工作原理
  6. matlab main函数_Python 和MATLAB 制作Gif 图像
  7. es springboot 不设置id_springboot整合ES_文档ID删除
  8. mysql断开同步并记录位置_数据库同步自动断开问题的处理
  9. python中print语句
  10. android 心跳效果动画,Android实现心跳的效果