本文给出了一个通用的线程池框架,该框架将与线程执行相关的任务进行了高层次的抽象,使之与具体的执行任务无关。另外该线程池具有动态伸缩性,它能根据执行任务的轻重自动调整线程池中线程的数量。文章的最后,我们给出一个简单示例程序,通过该示例程序,我们会发现,通过该线程池框架执行多线程任务是多么的简单。
为什么需要线程池
目前的大多数网络服务器,包括Web服务器、Email服务器以及数据库服务器等都具有一个共同点,就是单位时间内必须处理数目巨大的连接请求,但处理时间却相对较短。
传统多线程方案中我们采用的服务器模型则是一旦接受到请求之后,即创建一个新的线程,由该线程执行任务。任务执行完毕后,线程退出,这就是是“即时创建,即时销毁”的策略。尽管与创建进程相比,创建线程的时间已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执行次数极其频繁,那么服务器将处于不停的创建线程,销毁线程的状态。
我们将传统方案中的线程执行过程分为三个过程:T1、T2、T3。
T1:线程创建时间
T2:线程执行时间,包括线程的同步等时间
T3:线程销毁时间
那么我们可以看出,线程本身的开销所占的比例为(T1+T3) / (T1+T2+T3)。如果线程执行的时间很短的话,这比开销可能占到20%-50%左右。如果任务执行时间很频繁的话,这笔开销将是不可忽略的。
除此之外,线程池能够减少创建的线程个数。通常线程池所允许的并发线程是有上界的,如果同时需要并发的线程数超过上界,那么一部分线程将会等待。而传统方案中,如果同时请求数目为2000,那么最坏情况下,系统可能需要产生2000个线程。尽管这不是一个很大的数目,但是也有部分机器可能达不到这种要求。
因此线程池的出现正是着眼于减少线程池本身带来的开销。线程池采用预创建的技术,在应用程序启动之后,将立即创建一定数量的线程(N1),放入空闲队列中。这些线程都是处于阻塞(Suspended)状态,不消耗CPU,但占用较小的内存空间。当任务到来后,缓冲池选择一个空闲线程,把任务传入此线程中运行。当N1个线程都在处理任务后,缓冲池自动创建一定数量的新线程,用于处理更多的任务。在任务执行完毕后线程也不退出,而是继续保持在池中等待下一次的任务。当系统比较空闲时,大部分线程都一直处于暂停状态,线程池自动销毁一部分线程,回收系统资源。
基于这种预创建技术,线程池将线程创建和销毁本身所带来的开销分摊到了各个具体的任务上,执行次数越多,每个任务所分担到的线程本身开销则越小,不过我们另外可能需要考虑进去线程之间同步所带来的开销。

构建线程池框架
一般线程池都必须具备下面几个组成部分:
线程池管理器:用于创建并管理线程池
工作线程: 线程池中实际执行的线程
任务接口: 尽管线程池大多数情况下是用来支持网络服务器,但是我们将线程执行的任务抽象出来,形成任务接口,从而是的线程池与具体的任务无关。
任务队列:线程池的概念具体到实现则可能是队列,链表之类的数据结构,其中保存执行线程。
我们实现的通用线程池框架由五个重要部分组成CThreadManage,CThreadPool,CThread,CJob,CWorkerThread,除此之外框架中还包括线程同步使用的类CThreadMutex和CCondition。
CJob是所有的任务的基类,其提供一个接口Run,所有的任务类都必须从该类继承,同时实现Run方法。该方法中实现具体的任务逻辑。
CThread是Linux中线程的包装,其封装了Linux线程最经常使用的属性和方法,它也是一个抽象类,是所有线程类的基类,具有一个接口Run。
CWorkerThread是实际被调度和执行的线程类,其从CThread继承而来,实现了CThread中的Run方法。
CThreadPool是线程池类,其负责保存线程,释放线程以及调度线程。
CThreadManage是线程池与用户的直接接口,其屏蔽了内部的具体实现。
CThreadMutex用于线程之间的互斥。
CCondition则是条件变量的封装,用于线程之间的同步。
它们的类的继承关系如下图所示:

线程池的时序很简单,如下图所示。CThreadManage直接跟客户端打交道,其接受需要创建的线程初始个数,并接受客户端提交的任务。这儿的任务是具体的非抽象的任务。CThreadManage的内部实际上调用的都是CThreadPool的相关操作。CThreadPool创建具体的线程,并把客户端提交的任务分发给CWorkerThread,CWorkerThread实际执行具体的任务。

