vs2019 C++20 latch

  • 01 latch
  • 02 latch的一个实现

01 latch

<latch> 闩:单次使用的线程屏障。
latch 是 ptrdiff_t 类型的向下计数器,它能用于同步线程。在创建时初始化计数器的值。

线程可能在 latch 上阻塞直至计数器减少到零。没有可能增加或重置计数器,这使得 latch 为单次使用的屏障。

同时调用 latch 的成员函数,除了析构函数,不引入数据竞争。

不同于 std::barrier ,参与线程能减少 std::latch 多于一次。1

方法 作用
latch 不可赋值
count_down 以不阻塞的方式减少计数器
try_wait 测试内部计数器是否等于零
wait 阻塞直至计数器抵达零
arrive_and_wait 减少计数器并阻塞直至它抵达零
max [静态]实现所支持的计数器最大值

latch的参考头文件 std::latch2

namespace std {class latch {public:static constexpr ptrdiff_t max() noexcept;constexpr explicit latch(ptrdiff_t expected);~latch();latch(const latch&) = delete;latch& operator=(const latch&) = delete;void count_down(ptrdiff_t update = 1);bool try_wait() const noexcept;void wait() const;void arrive_and_wait(ptrdiff_t update = 1);private:ptrdiff_t counter;  // 仅用于阐释};

02 latch的一个实现


这个latch在windows上面的实现重点使用了 WakeByAddressAll,WaitOnAddress,InterlockedAdd,InterlockedAdd64四个api。

#pragma once
#include <cstddef>namespace std {
/*** @defgroup thread.coord* Concepts related to thread coordination, and defines the coordination types `latch` and `barrier`.* These types facilitate concurrent computation performed by a number of threads.*//*** @brief Allows any number of threads to block until an expected number of threads arrive at the latch* @ingroup thread.coord** A `latch` is a thread coordination mechanism that allows any number of threads* to block until an expected number of threads arrive at the `latch`* (via the `count_down` function).** The expected count is set when the `latch` is created.* An individual `latch` is a single-use object;* once the expected count has been reached, the `latch` cannot be reused.** @see N4835, 1571~1572p*/
class latch {public:/*** @brief upper limit on the value of `expected` for constructor of `latch`* @return ptrdiff_t    The maximum value of counter that the implementation supports* @see http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2019/p1865r1.html* @see /proc/sys/kernel/threads-max*/static constexpr ptrdiff_t max() noexcept {return 32;}public:/*** @brief   Initialize `counter` with `expected`* @param   expected* @pre     `expected >= 0` is true*/constexpr explicit latch(ptrdiff_t expected) noexcept : counter{expected} {}/*** Concurrent invocations of the member functions of `latch` other than its destructor,* do not introduce data races*/~latch() = default;latch(const latch&) = delete;latch& operator=(const latch&) = delete;/*** **Synchronization**:* Strongly happens before the returns from all calls that are unblocked.** **Error Conditions**:* Any of the error conditions allowed for mutex types (** @param   update* @pre     `update >= 0` is true, and `update <= counter` is true* @post    Atomically decreses `counter` by `update`.*          If `counter` is equal to zero, unblocks all threads blocked on `*this`* @throw   system_error*/void count_down(ptrdiff_t update = 1) noexcept(false);/*** @return true     `counter` equals zero* @return false    Very low probability of failure from system call*/bool try_wait() const noexcept;/*** If `counter` equals zero, returns immediately.* Otherwise, blocks on `*this` until a call to `count_down` that decrements `counter` to zero** @throw   system_error*/void wait() const noexcept(false);/*** @param   update  input for `count_down`* @see count_down* @see wait*/void arrive_and_wait(ptrdiff_t update = 1) noexcept(false);private:/*** @brief A latch maintains an internal counter** A latch maintains an internal counter that is initialized when the latch is created* Threads can block on the latch object, waiting for counter to be decremented to zero.*/ptrdiff_t counter;
} // namespace std


#include "latch.h"#include <atomic>
#include <system_error>
#include <type_traits>
// clang-format off#include <Windows.h>
#include <synchapi.h>
// clang-format onnamespace std {static_assert(is_copy_assignable_v<latch> == false);
static_assert(is_copy_constructible_v<latch> == false);void latch::arrive_and_wait(ptrdiff_t update) noexcept(false) {this->count_down(update);this->wait();
}void latch::count_down(ptrdiff_t update) noexcept(false) {static_assert(is_same_v<ptrdiff_t, LONG64> || is_same_v<ptrdiff_t, LONG>);if (counter < update)throw system_error{EINVAL, system_category(),"update is greater than counter"};// if not lock-free, rely on InterLocked operationif constexpr (atomic<ptrdiff_t>::is_always_lock_free) {counter -= update;} else if constexpr (is_same_v<ptrdiff_t, LONG>) {InterlockedAdd(reinterpret_cast<LONG*>(&counter),static_cast<LONG>(-update));} else if constexpr (is_same_v<ptrdiff_t, LONG64>) {InterlockedAdd64(reinterpret_cast<LONG64*>(&counter),static_cast<LONG64>(-update));}// counter reached zeroif (counter == 0)WakeByAddressAll(&counter);
}bool latch::try_wait() const noexcept {// if counter equals zero, returns immediatelyif (counter == 0)return true;// blocks on `*this` until a call to count_down that decrements counter to zeroptrdiff_t captured = counter;if (WaitOnAddress(const_cast<ptrdiff_t*>(&counter), &captured,sizeof(ptrdiff_t), INFINITE))return counter == 0;// caller can check `GetLastError` for this casereturn false;
}void latch::wait() const noexcept(false) {while (try_wait() == false) {// case: error from WaitOnAddressif (const auto ec = GetLastError())throw system_error{static_cast<int>(ec), system_category(),"WaitOnAddress"};// case: counter != 0. retry// ...}
}} // namespace std

  1. std::latch ↩︎

  2. 标准库头文件<latch> ↩︎

