线程

创建线程

线程std::thread对象表示一个可执行单元。当工作包是可调用单元时,工作包可以立即启动。线程对象是不可复制构造或复制赋值的,但可移动构造或移动赋值。

可调用单元是行为类似于函数。当然,它可以是一个函数,也可以是一个函数对象,或者一个Lambda表达式。通常忽略可调用单元的返回值。

介绍完理论知识之后,我们来动手写个小例子。

// createThread.cpp
#include <thread>
#include <iostream>void helloFunction()
{std::cout<<"Hello from a function."<<std::endl;
}class HelloFunctionObject
{public:void operator()()const{std::cout<<"Hello from a function object."<<std::endl;}
};int main()
{std::cout<<std::endl;std::thread t1(HelloFunction);HelloFunctionObject helloFunctionObject;std::thread t2(helloFunctionObject);std::thread t3([]{std::cout<<"Hello from a lambda."<<std::endl;});t1.join();t2.join();t3.join();std::cout<<std::endl;
}

三个线程(t1t2t3)都会将信息写入控制台。线程t2的工作包是一个函数对象(HelloFunctionObject),线程t3的工作包是一个Lambda函数。 t1.join(); t2.join(); t3.join();主线程在等待子线程完成工作。

线程的创建者(例子中是主线程)负责管理线程的生命周期,所以让我们来了解一下线程的生命周期。

线程的生命周期

父母需要照顾自己的孩子,这个简单的原则对线程的生命周期非常重要。下面的程序(子线程最后没有汇入),用来显示线程ID。

#include <iostream>
#include <thread>
int main()
{std::thread t([] {std::cout << std::this_thread::get_id() << std::endl; });
}

那是什么原因引起的异常呢?

join和detach

线程t的生命周期终止于可调用单元执行结束,而创建者有两个选择:

  1. 等待线程完成: t.join()
  2. 与创建线程解除关系:t.detach()

当后续代码依赖于线程中调用单元的计算结果时,需要使用t.join()t.detach()允许线程与创建线程分离执行,所以分离线程的生命周期与可执行文件的运行周期相关。通常,服务器上长时间运行的后台服务,会使用分离线程。

如果t.join()t.detach()都没有执行,可汇入线程的析构函数会抛出std::terminate异常,这也就是threadWithoutJoin.cpp程序产生异常的原因。如果在线程上多次调用t.join()t.detach(),则会产生std::system_error异常。

解决问题的方法很简单:使用t.join()

#include <iostream>
#include <thread>
int main()
{std::thread t([] {std::cout << std::this_thread::get_id() << std::endl; });t.join();
}

线程ID是std::thread唯一的标识符。

分离线程的挑战

当然,可以在最后一个程序中使用t.detach()代替t.join()。这样,线程t不能汇入了;这个类应该在其析构函数中自动调用t.join(),也可以反过来调用t.detach(),但分离处理也有问题。Anthony Williams提出了这样一个类,并在他的优秀著作《C++ Concurrency in Action》中介绍了它。他将包装器称为scoped_threadscoped_thread在构造函数中获取了线程对象,并检查线程对象是否可汇入。如果传递给构造函数的线程对象不可汇入,则不需要scoped_thread。如果线程对象可汇入,则析构函数调用t.join()。因为,复制构造函数和复制赋值操作符被声明为delete,所以scoped_thread的实例不能复制或赋值。

线程参数

和函数一样,线程可以通过复制、移动或引用来获取参数。std::thread是一个可变参数模板,可以传入任意数量的参数。

线程通过引用的方式获取数据的情况,必须非常小心参数的生命周期和数据的共享方式。

复制或引用

我们来看一个代码段。

#include <iostream>
#include <thread>
int main()
{std::string s{"C++11"};std::thread t1([=]{ std::cout << s << std::endl; });t1.join();std::thread t2([&]{ std::cout << s << std::endl; });t2.detach();
}

线程t2不是通过引用获取其参数,而是Lambda表达式通过引用捕获的参数。如果需要引用将参数传递给线程,则必须将其包装在引用包装器中,使用std::ref就能完成这项任务。std::ref<functional>头文件中定义。

<functional>
...
void transferMoney(int amount, Account& from, Account& to){...
}
...
std::thread thr1(transferMoney, 50, std::ref(account1), std::ref(account2));

线程thr1执行transferMoney函数。transferMoney的参数是使用引用的方式传递,所以线程thr1通过引用获取account1account2

这几行代码中隐藏着什么问题呢?线程t2通过引用获取其字符串s,然后从其创建者的生命周期中分离。字符串s与创建者的生存期周期绑定,全局对象std::cout与主线程的生存周期绑定。因此,std::cout的生存周期可能比线程t2的生存周期短。现在,我们已经置身于未定义行为中了。

不相信?来看看未定义行为是什么样的。

// threadArguments.cpp#include <chrono>
#include <iostream>
#include <thread>
class Sleeper
{public:Sleeper(int &i_) : i{ i_ } {};void operator()(int k){for (unsigned int j = 0; j <= 5; ++j){std::this_thread::sleep_for(std::chrono::microseconds(100));i += k;}std::cout << std::this_thread::get_id() << std::endl;}
private:int &i;
};
int main()
{std::cout << std::endl;int valSleepr = 1000;std::thread t(Sleeper(valSleepr), 5);t.detach();std::cout << "valSleeper = " << valSleepr << std::endl;std::cout << std::endl;
}

问题在于:valSleeperstd::cout << "valSleeper = " << valSleeper << std::endl;时值是多少?valSleeper是一个全局变量。线程t获得一个函数对象,该函数对象的实参为变量valSleeper和数字5(std::thread t(Sleeper(valSleeper), 5);),而线程通过引用获得valSleeper(Sleeper(int& i_): i{i_}{};),并与主线程(t.detach();)分离。接下来,执行函数对象的调用操作符,它从0计数到5,在每100毫秒的中休眠,将k加到i上。最后,屏幕上显示它的id。Nach Adam Riese (德国成语:真是精准的计算呀!),期望的结果应该是1000 + 6 * 5 = 1030。

然而,发生了什么?结果为什么完全不对?

这个输出有两个奇怪的地方:首先,valSleeper是1000;其次,ID没有显示。

这段程序至少有两个错误:

  1. valSleeper是线程共享的。这会导致数据竞争,因为线程可能同时读写valSleeper
  2. 主线程的生命周期很可能在子线程执行计算,或将其ID写入std::cout之前结束。

这两个问题都是构成竞态条件,因为程序的结果取决于操作的交错。构成竞态的条件也是导致数据竞争的原因。

解决数据竞争也非常容易:使用锁或原子保护valSleeper。为了解决valSleeperstd::cout的生命周期问题,必须汇入线程而不是分离它。

修改后的主函数体。

int main()
{  std::cout << std::endl; int valSleeper= 1000;std::thread t(Sleeper(valSleeper),5);t.join();std::cout << "valSleeper = " << valSleeper << std::endl;std::cout << std::endl;
}

成员函数

下面是std::thread的接口,在一个简洁的表中

函数名称 描述
t.join() 等待,直到线程t完成
t.detach() 独立于创建者执行创建的线程t
t.joinable() 如果线程t可以汇入,则返回true
t.get_id()std::this_thread::get_id() 返回线程的ID
std::thread::hardware_concurrency() 返回可以并发运行的线程数
std::this_thread::sleep_until(absTime) 将线程t置为睡眠状态,直到absTime时间点为止
std::this_thread::sleep_for(relTime) 将线程t置为睡眠状态,直到休眠了relTime为止
std::this_thread::yield() 允许系统运行另一个线程
t.swap(t2)std::swap(t1, t2) 交换线程对象

静态函数std::thread::hardware_concurrency返回实现支持的并发线程数量,如果运行时无法确定数量,则返回0(这是根据C++标准编写的)。sleep_untilsleep_for操作需要一个时间点或持续时间作为参数。

共享数据

为了更清楚地说明这一点,就需要考虑共享数据的同步问题,因为数据竞争很容易在共享数据上发生。如果并发地对数据进行非同步读写访问,则会产生未定义行为。

验证并发、未同步的读写操作的最简单方法,就是向std::cout写入一些内容。

让我们来看一下,使用不同步的方式进行std::cout打印输出。

