本文使用 Zhihu On VSCode 创作并发布
本文使用 CC BY-NC-SA 4.0 许可协议,转载请注明来源

一、设计背景

众所周知,Qt 的信号槽系统提供了线程安全的跨线程异步执行代码的机制(Qt::QueuedConnection)。

使用该机制,可以让槽函数代码在另一个线程执行,并且可以携带参数,用户代码无需加锁,只要发射信号即可。

但很多时候,我们仅仅只想单次异步执行一段代码。若是通过信号槽机制执行,则就不得不声明一个信号函数,连接信号槽,再发射信号,这样显然很繁琐。

幸好,Qt 本身也知道这种需求的存在,提供了 QTimer::singleShot() 函数,可以跨线程异步执行槽函数,甚至还可以延迟执行——然而该函数只能执行无参数槽函数,不能执行其它类型的回调(如 lambda)。

所以,最好能够有一个类似 QTimer::singleShot(),但又可以接收任意参数个数的任意函数子的 API。

更新:5.3的老代码写太久,思维定势了,刚查了下5.4的 singleShot 是支持 Functor 的……那这篇文章留作该机制的技术探讨吧……
更新2:Qt 5.4之后的 QTimer::singleShot 实现有坑,有一个 Qt 事件循环机制理论上不应该出现的问题,详见文末更新。

考虑到异步执行时对执行结果的访问,可以参考 std::async(),返回一个 future 对象。但不能直接使用 std::future——因为它的 getwait 会阻塞住线程,对于 Qt 而言就会阻塞事件循环。

即,我们还需要一个不会阻塞事件循环的等待机制。

综上所述,需求总结如下:

  1. 提供跨线程异步执行代码的能力,让回调函数在目标线程执行;
  2. 提供对任意函数子的异步执行接口,可以接受具备任意参数个数的任意函数子;
  3. 提供延迟执行功能,以满足 QTimer::singleShot() 的所有功能,便于替代前者;
  4. 提供 future 返回对象,用于处理返回值和等待同步,接口与 std::future 类似;
  5. 提供不阻塞 Qt 事件循环的等待机制,用于供 future 使用。

二、异步回调实现

跨线程异步回调的实现,可以参考 Qt 的元对象机制。

Qt 通过元对象系统进行异步执行时(信号槽、QTimer::singleShot()、QMetaMethod::invoke 等),本质上是将回调函数封装为 QMetaCallEvent 对象,再通过 QCoreApplication::postEvent() 投送至目标对象。目标对象会在所属线程的事件循环中触发 QObject::event() 事件处理函数,解析事件并执行回调函数。

然而 QMetaCallEvent 是非公开接口,Qt 不保证其接口的可用和稳定性,因此我们需要仿照此流程自行封装。

2.1 异步回调事件类

新建一个事件类,继承自 QEvent,并注册获取事件类型编号:

class AsyncInvokeEvent : public QEvent {public:static const int kEventType;std::function<QVariant(void)> Function;std::promise<QVariant>;std::shared_future<QVariant>;
};
const int AsyncInvokeEvent::kEventType = QEvent::registerEventType();
AsyncInvokeEvent::AsyncInvokeEvent() : QEvent(QEvent::Type(kEventType)) {}

将用户通过 API 传入的回调函数封装为 std::function<QVariant(void)> 对象,以擦除类型信息,便于封入事件类中。

考虑到需要获取返回值,此处使用 Qt 的万能动态类型 QVariant 存储返回类型,但代价是返回值必须注册至 Qt 元对象系统——也可将 future 实现为模板类型,但这会导致代码复杂度大幅增加,并且不得不将 cpp 中的大部分流程暴露至头文件。

2.2 异步事件过滤器

将异步回调事件发送至目标线程时,需要有一个重写了 QObject::event() 函数的对象接受该事件。我们可以考虑为每个 Qt 线程建立一个事件过滤器,使用一个全局的字典保存,在使用时通过线程指针查询该字典,若未检索到则新建之,即惰性初始化:

AsyncInvokerEventFilter* filter;
{// Find event filter for given threadstatic std::atomic_flag flag = ATOMIC_FLAG_INIT;static QHash<QThread*, AsyncInvokerEventFilter*> filters;while (flag.test_and_set(std::memory_order_seq_cst)) { // Spin-lock}auto it = filters.find(thread);if (it == filters.end()) {it = filters.insert(thread, new AsyncInvokerEventFilter{thread});}filter = *it;flag.clear(std::memory_order_release);
}

