1. FQueuedThreadPool & IQueuedWork

FQueuedThreadPool是UE4中抽象出的线程池。线程池由若干个Worker线程,和一个同步队列构成。UE4把同步队列执行的任务抽象为IQueuedWork. 线程池的同步队列,就是一个IQueuedWork的队列了。借用wiki上线程池的图, UE4的FQueuedThreadPool也是如图中所示的结构:

Thread pool

生产者生产IQueuedWork的实例对象。线程池会向生产者提供入队的接口。线程池中的Worker线程都是消费者,会不停地从队列中取出IQueuedWork,并执行work.

下面的代码就是FQueuedThreadPool给用户使用的接口:

class CORE_API FQueuedThreadPool
{
public:// 创建线程池,指定线程数,还有每个worker栈大小及优先级virtual bool Create( uint32 InNumQueuedThreads, uint32 StackSize, EThreadPriority ThreadPriority) = 0;// 销毁线程池,对Task Queue和每个worker线程执行清理操作virtual void Destroy() = 0;// 生产者使用这个接口,向同步队列添加IQueuedWorkvirtual void AddQueuedWork(IQueuedWork* InQueuedWork) = 0;// 生产者使用这个接口,尝试删除一个IQueuedWorkvirtual bool RetractQueuedWork(IQueuedWork* InQueuedWork) = 0;// 获取线程池中worker线程的数目virtual int32 GetNumThreads() const = 0;
};

需要提及的是,RetractQueuedWork接口只能尝试去删除或取消一个work对象。如果work不在队列当中,或者请求删除时已经在执行和执行完成,都无法取消。

IQueuedWork是同步队列中,任务对象的抽象。代码如下:

class IQueuedWork
{
public:virtual void DoThreadedWork() = 0;virtual void Abandon() = 0;
};

IQueuedWork的接口很简单,我们只需要实现代码中的两个接口,分别是任务的执行流程和废弃当前任务的接口。

2. FQueuedThread

FQueuedThread就是线程池worker线程的实现了。它是一个FRunnable的实现类,并内聚了一个FRunnableThread的实例对象。

class FQueuedThread : public FRunnable
{
protected:FRunnableThread* Thread;virtual uint32 Run() override;
};

FQueuedThread实现的Run函数,就是类似上一篇我们实现的MyRunnable的空闲等待的流程。我们回顾一下,实现所需的部件:

  1. 一个原子布尔变量作为循环的标识位
  2. 一个FEvent用来让线程在无任务可做时挂起,而不占用系统资源;

按照上面的思路,我们继续补完代码:

class FQueuedThread : public FRunnable
{
protected:FEvent*             DoWorkEvent;TAtomic<bool>       TimeToDie;FRunnableThread*    Thread;virtual uint32 Run() override{while(TimeToDie.Load(EMemoryOrder::Relaxed)){DoWorkEvent->Wait();// TODO ... do work}}
};

这样的实现有很严重的缺陷。无穷时间的等待,线程被挂起后,UE4无法获取这些线程的状态了。因此,UE4采用的是等待10ms,再check是否继续等待。

while(TimeToDie.Load(EMemoryOrder::Relaxed))
{bool bContinueWaiting = true; while(bContinueWaiting){DECLARE_SCOPE_CYCLE_COUNTER(...);       // record statusbContinueWaiting = !DoWorkEvent->Wait( 10 );}// TODO ... do work
}

被唤醒后意味着两种情况:

  1. 新的任务分配下来,有活干了;
  2. 线程池发出清理指令,线程即将退出;

把执行Work的代码加入,如下所示:

class FQueuedThread : public FRunnable
{
protected:FEvent*                 DoWorkEvent;TAtomic<bool>           TimeToDie;FRunnableThread*        Thread;IQueuedWork* volatile   QueuedWork;virtual uint32 Run() override{while(TimeToDie.Load(EMemoryOrder::Relaxed)){bool bContinueWaiting = true; while(bContinueWaiting){DECLARE_SCOPE_CYCLE_COUNTER(...);       // record statusbContinueWaiting = !DoWorkEvent->Wait( 10 );}IQueuedWork* LocalQueuedWork = QueuedWork;QueuedWork = nullptr;FPlatformMisc::MemoryBarrier();check(LocalQueuedWork || TimeToDie.Load(EMemoryOrder::Relaxed));while (LocalQueuedWork){LocalQueuedWork->DoThreadedWork();LocalQueuedWork = OwningThreadPool->ReturnToPoolOrGetNextJob(this);} }return 0;}
};

QueuedWork就是需要执行的work对象的指针,它被volatile修饰,说明还有其他的线程会修改这个指针,防止编译器生成直接从缓存中读取代码的优化。check方法,明显地指明了被唤醒时只有前面提及的两种情况。如果Work不为空,则调用IQueuedWork的DoThreadedWork接口。任务完成后的下一行代码,就是向所属线程池的同步队列再申请一个任务。如果队列中有任务,则继续执行新的任务。若队列已经为空,则将线程归还到线程池。线程池有一个QueuedThreads成员,记录线程池中的空闲的线程。

个人觉得UE4在check之后的实现略有不妥。在同时有Work要执行和TimeToDie为true时,UE4选择了继续执行完Work再退出。笔者认为TimeToDie为true时,应该放弃执行当前的work,直接退出。当然,这里不同的策略差别也不大,也不重要。

还有一个重要的函数,就是FQueuedThread::DoWork. 它是由生产者调用线程池的AddQueuedWork,线程池对象在进行调度的时候调用的。DoWork函数代码如下:

void FQueuedThread::DoWork(IQueuedWork* InQueuedWork)
{// ...QueuedWork = InQueuedWork;FPlatformMisc::MemoryBarrier();// Tell the thread to wake up and do its jobDoWorkEvent->Trigger();
}

值得提及的是两个函数中的内存屏障代码,FPlatformMisc::MemoryBarrier(). DoWork中会对QueuedWork进行写操作,而在Run函数中会对QueuedWork进行读操作,而且DoWork与Run发生在不同的线程,这样就产生了竞争条件(race condition). 一般的情况是上一个mutex lock,而UE4却没有,只使用了内存屏障。原因是这个竞争条件发生的时候,有且仅有一个线程写,有且仅有一个线程读;并且DoWork中的DoWorkEvent->Trigger(),发出一个事件告知已经准备好一个IQueuedWork,一定发生在Run函数中读取IQueuedWork之前。所以UE4使用内存屏障来保证顺序一致性,让Run函数从另外一个线程读取IQueuedWork时,能够读取到已经同步过后的值。关于无锁编程,大家感兴趣可以上purecpp的相关专题一起讨论。

3. FQueuedThreadPoolBase

再来看看线程池的实现类。FQueuedThreadPool的实现类只有一个,就是FQueuedThreadPoolBase类。我们从它的数据成员,可以很清晰地可以看出,该线程池的结构与第一节的所示的线程池的结构图是基本吻合的:

class FQueuedThreadPoolBase : public FQueuedThreadPool
{
protected:/** The work queue to pull from. */TArray<IQueuedWork*> QueuedWork;/** The thread pool to dole work out to. */TArray<FQueuedThread*> QueuedThreads;/** All threads in the pool. */TArray<FQueuedThread*> AllThreads;/** The synchronization object used to protect access to the queued work. */FCriticalSection* SynchQueue;/** If true, indicates the destruction process has taken place. */bool TimeToDie;// ....
}

数组QueuedWork和互斥锁SynchQueue,组成了一个线程安全的同步队列。AllThreads管理着全部的worker线程。TimeToDie是标识线程池生命状态,如果置为true,线程池的清理工作正在进行,或者已经进行完毕了。还有一个QueuedThreads成员,它管理着空闲的线程,也就是上一节FQueuedThread归还自己到线程池的空闲队列。

线程池的创建,会依次创建每个worker线程。线程池销毁的时候,会依次向每个worker线程发出销毁的命令,并等待线程退出。线程池的销毁会放弃还未执行的work. 创建和销毁的流程较为简单,就不详细展开了。后文着重讨论生产者向线程池添加work的流程。

生产者创建了一个IQueuedWork实现对象后,会调用第一节提及的AddQueuedWork接口,向线程池添加要执行的work. UE4控制线程池添加work的流程,实现的较为精细。它将线程池的状态分成了两类,来分别处理。这两种状态分别为: 1. 线程池中还有空闲线程,即QueuedThreads不为空,并且QueuedWork一定为空; 2. 线程池中已经没有空闲的线程,即QueuedThreads为空;

第一个情景的处理策略是从空闲线程数组中,取一个线程,并直接唤醒该线程执行由生产者当前传递进来的work. 第二个情景,较为简单,由于没有空闲线程可用,就直接将work入队即可。

void FQueuedThreadPoolBase::AddQueuedWork(IQueuedWork* InQueuedWork) /*override*/
{// ....FQueuedThread* Thread = nullptr;{FScopeLock sl(SynchQueue);const int32 AvailableThreadCount = QueuedThreads.Num();if (AvailableThreadCount == 0){// situation 2:QueuedWork.Add(InQueuedWork);return;}// situation 1:const int32 ThreadIndex = AvailableThreadCount - 1;Thread = QueuedThreads[ThreadIndex];QueuedThreads.RemoveAt(ThreadIndex, 1, false);}Thread->DoWork(InQueuedWork);
}

UE4处理情景一的实现,有两个优点。

第一,UE4并不是简单地让每个线程抢占任务队列中work. 而是在当有空闲线程的时候,小心地获取一个空闲线程,指定work并唤醒这一个线程。这样做的好处,是不会出现惊群效应,而让CPU浪费时间做无用的线程调度。

第二,从代码中可以看出,UE4每次获取空闲线程都是取数组的最末尾的空闲线程,也就是最近归还的work线程。这样做的好处是,最近归还的线程意味着它相比其他空闲线程是更近期使用过的。它有更大的概率,操作系统还未对它进行context切换,或者它的context数据还留存在缓存当中。优先使用该线程,就有更大的概率获取较为低廉的线程切换开销。

最后,线程池为worker线程提供的,从线程池获取下一个可用的work和归还空闲线程的接口,ReturnToPoolOrGetNextJob函数:

IQueuedWork* FQueuedThreadPoolBase::ReturnToPoolOrGetNextJob(FQueuedThread* InQueuedThread) /*override*/
{// ... omitted codesIQueuedWork* Work = nullptr;FScopeLock sl(SynchQueue);// ... omitted codesif (QueuedWork.Num() > 0){Work = QueuedWork[0];QueuedWork.RemoveAt(0, 1, false);}if (!Work)QueuedThreads.Add(InQueuedThread);return Work;
}

当任务队列中还有work时,就从队列头部取出一个,是一个FIFO的同步队列。当任务队列为空,无法取出新的任务时,线程就将自己归还给到线程池中,标记为空闲队列。UE4这里实现的不太妥当的就是QueuedWork是一个TArray<IQueuedWork*>数组。数组对非尾部元素的Remove操作,是会对数组元素进行移动的。虽然移动指针并不是很昂贵,而且UE4也禁止了Remove导致的shrink操作,但开销依然是存在的。这里最好的方案是使用一个可以扩容的环状队列。

4. 小结

本文讨论了UE4中线程池的实现细节。线程池FQueuedThreadPool的实现是由一个元素为IQueuedWork*的同步队列,及若干个worker线程所组成。UE4中的线程池,将IQueuedWork队列化,并用FIFO的调度策略。线程池为IQueuedWork的生产者提供了入队接口,并为worker线程(消费者)提供了获取出队接口。UE4对线程池的性能优化也做了不少的工作。例如避免线程池抢占IQueuedWork时可能会发生的惊群现象,以及取最近使用的线程,还有无锁编程等。

专题的下一篇,我们将讨论UE4中的AsyncTask. 这也是UE4迈向现代C++设计的有力步伐。

UE4异步编程专题 - 线程池FQueuedThreadPool相关推荐

  1. UE4异步编程专题 - TFunction

    0. 关于这个专题 游戏要给用户良好的体验,都会尽可能的保证60帧或者更高的fps.一帧留给引擎的时间也不过16ms的时长,再除去渲染时间,留给引擎时间连10ms都不到,能做的事情是极其有限的.同步模 ...

  2. UE4异步编程专题 - 多线程

    专题的第二篇,我们聊聊UE4中的多线程的基础设施.UE4中最基础的模型就是FRunnable和FRunnableThread,FRunnable抽象出一个可以执行在线程上的对象,而FRunnableT ...

  3. C#:异步编程和线程的使用(.NET 4.5 ),异步方法改为同步执行

    摘自:http://www.codeproject.com/Articles/996857/Asynchronous-programming-and-Threading-in-Csharp-N(葡萄城 ...

  4. 《转载》Python并发编程之线程池/进程池--concurrent.futures模块

    本文转载自 Python并发编程之线程池/进程池--concurrent.futures模块 一.关于concurrent.futures模块 Python标准库为我们提供了threading和mul ...

  5. (转)Java并发编程:线程池的使用

    背景:线程池在面试时候经常遇到,反复出现的问题就是理解不深入,不能做到游刃有余.所以这篇博客是要深入总结线程池的使用. ThreadPoolExecutor的继承关系 线程池的原理 1.线程池状态(4 ...

  6. 【转】1.3异步编程:线程同步基元对象

    开始<异步编程:同步基元对象(上)> 示例:异步编程:线程同步基元对象.rar 如今的应用程序越来越复杂,我们常常需要多线程技术来提高我们应用程序的响应速度.每个线程都由自己的线程ID,当 ...

  7. 【转】1.1异步编程:线程概述及使用

    从此图中我们会发现 .NET 与C# 的每个版本发布都是有一个"主题".即:C#1.0托管代码→C#2.0泛型→C#3.0LINQ→C#4.0动态语言→C#5.0异步编程.现在我为 ...

  8. 【Java 并发编程】线程池机制 ( ThreadPoolExecutor 线程池构造参数分析 | 核心线程数 | 最大线程数 | 非核心线程存活时间 | 任务阻塞队列 )

    文章目录 前言 一.ThreadPoolExecutor 构造参数 二.newCachedThreadPool 参数分析 三.newFixedThreadPool 参数分析 四.newSingleTh ...

  9. 【Java 并发编程】线程池机制 ( 线程池示例 | newCachedThreadPool | newFixedThreadPool | newSingleThreadExecutor )

    文章目录 前言 一.线程池示例 二.newCachedThreadPool 线程池示例 三.newFixedThreadPool 线程池示例 三.newSingleThreadExecutor 线程池 ...

最新文章

  1. Extjs 集合了1713个icon图标的CSS文件
  2. 面试官:序列化和反序列化为什么要实现Serializable接口?
  3. boost::gil::view_is_mutable用法的测试程序
  4. 1893. 检查是否区域内所有整数都被覆盖
  5. linux模块加载和模块卸载时出现的问题
  6. java 检测表情符号_一个能在字符串中识别出 Emoji 的简单工具 (支持JavaScript和Java)...
  7. 计算几何02_三次样条曲线
  8. 如何编写系统设计说明书
  9. CRM系统更换服务器,CRM系统三种常见安装实施解决方式
  10. 类的继承——cancas绘制五彩小球
  11. python小写变为大写_在python中改为大写和小写
  12. 2023-2028年中国压铸机行业发展前景与投资趋势分析报告
  13. 反激式开关电源双环PID(电压环+电流环)控制之MATLAB仿真
  14. VB、C#等高级语言与三菱PLC(Q系列、L系列、FX系列)串口、以太网通讯的DLL及源代码
  15. NodeJS与模块系统
  16. 用图灵机器人2.0实现聊天机器人
  17. 微信支付找不到sdk
  18. 计算机中班音乐,【精品】中班音乐教案6篇
  19. ERD Online 4.1.0对接ChatGPT,实现AI建模、SQL自由
  20. Pascal之父——Nicklaus Wirth——算法+数据结构=程序

热门文章

  1. 配置linux系统自带apache+php+mysql
  2. String类与其他数据类型得转换
  3. php 文件类型 html,HTML的文档类型怎么选择
  4. att汇编教程 linux,ATT 汇编语法
  5. java ee实验新闻_JAVAEE第四次作业-JSP显示新闻
  6. SGU495 Kids and Prizes 概率DP,期望公式
  7. 1.4Activity保存现场状态
  8. [2-sat]HDOJ3062 Party
  9. BZOJ2286 : [Sdoi2011]消耗战
  10. james-2.3.2中的配置