女主宣言

由于工作需求,笔者最近在阅读 Pulsar C++ 客户端的实现,发现该客户端虽然是基于 C++11 编写的,但却自己编写了 Future 和 Promise 类,随着阅读的深入,也体会到了在这里”重复造轮子“的原因。本文将根据lib/Future.h中的源码,讲述C++中Future和Promise的一种简单实现~

PS:丰富的一线技术、多元化的表现形式,尽在“360云计算”,点关注哦!

1

前言

众所周知,C++11 提供了对并发编程的支持。首先提供了对不同平台的线程设施的简单包装 thread,并提供了 native_handle() 方法可以得到平台相关的线程句柄,从而调用底层线程相关的函数。另外,C++11 还提供了 future 和 promise 来支持基于任务的程序设计。

1

C++11的并发API

首先回顾下 C++11 的并发设施。

std::thread

如果熟悉 pthread 库的话,那么 C++11 的 std::thread 会非常容易上手,它使用了可变模板参数这一技术,使得编写线程函数不必麻烦地进行 void* 和 T* 的相互转换。

举个例子,要在线程中将两个 int 求和然后转换成十六进制字符串,得到结果。

使用 pthread 的代码:

#include <pthread.h>#include <iostream>
#include <string>
using namespace std;struct Package {int x, y;            // inputstd::string result;  // outputvoid calculate() { result = std::to_string(x + y); }
};int main(int argc, char* argv[]) {pthread_t tid;Package package{1, 3, ""};pthread_create(&tid, nullptr,[](void* param) -> void* {  // 线程函数必须是 void* (void*)static_cast<Package*>(param)->calculate();return nullptr;},&package);  // 传入 void* 作为输入参数pthread_join(tid, nullptr);  // 第二个参数是 void**,如果非空的话可以得到线程函数返回的 void*cout << "result: " << package.result << endl;return 0;
}

为了简化代码,这里忽略 pthread API 的返回值检查,以及 param 是否为空的检查。可见,pthread 需要对参数打包成一个结构体。

再看看 std::thread 的等价实现:

#include <string>
#include <thread>
using namespace std;int main(int argc, char* argv[]) {string result;thread t([](int x, int y,string& s) { s = to_string(x + y); },  // 函数签名是 T (Args...)1, 3, ref(result));t.join();cout << "result: " << result << endl;return 0;
}

最大的改进是线程函数签名从底层的 void*(void*) 变成了返回任意类型、接收任意数量和类型的参数的 T(Args&&...)。注意传入引用时需要用 std::ref 将引用转换成可拷贝的某种结构。

如何得到返回值

问题来了,其实上面的实现并不直观,毕竟将计算结果作为返回值,比将计算结果的引用作为输入参数要更符合直观。

std::thread 直接就无法得到返回值。反而 pthread 线程函数可以返回的那个 void* 保存返回结果,比如:

#include <pthread.h>#include <iostream>
#include <string>
using namespace std;struct Input {int x, y;std::string sum() const { return to_string(x + y); }
};int main(int argc, char* argv[]) {Input input{1, 3};pthread_t tid;pthread_create(&tid, nullptr,[](void* param) -> void* {return new std::string(static_cast<Input*>(param)->sum());},&input);void* result;pthread_join(tid, &result);cout << "result: " << *static_cast<string*>(result) << endl;// 注意:在某些编译器上 delete result 虽然也行得通,但 delete void* 的行为是未定义的行为(UB)delete static_cast<string*>(result);return 0;
}

多了一次 new 以及一次拷贝构造的开销不谈,从代码上看,void* 和 T*的互相转换简直是噩梦一样,一不小心还容易内存泄漏。

std::future

std::future 很大程度上是用来解决 std::thread 无法得到返回值的问题的,比如基于 std::future 的等价代码为:

#include <future>
#include <iostream>
#include <string>
using namespace std;int main(int argc, char* argv[]) {auto future_ = async([](int x, int y) { return to_string(x + y); }, 1, 3);cout << "result: " << future_.get() << endl;return 0;
}

async 创建异步任务,get() 等待异步任务完成,并返回结果。

更多的细节就不讲了,比如其实的线程函数(姑且这么叫)是在主线程运行的(其实在主函数和线程函数中分别打印下 std::this_thread::get_id() 就能看出来),除非显式指定启动策略(std::launch)为 std::launch::async。

总之,std::future 是基于 std::thread 的进一步抽象,对于使用者而言,用起来更加方便。并且还提供了一些超时控制的方法,避免如果结果没计算出来的话 get() 会一直阻塞。

std::promise

std::future 的缺点就是和 std::thread 绑定了,无法使用线程池或者得到底层线程库的句柄设置线程属性。另一方面,有时候调用者得到某个结果后,线程函数还想继续执行一些事情。这时候就可以用到 std::promise 了。

#include <chrono>
#include <future>
#include <iostream>
#include <string>
#include <thread>
using namespace std;int main(int argc, char* argv[]) {string input;cout << "input: ";cin >> input;promise<int> p;std::thread t([&p](const string& s) {try {int number = stoi(s);p.set_value(number);  // 在此之后,p.get_future().get() 就能够返回了// 线程继续做其他事情this_thread::sleep_for(chrono::seconds(1));cout << "thread do something else..." << endl;} catch (...) {// 处理出错,此处的异常会由 p.get_future().get() 重新抛出p.set_exception(current_exception());},input);try {cout << "result: " << p.get_future().get() << endl;} catch (const std::exception& e) {cout << "Failed to parse: " << input << " to integer: " << e.what() << endl;}t.join();return 0;
}

运行示例:

$ ./a.out
input: 12
result: 12
thread do something else...
$ ./a.out
input: xx
Failed to parse: xx to integer: stoi

可见 std::promise 与一个 std::future 相关联,用户可以在任何地方去为 std::promise 设置返回结果或者异常。一旦设置完成,绑定的 std::future 处于完成状态,即之后若调用 get() 要么返回某个结果,要么抛出异常。

至于 std::promise 在何处设置返回结果或者异常,可以非常灵活,可以主线程直接设置,也可以在 std::thread 的线程函数中设置,也可以在线程池一类设施里设置。

2

Pulsar C++ 客户端的Future和Promise

Future

不同于 std::future<T> 仅提供单个模板参数 T 用来表示线程函数的返回值,Future 需要两个模板参数 Result 和 Type,内部唯一字段 state_ 也是通过这两个参数来实例化:

template <typename Result, typename Type>
class Future {public:// ...private:std::shared_ptr<InternalState<Result, Type> > state_;// ...
};
template <typename Result, typename Type>
struct InternalState {std::mutex mutex;std::condition_variable condition;Result result;Type value;bool complete;std::list<typename std::function<void(Result, const Type&)> > listeners;
};

重点是 listeners。它是若干函数组成的列表,函数接收 Result 和 const Type& 作为参数,这里大致猜测为回调?比如执行异步任务,返回结果 result 后,和 Type 类型的 value 传入回调进行处理?

注意到这里 Result 是传值,Type 是传常引用,大概 Result 是类似错误码一样的不超过 8 字节(指针大小)的整型,而 Type 则可能是一个比较大的结构体?带着猜测继续阅读 Future 暴露的方法:

首先是 addListener:

    Future& addListener(ListenerCallback callback) {// 获取 shared_ptr 内部指针,用锁来保护指针指向对象的访问InternalState<Result, Type>* state = state_.get();// Lock 即 std::unique_lock<std::mutex>,可以手动 unlock 解锁,也可以等析构时自动解锁Lock lock(state->mutex);// complete 置为 true 则代表任务完成,此时提前解锁并执行回调(因为不会再修改 state 指向对象的值)if (state->complete) {lock.unlock();callback(state->result, state->value);} else {// 而任务未完成时则将其加入 listeners 中,前面的锁就是为了保证 listeners 添加回调函数的线程安全state->listeners.push_back(callback);}return *this;}

其实只是添加一个回调。返回自身的引用是为了支持链式调用。

有一点值得斟酌,这里使用了 shared_ptr 内部的裸指针,而不是直接访问 shared_ptr 本身。这会出现一种可能:如果在其他线程中, state_ 被指向了其他对象,那么原来的对象引用计数就变成 0,从而析构,此时裸指针 state 指向的就是无效对象。

但是这里的 state_ 是私有字段,而且除了构造函数外也没有其它地方会修改它指向的对象,所以这个问题是不会发生的。

然后是 get() 方法,和 std::future 一样,也就是阻塞直到任务完成。

    Result get(Type& result) {InternalState<Result, Type>* state = state_.get();Lock lock(state->mutex);if (!state->complete) {// Wait for resultwhile (!state->complete) {state->condition.wait(lock);}}result = state->value;return state->result;}

整个 Pulsar C++ 客户端的代码风格是使用错误码而非异常来报告错误的,所以可以看到,这里返回值是 Result,而任务结果是 Type 类型,通过引用型入参来得到的。

用 condition 字段来等待结果被设置。这里用了条件变量的一个惯用法,就是 while 循环来等待唤醒。刚接触条件变量的人可能会疑惑为什么不直接一个 if 完事呢,像这样:

 if (!state->complete) {// 阻塞直到 condition 被 notify 唤醒,只要唤醒者在 notify 之前将 complete 置为 true 即可state->condition.wait(lock);}

在 The Linux Progamming Interface(《Linux/Unix 系统编程手册》)第 30.2.3 节有讲到,条件变量被 wait 返回时,不能确定判断条件的状态,原因是:

  • 其他线程可能率先醒来,比如多个线程在等待同一条件变量,然后其他线程先被唤醒,而它们可能会重新改变条件的状态。

  • 设置宽松的判断条件或许更为简单。

  • 可能发生虚假唤醒的情况,也就是即使没有被 notify,也可能会被唤醒,在一些多处理器系统上,为确保高效实现可能会采用这种(不常见的)虚假唤醒实现。

总之,用 while 循环并不比 if 直接判断坏多少,最坏也就多一次 if 判断(这开销基本可以忽略),但是能覆盖一些 corner case。

Promise

在 Future::get 方法中,state_.condition 会等待唤醒。而 state_ 是私有的并且没有暴露出来(当然,构造 Future 时可以暂存一份 shared_ptr 手动唤醒)。那么谁来唤醒呢?注意到 Future 的构造函数是私有的,并且声明了友元类:

    template <typename U, typename V>friend class Promise;
template <typename Result, typename Type>
class Promise {public:Promise() : state_(std::make_shared<InternalState<Result, Type> >()) {}// ...Future<Result, Type> getFuture() const { return Future<Result, Type>(state_); }private:typedef std::function<void(Result, const Type&)> ListenerCallback;std::shared_ptr<InternalState<Result, Type> > state_;
}

从构造函数可见,Promise 可以和 Future 共享 InternalState。并且 ListenerCallback 类型就是列表 InternalState.listeners 持有的函数类型。getFuture 方法会创建 Future 对象,由于使用 shared_ptr,这些 Future 和当前 Promise 共享一个 InternalState。

因为 Future 的构造函数是私有的,所以只能由 Promise::getFuture 来构造。

Promise 提供 3 个公有方法:

  • bool isComplete() const:利用 mutex 字段上锁,返回 complete 字段的值,检查任务是否完成。

  • setValue(const Type& value):设置返回结果

  • setFailed(Result result):设置错误码

    bool setValue(const Type& value) {InternalState<Result, Type>* state = state_.get();Lock lock(state->mutex);// 若任务已完成,则设置失败,因此多线程设置任务返回值时,只有第一个能成功if (state->complete) {return false;}state->value = value;  // 设置任务返回值state->result = Result();  // 设置错误码为默认值state->complete = true;  // 标志任务已完成// 遍历所有注册过的回调函数并依次调用typename std::list<ListenerCallback>::iterator it;for (it = state->listeners.begin(); it != state->listeners.end(); ++it) {ListenerCallback& callback = *it;callback(state->result, state->value);}// 清空回调函数列表,唤醒所有阻塞在 get() 中的 Futurestate->listeners.clear();state->condition.notify_all();return true;}

setFailed 方法和 setValue 类似,唯一的不同就是对 InternalState 的 value 和 result 字段的设置:

 state->result = result;  // 仅仅设置错误码,不设置任务返回值

代码的一点瑕疵是,这里居然不用 range for 语法:

for (const auto& callback : state) {callback(state->result, state->value);
}

而是直接迭代器遍历 list,还写出了迭代器的长长类型而不用 auto。

PS:其实发现有不少地方都是这种 C++98 风格的,显式写出迭代器类型,然后用迭代器遍历容器。

3

重复造轮子?

至此整个实现就看完了,可以发现它只是实现了 C++11 的 std::future / std::promise 的一部分功能,它的使用套路是类似的,只不过它是基于错误码而非异常的(整个 Pulsar 客户端的错误处理风格也是基于错误码的)。

相比标准库而言,最大的特点就是支持注册回调函数,Promise 一旦设置了返回值或错误码,其绑定的 Future 中所有回调函数都会依次调用。这样就很方便实现一个支持异步和同步调用的类,比如将前文中使用 std::promise 的例子包装成 IntegerParser 类,然后提供同步和异步的接口:

enum class ErrorCode { kSuccess = 0, kInvalidFormat, kUserCallbackException };inline std::string strErrorCode(ErrorCode error) {switch (error) {case ErrorCode::kSuccess:return "success";case ErrorCode::kInvalidFormat:return "invalid format";case ErrorCode::kUserCallbackException:return "exception from user's callback";}return "unknown error";  // 永远不会到达这里,仅仅是为了关闭编译警告
}class IntegerParser {public:// 异步 API,用户提供回调,注意,回调的参数2是 int 而非 const int&Future<ErrorCode, int> parseAsync(const std::string& input, std::function<void(ErrorCode, int)> callback) {std::thread t([&input, this] {try {int number = std::stoi(input);promise_.setValue(number);} catch (...) {promise_.setFailed(ErrorCode::kInvalidFormat);}});t.detach();// 间接调用回调,并处理用户提供的回调可能抛出的异常return promise_.getFuture().addListener([&callback](ErrorCode code, const int& result) {try {callback(code, result);} catch (const std::exception& e) {std::cerr << "[ERROR] User's callback throws: " << e.what()<< std::endl;}});}// 同步 API,返回错误码,传入引用保存处理结果ErrorCode parse(const std::string& input, int& result) {auto parse_error = ErrorCode::kSuccess;auto fut = parseAsync(input, [&parse_error](ErrorCode error, const int&) {parse_error = error;});fut.get(result);return parse_error;}private:Promise<ErrorCode, int> promise_;
};

360云计算

由360云平台团队打造的技术分享公众号,内容涉及数据库、大数据、微服务、容器、AIOps、IoT等众多技术领域,通过夯实的技术积累和丰富的一线实战经验,为你带来最有料的技术分享

C++中Future和Promise的一种简单实现相关推荐

  1. 中的listeners_C++中Future和Promise的一种简单实现

    女主宣言 由于工作需求,笔者最近在阅读 Pulsar C++ 客户端的实现,发现该客户端虽然是基于 C++11 编写的,但却自己编写了 Future 和 Promise 类,随着阅读的深入,也体会到了 ...

  2. 在 Linux 中查找 IP 地址的 3 种简单方法

    在 Linux 系统中,经常需要查找 IP 地址以进行网络配置.故障排除或安全管理.无论是查找本地主机的 IP 地址还是查找其他设备的 IP 地址,本文将介绍三种简单的方法,帮助你在 Linux 中轻 ...

  3. 如何从WordPress帖子中删除作者姓名(2种简单方法)

    Do you want to remove the author name from your WordPress blog posts? Normally, blog posts are suppo ...

  4. xsl调用java方法传参_Java中的XSL转换:一种简单的方法

    xsl调用java方法传参 XSL转换 (XSLT)是将一个XML文档转换为另一个XML文档的强大机制. 但是,在Java中,XML操作相当冗长和复杂. 即使是简单的XSL转换,也必须编写几十行代码- ...

  5. Java中的XSL转换:一种简单的方法

    XSL转换 (XSLT)是将一个XML文档转换为另一个XML文档的强大机制. 但是,在Java中,XML操作相当冗长和复杂. 即使是简单的XSL转换,也必须编写几十行代码-如果需要适当的异常处理和日志 ...

  6. 如何在WordPress中删除谷歌字体(2种简单方法)

    许多WordPress主题都会用Google字体,然而,Google字体并不存储在网站本地,它是一个第三方资源,这会影响网站的加载速度,让网站变慢,尤其当你的网站面向的是国内用户时,更需要禁用Goog ...

  7. Simulink中进行电容充放电的一种简单仿真

    https://jingyan.baidu.com/article/5d368d1ea1f2e03f60c057db.html

  8. netty中的future和promise源码分析(二)

    前面一篇netty中的future和promise源码分析(一)中对future进行了重点分析,接下来讲一讲promise. promise是可写的future,从future的分析中可以发现在其中没 ...

  9. C++并发编程之std::async(), std::future, std::promise, std::packaged_task

    c++11中增加了线程,使得我们可以非常方便的创建线程,它的基本用法是这样的: void f(int n); std::thread t(f, n + 1); t.join(); 但是线程毕竟是属于比 ...

最新文章

  1. Sentinel: 分布式系统的流量防卫兵 1
  2. 记录一些使用git过程中的bug
  3. Mybatis基于XML配置SQL映射器(一)
  4. java线程池 synchronized_java多线程学习(二) 之 synchronized
  5. itertools chain
  6. html怎么给表格加a链接地址,html基础02-图片标签、绝/相对地址、表格的属性、链接的属性及链接的分类、name定义锚点的名称、编码...
  7. python生活中的小问题_python日常注意小知识集锦
  8. 博途v14电脑要求_Win10运行不了博途V14怎么办?
  9. Daemontools和Supervisor管理linux常驻进程
  10. 计算机和小学科课题,《小学信息技术课堂有效教学的探索》课题研究方案
  11. 360安全助手 -- 强力卸载电脑上的软件 的问题
  12. 如何查看电脑上曾记录的账号密码
  13. 入门Web前端开发需要学习哪些技术?薪资高吗?
  14. JavaScript in_array 函数
  15. 关于通过Date.getTime()得到1970年01月1日0点零分问题验证
  16. 基于 Java8 的国产开源 IoT 企业级物联网平台
  17. [C/C++语言基础] —函数
  18. TR200 ASSSD跑分测试
  19. 平安普惠借款费用占本金一半 被诉未起告知义务、存代签情况
  20. 人工智能 漆桂林_东南大学计算机科学与工程学院硕导介绍:漆桂林

热门文章

  1. 诗与远方:无题(六十五)- 杂诗
  2. Spring Boot第一个简单返回html页面的程序
  3. MongoDB学习之在Windows下安装MongoDB
  4. BitMap-BitSet(JDK1.8)基本使用入门
  5. php5.3 gd库,php5.3动态编译gd库 zlib扩展 mcrypt扩展 mysqli扩展
  6. nginx 网络模型,cpu亲和等优点
  7. [Azure][PowerShell][ASM][12]ACL
  8. 用 Anaconda 完美解决 Python2 和 python3 共存问题
  9. phalcon: Profiling分析 profilter / Plugin结合,dispatcher调度控制器 监听sql执行日志
  10. STM32中的位带(bit-band)操作