拿到事件过滤器后,即可向其投送事件:

auto event = new AsyncInvokeEvent;
event->Function = function;
event->future = event->promise.get_future();
QCoreApplication::postEvent(filter, event);
return event->future;

该事件会通过 Qt 的事件循环机制,在目标线程中被传递至接收者的 event() 函数:

bool AsyncInvokerEventFilter::event(QEvent* event) {bool ret = QObject::event(event);if (event->type() == AsyncInvokeEvent::kEventType) {AsyncInvokeEvent* e = static_cast<AsyncInvokeEvent*>(event);e->Invoke();}event->accept();return ret;
}

至此,跨线程异步执行代码的机制已经编写完毕,整体其实是非常简单的。而且也并非 Qt 专属,其实任意具备事件循环的框架,都可以使用相同逻辑实现。

2.3 生命周期控制

Qt 信号槽的接收者指针,除了指定槽函数执行的线程外,还负责了生命周期控制的作用——只要 sender 或者 receiver 对象被析构,则该信号槽便不会再执行。

由于上文的异步回调事件类是由事件过滤器执行,而非回调函数对应的逻辑意义上的接收者,因此存在回调函数与其依赖资源的生命周期不一致的风险——我们需要引入额外的信息来监测回调函数的生命周期。

虽然回调函数中,也可以通过各类智能指针来管理资源的生命周期,但这会强迫调用者编写更多的代码,而且无法让事件在执行回调前判断相关资源生命周期是否已结束。

因此,我们需要一个机制来判断依赖资源的生命周期。由于在接口层可以做各式封装,最终传递到执行点的判断方式,可通过 std::function<bool(void)> 来表达:

void AsyncInvokeEvent::Invoke() {QVariant ret;if (!IsAlive || IsAlive()) {ret = Function();}promise.set_value(ret);
}

对外接口中,可以考虑提供如下几种使用方式:

  • 最基础的方式,直接传递 std::function<bool(void)> 回调函数,可在其中封装各类自定义判断;
  • 仿信号槽方式,传递 QObject* 指针,接口层通过 QPointer 类监测其存活状态,并将其封装为回调函数;
  • 无生命周期约束,则接口层封装默认实现的回调函数,自动返回 true

三、异步回调接口封装

根据上文代码,此机制的接口需要提供 (执行线程, 回调函数) 二元组作为输入参数,以及一个可选参数 [生命周期判断回调]

为方便使用,参考 Qt 的信号槽、 QTimer::singleShot() 语法,也可直接提供一个 QObject* 对象指针作为逻辑意义上的接收者,则可通过 QObject::thread() 函数获取执行线程。

回调函数最终传递至内部实现的版本,便是上文所述的 std::function<QVariant(void)> 对象。但为方便使用,我们可以提供 Func function, Args&&... args 形式的模板接口,用于承接任意类型的函数子和函数参数:

template <typename Func, typename... Args>
AsyncInvoker::Future AsyncInvoker::Invoke(QThread* thread, const Func& func,Args&&... args) {if (!thread) {thread = qApp->thread();}auto f = std::bind(func, std::forward<Args>(args)...);std::function<QVariant(void)> function = [f]{ return QVariant{f()}; };return Invoke(function, thread);
}

此处的封装返回值一句存在隐患,因为传入函数有可能无返回值,此时这行代码会无法编译。

针对此情况,我们可以去 Qt 源码中看看官方是如何处理的。顺着接收函数子作为槽函数的 QObject::connect() 源代码,可在 qobjectdefs_impl.h 中找到如下黑魔法:

/*trick to set the return value of a slot that works even if the signal or the slot returns voidto be used like     function(), ApplyReturnValue<ReturnType>(&return_value)if function() returns a value, the operator,(T, ApplyReturnValue<ReturnType>) is called, but if itreturns void, the builtin one is used without an error.
*/
template <typename T>
struct ApplyReturnValue {void *data;explicit ApplyReturnValue(void *data_) : data(data_) {}
};
template<typename T, typename U>
void operator,(T &&value, const ApplyReturnValue<U> &container) {if (container.data)*reinterpret_cast<U *>(container.data) = std::forward<T>(value);}template<typename T>
void operator,(T, const ApplyReturnValue<void> &) {}