理解系统组件
下面我们分开来了解系统中的各个组件。
[cpp] view plain copy
  1. //CThreadManage
  2. //CThreadManage的功能非常简单,其提供最简单的方法,其类定义如下:
  3. class CThreadManage
  4. {
  5. private:
  6. CThreadPool*    m_Pool;
  7. int          m_NumOfThread;
  8. protected:
  9. public:
  10. void     SetParallelNum(int num);
  11. CThreadManage();
  12. CThreadManage(int num);
  13. virtual ~CThreadManage();
  14. void    Run(CJob* job,void* jobdata);
  15. void    TerminateAll(void);
  16. };
  17. //其中m_Pool指向实际的线程池;m_NumOfThread是初始创建时候允许创建的并发的线程个数。另外Run和TerminateAll方法也非常简单,只是简单的调用CThreadPool的一些相关方法而已。其具体的实现如下:
  18. CThreadManage::CThreadManage(){
  19. m_NumOfThread = 10;
  20. m_Pool = new CThreadPool(m_NumOfThread);
  21. }
  22. CThreadManage::CThreadManage(int num){
  23. m_NumOfThread = num;
  24. m_Pool = new CThreadPool(m_NumOfThread);
  25. }
  26. CThreadManage::~CThreadManage(){
  27. if(NULL != m_Pool)
  28. delete m_Pool;
  29. }
  30. void CThreadManage::SetParallelNum(int num){
  31. m_NumOfThread = num;
  32. }
  33. void CThreadManage::Run(CJob* job,void* jobdata){
  34. m_Pool->Run(job,jobdata);
  35. }
  36. void CThreadManage::TerminateAll(void){
  37. m_Pool->TerminateAll();
  38. }
  39. //CThread
  40. //CThread 类实现了对Linux中线程操作的封装,它是所有线程的基类,也是一个抽象类,提供了一个抽象接口Run,所有的CThread都必须实现该Run方法。CThread的定义如下所示:
  41. class CThread
  42. {
  43. private:
  44. int          m_ErrCode;
  45. Semaphore    m_ThreadSemaphore;  //the inner semaphore, which is used to realize
  46. unsigned     long m_ThreadID;
  47. bool         m_Detach;       //The thread is detached
  48. bool         m_CreateSuspended;  //if suspend after creating
  49. char*        m_ThreadName;
  50. ThreadState m_ThreadState;      //the state of the thread
  51. protected:
  52. void     SetErrcode(int errcode){m_ErrCode = errcode;}
  53. static void* ThreadFunction(void*);
  54. public:
  55. CThread();
  56. CThread(bool createsuspended,bool detach);
  57. virtual ~CThread();
  58. virtual void Run(void) = 0;
  59. void     SetThreadState(ThreadState state){m_ThreadState = state;}
  60. bool     Terminate(void);    //Terminate the threa
  61. bool     Start(void);        //Start to execute the thread
  62. void     Exit(void);
  63. bool     Wakeup(void);
  64. ThreadState  GetThreadState(void){return m_ThreadState;}
  65. int      GetLastError(void){return m_ErrCode;}
  66. void     SetThreadName(char* thrname){strcpy(m_ThreadName,thrname);}
  67. char*    GetThreadName(void){return m_ThreadName;}
  68. int      GetThreadID(void){return m_ThreadID;}
  69. bool     SetPriority(int priority);
  70. int      GetPriority(void);
  71. int      GetConcurrency(void);
  72. void     SetConcurrency(int num);
  73. bool     Detach(void);
  74. bool     Join(void);
  75. bool     Yield(void);
  76. int      Self(void);
  77. };
  78. //线程的状态可以分为四种,空闲、忙碌、挂起、终止(包括正常退出和非正常退出)。由于目前Linux线程库不支持挂起操作,因此,我们的此处的挂起操作类似于暂停。如果线程创建后不想立即执行任务,那么我们可以将其“暂停”,如果需要运行,则唤醒。有一点必须注意的是,一旦线程开始执行任务,将不能被挂起,其将一直执行任务至完毕。
  79. //线程类的相关操作均十分简单。线程的执行入口是从Start()函数开始,其将调用函数ThreadFunction,ThreadFunction再调用实际的Run函数,执行实际的任务。
  80. //CThreadPool
  81. //CThreadPool是线程的承载容器,一般可以将其实现为堆栈、单向队列或者双向队列。在我们的系统中我们使用STL Vector对线程进行保存。CThreadPool的实现代码如下:
  82. class CThreadPool
  83. {
  84. friend class CWorkerThread;
  85. private:
  86. unsigned int m_MaxNum;   //the max thread num that can create at the same time
  87. unsigned int m_AvailLow; //The min num of idle thread that shoule kept
  88. unsigned int m_AvailHigh;    //The max num of idle thread that kept at the same time
  89. unsigned int m_AvailNum; //the normal thread num of idle num;
  90. unsigned int m_InitNum;  //Normal thread num;
  91. protected:
  92. CWorkerThread* GetIdleThread(void);
  93. void    AppendToIdleList(CWorkerThread* jobthread);
  94. void    MoveToBusyList(CWorkerThread* idlethread);
  95. void    MoveToIdleList(CWorkerThread* busythread);
  96. void    DeleteIdleThread(int num);
  97. void    CreateIdleThread(int num);
  98. public:
  99. CThreadMutex m_BusyMutex;    //when visit busy list,use m_BusyMutex to lock and unlock
  100. CThreadMutex m_IdleMutex;    //when visit idle list,use m_IdleMutex to lock and unlock
  101. CThreadMutex m_JobMutex; //when visit job list,use m_JobMutex to lock and unlock
  102. CThreadMutex m_VarMutex;
  103. CCondition       m_BusyCond; //m_BusyCond is used to sync busy thread list
  104. CCondition       m_IdleCond; //m_IdleCond is used to sync idle thread list
  105. CCondition       m_IdleJobCond;  //m_JobCond is used to sync job list
  106. CCondition       m_MaxNumCond;
  107. vector<CWorkerThread*>   m_ThreadList;
  108. vector<CWorkerThread*>   m_BusyList;     //Thread List
  109. vector<CWorkerThread*>   m_IdleList; //Idle List
  110. CThreadPool();
  111. CThreadPool(int initnum);
  112. virtual ~CThreadPool();
  113. void    SetMaxNum(int maxnum){m_MaxNum = maxnum;}
  114. int     GetMaxNum(void){return m_MaxNum;}
  115. void    SetAvailLowNum(int minnum){m_AvailLow = minnum;}
  116. int     GetAvailLowNum(void){return m_AvailLow;}
  117. void    SetAvailHighNum(int highnum){m_AvailHigh = highnum;}
  118. int     GetAvailHighNum(void){return m_AvailHigh;}
  119. int     GetActualAvailNum(void){return m_AvailNum;}
  120. int     GetAllNum(void){return m_ThreadList.size();}
  121. int     GetBusyNum(void){return m_BusyList.size();}
  122. void    SetInitNum(int initnum){m_InitNum = initnum;}
  123. int     GetInitNum(void){return m_InitNum;}
  124. void    TerminateAll(void);
  125. void    Run(CJob* job,void* jobdata);
  126. };
  127. CThreadPool::CThreadPool()
  128. {
  129. m_MaxNum = 50;
  130. m_AvailLow = 5;
  131. m_InitNum=m_AvailNum = 10 ;
  132. m_AvailHigh = 20;
  133. m_BusyList.clear();
  134. m_IdleList.clear();
  135. for(int i=0;i<m_InitNum;i++){
  136. CWorkerThread* thr = new CWorkerThread();
  137. thr->SetThreadPool(this);
  138. AppendToIdleList(thr);
  139. thr->Start();
  140. }
  141. }
  142. CThreadPool::CThreadPool(int initnum)
  143. {
  144. assert(initnum>0 && initnum<=30);
  145. m_MaxNum   = 30;
  146. m_AvailLow = initnum-10>0?initnum-10:3;
  147. m_InitNum=m_AvailNum = initnum ;
  148. m_AvailHigh = initnum+10;
  149. m_BusyList.clear();
  150. m_IdleList.clear();
  151. for(int i=0;i<m_InitNum;i++){
  152. CWorkerThread* thr = new CWorkerThread();
  153. AppendToIdleList(thr);
  154. thr->SetThreadPool(this);
  155. thr->Start();       //begin the thread,the thread wait for job
  156. }
  157. }
  158. CThreadPool::~CThreadPool()
  159. {
  160. TerminateAll();
  161. }
  162. void CThreadPool::TerminateAll()
  163. {
  164. for(int i=0;i < m_ThreadList.size();i++) {
  165. CWorkerThread* thr = m_ThreadList[i];
  166. thr->Join();
  167. }
  168. return;
  169. }
  170. CWorkerThread* CThreadPool::GetIdleThread(void)
  171. {
  172. while(m_IdleList.size() ==0 )
  173. m_IdleCond.Wait();
  174. m_IdleMutex.Lock();
  175. if(m_IdleList.size() > 0 )
  176. {
  177. CWorkerThread* thr = (CWorkerThread*)m_IdleList.front();
  178. printf("Get Idle thread %dn",thr->GetThreadID());
  179. m_IdleMutex.Unlock();
  180. return thr;
  181. }
  182. m_IdleMutex.Unlock();
  183. return NULL;
  184. }
  185. //add an idle thread to idle list
  186. void CThreadPool::AppendToIdleList(CWorkerThread* jobthread)
  187. {
  188. m_IdleMutex.Lock();
  189. m_IdleList.push_back(jobthread);
  190. m_ThreadList.push_back(jobthread);
  191. m_IdleMutex.Unlock();
  192. }
  193. //move and idle thread to busy thread
  194. void CThreadPool::MoveToBusyList(CWorkerThread* idlethread)
  195. {
  196. m_BusyMutex.Lock();
  197. m_BusyList.push_back(idlethread);
  198. m_AvailNum--;
  199. m_BusyMutex.Unlock();
  200. m_IdleMutex.Lock();
  201. vector<CWorkerThread*>::iterator pos;
  202. pos = find(m_IdleList.begin(),m_IdleList.end(),idlethread);
  203. if(pos !=m_IdleList.end())
  204. m_IdleList.erase(pos);
  205. m_IdleMutex.Unlock();
  206. }
  207. void CThreadPool::MoveToIdleList(CWorkerThread* busythread)
  208. {
  209. m_IdleMutex.Lock();
  210. m_IdleList.push_back(busythread);
  211. m_AvailNum++;
  212. m_IdleMutex.Unlock();
  213. m_BusyMutex.Lock();
  214. vector<CWorkerThread*>::iterator pos;
  215. pos = find(m_BusyList.begin(),m_BusyList.end(),busythread);
  216. if(pos!=m_BusyList.end())
  217. m_BusyList.erase(pos);
  218. m_BusyMutex.Unlock();
  219. m_IdleCond.Signal();
  220. m_MaxNumCond.Signal();
  221. }
  222. //create num idle thread and put them to idlelist
  223. void CThreadPool::CreateIdleThread(int num)
  224. {
  225. for(int i=0;i<num;i++){
  226. CWorkerThread* thr = new CWorkerThread();
  227. thr->SetThreadPool(this);
  228. AppendToIdleList(thr);
  229. m_VarMutex.Lock();
  230. m_AvailNum++;
  231. m_VarMutex.Unlock();
  232. thr->Start();       //begin the thread,the thread wait for job
  233. }
  234. }
  235. void CThreadPool::DeleteIdleThread(int num)
  236. {
  237. printf("Enter into CThreadPool::DeleteIdleThreadn");
  238. m_IdleMutex.Lock();
  239. printf("Delete Num is %dn",num);
  240. for(int i=0;i<num;i++){
  241. CWorkerThread* thr;
  242. if(m_IdleList.size() > 0 ){
  243. thr = (CWorkerThread*)m_IdleList.front();
  244. printf("Get Idle thread %dn",thr->GetThreadID());
  245. }
  246. vector<CWorkerThread*>::iterator pos;
  247. pos = find(m_IdleList.begin(),m_IdleList.end(),thr);
  248. if(pos!=m_IdleList.end())
  249. m_IdleList.erase(pos);
  250. m_AvailNum--;
  251. printf("The idle thread available num:%d n",m_AvailNum);
  252. printf("The idlelist              num:%d n",m_IdleList.size());
  253. }
  254. m_IdleMutex.Unlock();
  255. }
  256. void CThreadPool::Run(CJob* job,void* jobdata)
  257. {
  258. assert(job!=NULL);
  259. //if the busy thread num adds to m_MaxNum,so we should wait
  260. if(GetBusyNum() == m_MaxNum)
  261. m_MaxNumCond.Wait();
  262. if(m_IdleList.size()<m_AvailLow)
  263. {
  264. if(GetAllNum()+m_InitNum-m_IdleList.size() < m_MaxNum )
  265. CreateIdleThread(m_InitNum-m_IdleList.size());
  266. else
  267. CreateIdleThread(m_MaxNum-GetAllNum());
  268. }
  269. CWorkerThread*  idlethr = GetIdleThread();
  270. if(idlethr !=NULL)
  271. {
  272. idlethr->m_WorkMutex.Lock();
  273. MoveToBusyList(idlethr);
  274. idlethr->SetThreadPool(this);
  275. job->SetWorkThread(idlethr);
  276. printf("Job is set to thread %d n",idlethr->GetThreadID());
  277. idlethr->SetJob(job,jobdata);
  278. }
  279. }
  280. //在CThreadPool中存在两个链表,一个是空闲链表,一个是忙碌链表。Idle链表中存放所有的空闲进程,当线程执行任务时候,其状态变为忙碌状态,同时从空闲链表中删除,并移至忙碌链表中。在CThreadPool的构造函数中,我们将执行下面的代码:
  281. for(int i=0;i<m_InitNum;i++)
  282. {
  283. CWorkerThread* thr = new CWorkerThread();
  284. AppendToIdleList(thr);
  285. thr->SetThreadPool(this);
  286. thr->Start();       //begin the thread,the thread wait for job
  287. }