// coutUnsynchronised.cpp#include <chrono>
#include <iostream>
#include <thread>class Worker
{public:Worker(std::string n) : name(n) {}void operator()(){for (int i = 1; i <= 3; ++i){// begin workstd::this_thread::sleep_for(std::chrono::microseconds(200));// end workstd::cout << name << ": " << "Work " << i << " done !!!" << std::endl;}}
private:std::string name;
};int main()
{std::cout << std::endl;std::cout << "Boss: Let's start working.\n\n";std::thread herb = std::thread(Worker("Herb"));std::thread andrei = std::thread(Worker(" Andrei"));std::thread scott = std::thread(Worker("  Scott"));std::thread bjarne = std::thread(Worker("   Bjarne"));std::thread bart = std::thread(Worker("    Bart"));std::thread jenne = std::thread(Worker("     Jenne"));herb.join();andrei.join();scott.join();bjarne.join();bart.join();jenne.join();std::cout << "\n" << "Boss: Let's go home." << std::endl;std::cout << std::endl;
}

该程序描述了一个工作流程:老板有六个员工(第29 - 34行),每个员工必须处理3个工作包,处理每个工作包需要200毫秒(第13行)。当员工完成了他的所有工作包时,他向老板报告(第15行)。当老板收到所有员工的报告,老板就会把员工们送回家(第43行)。

这么简单的工作流程,输出却如此混乱。

互斥量

Mutex是互斥(mutual exclusion)的意思,它确保在任何时候只有一个线程可以访问临界区。

通过使用互斥量,工作流程的混乱变的和谐许多。

// coutSynchronised.cpp#include <chrono>
#include <iostream>
#include <mutex>
#include <thread>std::mutex coutMutex;class Worker
{public:Worker(std::string n) : name(n) {}void operator()(){for (int i = 1; i <= 3; ++i){// begin workstd::this_thread::sleep_for(std::chrono::microseconds(200));// end workcoutMutex.lock();std::cout << name << ": " << "Work " << i << " done !!!" << std::endl;coutMutex.unlock();}}
private:std::string name;
};int main()
{std::cout << std::endl;std::cout << "Boss: Let's start working.\n\n";std::thread herb = std::thread(Worker("Herb"));std::thread andrei = std::thread(Worker(" Andrei"));std::thread scott = std::thread(Worker("  Scott"));std::thread bjarne = std::thread(Worker("   Bjarne"));std::thread bart = std::thread(Worker("    Bart"));std::thread jenne = std::thread(Worker("     Jenne"));herb.join();andrei.join();scott.join();bjarne.join();bart.join();jenne.join();std::cout << "\n" << "Boss: Let's go home." << std::endl;std::cout << std::endl;
}

C++11有4个不同的互斥量,可以递归地、暂时地锁定,并且不受时间限制。

成员函数 mutex recursive_mutex timed_mutex recursive_timed_mutex
m.lock yes yes yes yes
m.try_lock yes yes yes yes
m.try_lock_for yes yes
m.try_lock_until yes yes
m.unlock yes yes yes yes

递归互斥量允许同一个线程多次锁定互斥锁。互斥量保持锁定状态,直到解锁次数与锁定次数相等。可以锁定互斥量的最大次数默认并未指定,当达到最大值时,会抛出std::system_error异常。

C++14中有std::shared_timed_mutex,C++17中有std::shared_mutexstd::shared_mutexstd::shared_timed_mutex非常相似,使用的锁可以是互斥或共享的。另外,使用std::shared_timed_mutex可以指定时间点或时间段进行锁定。

成员函数 shared_timed_mutex shared_mutex
m.lock yes yes
m.try_lock yes yes
m.try_lock_for yes
m.try_lock_until yes
m.unlock yes yes
m.lock_shared yes yes
m.try_lock_shared yes yes
m.try_lock_shared_for yes
m.try_lock_shared_until yes
m.unlock_shared yes yes

std::shared_timed_mutex(std::shared_mutex)可以用来实现读写锁,也就可以使用std::shared_timed_mutex(std::shared_mutex)进行独占或共享锁定。如果将std::shared_timed_mutex(std::shared_mutex)放入std::lock_guardstd::unique_lock中,就可实现独占锁;如果将std::shared_timed_mutex(std::shared_lock)放入std::shared_lock中,就可实现共享锁。m.try_lock_for(relTime)m.try_lock_shared_for(relTime)需要一个时间段;m.try_lock_until(absTime)m.try_lock_shared_until(absTime)需要一个绝对的时间点。

m.try_lock(m.try_lock_shared)尝试锁定互斥量并立即返回。成功时,它返回true,否则返回false。相比之下,m.try_lock_for(m.try_lock_shared_for)m.try_lock_until(m.try_lock_shared_until)也会尝试上锁,直到超时或完成锁定,这里应该使用稳定时钟来限制时间(稳定的时钟是不能调整的)。

不应该直接使用互斥量,应该将互斥量放入锁中,下面解释下原因。

互斥量的问题

互斥量的问题可以归结为一个:死锁。

死锁

两个或两个以上的个线程处于阻塞状态,并且每个线程在释放之前都要等待其他线程的释放。

结果就是程序完全静止。试图获取资源的线程,通常会永久的阻塞程序。形成这种困局很简单,有兴趣了解一下吗?

下面的代码段有很多问题。

std::mutex m;
m.lock();
sharedVariable = getVar();
m.unlock();

问题如下:

  1. 如果函数getVar()抛出异常,则互斥量m不会被释放。
  2. 永远不要在持有锁的时候调用函数。因为m不是递归互斥量,如果函数getVar试图锁定互斥量m,则程序具有未定义的行为。大多数情况下,未定义行为会导致死锁。
  3. 避免在持有锁时调用函数。可能这个函数来自一个库,但当这个函数发生改变,就有陷入僵局的可能。

程序需要的锁越多,程序的风险就越高(非线性)。

不同顺序锁定的互斥锁

​ 线程1和线程2需要访问两个资源来完成它们的工作。当资源被两个单独的互斥体保护,并且以不同的顺序被请求(线程1:锁1,锁2;线程2:锁2,锁1)时,线程交错执行,线程1得到互斥锁1,然后线程2得到互斥锁2,从而程序进入停滞状态。每个线程都想获得另一个互斥锁,但需要另一个线程释放其需要的互斥锁。“死亡拥抱”这个形容,很好地描述了这种状态。

// deadlock.cpp#include <iostream>
#include <chrono>
#include <mutex>
#include <thread>struct CriticalData
{std::mutex mut;
};void deadLock(CriticalData& a, CriticalData& b)
{a.mut.lock();std::cout << "get the first mutex" << std::endl;std::this_thread::sleep_for(std::chrono::microseconds(1));b.mut.lock();std::cout << "get the second mutext" << std::endl;// do something with a and ba.mut.unlock();b.mut.unlock();
}int main()
{CriticalData c1;CriticalData c2;std::thread t1([&] {deadLock(c1, c2); });std::thread t2([&] {deadLock(c2, c1); });t1.join();t2.join();
}

线程t1t2调用死锁函数(第12 - 23行),向函数传入了c1c2(第27行和第28行)。由于需要保护c1c2不受共享访问的影响,它们在内部各持有一个互斥量(为了保持本例简短,关键数据除了互斥量外没有其他函数或成员)。

第16行中,约1毫秒的短睡眠就足以产生死锁。

这时,只能按CTRL+C终止进程。

互斥量不能解决所有问题,但在很多情况下,锁可以帮助我们解决这些问题。

锁使用RAII方式处理它们的资源。锁在构造函数中自动绑定互斥量,并在析构函数中释放互斥量,这大大降低了死锁的风险。

锁有四种不同的形式:std::lock_guard用于简单程序,std::unique_lock用于高级程序。从C++14开始就可以用std::shared_lock来实现读写锁了。C++17中,添加了std::scoped_lock,它可以在原子操作中锁定更多的互斥对象。

首先,来看简单程序。

std::lock_guard

std::mutex m;
m.lock();
sharedVariable = getVar();
m.unlock();

互斥量m可以确保对sharedVariable = getVar()的访问是有序的。有序指的是,每个线程按照某种顺序,依次访问临界区。代码很简单,但是容易出现死锁。如果临界区抛出异常或者忘记解锁互斥量,就会出现死锁。使用std::lock_guard,可以很优雅的解决问题:

{std::mutex m,std::lock_guard<std::mutex> lockGuard(m);sharedVariable = getVar();
}

代码很简单,但是前后的花括号是什么呢?std::lock_guard的生存周期受其作用域的限制,作用域由花括号构成。生命周期在达到右花括号时结束,std::lock_guard析构函数被调用,并且互斥量被释放。这都是自动发生的,如果sharedVariable = getVar()中的getVar()抛出异常,释放过程也会自动发生。函数作用域和循环作用域,也会限制实例对象的生命周期。

std::scoped_lock

C++17中,添加了std::scoped_lock。与std::lock_guard非常相似,但可以原子地锁定任意数量的互斥对象。

  1. 如果std::scoped_lock调用一个互斥量,它的行为就类似于std::lock_guard,并锁定互斥量m: m.lock。如果std::scoped_lock被多个互斥对象调用std::scoped_lock(mutextypes&…),则使用std::lock(m…)函数进行锁定操作。
  2. 如果当前线程已经拥有了互斥量,但这个互斥量不可递归,那么这个行为就是未定义的,很有可能出现死锁。
  3. 只需要获得互斥量的所有权,而不需要锁定它们。这种情况下,必须将标志std::adopt_lock_t提供给构造函数:std::scoped_lock(std::adopt_lock_t, mutextypes&…m)

使用std::scoped_lock,可以优雅地解决之前的死锁问题。下一节中,将讨论如何杜绝死锁。

std::unique_lock

std::unique_lockstd::lock_guard更强大,也更重量级。

除了包含std::lock_guard提供的功能之外,std::unique_lock还允许:

  • 创建无需互斥量的锁
  • 不锁定互斥量的情况下创建锁
  • 显式地/重复地设置或释放关联的互斥锁量
  • 递归锁定互斥量
  • 移动互斥量
  • 尝试锁定互斥量
  • 延迟锁定关联的互斥量

下表展示了std::unique_lock lk的成员函数:

成员函数 功能描述
lk.lock() 锁定相关互斥量
lk.try_lock() 尝试锁定相关互斥量
lk.try_lock_for(relTime) 尝试锁定相关互斥量
lk.try_lock_until(absTime) 尝试锁定相关互斥量
lk.unlock() 解锁相关互斥量
lk.release() 释放互斥量,互斥量保持锁定状态
lk.swap(lk2)std::swap(lk, lk2) 交换锁
lk.mutex() 返回指向相关互斥量的指针
lk.owns_lock()和bool操作符 检查锁lk是否有锁住的互斥量

try_lock_for(relTime)需要传入一个时间段,try_lock_until(absTime)需要传入一个绝对的时间点。lk.try_lock_for(lk.try_lock_until)会调用关联的互斥量mut的成员函数mut.try_lock_for(mut.try_lock_until)。相关的互斥量需要支持定时阻塞,这就需要使用稳定的时钟来限制时间。

lk.try_lock尝试锁定互斥锁并立即返回。成功时返回true,否则返回false。相反,lk.try_lock_forlk.try_lock_until则会让锁lk阻塞,直到超时或获得锁为止。如果没有关联的互斥锁,或者这个互斥锁已经被std::unique_lock锁定,那么lk.try_locklk.try_lock_forlk.try_lock_for则抛出std::system_error异常。

lk.release()返回互斥量,必须手动对其进行解锁。

std::unique_lock在原子步骤中可以锁定多个互斥对象。因此,可以通过以不同的顺序锁定互斥量来避免死锁。死锁必须原子地锁定互斥对象,也正是下面的程序中所展示的。

// deadlockResolved.cpp#include <iostream>
#include <chrono>
#include <mutex>
#include <thread>using namespace std;struct CriticalData
{mutex mut;
};void deadLock(CriticalData &a, CriticalData &b)
{unique_lock<mutex> guard1(a.mut, defer_lock);cout << "Thread: " << this_thread::get_id() << " first mutex" << endl;this_thread::sleep_for(chrono::milliseconds(1));unique_lock<mutex> guard2(b.mut, defer_lock);cout << " Thread: " << this_thread::get_id() << " second mutex" << endl;cout << "  Thread: " << this_thread::get_id() << " get both mutex" << endl;lock(guard1, guard2);// do something with a and b
}int main()
{cout << endl;CriticalData c1;CriticalData c2;thread t1([&] {deadLock(c1, c2); });thread t2([&] {deadLock(c2, c1); });t1.join();t2.join();cout << endl;
}

如果使用std::defer_lockstd::unique_lock进行构造,则底层的互斥量不会自动锁定。此时(unique_lock<mutex> guard1(a.mut,defer_lock);unique_lock<mutex> guard2(b.mut,defer_lock);),std::unique_lock就是互斥量的所有者。由于std::lock是可变参数模板,锁操作可以原子的执行(lock(guard1,guard2);)。

使用std::lock进行原子锁定

std::lock可以在原子的锁定互斥对象。std::lock是一个可变参数模板,因此可以接受任意数量的参数。std::lock尝试使用避免死锁的算法,在一个原子步骤获得所有锁。互斥量会锁定一系列操作,比如:locktry_lockunlock。如果对锁或解锁的调用异常,则解锁操作会在异常重新抛出之前执行。

本例中,std::unique_lock管理资源的生存期,std::lock锁定关联的互斥量,也可以反过来。第一步中锁住互斥量,第二步中std::unique_lock管理资源的生命周期。下面是第二种方法的例子:

std::lock(a.mut, b.mut);
std::lock_guard<std::mutex> guard1(a.mut, std::adopt_lock);
std::lock_guard<std::mutex> guard2(b.mut, std::adopt_lock);

这两个方式都能解决死锁。

使用std::scoped_lock解决死锁

C++17中解决死锁非常容易。有了std::scoped_lock帮助,可以原子地锁定任意数量的互斥。只需使用std::scoped_lock,就能解决所有问题。下面是修改后的死锁函数:

// deadlock.cpp#include <iostream>#include <chrono>#include <mutex>#include <thread>struct CriticalData {  std::mutex mut;};void deadLock(CriticalData& a, CriticalData& b) {  std::scoped_lock(a.mut, b.mut);  a.mut.lock();  std::cout << "get the first mutex" << std::endl;  std::this_thread::sleep_for(std::chrono::microseconds(1));  b.mut.lock();  std::cout << "get the second mutext" << std::endl;  // do something with a and b  a.mut.unlock();  b.mut.unlock();}int main() {  CriticalData c1;  CriticalData c2;  std::thread t1([&] {deadLock(c1, c2); });  std::thread t2([&] {deadLock(c2, c1); });  t1.join();  t2.join();}

std::shared_lock

C++14中添加了std::shared_lock

std::shared_lockstd::unique_lock的接口相同,但与std::shared_timed_mutexstd::shared_mutex一起使用时,行为会有所不同。许多线程可以共享一个std::shared_timed_mutex (std::shared_mutex),从而实现读写锁。读写器锁的思想非常简单,而且非常有用。执行读操作的线程可以同时访问临界区,但是只允许一个线程写。

读写锁并不能解决最根本的问题——线程争着访问同一个关键区域。

电话本就是使用读写锁的典型例子。通常,许多人想要查询电话号码,但只有少数人想要更改。让我们看一个例子:

// readerWriterLock.cpp#include <iostream>
#include <map>
#include <shared_mutex>
#include <string>
#include <thread>std::map<std::string, int> teleBook{ {"Dijkstra", 1972}, {"Scott", 1976},{"Ritchie", 1983} }; //9std::shared_timed_mutex teleBookMutex;void addToTeleBook(const std::string &na, int tele)
{std::lock_guard<std::shared_timed_mutex> writerLock(teleBookMutex); //15std::cout << "\nSTARTING UPDATE " << na;std::this_thread::sleep_for(std::chrono::milliseconds(500));teleBook[na] = tele;std::cout << " ... ENDING UPDATE " << na << std::endl;
}void printNumber(const std::string &na)
{std::shared_lock<std::shared_timed_mutex> readerLock(teleBookMutex); //23std::cout << na << ": " << teleBook[na];
}int main()
{std::cout << std::endl;std::thread reader1([] {printNumber("Scott"); }); //31std::thread reader2([] {printNumber("Ritchie"); });std::thread w1([] {addToTeleBook("Scott", 1968); });std::thread reader3([] {printNumber("Dijkstra"); });std::thread reader4([] {printNumber("Scott"); });std::thread w2([] {addToTeleBook("Bjarne", 1965); });std::thread reader5([] {printNumber("Scott"); });std::thread reader6([] {printNumber("Ritchie"); });std::thread reader7([] {printNumber("Scott"); });std::thread reader8([] {printNumber("Bjarne"); }); //40reader1.join();reader2.join();reader3.join();reader4.join();reader5.join();reader6.join();reader7.join();reader8.join();w1.join();w2.join();std::cout << std::endl;std::cout << "\nThe new telephone book" << std::endl;for (auto teleIt : teleBook){std::cout << teleIt.first << ": " << teleIt.second << std::endl;}std::cout << std::endl;
}

第9行中的电话簿是共享变量,必须对其进行保护。八个线程要查询电话簿,两个线程想要修改它(第31 - 40行)。为了同时访问电话簿,读取线程使用std::shared_lock<std::shared_timed_mutex>(第23行)。写线程需要以独占的方式访问临界区,第15行中的std::lock_guard<std::shared_timed_mutex>具有独占性。最后,程序显示了更新后的电话簿(第55 - 58行)。

屏幕截图显示,读线程的输出是重叠的,而写线程是一个接一个地执行。这就意味着,读取操作应该是同时执行的。

这很容易让“电话簿”有未定义行为。

未定义行为

程序有未定义行为。更准确地说,它有一个数据竞争。啊哈!?在继续之前,停下来想几秒钟。

数据竞争的特征是,至少有两个线程同时访问共享变量,并且其中至少有一个线程是写线程,这种情况很可能在程序执行时发生。使用索引操作符读取容器中的值,并可以修改它。如果元素在容器中不存在,就会发生这种情况。如果在电话簿中没有找到“Bjarne”,则从读访问中创建一对(“Bjarne”,0)。可以通过在第40行前面打印Bjarne的数据,强制数据竞争。

可以看到的是,Bjarne的值是0。

修复这个问题的最直接的方法是使用printNumber函数中的读取操作:

// readerWriterLocksResolved.cpp...void printNumber(const std::string &na){    std::shared_lock<std::shared_timed_mutex> readerLock(teleBookMutex);    auto searchEntry = teleBook.find(na);    if (searchEntry != teleBook.end())    {        std::cout << searchEntry->first << ": " << searchEntry->second << std::endl;    }    else    {        std::cout << na << " not found!" << std::endl;    }}...

如果电话簿里没有相应键值,就把键值写下来,并且向控制台输出“找不到!”。

第二个程序执行的输出中,可以看到Bjarne的信息没有找到。第一个程序执行中,首先执行了addToTeleBook,所以Bjarne被找到了。

线程安全的初始化

如果变量从未修改过,那么就不需要锁或原子变量来进行同步,只需确保以线程安全的方式初始化就可以了。

C++中有三种以线程安全初始化变量的方法:

  • 常量表达式
  • std::call_oncestd::once_flag结合的方式
  • 作用域的静态变量

主线程中的安全初始化

以线程安全的方式初始化变量的最简单方法,是在创建任何子线程之前在主线程中初始化变量。

常数表达式

常量表达式,是编译器可以在编译时计算的表达式,隐式线程安全的。将关键字constexpr放在变量前面,会使该变量成为常量表达式。常量表达式必须初始化。

constexpr double pi = 3.14;

此外,用户定义的类型也可以是常量表达式。不过,必须满足一些条件才能在编译时初始化:

  • 不能有虚方法或虚基类
  • 构造函数必须为空,且本身为常量表达式
  • 必须初始化每个基类和每个非静态成员
  • 成员函数在编译时应该是可调用的,必须是常量表达式

MyDouble的实例满足所有这些需求,因此可以在编译时实例化。所以,这个实例化是线程安全的。

// constexpr.cpp
#include <iostream>
class MyDouble
{private:double myVal1;double myVal2;public:constexpr MyDouble(double v1,double v2):myVal1(v1), myVal2(v2){}constexpr double getSum() const {return myVal1 + myVal2;}
};int main()
{constexpr double myStatVal = 2.0;constexpr MyDouble myStatic(10.5, myStatVal);constexpr double sumStat = myStatic.getSum();
}

std::call_once和std::once_flag

​ 通过使用std::call_once函数,可以注册一个可调用单元。std::once_flag确保已注册的函数只调用一次。可以通过相同的std::once_flag注册其他函数,只能调用注册函数组中的一个函数。

std::call_once遵循以下规则:

  • 只执行其中一个函数的一次,未定义选择哪个函数执行。所选函数与std::call_once在同一个线程中执行。
  • 上述所选函数的执行成功完成之前,不返回任何调用。
  • 如果函数异常退出,则将其传播到调用处。然后,执行另一个函数。

这个短例演示了std::call_oncestd::once_flag的应用(都在头文件<mutex>中声明)。

// callOnce.cpp
#include <iostream>
#include <thread>
#include <mutex>
std::once_flag onceFlag;
void do_once()
{std::call_once(onceFlag, [] {std::cout << "Only once." << std::endl; });
}void do_once2()
{std::call_once(onceFlag, [] {std::cout << "Only once2." << std::endl; });
}int main()
{std::cout << std::endl; std::thread t1(do_once); //21std::thread t2(do_once);std::thread t3(do_once2);std::thread t4(do_once2); //24t1.join();t2.join();t3.join();t4.join();std::cout << std::endl;
}

程序从四个线程开始(第21 - 24行)。其中两个调用do_once,另两个调用do_once2。预期的结果是“Only once”或“Only once2”只显示一次。

单例模式保证只创建类的一个实例,这在多线程环境中是一个具有挑战性的任务。由于std::call_oncestd::once_flag的存在,实现这样的功能就非常容易了。

现在,单例以线程安全的方式初始化。

// singletonCallOnce.cpp#include <iostream>
#include <mutex>
using namespace std;
class MySingleton
{private:static once_flag initInstanceFlag;static MySingleton *instance;MySingleton() = default;~MySingleton() = default;
public:MySingleton(const MySingleton &) = delete;MySingleton &operator=(const MySingleton &) = delete;static MySingleton *getInstance(){call_once(initInstanceFlag, MySingleton::initSingleton);return instance;}static void initSingleton(){instance = new MySingleton();}
};
MySingleton *MySingleton::instance = nullptr;
once_flag MySingleton::initInstanceFlag;
int main()
{cout << endl;cout << "MySingleton::getInstance(): " << MySingleton::getInstance() << endl;cout << "MySingleton::getInstance(): " << MySingleton::getInstance() << endl;cout << endl;
}

静态变量initInstanceFlag在第11行声明,在第31行初始化。静态方法getInstance(第20 - 23行)使用initInstanceFlag标志,来确保静态方法initSingleton(第25 - 27行)只执行一次。

default和delete修饰符

可以使用关键字default向编译器申请函数实现,编译器可以创建并实现它们。

delete修饰一个成员函数的话,则该函数不可用,因此不能被调用。如果尝试使用它们,将得到一个编译时错误。这里有default和delete的详细信息。

MySingleton::getIstance()函数显示了单例的地址。

有作用域的静态变量

具有作用域的静态变量只创建一次,并且是惰性的,惰性意味着它们只在使用时创建。这一特点是基于Meyers单例的基础,以Scott Meyers命名,这是迄今为止C++中单例模式最优雅的实现。C++11中,带有作用域的静态变量有一个额外的特点,可以以线程安全的方式初始化。

下面是线程安全的Meyers单例模式。

// meyersSingleton.cpp
class MySingleton
{public:static MySingleton &getInstance(){static MySingleton instance;return instance;}
private:MySingleton();~MySingleton();MySingleton(const MySingleton &) = delete;MySingleton &operator=(const MySingleton &) = delete;
};
MySingleton::MySingleton() = default;
MySingleton::~MySingleton() = default;
int main()
{MySingleton::getInstance();
}

编译器对静态变量的支持

线程-本地数据

线程-本地数据(也称为线程-本地存储)是为每个线程单独创建的,其行为类似于静态数据。在命名空间范围内,或作为静态类成员的线程局部变量,是在第一次使用之前创建,而在函数中声明的线程局部变量是在第一次使用时创建,并且线程-本地数据只属于线程。

// threadLocal.cpp
#include <iostream>
#include <string>
#include <mutex>
#include <thread>
std::mutex coutMutex;
thread_local std::string s("hello from "); //10
void addThreadLocal(std::string const& s2)
{  s += s2;// protect std::coutstd::lock_guard<std::mutex> guard(coutMutex);std::cout << s << std::endl;std::cout << "&s: " << &s << std::endl;std::cout << std::endl;
}
int main()
{std::cout << std::endl;std::thread t1(addThreadLocal, "t1");std::thread t2(addThreadLocal, "t2");std::thread t3(addThreadLocal, "t3");std::thread t4(addThreadLocal, "t4");t1.join();t2.join();t3.join();t4.join();
}

通过在第10行中使用关键字thread_local,可以创建线程本地字符串s。线程t1 - t4(第27 - 30行)使用addThreadLocal函数(第12 - 21行)作为工作包。线程分别获取字符串t1t4作为参数,并添加到线程本地字符串s中。另外,addThreadLocal在第18行会打印s的地址。

// threadLocalState.cpp#include <iostream>
#include <string>
#include <mutex>
#include <thread>std::mutex coutMutex;thread_local std::string s("hello from ");void first()
{s += "first ";
}void second()
{s += "second ";
}void third()
{s += "third";
}void addThreadLocal(std::string const &s2)
{s += s2;first();second();third();// protect std::coutstd::lock_guard<std::mutex> guard(coutMutex);std::cout << s << std::endl;std::cout << "&s: " << &s << std::endl;std::cout << std::endl;}int main()
{std::cout << std::endl;std::thread t1(addThreadLocal, "t1: ");std::thread t2(addThreadLocal, "t2: ");std::thread t3(addThreadLocal, "t3: ");std::thread t4(addThreadLocal, "t4: ");t1.join();t2.join();t3.join();t4.join();
}

代码中,函数addThreadLocal(第24行)先调用函数first ,然后调用second,再调用third 。。每个函数都使用thread_local字符串s来添加它的函数名。这种变化的关键之处在于,字符串s在函数firstsecondthird中操作时,处于一种本地数据的状态(第28 - 30行),并且从输出表明字符串是独立存在的。

单线程到多线程

线程本地数据有助于将单线程程序移植成多线程程序。如果全局变量是线程局部的,则可以保证每个线程都得到其数据的副本,从而避免数据竞争。

与线程-本地数据相比,条件变量的使用门槛更高。

条件变量

条件变量通过消息对线程进行同步(需要包含<condition_variable>头文件),一个线程作为发送方,另一个线程作为接收方,接收方等待来自发送方的通知。条件变量的典型用例:发送方-接收方或生产者-消费者模式。

条件变量cv的成员函数

成员函数 函数描述
cv.notify_one() 通知一个等待中的线程
cv.notify_all() 通知所有等待中的线程
cv.wait(lock, ...) 持有std::unique_lock,并等待通知
cv.wait_for(lock, relTime, ...) 持有std::unique_lock,并在给定的时间段内等待通知
cv.wait_until(lock, absTime, ...) 持有std::unique_lock的同时,并在给定的时间点前等待通知
cv.native_handle() 返回条件变量的底层句柄

cv.notify_onecv.notify_all相比较,cv.notify_all会通知所有正在等待的线程,cv.notify_one只通知一个正在等待的线程,其他条件变量依旧保持在等待状态。介绍条件变量的详细信息之前,来看个示例。

// conditionVariable.cpp#include <iostream>
#include <condition_variable>
#include <mutex>
#include <thread>std::mutex mutex_;
std::condition_variable condVar;
bool dataReady{ false };
void doTheWork()
{std::cout << "Processing shared data." << std::endl;
}void waitingForWork()
{std::cout << "Worker: Waiting for work." << std::endl;std::unique_lock<std::mutex> lck(mutex_);condVar.wait(lck, [] {return dataReady; });doTheWork();std::cout << "Work done." << std::endl;
}void setDataReady()
{{std::lock_guard<std::mutex> lck(mutex_);dataReady = true;}std::cout << "Sender: Data is ready." << std::endl;condVar.notify_one();
}int main()
{std::cout << std::endl;std::thread t1(waitingForWork);std::thread t2(setDataReady);t1.join();t2.join();std::cout << std::endl;}

该程序有两个子线程:t1t2。第38行和第39行中,线程得到工作包waitingForWorksetDataRead

现在就很清楚了,谓词是无状态条件变量,所以等待过程中总是检查谓词。条件变量有两个已知有害现象:未唤醒和伪唤醒。

未唤醒和伪唤醒

未唤醒

该现象是发送方在接收方到达其等待状态之前发送通知,结果是通知丢失了。C++标准将条件变量描述为同步机制:“条件变量类是同步原语,可用于阻塞一个线程,或同时阻塞多个线程……”所以通知丢失了,接收者就会持续等待……

伪唤醒

还有一种情况,就会没有发通知,但接收方会被唤醒。使用POSIX Threads和 Windows API时,都会出现这样的现象。伪唤醒的真相,很可能是本来就没有处于休眠状态。这意味着,在被唤醒的线程有机会运行之前,另一个线程早就等候多时了。

等待线程的工作流程

等待线程的工作流程相当复杂。

下面是来自前面示例conditionVariable.cpp的19和20行。

std::unique_lock<std::mutex> lck(mutex_);
condVar.wait(lck, []{ return dataReady; });

上面两行与下面四行等价:

std::unique_lock<std::mutex> lck(mutex_);
while ( ![] { return dataReady; } )
{condVar.wait(lck);
}

首先,必须区分std::unique_lock<std::mutex> lck(mutex_)的第一次调用与条件变量的通知:condVar.wait(lck)

std::unique_lock<std::mutex> lck(mutex_) : 初始化阶段,线程就将互斥量锁定,并对谓词函数[]{ return dataReady;}进行检查。

  • 谓词返回值:
  • true : 线程继续工作。
  • false : condVar.wait()解锁互斥量,并将线程置为等待(阻塞)状态。

condVar.wait(lck) : 如果condition_variable condVar处于等待状态,并获得通知或伪唤醒处于运行状态,则执行以下步骤:

  • 线程解除阻塞,重新获得互斥锁。

  • 检查谓词函数。

  • 当谓词函数返回值为:

    • true : 线程继续工作。
    • false : condVar.wait()解锁互斥量,并将线程置为等待(阻塞)状态。

    即使共享变量是原子的,也必须在互斥锁保护下进行修改,以便将正确地内容告知等待的线程。

    使用互斥锁来保护共享变量

    即使将dataReady设置为原子变量,也必须在互斥锁的保护下进行修改;如果没有,对于等待线程来说dataReady的内容就可能是错的,此竞争条件可能导致死锁。让我们再次查看下等待的工作流,并假设deadReady是一个原子变量,在不受互斥量mutex_保护时进行修改的情况。

std::unique_lock<std::mutex> lck(mutex_);
while ( ![] { return dataReady.load(); })
{// time windowcondVar.wait(lck);
}

假设在条件变量condVar,在不处于等待状态时发送通知。这样,线程执行到第2行和第4行之间时(参见注释时间窗口)会丢失通知。之后,线程返回到等待状态,可能会永远休眠。

任务

除了线程之外,C++还有可以异步处理任务,这种方式处理任务需要包含<future>头文件。任务由一个参数化工作包和两个组件组成:promise和future,两者构建一条数据通道。promise执行工作包并将结果放入数据通道,对应的future可以获取结果,两个通信端可以在不同的线程中运行。特别的是future可以在之后的某个时间点获取结果,所以通过promise计算结果与通过future查询结果的步骤是分开的。

将任务视为通信端间的数据通道

任务的行为类似于通信点之间的数据通道。数据通道的一端称为promise,另一端称为future。这些端点可以存在于相同的线程中,也可以存在于不同的线程中。promise将其结果放入数据通道,future会在晚些时候把结果取走。

任务 vs. 线程

任务与线程有很大的不同。

// asyncVersusThread.cpp
#include <future>
#include <thread>
#include <iostream>
int main()
{std::cout << std::endl;int res;std::thread t([&] {res = 2000 + 11; });t.join();std::cout << "res: " << res << std::endl;auto fut = std::async([] {return 2000 + 11; });std::cout << "fut.get(): " << fut.get() << std::endl;std::cout << std::endl;
}

线程tstd::async异步调用函数同时计算2000和11的和。主线程通过共享变量res获取其线程t的计算结果,并在第14行中显示它。第16行中,使用std::async在发送方(promise)和接收方(future)之间创建数据通道。future 变量使用fut.get()(第17行),通过数据通道获得计算结果。fut.get为阻塞调用。

基于这个程序,我想强调线程和任务之间的区别。

任务 vs. 线程

标准 线程 任务
构成元素 创建线程和子线程 promise和future
通讯方式 共享变量 通信通道
创建线程 必定创建 可选
同步方式 通过join()(等待) 使用get阻塞式调用
线程中的异常 子线程和创建线程终止 返回promise的值
通信类型 变量值 变量值、通知和异常

线程需要包含<thread>头文件,任务需要包含<future>头文件。创建线程和子线程之间的通信需要使用共享变量,任务通过其隐式的数据通道保护数据通信。因此,任务不需要互斥锁之类的保护机制。 虽然,可以使用共享变量(的可变)来在子线程及其创建线程之间进行通信,但任务的通信方式更为明确。future只能获取一次任务的结果(通过调用fut.get()),多次调用它会导致未定义的行为(而std::shared_future可以查询多次)。

创建线程需要等待子线程汇入。而使用fut.get()时,该调用将一直阻塞,直到获取结果为止。

如果子线程中抛出异常,创建的线程将终止,创建者和整个进程也将终止。相反,promise可以将异常发送给future,而future必须对异常进行处理。一个promise可以对应于一个或多个future。它可以发送值、异常,或者只是通知,可以使用它们替换条件变量。std::async是创建future最简单的方法。

std::async

std::async的行为类似于异步函数调用,可调用带有参数的函数。std::async是一个可变参数模板,因此可以接受任意数量的参数。对std::async的调用会返回一个future 的对象fut。可以通过fut.get()获得结果。

std::async应该首选

C++运行时决定std::async是否在独立的线程中执行,决策可能取决于可用的CPU内核的数量、系统的利用率或工作包的大小。通过使用std::async,只需要指定运行的任务,C++运行时会自动管理线程。

可以指定std::async的启动策略。

启动策略

使用启动策略,可以显式地指定异步调用应该在同一线程(std::launch::deferred)中执行,还是在不同线程(std::launch::async)中执行。

及早求值与惰性求值

及早求值与惰性求值是计算结果表达式的两种策略。在及早求值的情况下,立即计算表达式,而在惰性求值 的情况下,仅在需要时才计算表达式。及早求值通常称为贪婪求值,而惰性求值通常称为按需调用。使用惰性求值,可以节省时间和计算资源。

调用auto fut = std::async(std::launch::deferred,…)的特殊之处在于,promise可能不会立即执行,调用fut.get()时才执行对应的promise 。这意味着,promise只在future调用fut.get()时计算得到结果。

// asyncLazy.cpp#include <chrono>
#include <future>
#include <iostream>int main() {std::cout << std::endl;auto begin = std::chrono::system_clock::now();auto asyncLazy = std::async(std::launch::deferred,[] {return std::chrono::system_clock::now(); });auto asyncEager = std::async(std::launch::async,[] {return std::chrono::system_clock::now(); });std::this_thread::sleep_for(std::chrono::seconds(1));auto lazyStart = asyncLazy.get() - begin;auto eagerStart = asyncEager.get() - begin;auto lazyDuration = std::chrono::duration<double>(lazyStart).count();auto eagerDuration = std::chrono::duration<double>(eagerStart).count();std::cout << "asyncLazy evaluated after : " << lazyDuration<< " seconds." << std::endl;std::cout << "asyncEager  evaluated after : " << eagerDuration<< " seconds." << std::endl;std::cout << std::endl;
}

两个std::async调用(第13行和第16行)都返回当前时间点。但是,第一个调用是lazy,第二个调用是eager。第21行中的asyncLazy.get()调用触发了第13行promise的执行——短睡一秒(第19行)。这对于asyncEager来说是不存在的,asyncEager.get()会立即获取执行结果。

下面就是该程序输出的结果:

发后即忘(Fire and Forget)

发后即忘是比较特殊的future。因为其future不受某个变量的约束,所以只是在原地执行。对于一个发后即忘的future,相应的promise运行在一个不同的线程中,所以可以立即开始(这是通过std::launch::async策略完成的)。

我们对普通的future和发后即忘的future进行比较。

auto fut= std::async([]{ return 2011; });
std::cout << fut.get() << std::endl;
std::async(std::launch::async,[]{ std::cout << "fire and forget" << std::endl; });

发后即忘的future看起来很有美好,但有一个很大的缺点。std::async创建的future会等待promise完成,才会进行析构。这种情况下,等待和阻塞就没有太大的区别了。future的析构函数会中阻塞程序的进程,当使用发后即忘的future时,这一点变得更加明显,看起来程序上是并发的,但实际上是串行运行的。

// fireAndForgetFutures.cpp
#include <chrono>
#include <future>
#include <iostream>
#include <thread>
int main()
{std::cout << std::endl;std::async(std::launch::async, []{std::this_thread::sleep_for(std::chrono::seconds(2));std::cout << "first thread" << std::endl;});std::async(std::launch::async, []{std::this_thread::sleep_for(std::chrono::seconds(2));std::cout << "second thread" << std::endl;});std::cout << "main thread" << std::endl;std::cout << std::endl;
}

程序在线程中执行两个promise,这样就会产生发后即忘的future。future在析构函数中阻塞线程,直到相关的promise完成。promise是按照源代码顺序执行的,执行顺序与执行时间无关。

std::async是一种方便的机制,可用于在分解较大的计算任务。

并行计算

标量乘积的计算可分布在四个异步调用中。

// dotProductAsync.cpp#include <iostream>
#include <future>
#include <random>
#include <vector>
#include <numeric>
using namespace std;
static const int NUM = 100000000;
long long getDotProduct(vector<int> &v, vector<int> &w)
{auto vSize = v.size();auto future1 = async([&]{return inner_product(&v[0], &v[vSize / 4], &w[0], 0LL);});auto future2 = async([&]{return inner_product(&v[vSize / 4], &v[vSize / 2], &w[vSize / 4], 0LL);});auto future3 = async([&]{return inner_product(&v[vSize / 2], &v[vSize * 3 / 4], &w[vSize / 2], 0LL);});auto future4 = async([&]{return inner_product(&v[vSize * 3 / 4], &v[vSize], &w[vSize * 3 / 4], 0LL);});return future1.get() + future2.get() + future3.get() + future4.get();
}int main()
{cout << endl;random_device seed;// generatormt19937 engine(seed());// distributionuniform_int_distribution<int> dist(0, 100);// fill the vectorvector<int> v, w;v.reserve(NUM);w.reserve(NUM);for (int i = 0; i < NUM; ++i){v.push_back(dist(engine));w.push_back(dist(engine));}cout << "getDotProduct(v, w): " << getDotProduct(v, w) << endl;cout << endl;
}

std::packaged_task通常也用于并发。

std::packaged_task

std::packaged_task是用于异步调用的包装器。通过pack.get_future()可以获得相关的future。可以使用可调用操作符pack(pack())执行std::packaged_task

处理std::packaged_task通常包括四个步骤:

I. 打包:

std::packaged_task<int(int, int)> sumTask([](int a, int b){ return a + b; });

II. 创建future:

std::future<int> sumResult= sumTask.get_future();

III. 执行计算:

sumTask(2000, 11);

IV. 查询结果:

sumResult.get();

下面的示例,展示了这四个步骤。

// packagedTask.cpp#include <utility>
#include <future>
#include <iostream>
#include <thread>
#include <deque>class SumUp
{public:int operator()(int beg, int end){long long int sum{ 0 };for (int i = beg; i < end; ++i) sum += i;return static_cast<int>(sum);}
};int main()
{std::cout << std::endl;SumUp sumUp1;SumUp sumUp2;SumUp sumUp3;SumUp sumUp4;// wrap the taskstd::packaged_task<int(int, int)> sumTask1(sumUp1);std::packaged_task<int(int, int)> sumTask2(sumUp2);std::packaged_task<int(int, int)> sumTask3(sumUp3);std::packaged_task<int(int, int)> sumTask4(sumUp4);// create the futuresstd::future<int> sumResult1 = sumTask1.get_future();std::future<int> sumResult2 = sumTask2.get_future();std::future<int> sumResult3 = sumTask3.get_future();std::future<int> sumResult4 = sumTask4.get_future();// push the task on the containerstd::deque<std::packaged_task<int(int, int)>> allTasks;allTasks.push_back(std::move(sumTask1));allTasks.push_back(std::move(sumTask2));allTasks.push_back(std::move(sumTask3));allTasks.push_back(std::move(sumTask4));int begin{ 1 };int increment{ 2500 };int end = begin + increment;// preform each calculation in a separate threadwhile (!allTasks.empty()){std::packaged_task<int(int, int)> myTask = std::move(allTasks.front());allTasks.pop_front();std::thread sumThread(std::move(myTask), begin, end);begin = end;end += increment;sumThread.detach();}// pick up the resultsauto sum = sumResult1.get() + sumResult2.get() +sumResult3.get() + sumResult4.get();std::cout << "sum of 0 .. 10000 = " << sum << std::endl;std::cout << std::endl;}

这段程序的是计算从0到10000的整数和。创建四个std::packaged_task的对象,并且每个std::packaged_task有自己的线程,并使用future来汇总结果。当然,也可以直接使用Gaußschen Summenformel(高斯求和公式)。真奇怪,我没有找到英文网页。(译者注:打开网页就是最熟悉的高斯求和公式,也就是等差数列求和公式。翻了下维基百科,确实没有相关的英文页面。)

I. 打包任务:程序将工作包打包进std::packaged_task(第28 - 31行)的实例中,工作包就是SumUp的实例(第9 - 16行),使用函数操作符完成任务(第11 - 15行)。函数操作符将begend - 1的所有整数相加并返回结果。第28 - 31行中的std::packaged_task实例可以处理需要两个int参数的函数调用,并返回一个int: int(int, int)类型的任务包。

II.创建future:第34到37行中,使用std::packaged_task创建future对象,这时std::packaged_task对象属于通信通道中的promise。future的类型有明确定义:std::future<int> sumResult1 = sumTask1.get_future(),也可以让编译器来确认future的具体类型:auto sumResult1 sumTask1.get_future()

III. 进行计算:开始计算。将任务包移动到std::deque(第40 - 44行)中,while循环(第51 - 58行)会执行每个任务包。为此,将std::deque的队头任务包移动到一个std::packaged_task实例中(第52行),并将这个实例移动到一个新线程中(第54行),并让这个线程在后台运行(第57行)。因为packaged_task对象不可复制的,所以会在52和54行中使用move语义。这个限制不仅适用于所有的promise实例,但也适用于future和线程实例。但有一个例外:std::shared_future

IV. 查询结果:最后一步中,从每个future获取计算的结果,并把它们加起来(第61行)。

下表展示std::packaged_task pack的接口

成员函数 函数描述
pack.swap(pack2)std::swap(pack, pack2) 交换对象
pack.valid() 检查对象中的函数是否合法
pack.get_future() 返回future
pack.make_ready_at_thread_exit(ex) 执行的函数,如果线程还存在,那么结果还是可用的
pack.reset() 重置任务的状态,擦除之前执行的结果

std::asyncstd::promise相比,std::packaged_task可以复位并重复使用。下面的程序展示了std::packaged_task的“特殊”使用方式。

// packagedTaskReuse.cpp#include <functional>
#include <future>
#include <iostream>
#include <utility>
#include <vector>void calcProducts(std::packaged_task<int(int, int)> &task,const std::vector<std::pair<int, int>> &pairs)
{for (auto &pair : pairs){auto fut = task.get_future();task(pair.first, pair.second);std::cout << pair.first << " * " << pair.second << " = " << fut.get() <<std::endl;task.reset();}
}int main()
{std::cout << std::endl;std::vector<std::pair<int, int>> allPairs;allPairs.push_back(std::make_pair(1, 2));allPairs.push_back(std::make_pair(2, 3));allPairs.push_back(std::make_pair(3, 4));allPairs.push_back(std::make_pair(4, 5));std::packaged_task<int(int, int)> task{ [](int fir, int sec){return fir * sec;}};calcProducts(task, allPairs);std::cout << std::endl;std::thread t(calcProducts, std::ref(task), allPairs);t.join();std::cout << std::endl;
}

函数calcProduct(第9行)有两个参数:taskpairs。使用任务包task来计算pairs中的每个整数对的乘积(第13行),并在第16行重置任务task。这样,calcProduct就能在主线程(第34行)和另外开启的线程(第38行)中运行。下面是程序的输出。

std::promise和std::future

std::promisestd::future可以完全控制任务。

promise和future是一对强有力的组合。promise可以将值、异常或通知放入数据通道。一个promise可以对应多个std::shared_future对象。

下面是std::promisestd::future用法的示例。两个通信端点都可以在不同的的线程中,因此通信可以在线程间发生。

// promiseFuture.cpp#include <future>
#include <iostream>
#include <thread>
#include <utility>void product(std::promise<int> &&intPromise, int a, int b)
{intPromise.set_value(a * b);
}struct Div
{void operator()(std::promise<int> &&intPromise, int a, int b) const{intPromise.set_value(a / b);}
};int main()
{int a = 20;int b = 10;std::cout << std::endl;// define the promisesstd::promise<int> prodPromise;std::promise<int> divPromise;// get the futuresstd::future<int> prodResult = prodPromise.get_future();std::future<int> divResult = divPromise.get_future();// calculate the result in a separate threadstd::thread prodThread(product, std::move(prodPromise), a, b);Div div;std::thread divThread(div, std::move(divPromise), a, b);// get the resultstd::cout << "20*10 = " << prodResult.get() << std::endl;std::cout << "20/10 = " << divResult.get() << std::endl;prodThread.join();divThread.join();std::cout << std::endl;
}

将函数product(第8 -10行)、prodPromise(第32行)以及数字ab放入线程Thread prodThread(第36行)中。prodThread的第一个参数需要一个可调用的参数,上面程序中就是函数乘积函数。函数需要一个类型右值引用的promise(std::promise<int>&& intPromise)和两个数字。std::move(第36行)创建一个右值引用。剩下的就简单了,divThread(第38行)将ab分开传入。

future通过prodResult.get()divResult.get()获取结果

std::promise

std::promise允许设置一个值、一个通知或一个异常。此外,promise可以以延迟的方式提供结果。

std::promise prom的成员函数

成员函数 函数描述
prom.swap(prom2)std::swap(prom, prom2) 交换对象
prom.get_future() 返回future
prom.set_value(val) 设置值
prom.set_exception(ex) 设置异常
prom.set_value_at_thread_exit(val) promise退出前存储该值
prom.set_exception_at_thread_exit(ex) promise退出前存储该异常

如果多次对promise设置值或异常,则会抛出std::future_error

std::future

std::future可以完成的事情有:

  • 从promise中获取值。
  • 查询promise值是否可获取。
  • 等待promise通知,这种等待可以用一个时间段或一个绝对的时间点来完成。
  • 创建共享future(std::shared_future)。

future实例fut的成员函数

成员函数 函数描述
fut.share() 返回std::shared_future
fut.get() 返回可以是值或异常
fut.valid() 检查当前实例是否可用调用fut.get()。使用get()之后,返回false
fut.wait() 等待结果
fut.wait_for(relTime) relTime时间段内等待获取结果,并返回std:: future_status实例
fut.wait_until(absTime) absTime时间点前等待获取结果,并返回std:: future_status实例

wait不同,wait_forwait_until会返回future的状态。

std::future_status

future和共享future的wait_forwait_until成员函数将返回其状态。有三种可能:

enum class future_status {ready,timeout,deferred
};

下表描述了每种状态:

状态 描述
deferred 函数还未运行
ready 结果已经准备就绪
timeout 结果超时得到,视为过期

使用wait_forwait_until可以一直等到相关的promise完成。

// waitFor.cpp
#include <iostream>
#include <future>
#include <thread>
#include <chrono>
using namespace std::literals::chrono_literals;
void getAnswer(std::promise<int> intPromise)
{std::this_thread::sleep_for(3000ms);intPromise.set_value(42);
}int main()
{std::cout << std::endl;std::promise<int> answerPromise;auto fut = answerPromise.get_future();std::thread prodThread(getAnswer, std::move(answerPromise));std::future_status status{};do{status = fut.wait_for(0.2s);std::cout << "... doing something else" << std::endl;}while (status != std::future_status::ready);std::cout << std::endl;std::cout << "The Answer: " << fut.get() << '\n';prodThread.join();std::cout << std::endl;
}

在futurefut在等待promise时,可以执行其他操作。

如果多次获取futurefut的结果,会抛出std::future_error异常。promise和future是一对一的关系,而std::shared_future支持一个promise 对应多个future。

std::shared_future

创建std::shared_future的两种方式:

  1. 通过promise实例prom创建std::shared_future:std::shared_future<int> fut = prom.get_future()
  2. 使用futfut.share()进行创建。执行了fut.share()后,fut.valid()会返回false。

共享future是与相应的promise相关联的,可以获取promise的结果。共享future与std::future有相同的接口。

除了有std::future的功能外,std::shared_future还允许和其他future查询关联promise的值。

std::shared_future的操作很特殊,下面的代码中就直接创建了一个std::shared_future

{intPromise.set_value(a / b);
}

};

struct Requestor
{
void operator()(std::shared_future shaFut)
{
// lock std::cout
std::lock_guardstd::mutex coutGuard(coutMutex);
// get the thread id
std::cout << “threadId(” << std::this_thread::get_id() << "): ";
std::cout << "20/10= " << shaFut.get() << std::endl;
}
};

int main()
{
std::cout << std::endl;
// define the promises
std::promise divPromise;
// get the futures
std::shared_future divResult = divPromise.get_future();
// calculate the result in a separate thread
Div div;
std::thread divThread(div, std::move(divPromise), 20, 10);
Requestor req;
std::thread sharedThread1(req, divResult);
std::thread sharedThread2(req, divResult);
std::thread sharedThread3(req, divResult);
std::thread sharedThread4(req, divResult);
std::thread sharedThread5(req, divResult);
divThread.join();
sharedThread1.join();
sharedThread2.join();
sharedThread3.join();
sharedThread4.join();
sharedThread5.join();
std::cout << std::endl;
}


promise和future的工作包都是函数对象。第46行中将`divPromise`移动到线程`divThread`中执行,因此会将`std::shared_future`复制到5个线程中(第49 - 53行)。与只能移动的`std::future`对象不同,可以`std::shared_future`对象可以进行复制。主线程在第57到61行等待子线程完成它们的任务。![](/home/xz/图片/选区_217.png)前面提到过,可以通过使用`std::future`的成员函数创建`std::shared_future`。我们把上面的代码改一下。```cpp
// sharedFutureFromFuture.cpp#include <future>
#include <iostream>
#include <thread>
#include <utility>
std::mutex coutMutex;
struct Div
{void operator()(std::promise<int> &&intPromise, int a, int b){intPromise.set_value(a / b);}
};struct Requestor
{void operator()(std::shared_future<int> shaFut){// lock std::coutstd::lock_guard<std::mutex> coutGuard(coutMutex);// get the thread idstd::cout << "threadId(" << std::this_thread::get_id() << "): ";std::cout << "20/10= " << shaFut.get() << std::endl;}
};int main()
{std::cout << std::boolalpha << std::endl;// define the promisesstd::promise<int> divPromise;// get the futuresstd::future<int> divResult = divPromise.get_future();std::cout << "divResult.valid(): " << divResult.valid() << std::endl; //44// calculate the result in a separate threadDiv div;std::thread divThread(div, std::move(divPromise), 20, 10);std::cout << "divResult.valid(): " << divResult.valid() << std::endl; //45std::shared_future<int> sharedResult = divResult.share();//52std::cout << "divResult.valid(): " << divResult.valid() << "\n\n";//falseRequestor req;std::thread sharedThread1(req, sharedResult);std::thread sharedThread2(req, sharedResult);std::thread sharedThread3(req, sharedResult);std::thread sharedThread4(req, sharedResult);std::thread sharedThread5(req, sharedResult);divThread.join();sharedThread1.join();sharedThread2.join();sharedThread3.join();sharedThread4.join();sharedThread5.join();std::cout << std::endl;
}

std::future(第44行和第50行)前两次调用divResult.valid()都返回true。第52行执行divResult.share()之后,因为该操作使得状态转换为共享,所以在执行到第54行时,程序会返回false。

异常

如果std::asyncstd::packaged_task的工作包抛出错误,则异常会存储在共享状态中。当futurefut调用fut.get()时,异常将重新抛出。

std::promise prom提供了相同的功能,但是它有一个成员函数prom.set_value(std::current_exception())可以将异常设置为共享状态。

数字除以0是未定义的行为,函数executeDivision显示计算结果或异常。

// promiseFutureException.cpp
#include <exception>
#include <future>
#include <iostream>
#include <thread>
#include <utility>#ifdef WIN32
#include <string>
#endif
struct Div
{void operator()(std::promise<int> &&intPromise, int a, int b){try{if (b == 0){std::string errMess = std::string("Illegal division by zero: ") +std::to_string(a) + "/" + std::to_string(b);throw std::runtime_error(errMess);}intPromise.set_value(a / b);}catch (...){intPromise.set_exception(std::current_exception());}}
};void executeDivision(int nom, int denom)
{std::promise<int> divPromise;std::future<int> divResult = divPromise.get_future();Div div;std::thread divThread(div, std::move(divPromise), nom, denom);// get the result or the exceptiontry{std::cout << nom << "/" << denom << " = " << divResult.get() << std::endl;}catch (std::runtime_error &e){std::cout << e.what() << std::endl;}divThread.join();
}int main()
{std::cout << std::endl;executeDivision(20, 0);executeDivision(20, 10);std::cout << std::endl;
}

这个程序中,promise会处理分母为0的情况。如果分母为0,则在第24行中将异常设置为返回值:intPromise.set_exception(std::current_exception())。future需要在try-catch中处理异常(第37 - 42行)。

下面是程序的输出。

std::current_exception和std::make_exception_ptr

std::current_exception()捕获当前异常对象,并创建一个 std:: exception_ptrstd::exception_ptr保存异常对象的副本或引用。如果在没有异常处理时调用该函数,则返回一个空的std::exception_ptr

为了不在try/catch中使用intPromise.set_exception(std::current_exception())检索抛出的异常,可以直接调用intPromise.set_exception(std::make_exception_ptr(std::runtime_error(errMess)))

如果在std::promise销毁之前没有调用设置类的成员函数,或是在std::packaged_task调用它,那么std::future_error异常和错误代码std::future_errc::broken_promise将存储在共享future中。

通知

任务是条件变量的一种替代方式。如果使用promise和future来同步线程,它们与条件变量有很多相同之处。大多数时候,promise和future是更好的选择。

在看例子之前,先了解下任务和条件变量的差异。

对比标准 条件变量 任务
多重同步 Yes No
临界区保护 Yes No
接收错误处理机制 No Yes
伪唤醒 Yes No
未唤醒 Yes No

与promise和future相比,条件变量的优点是可以多次同步线程,而promise只能发送一次通知,因此必须使用更多promise和future对,才能模拟出条件变量的功能。如果只同步一次,那条件变量正确的使用方式或许将更具大的挑战。promise和future对不需要共享变量,所以不需要锁,并且不大可能出现伪唤醒或未唤醒的情况。除了这些,任务还可以处理异常。所以,在同步线程上我会更偏重于选择任务,而不是条件变量。

还记得使用条件变量有多难吗?如果忘记了,这里展示了两个线程同步所需的关键部分。

void waitingForWork()
{std::cout << "Worker: Waiting for work." << std::endl;  std::unique_lock<std::mutex> lck(mutex_);condVar.wait(lck, []{ return dataReady; });doTheWork();std::cout << "Work done." << std::endl;
}void setDataReady()
{std::lock_guard<std::mutex> lck(mutex_);dataReady=true;std::cout << "Sender: Data is ready." << std::endl;condVar.notify_one();
}

函数setDataReady为同步通知,函数waitingForWork为同步等待。

使用任务完成相同的工作流程。

// promiseFutureSynchronise.cpp#include <future>
#include <iostream>
#include <utility>void doTheWork()
{std::cout << "Processing shared data." << std::endl;
}void waitingForWork(std::future<void> &&fut)
{std::cout << "Worker: Waiting for work." << std::endl;fut.wait();doTheWork();std::cout << "Work done." << std::endl;}void setDataReady(std::promise<void> &&prom)
{std::cout << "Sender: Data is ready." << std::endl;prom.set_value();}int main()
{std::cout << std::endl;std::promise<void> sendReady;auto fut = sendReady.get_future();std::thread t1(waitingForWork, std::move(fut));std::thread t2(setDataReady, std::move(sendReady));t1.join();t2.join();std::cout << std::endl;
}

是不是非常简单?

通过sendReady(第32行)获得了一个futurefut(第33行),promise使用其返回值void (std::promise<void> sendReady)进行通信,并且只能够发送通知。两个通信端点分别移动到线程t1t2中(第35行和第36行),调用fut.wait()(第15行)等待promise的通知(prom.set_value()(第24行))。

Concurrency-with-Modern-Cpp学习笔记 - 线程相关推荐

  1. Modern C++ 学习笔记——C++函数式编程

    往期精彩: Modern C++ 学习笔记--易用性改进篇 Modern C++ 学习笔记 -- 右值.移动篇 Modern C++ 学习笔记 -- 智能指针篇 Modern C++ 学习笔记 -- ...

  2. 多线程编程学习笔记——线程池(二)

    接上文 多线程编程学习笔记--线程池(一) 三.线程池与并行度 此示例是学习如何应用线程池实现大量的操作,及与创建大量线程进行工作的区别. 1. 代码如下 using System; using Sy ...

  3. C++11学习笔记-----线程库std::thread

    在以前,要想在C++程序中使用线程,需要调用操作系统提供的线程库,比如linux下的<pthread.h>.但毕竟是底层的C函数库,没有什么抽象封装可言,仅仅透露着一种简单,暴力美 C++ ...

  4. 学习笔记 线程异步请求过程

    记录学习线程异步请求的过程 版本1 不使用线程时,正常情况下执行过程. # -*- coding:utf-8 -*- # 日期:2018/5/27 14:50 # Author:小鼠标 import ...

  5. Linux学习笔记-线程的自然终止

    线程的自然终止 线程主函数退出时,该线程自然终止.例如,下面的线程运行10秒后终止 ... void* Thread_Main(void* context) {for(int i=0; i<10 ...

  6. Java学习笔记----线程

    继续学习----------------------------------------------------------------------- 在Java中线程指的是两件事 1)java.la ...

  7. 菜鸟教程Cpp学习笔记

    一.面向对象开发的特征 面向对象开发的四大特性: 封装 抽象 继承 多态 二.C++ 中的类型限定符 限定符 含义 const const 类型的对象在程序执行期间不能被修改改变. volatile ...

  8. 【cpp学习笔记】多线程编程

    1.前言 在学习利用多线程来实现多模型的目标检测时,接触到了 condition_variable.future等多线程编程知识,感觉自己对这方面的知识还不熟悉,于是便找了一些学习资料进行学习. 2. ...

  9. Java学习笔记——线程

    线程 – 线程的基本概念 程序 :是为完成特定任务.用某种语言编写的一组指令的集合.即指一段静态的代码,静态对象. 进程 :是程序的一次执行过程,或是正在运行的一个程序.是一个动态的过程:有它自身的产 ...

最新文章

  1. 10 分钟从零搭建个人博客
  2. jsonp跨域读取cookie
  3. SQL Server安全(11/11):审核(Auditing)
  4. Single forest vs. multi-forest Active Directory design
  5. css font简写
  6. 九度OJ 题目1011:最大连续子序列
  7. Cmder 设置默认打开目录、解决中文乱码
  8. 博主在哈佛评论网上的博客
  9. 【排版】LaTeX公式编辑器-Texlive入门
  10. C语言,进制转换之十六进制转二进制,完整代码
  11. Linux相对路径和绝对路径
  12. 做IT民工还是IT精英?
  13. vue中使用file-saver导出文件
  14. IDEA提交git代码,配置文件乱码
  15. 户口本识别/户口本OCR识别
  16. 【认知实习】虚拟现实体验
  17. 谈谈QUIC协议原理
  18. LCR测试仪与阻抗分析仪有何不同?测试软件怎么选?
  19. css 文本溢出隐藏,省略号代替
  20. 闲的折腾——自己动手更换油雾分离阀/废气阀

热门文章

  1. linux将txt文件转化为raw,如何利用qemu-img工具将其它格式的镜像文件转换成VHD或RAW格式...
  2. SpringBoot JPA多对一 持久化是报错object references an unsaved transient instance - save the transient instanc
  3. Oracle数据库违反唯一约束条件
  4. 一文搞懂Oracle字符集
  5. 【面经】2022年软件测试面试题大全(持续更新)附答案
  6. [python爬虫]喜马拉雅音乐
  7. 【CSharp】延迟初始化(Lazy)
  8. 哪些场景N1 mode是disable状态
  9. 阿玛机器人_豪华日本声优阵容,《战斗天赋解析系统》让你耳朵怀孕!
  10. 75佳精美的 CSS 网页设计作品欣赏(系列一)