该模板类重载了逗号运算符,然后再通过模板特化匹配到不同版本的实现,对于有返回值的版本,将返回值储存至构造时输入的对象指针中。

仿写一下,就能得到我们想要的了:

namespace impl {template <typename T>
struct ApplyReturnValue {mutable QVariant* data_;explicit ApplyReturnValue(QVariant* data) : data_(data) {}
};
template <typename T, typename U>
inline void operator,(T&& value, const ApplyReturnValue<U>& container) {container.data_->setValue(std::forward<T>(value));
}
template <typename T>
inline void operator,(T, const ApplyReturnValue<void>&) {}
}  // namespace impltemplate <typename Func, typename... Args>
AsyncInvoker::Future AsyncInvoker::Invoke(QThread* thread, const Func& func,Args&&... args) {if (!thread) {thread = qApp->thread();}auto f = std::bind(func, std::forward<Args>(args)...);std::function<QVariant(void)> function = [f] {using return_t = decltype(func(std::forward<Args>(args)...));QVariant ret;f(), impl::ApplyReturnValue<return_t>(&ret);return ret;};return Invoke(function, thread);
}

注意lambda 的返回类型无法通过 std::result_of 获取,只能通过 decltype 获取。

四、延迟执行

延迟执行原理上也很简单,将延迟事件一并封装入异步回调事件类中,投送至事件过滤器后,事件过滤器再启动一个定时器事件,在定时器事件中才实际执行回调。

考虑到性能问题,此处不应为了执行一个回调函数就创建一个 QTimer 定时器对象,并绑定信号槽。

好消息是,Qt 已经考虑到此类需求,提供了一个轻量级的定时器接口 QObject::startTimer(),无需额外新建任何对象以及信号槽。该接口会定时发起定时器事件,通过 QObject::timerEvent 接收处理。

因此,将前文的 AsyncInvokerEventFilter::event() 代码进行改造如下:

// AsyncInvokeEvent 成员变量:
// QSharedPointer<AsyncInvokeData> d;// AsyncInvokerEventFilter 成员变量:
// QHash<int, QSharedPointer<AsyncInvokeData>> events_;bool AsyncInvokerEventFilter::event(QEvent* event) {bool ret = QObject::event(event);if (event->type() == AsyncInvokeEvent::kEventType) {AsyncInvokeEvent* e = static_cast<AsyncInvokeEvent*>(event);if (e->d->delay_ms > 0) {// Deferred event, invoke in timerEventint id = startTimer(e->d->delay_ms);events_[id] = e->d;} else {e->d->Invoke();}}event->accept();return ret;
}void AsyncInvokerEventFilter::timerEvent(QTimerEvent* event) {int id = event->timerId();killTimer(id);auto it = events_.find(id);if (it == events_.end()) {return;}it.value()->Invoke();events_.erase(it);
}

注意

对于自定义事件,无论 QObject::event() 返回是 true 还是 false,或者通过 QEvent::accept() / QEvent::ignore() 接受或者忽略事件,Qt 都会无视上述操作,在执行完 QObject::event() 后,直接删除由 QCoreApplication::postEvent() 投送的异步事件对象。

因此,对于需要延迟执行的事件,直接将事件指针保存下来是无效的,该指针会成为悬空指针。

此处使用共享指针保存事件数据,而非直接与容器内的值进行 std::swap()——因为这些数据在 Future 中也会被引用,需要进行共享。

此处不可使用 QCoreApplication::processEvents() 方式进行延时——因为若在延时过程中又接收到异步回调事件,则会递归进入此函数,以此类推,存在多次递归导致爆栈的风险。

五、Future 对象

其实,简单一点的话,在异步回调事件类中存储一个 std::promise 对象,然后返回它的 get_future() 即可。

但前文也提到了,std::future 等待操作会阻塞线程,导致 Qt 事件循环失去响应,因此我们需要编写一个不阻塞 Qt 事件循环的等待机制,并且基于它来封装我们的 Future 类。

5.1 不阻塞 Qt 事件循环的等待

这个等待机制,想必很多人都已经在自己的项目中广泛应用,即使用计时器配合 QCoreApplication::processEvents() 实现不阻塞事件循环的延时:

QElapsedTimer timer;
timer.start()
while (timer.elapsed() < timeout) {QCoreApplication::processEvents();
}

为方便定制化的使用,我们可以参考 std::future 的 wait() / wait_for() / wait_until() 函数,做多个额外的封装,并提供 QDateTime 和 std::chrono 两套接口:

void Wait(const std::function<bool(void)>& isValid,QEventLoop::ProcessEventsFlags flags = QEventLoop::AllEvents);bool WaitFor(int timeout_milliseconds, QEventLoop::ProcessEventsFlags flags = QEventLoop::AllEvents,const std::function<bool(void)>& isValid = {});bool WaitUntil(const QDateTime& timeout_time,QEventLoop::ProcessEventsFlags flags = QEventLoop::AllEvents,const std::function<bool(void)>& isValid = {});template <class Rep, class Period>
bool WaitFor(const std::chrono::duration<Rep, Period>& timeout_duration,QEventLoop::ProcessEventsFlags flags = QEventLoop::AllEvents,const std::function<bool(void)>& isValid = {});template <class Clock, class Duration>
bool WaitUntil(const std::chrono::time_point<Clock, Duration>& timeout_time,QEventLoop::ProcessEventsFlags flags = QEventLoop::AllEvents,const std::function<bool(void)>& isValid = {});

具体实现不再赘述,本例思路如下:

  • WaitFor 中,使用 当前时间 + 延时 方式转换为 WaitUntil 的调用。
  • WaitUntil 中,将超时判断封装为回调函数,以转换为 Wait 的调用。

5.2 Future 对象的 wait 与 get

Future 对象的 wait()/wait_for()/wait_until() 可直接调用上述实现。

但 wait_for() / wait_until() 函数需要返回 std::future_status 状态值,因此我们还需要判断该异步事件当前的执行状态。

想必由于要避免阻塞事件循环,我们不能直接调用 std::future 的对应函数,因此需要自行封装执行状态。

可考虑在异步回调事件类对象中存储一个 std::atomic_bool 标志位,用于标识异步执行状态,在回调执行后将其之为 true

// Future 成员变量:
// QSharedPointer<AsyncInvokeData> d_;std::future_status AsyncInvoker::Future::status() const {if (!d_->future.valid()) {return std::future_status::deferred;} else if (!d_->executed.load()) {return std::future_status::timeout;} else {return std::future_status::ready;}
}

wait_for()wait_until() 函数在完成等待后,返回 status() 即可;wait() 则是将 status() 作为判断条件传给上一节的 Wait() 函数。

get() 函数同理, `status()` 可以直接使用 wait() 完成等待,然后返回 std::future::get() 即可。

valid() 函数则是同时判断 std::future::valid() 和 executed 状态,即 status() == std::future_status::ready

六、范例代码

上文中的代码,已提交至 GitHub: ZgblKylin/KtUtils 仓库的 AsyncInvoker 分支

该仓库提供 CMake 和 QMake 两种使用方式,支持静态链接和动态链接(QMake 还提供源码包含)。

库文件会生成至 ${CMAKE_SOURCE_DIR}/lib 目录,dll文件(特例)和单元测试的exe文件会生成至 ${CMAKE_SOURCE_DIR}/bin 目录,库文件名称为 KtUtils/KtUtilsd(Debug 后缀)。

CMake 使用方式

# 启用动态链接。默认使用静态链接。
set(KT_UTILS_SHARED_LIBRARY ON)# 编译单元测试
set(BUILD_TESTING ON)# 链接目标
add_subdirectory(KtUtils)
target_link_libraries(TargetName KtUtils)

单元测试使用 Qt Test 编写,可使用 CMake 的 CTest 机制直接执行(如 make test),但该执行方式下无法看到 Qt Test 输出。

QMake 使用方式

# 源码包含
include(KtUtils/KtUtils.pri)# 链接库
# 修改 KtUtilsconf.pri 以启用动态链接、启用单元测试
SUBDIRS += KtUtils
win32: {contains(KtUtils_CONFIG, KtUtils_Shared_Library) {LIBS += -LKtUtils/bin/} else {LIBS += -LKtUtils/lib/}
} else:unix: {LIBS += -LKtUtils/lib/
}
CONFIG(release, debug|release): LIBS += -lKtUtils
else:CONFIG(debug, debug|release): LIBS += -lKtUtilsd
DESTDIR = KtUtils/bin
INCLUDEPATH += KtUtils/include