在该代码中,我们将创建m_InitNum个线程,创建之后即调用AppendToIdleList放入Idle链表中,由于目前没有任务分发给这些线程,因此线程执行Start后将自己挂起。
事实上,线程池中容纳的线程数目并不是一成不变的,其会根据执行负载进行自动伸缩。为此在CThreadPool中设定四个变量:
m_InitNum:初始创建时线程池中的线程的个数。
m_MaxNum:当前线程池中所允许并发存在的线程的最大数目。
m_AvailLow:当前线程池中所允许存在的空闲线程的最小数目,如果空闲数目低于该值,表明负载可能过重,此时有必要增加空闲线程池的数目。实现中我们总是将线程调整为m_InitNum个。
m_AvailHigh:当前线程池中所允许的空闲的线程的最大数目,如果空闲数目高于该值,表明当前负载可能较轻,此时将删除多余的空闲线程,删除后调整数也为m_InitNum个。
m_AvailNum:目前线程池中实际存在的线程的个数,其值介于m_AvailHigh和m_AvailLow之间。如果线程的个数始终维持在m_AvailLow和m_AvailHigh之间,则线程既不需要创建,也不需要删除,保持平衡状态。因此如何设定m_AvailLow和m_AvailHigh的值,使得线程池最大可能的保持平衡态,是线程池设计必须考虑的问题。
线程池在接受到新的任务之后,线程池首先要检查是否有足够的空闲池可用。检查分为三个步骤:
(1)检查当前处于忙碌状态的线程是否达到了设定的最大值m_MaxNum,如果达到了,表明目前没有空闲线程可用,而且也不能创建新的线程,因此必须等待直到有线程执行完毕返回到空闲队列中。
(2)如果当前的空闲线程数目小于我们设定的最小的空闲数目m_AvailLow,则我们必须创建新的线程,默认情况下,创建后的线程数目应该为m_InitNum,因此创建的线程数目应该为( 当前空闲线程数与m_InitNum);但是有一种特殊情况必须考虑,就是现有的线程总数加上创建后的线程数可能超过m_MaxNum,因此我们必须对线程的创建区别对待。
    if(GetAllNum()+m_InitNum-m_IdleList.size() < m_MaxNum )
        CreateIdleThread(m_InitNum-m_IdleList.size());
    else
        CreateIdleThread(m_MaxNum-GetAllNum());
