女主宣言

由于工作需求,笔者最近在阅读 Pulsar C++ 客户端的实现,发现该客户端虽然是基于 C++11 编写的,但却自己编写了 Future 和 Promise 类,随着阅读的深入,也体会到了在这里”重复造轮子“的原因。本文将根据lib/Future.h中的源码,讲述C++中Future和Promise的一种简单实现~

PS:丰富的一线技术、多元化的表现形式,尽在“360云计算”,点关注哦!

1

前言

众所周知,C++11 提供了对并发编程的支持。首先提供了对不同平台的线程设施的简单包装 thread,并提供了 native_handle() 方法可以得到平台相关的线程句柄,从而调用底层线程相关的函数。另外,C++11 还提供了 future 和 promise 来支持基于任务的程序设计。

1

C++11的并发API

首先回顾下 C++11 的并发设施。

std::thread

如果熟悉 pthread 库的话,那么 C++11 的 std::thread 会非常容易上手,它使用了可变模板参数这一技术,使得编写线程函数不必麻烦地进行 void* 和 T* 的相互转换。

举个例子,要在线程中将两个 int 求和然后转换成十六进制字符串,得到结果。

使用 pthread 的代码:

#include #include #include using namespace std;struct Package {  int x, y;            // input  std::string result;  // output  void calculate() { result = std::to_string(x + y); }};int main(int argc, char* argv[]) {  pthread_t tid;  Package package{1, 3, ""};  pthread_create(      &tid, nullptr,      [](void* param) -> void* {  // 线程函数必须是 void* (void*)        static_cast(param)->calculate();        return nullptr;      },      &package);  // 传入 void* 作为输入参数  pthread_join(tid, nullptr);  // 第二个参数是 void**,如果非空的话可以得到线程函数返回的 void*  cout << "result: " << package.result << endl;  return 0;}

为了简化代码,这里忽略 pthread API 的返回值检查,以及 param 是否为空的检查。可见,pthread 需要对参数打包成一个结构体。

再看看 std::thread 的等价实现:

#include #include using namespace std;int main(int argc, char* argv[]) {  string result;  thread t([](int x, int y,              string& s) { s = to_string(x + y); },  // 函数签名是 T (Args...)           1, 3, ref(result));  t.join();  cout << "result: " << result << endl;  return 0;}

最大的改进是线程函数签名从底层的 void*(void*) 变成了返回任意类型、接收任意数量和类型的参数的 T(Args&&...)。注意传入引用时需要用 std::ref 将引用转换成可拷贝的某种结构。

如何得到返回值

问题来了,其实上面的实现并不直观,毕竟将计算结果作为返回值,比将计算结果的引用作为输入参数要更符合直观。

std::thread 直接就无法得到返回值。反而 pthread 线程函数可以返回的那个 void* 保存返回结果,比如:

#include #include #include using namespace std;struct Input {  int x, y;  std::string sum() const { return to_string(x + y); }};int main(int argc, char* argv[]) {  Input input{1, 3};  pthread_t tid;  pthread_create(      &tid, nullptr,      [](void* param) -> void* {        return new std::string(static_cast(param)->sum());      },      &input);  void* result;  pthread_join(tid, &result);  cout << "result: " << *static_cast<string*>(result) << endl;  // 注意:在某些编译器上 delete result 虽然也行得通,但 delete void* 的行为是未定义的行为(UB)  delete static_cast<string*>(result);  return 0;}

多了一次 new 以及一次拷贝构造的开销不谈,从代码上看,void* 和 T*的互相转换简直是噩梦一样,一不小心还容易内存泄漏。

std::future

std::future 很大程度上是用来解决 std::thread 无法得到返回值的问题的,比如基于 std::future 的等价代码为:

#include #include #include using namespace std;int main(int argc, char* argv[]) {  auto future_ = async([](int x, int y) { return to_string(x + y); }, 1, 3);  cout << "result: " << future_.get() << endl;  return 0;}