七、QTimer::singleShot

7.1 功能对比

笔者之前写了5年的 Qt 5.3,所以形成了一定的思维定势,加上 Qt 极端注重兼容性,基本不在大版本内做大更新,所以忽略了某些问题……

就是 Qt 5.4 其实算 breaking change,只是不破坏老代码兼容性。5.4 开始,API 设计全面提升到 C++11了,于是很多 API 都引入了 Functor 版本。

5.4 的 QTimer::singleShot 加入了 Functor+Args 的接口,接口设计和功能与我文中的几乎一致。

但我试用了,发现有一个坑——无法在非 Qt 线程中调用 QTimer::singleShot,此场景下该函数不会被执行。

但 Qt 的事件循环机制是不应该有这问题的,因为 Qt 的异步事件的处理(底层为QCoreApplication::postEvent)只取决于接收者的事件循环,对发送者无任何要求。典型例子就是信号槽,你可以在任何位置发信号,甚至在类似中断的 catch 块、signal 函数回调等这些特殊位置发信号。

那么 QTimer::singleShot 的这个问题是怎么出现的呢?这需要我们对比下两个方案的实现方式。

7.2 问题分析

我的方案:

  • 人工仿造 QMetaCallEvent
  • 通过 QCoreApplication::postEvent 投递事件;
  • receiver 接收事件后,再根据 timeout 参数来决定是否需要延时,若需要,则再通过 startTimer 转发至 timerEvent 事件。

QTimer::singleShot 的方案:

该方案比较取巧,把 invoke 和 timeout 两个动作合并到一起了,然后比起我的方案还不需要给接收线程外挂一个 filter 处理器,整体实现上的确更加优雅,但也导致了此处的问题。

  • 建立一个 `QSingleShotTimer` 对象,该对象本身承担了 invoke 功能,同时继承自 QObject,来一并处理延时功能;
  • 直接在调用线程对该对象执行 startTimer 操作——因为此操作不能跨线程调用;
  • 通过 moveToThread 将其移入接收者线程,则已经启动的定时器会在该线程自动重新开启;
  • 不用管了,也不需要做啥 post,把调用请求投送到另一个线程,以及延迟执行,都通过 moveToThread 这步一石二鸟了;
  • 在 timerEvent 中直接 invoke 函数即可,多么优雅。

唯一纰漏在于,非 Qt 线程(无 Qt 事件循环的线程)中无法启动定时器!

此时, moveToThread 做的“停止原线程中的定时器,移动对象所有权到新线程后,在新线程中自动注册定时器”的自动操作,一开始就被堵死了。

于是这个定时器永远跑不起来,这个函数永远不会被执行。

对了,顺带还引发一个额外的副作用——如果你这个 functor 是捕获了变量的 lambda,那么捕获的变量也就释放不掉了——也不是严格意义上的野指针化了,因为在进程退出前,还是会析构掉这个 QSingleShotTimer 对象的。

7.3 替代方案

那么,为了避开这个坑,难道我们就一定要重复造轮子了吗?

也不是,Qt 还是有一个老老实实走 QCoreApplication::postEvent 投递 QMetaCallEvent 的实现的。

那就是 QMetaObject::invokeMethod。

只是延迟执行功能就得自己造轮子了:

QMetaObject::invokeMethod(receiver, [timeout]{// 以下延时也可通过我前文封装的 WaitFor 函数实现auto start = std::chrono::steady_clock::now();std::chrono::milliseconds duration{timeout};while (std::chrono::steady_clock::now() < (start + duration)) {QCoreApplication::processEvents();}...}, Qt::QueuedConnection);

怎么说呢?放着 QMetaCallEvent 的正道不走,非要为了优雅玩花活,结果玩出了一个本不应该有的坑……

建议有异步延迟执行的需求时,老老实实走最正统的 QMetaObject::invokeMethod 吧,无非是封装个 WaitFor 方法,多写一行代码来延时罢了。

connect跨进程 qt_编写 Qt 跨线程异步调用器相关推荐

  1. android 跨进程 android:process,Android跨进程通信技术-多进程模式的运行机制

    本文为个人学习笔记分享,没有任何商业化行为,对其他文章的引用都会标记.如有侵权行为,请及时提醒更正!如需转载请表明出处 本文主要来源是 任玉刚大神的<Android开发艺术探索> 如果说用 ...

  2. android跨进程读写内存,Android 跨进程内存泄露

    内存泄露的检测和修复一直是每个APP的重点和难点,也有很多文章讲述了如何检测和修复.本篇文章 结合最近开发的项目遇到的实例,讲述下Android Binder导致的内存泄露的一个案例. 发现问题 参与 ...

  3. Java线程异步调用使用的最好的方式

    一.异步调用方式分析 今天在写代码的时候,想要调用异步的操作,这里我是用的java8的流式异步调用,但是使用过程中呢,发现这个异步方式有两个方法,如下所示: 区别是一个 需要指定线程池,一个不需要. ...

  4. Windows进程与线程学习笔记(九)—— 线程优先级/进程挂靠/跨进程读写

    Windows进程与线程学习笔记(九)-- 线程优先级/进程挂靠/跨进程读写 要点回顾 线程优先级 调度链表 分析 KiFindReadyThread 分析 KiSwapThread 总结 进程挂靠 ...

  5. Android跨进程通信Binder机制与AIDL实例

    文章目录 进程通信 1.1 进程空间划分 1.2 跨进程通信IPC 1.3 Linux跨进程通信 1.4 Android进程通信 Binder跨进程通信 2.1 Binder简介 2.2 Binder ...

  6. Android 跨进程通信大总结

    转载请标明出处:http://blog.csdn.net/zhaoyanjun6/article/details/111553746 本文出自[赵彦军的博客] 文章目录 1.Android进程 2.修 ...

  7. 再谈Android Binder跨进程通信原理

    在谈Android的跨进程通信问题上时,总会问到Android的IPC机制,是指两个进程之间进行数据交换的过程.按操作系统的中的描述,线程是CPU调度最小的单元,同时线程是一种有限的系统资源,而进程是 ...

  8. Android 跨进程通信基础

    2019独角兽企业重金招聘Python工程师标准>>> Android跨进程通信基础--Binder, BinderProxy, parcel, parcelable, Stub, ...

  9. Android开发之跨进程通信-广播跨进程实现方法(附源码)

    真的特别简单,简单概述下android的四大组件都可以跨进程. Activity,广播,服务,内容提供者都可以 先看下跨进程传递数据的效果图 下面是两个APP用于模拟跨进程 再看下跨进程效果,AIDL ...

最新文章

  1. This和Super关键字的对比
  2. Oracle编程入门经典 第3章 建立以及管理用户和表
  3. vbs获取cpu使用率
  4. html安装网卡驱动,如何手动安装无线网卡驱动,网卡驱动安装教程
  5. minSdkVersion = targetSdkVersion = compileSdkVersion
  6. 编程到底难在哪里? 从一个美国实习生的故事说起
  7. autowired_@Autowired所有的东西!
  8. 网络编程之 keepalive(zz)
  9. opencv画框显示python_Python OpenCV实现鼠标画框效果
  10. PostgreSQL 删除重复数据
  11. 电路分析基础知识点总结
  12. 内存超频时序怎么调_超频讲解:内存时序设置说明二
  13. 新浪微博开放平台提交审核时Android签名生成
  14. vue知识(四)生命周期、钩子函数、路由
  15. MAT400安全帽标签
  16. 【妄言之言】我的2016--困境与选择
  17. 2021/7/8——集训Day.3
  18. Vcastr 3.0 开源的在线FLV播放器
  19. Windows 7之BitLock To Go
  20. 穿过任意防火墙NAT的远程控制软件TeamViewer

热门文章

  1. 工作57:element格式化内容
  2. 前端学习(1684):前端系列实战课程之判断游戏结束
  3. 前端学习(574):margin无效情形之绝对定位下的非定义的方向“无效”
  4. spring mvc学习(42):restful的编辑功能实现
  5. CSS之创建等高列布局之二
  6. RS(2)--从文本数据到用户画像
  7. python转c报错no module named_python异常No module named 'win32com'
  8. 猜数字游戏python程序_python经典小程序:猜数字游戏
  9. Fibonacci数列(数列 取模)
  10. C++ 宏、范型和RTTI 浅析