如果创建后总数不超过m_MaxNum,则创建后的线程为m_InitNum;如果超过了,则只创建( m_MaxNum-当前线程总数)个。
(3)调用GetIdleThread方法查找空闲线程。如果当前没有空闲线程,则挂起;否则将任务指派给该线程,同时将其移入忙碌队列。
当线程执行完毕后,其会调用MoveToIdleList方法移入空闲链表中,其中还调用m_IdleCond.Signal()方法,唤醒GetIdleThread()中可能阻塞的线程。
 
[cpp] view plain copy
  1. //CWorkerThread
  2. //CWorkerThread是CThread的派生类,是事实上的工作线程。在CThreadPool的构造函数中,我们创建了一定数量的CWorkerThread。一旦这些线程创建完毕,我们将调用Start()启动该线程。Start方法最终会调用Run方法。Run方法是个无限循环的过程。在没有接受到实际的任务的时候,m_Job为NULL,此时线程将调用Wait方法进行等待,从而处于挂起状态。一旦线程池将具体的任务分发给该线程,其将被唤醒,从而通知线程从挂起的地方继续执行。CWorkerThread的完整定义如下:
  3. class CWorkerThread:public CThread
  4. {
  5. private:
  6. CThreadPool*  m_ThreadPool;
  7. CJob*    m_Job;
  8. void*    m_JobData;
  9. CThreadMutex m_VarMutex;
  10. bool      m_IsEnd;
  11. protected:
  12. public:
  13. CCondition   m_JobCond;
  14. CThreadMutex m_WorkMutex;
  15. CWorkerThread();
  16. virtual ~CWorkerThread();
  17. void Run();
  18. void    SetJob(CJob* job,void* jobdata);
  19. CJob*   GetJob(void){return m_Job;}
  20. void    SetThreadPool(CThreadPool* thrpool);
  21. CThreadPool* GetThreadPool(void){return m_ThreadPool;}
  22. };
  23. CWorkerThread::CWorkerThread()
  24. {
  25. m_Job = NULL;
  26. m_JobData = NULL;
  27. m_ThreadPool = NULL;
  28. m_IsEnd = false;
  29. }
  30. CWorkerThread::~CWorkerThread()
  31. {
  32. if(NULL != m_Job)
  33. delete m_Job;
  34. if(m_ThreadPool != NULL)
  35. delete m_ThreadPool;
  36. }
  37. void CWorkerThread::Run()
  38. {
  39. SetThreadState(THREAD_RUNNING);
  40. for(;;)
  41. {
  42. while(m_Job == NULL)
  43. m_JobCond.Wait();
  44. m_Job->Run(m_JobData);
  45. m_Job->SetWorkThread(NULL);
  46. m_Job = NULL;
  47. m_ThreadPool->MoveToIdleList(this);
  48. if(m_ThreadPool->m_IdleList.size() > m_ThreadPool->GetAvailHighNum())
  49. {
  50. m_ThreadPool->DeleteIdleThread(m_ThreadPool->m_IdleList.size()-m_ThreadPool->GetInitNum());
  51. }
  52. m_WorkMutex.Unlock();
  53. }
  54. }
  55. void CWorkerThread::SetJob(CJob* job,void* jobdata)
  56. {
  57. m_VarMutex.Lock();
  58. m_Job = job;
  59. m_JobData = jobdata;
  60. job->SetWorkThread(this);
  61. m_VarMutex.Unlock();
  62. m_JobCond.Signal();
  63. }
  64. void CWorkerThread::SetThreadPool(CThreadPool* thrpool)
  65. {
  66. m_VarMutex.Lock();
  67. m_ThreadPool = thrpool;
  68. m_VarMutex.Unlock();
  69. }
  70. //当线程执行任务之前首先必须判断空闲线程的数目是否低于m_AvailLow,如果低于,则必须创建足够的空闲线程,使其数目达到m_InitNum个,然后将调用MoveToBusyList()移出空闲队列,移入忙碌队列。当任务执行完毕后,其又调用MoveToIdleList()移出忙碌队列,移入空闲队列,等待新的任务。
  71. //除了Run方法之外,CWorkerThread中另外一个重要的方法就是SetJob,该方法将实际的任务赋值给线程。当没有任何执行任务即m_Job为NULL的时候,线程将调用m_JobCond.Wait进行等待。一旦Job被赋值给线程,其将调用m_JobCond.Signal方法唤醒该线程。由于m_JobCond属于线程内部的变量,每个线程都维持一个m_JobCond,只有得到任务的线程才被唤醒,没有得到任务的将继续等待。无论一个线程何时被唤醒,其都将从等待的地方继续执行m_Job->Run(m_JobData),这是线程执行实际任务的地方。
  72. //在线程执行给定Job期间,我们必须防止另外一个Job又赋给该线程,因此在赋值之前,通过m_VarMutex进行锁定, Job执行期间,其于的Job将不能关联到该线程;任务执行完毕,我们调用m_VarMutex.Unlock()进行解锁,此时,线程又可以接受新的执行任务。
  73. //在线程执行任务结束后返回空闲队列前,我们还需要判断当前空闲队列中的线程是否高于m_AvailHigh个。如果超过m_AvailHigh,则必须从其中删除(m_ThreadPool->m_IdleList.size()-m_ThreadPool->GetInitNum())个线程,使线程数目保持在m_InitNum个。
  74. //CJob
  75. //CJob类相对简单,其封装了任务的基本的属性和方法,其中最重要的是Run方法,代码如下:
  76. class CJob
  77. {
  78. private:
  79. int      m_JobNo;        //The num was assigned to the job
  80. char*    m_JobName;      //The job name
  81. CThread  *m_pWorkThread;     //The thread associated with the job
  82. public:
  83. CJob( void );
  84. virtual ~CJob();
  85. int      GetJobNo(void) const { return m_JobNo; }
  86. void     SetJobNo(int jobno){ m_JobNo = jobno;}
  87. char*    GetJobName(void) const { return m_JobName; }
  88. void     SetJobName(char* jobname);
  89. CThread *GetWorkThread(void){ return m_pWorkThread; }
  90. void     SetWorkThread ( CThread *pWorkThread ){
  91. m_pWorkThread = pWorkThread;
  92. }
  93. virtual void Run ( void *ptr ) = 0;
  94. };