async 创建异步任务,get() 等待异步任务完成,并返回结果。

更多的细节就不讲了,比如其实的线程函数(姑且这么叫)是在主线程运行的(其实在主函数和线程函数中分别打印下 std::this_thread::get_id() 就能看出来),除非显式指定启动策略(std::launch)为 std::launch::async。

总之,std::future 是基于 std::thread 的进一步抽象,对于使用者而言,用起来更加方便。并且还提供了一些超时控制的方法,避免如果结果没计算出来的话 get() 会一直阻塞。

std::promise

std::future 的缺点就是和 std::thread 绑定了,无法使用线程池或者得到底层线程库的句柄设置线程属性。另一方面,有时候调用者得到某个结果后,线程函数还想继续执行一些事情。这时候就可以用到 std::promise 了。

#include #include #include #include #include using namespace std;int main(int argc, char* argv[]) {  string input;  cout << "input: ";  cin >> input;  promise<int> p;  std::thread t(      [&p](const string& s) {        try {          int number = stoi(s);          p.set_value(number);  // 在此之后,p.get_future().get() 就能够返回了          // 线程继续做其他事情          this_thread::sleep_for(chrono::seconds(1));          cout << "thread do something else..." << endl;        } catch (...) {          // 处理出错,此处的异常会由 p.get_future().get() 重新抛出          p.set_exception(current_exception());      },      input);  try {    cout << "result: " << p.get_future().get() << endl;  } catch (const std::exception& e) {    cout << "Failed to parse: " << input << " to integer: " << e.what() << endl;  }  t.join();  return 0;}

运行示例:

$ ./a.out input: 12result: 12thread do something else...$ ./a.out input: xxFailed to parse: xx to integer: stoi

可见 std::promise 与一个 std::future 相关联,用户可以在任何地方去为 std::promise 设置返回结果或者异常。一旦设置完成,绑定的 std::future 处于完成状态,即之后若调用 get() 要么返回某个结果,要么抛出异常。

至于 std::promise 在何处设置返回结果或者异常,可以非常灵活,可以主线程直接设置,也可以在 std::thread 的线程函数中设置,也可以在线程池一类设施里设置。

2

Pulsar C++ 客户端的Future和Promise

Future

不同于 std::future 仅提供单个模板参数 T 用来表示线程函数的返回值,Future 需要两个模板参数 Result 和 Type,内部唯一字段 state_ 也是通过这两个参数来实例化:

template <typename Result, typename Type>class Future {   public:    // ...   private:    std::shared_ptr > state_;    // ...};
template <typename Result, typename Type>struct InternalState {    std::mutex mutex;    std::condition_variable condition;    Result result;    Type value;    bool complete;        std::list<typename std::function<void(Result, const Type&)> > listeners;};

重点是 listeners。它是若干函数组成的列表,函数接收 Result 和 const Type& 作为参数,这里大致猜测为回调?比如执行异步任务,返回结果 result 后,和 Type 类型的 value 传入回调进行处理?

注意到这里 Result 是传值,Type 是传常引用,大概 Result 是类似错误码一样的不超过 8 字节(指针大小)的整型,而 Type 则可能是一个比较大的结构体?带着猜测继续阅读 Future 暴露的方法:

首先是 addListener:

    Future& addListener(ListenerCallback callback) {        // 获取 shared_ptr 内部指针,用锁来保护指针指向对象的访问        InternalState* state = state_.get();        // Lock 即 std::unique_lock<:mutex>,可以手动 unlock 解锁,也可以等析构时自动解锁        Lock lock(state->mutex);        // complete 置为 true 则代表任务完成,此时提前解锁并执行回调(因为不会再修改 state 指向对象的值)        if (state->complete) {            lock.unlock();            callback(state->result, state->value);        } else {            // 而任务未完成时则将其加入 listeners 中,前面的锁就是为了保证 listeners 添加回调函数的线程安全            state->listeners.push_back(callback);        }        return *this;    }

其实只是添加一个回调。返回自身的引用是为了支持链式调用。

有一点值得斟酌,这里使用了 shared_ptr 内部的裸指针,而不是直接访问 shared_ptr 本身。这会出现一种可能:如果在其他线程中, state_ 被指向了其他对象,那么原来的对象引用计数就变成 0,从而析构,此时裸指针 state 指向的就是无效对象。

但是这里的 state_ 是私有字段,而且除了构造函数外也没有其它地方会修改它指向的对象,所以这个问题是不会发生的。

然后是 get() 方法,和 std::future 一样,也就是阻塞直到任务完成。

    Result get(Type& result) {        InternalState* state = state_.get();        Lock lock(state->mutex);        if (!state->complete) {            // Wait for result            while (!state->complete) {                state->condition.wait(lock);            }        }        result = state->value;        return state->result;    }

整个 Pulsar C++ 客户端的代码风格是使用错误码而非异常来报告错误的,所以可以看到,这里返回值是 Result,而任务结果是 Type 类型,通过引用型入参来得到的。

用 condition 字段来等待结果被设置。这里用了条件变量的一个惯用法,就是 while 循环来等待唤醒。刚接触条件变量的人可能会疑惑为什么不直接一个 if 完事呢,像这样:

 if (!state->complete) {      // 阻塞直到 condition 被 notify 唤醒,只要唤醒者在 notify 之前将 complete 置为 true 即可      state->condition.wait(lock);  }

在 The Linux Progamming Interface(《Linux/Unix 系统编程手册》)第 30.2.3 节有讲到,条件变量被 wait 返回时,不能确定判断条件的状态,原因是:

  • 其他线程可能率先醒来,比如多个线程在等待同一条件变量,然后其他线程先被唤醒,而它们可能会重新改变条件的状态。

  • 设置宽松的判断条件或许更为简单。

  • 可能发生虚假唤醒的情况,也就是即使没有被 notify,也可能会被唤醒,在一些多处理器系统上,为确保高效实现可能会采用这种(不常见的)虚假唤醒实现。

总之,用 while 循环并不比 if 直接判断坏多少,最坏也就多一次 if 判断(这开销基本可以忽略),但是能覆盖一些 corner case。

Promise

在 Future::get 方法中,state_.condition 会等待唤醒。而 state_ 是私有的并且没有暴露出来(当然,构造 Future 时可以暂存一份 shared_ptr 手动唤醒)。那么谁来唤醒呢?注意到 Future 的构造函数是私有的,并且声明了友元类:

    template <typename U, typename V>    friend class Promise;
template <typename Result, typename Type>class Promise {   public:    Promise() : state_(std::make_shared >()) {}    // ...    Future getFuture() const { return Future(state_); }   private:    typedef std::function<void(Result, const Type&)> ListenerCallback;    std::shared_ptr > state_;}

从构造函数可见,Promise 可以和 Future 共享 InternalState。并且 ListenerCallback 类型就是列表 InternalState.listeners 持有的函数类型。getFuture 方法会创建 Future 对象,由于使用 shared_ptr,这些 Future 和当前 Promise 共享一个 InternalState。

因为 Future 的构造函数是私有的,所以只能由 Promise::getFuture 来构造。

Promise 提供 3 个公有方法:

  • bool isComplete() const:利用 mutex 字段上锁,返回 complete 字段的值,检查任务是否完成。

  • setValue(const Type& value):设置返回结果

  • setFailed(Result result):设置错误码

    bool setValue(const Type& value) {        InternalState* state = state_.get();        Lock lock(state->mutex);        // 若任务已完成,则设置失败,因此多线程设置任务返回值时,只有第一个能成功        if (state->complete) {            return false;        }        state->value = value;  // 设置任务返回值        state->result = Result();  // 设置错误码为默认值        state->complete = true;  // 标志任务已完成        // 遍历所有注册过的回调函数并依次调用        typename std::list::iterator it;        for (it = state->listeners.begin(); it != state->listeners.end(); ++it) {            ListenerCallback& callback = *it;            callback(state->result, state->value);        }        // 清空回调函数列表,唤醒所有阻塞在 get() 中的 Future        state->listeners.clear();        state->condition.notify_all();        return true;    }

setFailed 方法和 setValue 类似,唯一的不同就是对 InternalState 的 value 和 result 字段的设置:

 state->result = result;  // 仅仅设置错误码,不设置任务返回值

代码的一点瑕疵是,这里居然不用 range for 语法:

for (const auto& callback : state) {    callback(state->result, state->value);}

而是直接迭代器遍历 list,还写出了迭代器的长长类型而不用 auto。

PS:其实发现有不少地方都是这种 C++98 风格的,显式写出迭代器类型,然后用迭代器遍历容器。

3

重复造轮子?

至此整个实现就看完了,可以发现它只是实现了 C++11 的 std::future / std::promise 的一部分功能,它的使用套路是类似的,只不过它是基于错误码而非异常的(整个 Pulsar 客户端的错误处理风格也是基于错误码的)。

相比标准库而言,最大的特点就是支持注册回调函数,Promise 一旦设置了返回值或错误码,其绑定的 Future 中所有回调函数都会依次调用。这样就很方便实现一个支持异步和同步调用的类,比如将前文中使用 std::promise 的例子包装成 IntegerParser 类,然后提供同步和异步的接口:

enum class ErrorCode { kSuccess = 0, kInvalidFormat, kUserCallbackException };inline std::string strErrorCode(ErrorCode error) {  switch (error) {    case ErrorCode::kSuccess:      return "success";    case ErrorCode::kInvalidFormat:      return "invalid format";    case ErrorCode::kUserCallbackException:      return "exception from user's callback";  }  return "unknown error";  // 永远不会到达这里,仅仅是为了关闭编译警告}class IntegerParser { public:  // 异步 API,用户提供回调,注意,回调的参数2是 int 而非 const int&  Futureint> parseAsync(      const std::string& input, std::function<void(ErrorCode, int)> callback) {    std::thread t([&input, this] {      try {        int number = std::stoi(input);        promise_.setValue(number);      } catch (...) {        promise_.setFailed(ErrorCode::kInvalidFormat);      }    });    t.detach();    // 间接调用回调,并处理用户提供的回调可能抛出的异常    return promise_.getFuture().addListener(        [&callback](ErrorCode code, const int& result) {          try {            callback(code, result);          } catch (const std::exception& e) {            std::cerr << "[ERROR] User's callback throws: " << e.what()                      << std::endl;          }        });  }  // 同步 API,返回错误码,传入引用保存处理结果  ErrorCode parse(const std::string& input, int& result) {    auto parse_error = ErrorCode::kSuccess;    auto fut = parseAsync(input, [&parse_error](ErrorCode error, const int&) {      parse_error = error;    });    fut.get(result);    return parse_error;  } private:  Promiseint> promise_;};

360云计算

由360云平台团队打造的技术分享公众号,内容涉及数据库、大数据、微服务、容器、AIOps、IoT等众多技术领域,通过夯实的技术积累和丰富的一线实战经验,为你带来最有料的技术分享

中的listeners_C++中Future和Promise的一种简单实现相关推荐

  1. C++中Future和Promise的一种简单实现

    女主宣言 由于工作需求,笔者最近在阅读 Pulsar C++ 客户端的实现,发现该客户端虽然是基于 C++11 编写的,但却自己编写了 Future 和 Promise 类,随着阅读的深入,也体会到了 ...

  2. 视频中的水印如何去除?教你几种简单去除视频水印方法

    视频中的水印如何去除掉呢?如果我们经常观看视频,可能会注意到一些视频上有水印.水印是在视频中嵌入的品牌标志或文字.这些水印可能会影响视频的观感,去除水印可以帮助我们在学习和研究方面更有效地使用视频资源 ...

  3. word中使正文中的上标数字链接到参考文献

    在word中使用尾注的方法添加参考文献会改变文章原有的一些格式,而且操作起来非常麻烦.但我们有时需要把参考文献和正文中的数字链接起来,下边介绍一种简单可行的方法. 如下图所示,文档中有多个参考文献,并 ...

  4. netty中的future和promise源码分析(二)

    前面一篇netty中的future和promise源码分析(一)中对future进行了重点分析,接下来讲一讲promise. promise是可写的future,从future的分析中可以发现在其中没 ...

  5. C++并发编程之std::async(), std::future, std::promise, std::packaged_task

    c++11中增加了线程,使得我们可以非常方便的创建线程,它的基本用法是这样的: void f(int n); std::thread t(f, n + 1); t.join(); 但是线程毕竟是属于比 ...

  6. 给Future一个Promise

    对java开发者来说,经常需要在一个线程中另起一个线程来异步干其他事,就涉及到熟悉的Thread和Runnable.使用方式如下: System.out.println("Do someth ...

  7. Scala教程之:Future和Promise

    文章目录 定义返回Future的方法 阻塞方式获取Future的值 非阻塞方式获取Future的值 Future链 flatmap VS map Future.sequence() VS Future ...

  8. C++11多线程之future和promise

    std::future和promise的作用是在不同线程之间传递数据.使用指针也可以完成数据的传递,但是指针非常危险,因为互斥量不能阻止指针的访问:而且指针的方式传递的数据是固定的,如果更改数据类型, ...

  9. future promise java_第四章 Future和Promise

    Netty是一个异步网络处理框架,在实现中大量使用了Future机制,并在Java自带Future的基础上,增加了Promise机制.这两者的目的都是使异步编程更加方便使用.在阅读源码之前,我们需要对 ...

最新文章

  1. R语言return返回值的形式实战
  2. Java多线程面试题
  3. Java Code Examples for java.net.Authenticator
  4. 捉虫记 NullPointerException
  5. 使用display inline-block 布局时,出现的间距问题的解决办法和相关说明
  6. webpack4搭建vue
  7. 项目后台运行关闭_iOS到底有没有必要上滑强制关闭APP?
  8. python的功能及特点_使用Python这么多年,才发现Python还有这些实用的功能和特点...
  9. Java后台开发知识一览
  10. c语言入门手机自学软件,C语言入门学习
  11. 《一小时高效会议》纪要摘录----梁聪
  12. C++ Builder开发AutoCAD应用程序的方法
  13. Spark中Map和Json字符串相互转换
  14. LiveGBS国标流媒体-摄像头网络直播方案部署问题
  15. 35个Photoshop最强己付费扩展面板+自动修图PS插件
  16. Maya2022和C4D哪个更好用?
  17. tcp实时传输kafka数据_将物联网数据和MQTT消息流式传输到Apache Kafka
  18. 再获殊荣 用友U9 cloud荣获“2022中国制造业云ERP状元奖”
  19. 201612-3-炉石传说
  20. 传统数据库逐渐“难适应”,云原生数据库脱颖而出

热门文章

  1. android方块模拟器,方块进化模拟器
  2. 300页!2020年全网最新Java面试题(附答案)开放下载!超全!!
  3. 你知道高并发的性能测试怎么做吗?
  4. JAVA继承类phone_【Java基础】类-----继承
  5. StorageEvent
  6. Mybatis SQL 语句中 IF函数不支持
  7. Codeforeces Round #226 (Div. 2) E---Bear in the Field(矩阵快速幂)
  8. SSH远程登录原理与运用
  9. [转载]值得推荐的C/C++框架和库
  10. 【数据结构】trie树