[cpp] view plain copy
  1. CJob::CJob(void)
  2. :m_pWorkThread(NULL)
  3. ,m_JobNo(0)
  4. ,m_JobName(NULL)
  5. {
  6. }
  7. CJob::~CJob(){
  8. if(NULL != m_JobName)
  9. free(m_JobName);
  10. }
  11. void CJob::SetJobName(char* jobname)
  12. {
  13. if(NULL !=m_JobName)    {
  14. free(m_JobName);
  15. m_JobName = NULL;
  16. }
  17. if(NULL ==jobname)    {
  18. m_JobName = (char*)malloc(strlen(jobname)+1);
  19. strcpy(m_JobName,jobname);
  20. }
  21. }
  22. //线程池使用示例
  23. //至此我们给出了一个简单的与具体任务无关的线程池框架。使用该框架非常的简单,我们所需要的做的就是派生CJob类,将需要完成的任务实现在Run方法中。然后将该Job交由CThreadManage去执行。下面我们给出一个简单的示例程序
  24. class CXJob:public CJob
  25. {
  26. public:
  27. CXJob(){i=0;}
  28. ~CXJob(){}
  29. void Run(void* jobdata)    {
  30. printf("The Job comes from CXJOB/n");
  31. sleep(2);
  32. }
  33. };
  34. class CYJob:public CJob
  35. {
  36. public:
  37. CYJob(){i=0;}
  38. ~CYJob(){}
  39. void Run(void* jobdata)    {
  40. printf("The Job comes from CYJob/n");
  41. }
  42. };
  43. main()
  44. {
  45. CThreadManage* manage = new CThreadManage(10);
  46. for(int i=0;i<40;i++)
  47. {
  48. CXJob*   job = new CXJob();
  49. manage->Run(job,NULL);
  50. }
  51. sleep(2);
  52. CYJob* job = new CYJob();
  53. manage->Run(job,NULL);
  54. manage->TerminateAll();
  55. }
  56. //CXJob和CYJob都是从Job类继承而来,其都实现了Run接口。CXJob只是简单的打印一句”The Job comes from CXJob”,CYJob也只打印”The Job comes from CYJob”,然后均休眠2秒钟。在主程序中我们初始创建10个工作线程。然后分别执行40次CXJob和一次CYJob。
线程池使用后记
线程池适合场合
事实上,线程池并不是万能的。它有其特定的使用场合。线程池致力于减少线程本身的开销对应用所产生的影响,这是有前提的,前提就是线程本身开销与线程执行任务相比不可忽略。如果线程本身的开销相对于线程任务执行开销而言是可以忽略不计的,那么此时线程池所带来的好处是不明显的,比如对于FTP服务器以及Telnet服务器,通常传送文件的时间较长,开销较大,那么此时,我们采用线程池未必是理想的方法,我们可以选择“即时创建,即时销毁”的策略。
总之线程池通常适合下面的几个场合:
(1)  单位时间内处理任务频繁而且任务处理时间短
(2)  对实时性要求较高。如果接受到任务后在创建线程,可能满足不了实时要求,因此必须采用线程池进行预创建。
(3)  必须经常面对高突发性事件,比如Web服务器,如果有足球转播,则服务器将产生巨大的冲击。此时如果采取传统方法,则必须不停的大量产生线程,销毁线程。此时采用动态线程池可以避免这种情况的发生。

结束语
本文给出了一个简单的通用的与任务无关的线程池的实现,通过该线程池能够极大的简化Linux下多线程的开发工作。该线程池的进一步完善开发工作还在进行中,希望能够得到你的建议和支持。
参考资料
http://www-900.ibm.com/developerWorks/cn/java/j-jtp0730/index.shtmlPOSIX多线程程序设计,David R.Butenhof    译者:于磊 曾刚,中国电力出版社
C++面向对象多线程编程,CAMERON HUGHES等著 周良忠译,人民邮电出版社
Java Pro,结合线程和分析器池,Edy Yu
关于作者
张中庆,西安交通大学软件所,在读硕士,目前研究方向为分布式网络与移动中间件.

线程池原理及创建并C++实现相关推荐

  1. C++线程池原理及创建(转)

    C++线程池原理及创建(转) 来自http://www.cnblogs.com/cpper-kaixuan/p/3640485.html 本文给出了一个通用的线程池框架,该框架将与线程执行相关的任务进 ...

  2. 线程池原理及创建(C++实现)

    本文给出了一个通用的线程池框架,该框架将与线程执行相关的任务进行了高层次的抽象,使之与具体的执行任务无关.另外该线程池具有动态伸缩性,它能根据执行任务的轻重自动调整线程池中线程的数量.文章的最后,我们 ...

  3. C++线程池原理及创建

    本文给出了一个通用的线程池框架,该框架将与线程执行相关的任务进行了高层次的抽象,使之与具体的执行任务无关.另外该线程池具有动态伸缩性,它能根据执行任务的轻重自动调整线程池中线程的数量.文章的最后,我们 ...

  4. 线程池原理及创建(转)

    本文给出了一个通用的线程池框架,该框架将与线程执行相关的任务进行了高层次的抽象,使之与具体的执行任务无关.另外该线程池具有动态伸缩性,它能根据执行任务的轻重自动调整线程池中线程的数量.文章的最后,我们 ...

  5. [线程池][完整实现] 转:线程池原理及创建(C++实现)

    在实际任务中经常会用到多线程,但是没用过线程池,我以前的方法比较暴力:1)创建线程,不断切换任务,保持线程一直运行,直到所有任务结束:2)不断的创建和销毁线程:一般都用1方法.最近和别人聊天,谈到线程 ...

  6. [C++][线程池][完整实现] 转:线程池原理及创建(C++实现)

    文章的主要框架是参考这篇文档的,http://jacky-dai.iteye.com/blog/1090285, 关于作者 张中庆,西安交通大学软件所,在读硕士,目前研究方向为分布式网络与移动中间件, ...

  7. python3 线程池源码解析_5分钟看懂系列:Python 线程池原理及实现

    概述 传统多线程方案会使用"即时创建, 即时销毁"的策略.尽管与创建进程相比,创建线程的时间已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执行次数极其频繁,那么服务器 ...

  8. Java多线程闲聊(四):阻塞队列与线程池原理

    Java多线程闲聊(四)-阻塞队列与线程池原理 前言 复用永远是人们永恒的主题,这能让我们更好地避免重复制造轮子. 说到多线程,果然还是绕不开线程池,那就来聊聊吧. 人们往往相信,世界是存在一些规律的 ...

  9. java 线程池原理分析

    一.为什么使用线程池 1.降低资源消耗,减少线程创建和销毁次数,每个工作线程可以重复利用,执行多个任务 2.可根据系统承受能力,调整工作线程的数目,防止消耗过多的内存 二.java 线程池使用 Exe ...

最新文章

  1. SpringMVC 处理multipart形式数据:java方式配置文件上传
  2. Android之如何获取网络类型并判断是否可用
  3. HDU 4321 Contest 3
  4. 厉害了,Servlet3的异步处理机制
  5. python mockito arg_that_wqingxiao
  6. SpringSecurity Filter顺序
  7. HDU2161 Primes
  8. PHP多次调用Mysql存储过程报错解决办法
  9. 使用FastReport 3.0及以上版本创建动态报表的几个技巧(转)
  10. SQL将本地图片文件插入到数据库
  11. 2020 年最牛逼的 10 门编程语言
  12. pytorch-sequencelabeling是一个支持softmax、crf、span等模型,注于序列标注(命名实体识别、词性标注、中文分词)的轻量级自然语言处理工具包,包含数据与实验
  13. VM虚拟机Ubuntu16 运行facenet人脸识别源码
  14. 第一次创建百度脑图介绍自己,把创建过程分享一下吧,嘿嘿。
  15. 手游脚本_开发与迭代
  16. 基于FPGA和ABZ增量式编码器的转子位置检测
  17. linux startx 后返回命令行,输入命令: startx 反过来
  18. linux find工作原理,Linux基础教程:find 与 xargs
  19. android+音乐节拍检测,科学网—音乐节拍跟踪或音乐节拍检测软件,LilyBeats alpha - 石自强的博文...
  20. Dubbo-Zookeeper注册中心;监控中心

热门文章

  1. js中的this指针(二)
  2. javaweb 学习资源
  3. POJ 1745 Divisibility【DP】
  4. 面试英语自我介绍的常用词汇
  5. yii::$app-mongodb 查询纪录数_老詹总决赛有多强?12项数据领先乔丹科比,已握10项数据纪录...
  6. python填表_小Python填表得到d
  7. php试卷A高质量含答案,php试卷A高质量含答案
  8. c语言如何在文件中间插入数据,急求如何将下列C语言程序数据存储到文件中?...
  9. 开课吧java_开课吧javaee企业级开发工程师 十期
  10. C语言switch中break的作用,C语言中switch...case语句中break的重要性