《C++ Concurrency in Action》笔记

  • 1 你好,C++的并发世界
    • 1.1 何谓并发
      • 1.1.1 计算机系统中的并发
      • 1.1.2 并发的途径
        • 多进程并发
        • 多线程并发
    • 1.2 为什么使用并发?
      • 1.2.1 为了分离关注点
      • 1.2.2 为了性能
      • 1.2.3 什么时候不使用并发
    • 1.3 C++中的并发和多线程
      • 1.3.1 C++多线程历史
      • 1.3.2 新标准支持并发
      • 1.3.3 C++线程库的效率
      • 1.3.4 平台相关的工具
    • 1.4 开始入门
  • 2 线程管理
    • 2.1 线程管理的基础
      • 2.1.1 启动线程
      • 2.1.2 等待线程完成
      • 2.1.3 特殊情况下的等待
      • 2.1.4 后台运行线程
    • 2.2 向线程函数传递参数
    • 2.3 转移线程所有权
    • 2.4 运行时决定线程数量
    • 2.5 标识线程
  • 3 线程间共享数据
    • 3.1 共享数据带来的问题
      • 3.1.1 条件竞争
      • 3.1.2 避免恶性条件竞争
    • 3.2 使用互斥量保护共享数据
      • 3.2.1 C++中使用互斥量
      • 3.2.2 精心组织代码来保护共享数据
      • 3.2.3 发现接口内在的条件竞争
      • 3.2.4 死锁:问题描述及解决方案
      • 3.2.5 避免死锁的进阶指导
      • 3.2.6 std::unique_lock——灵活的锁
      • 3.2.7 不同域中互斥量所有权的传递
      • 3.2.8 锁的粒度
    • 3.3 保护共享数据的替代设施
      • 3.3.1 保护共享数据的初始化过程
      • 3.3.2 保护很少更新的数据结构
      • 3.3.3 嵌套锁
  • 4 同步并发操作
    • 4.1 等待一个事件或其他条件
      • 4.1.1 等待条件达成
      • 4.1.2 使用条件变量构建线程安全队列
    • 4.2 使用期望等待一次性事件
      • 4.2.1 带返回值的后台任务
      • 4.2.2 任务与期望
      • 4.2.3 使用std::promises(单线程多连接)
      • 4.2.4 为“期望”存储“异常”
      • 4.2.5 多个线程的等待
    • 4.3 限定等待时间
      • 4.3.1 时钟
      • 4.3.2 时延
      • 4.3.3 时间点
      • 4.3.4 具有超时功能的函数
    • 4.4 使用同步操作简化代码
      • 4.4.1 使用“期望”的函数化编程
      • 4.4.2 使用消息传递的同步操作(状态机)
  • 5 C++内存模型和原子类型操作
    • 5.1 内存模型基础
      • 5.1.1 对象和内存位置
      • 5.1.2 对象、内存位置和并发
      • 5.1.3 修改顺序
    • 5.2 `C++`中的原子操作和原子类型
      • 5.2.1 标准原子类型
      • 5.2.3 std::atomic的相关操作
  • 6 基于锁的并发数据结构设计
  • 7 无锁并发数据结构设计
  • 8 并发代码设计
  • 9 高级线程管理
  • 10 多线程程序的测试和调试
  • 重点
    • 线程std::thread对象必须调用join或detach
    • shared_mutex也是基于操作系统底层的读写锁pthread_rwlock_t的封装
  • 参考

英文书籍《C++ Concurrency in Action》国内有不同人翻译,有一版是4个译者翻译的,豆瓣知乎风评一般。陈晓伟翻译的不错,下面是我对该译者书籍的读书笔记。

1 你好,C++的并发世界

1.1 何谓并发

最简单和最基本的并发,是指两个或更多独立的活动同时发生。
并发在生活中随处可见,我们可以一边走路一边说话,也可以两只手同时作不同的动作,还有我们每个人都过着相互独立的生活——当我在游泳的时候,你可以看球赛,等等。

1.1.1 计算机系统中的并发

计算机领域的并发指的是在单个系统里同时执行多个独立的任务,而非顺序的进行一些活动。
系统通过任务调度,在时间片微粒度下,做任务切换,让用户感觉不到间隙,这种也被称为并发。
多处理器计算机可实现硬件并发。

图1.2显示了四个任务在双核处理器上的任务切换,仍然是将任务整齐地划分为同等大小块的理想情况。实际上,许多因素会使得分割不均和调度不规则。

1.1.2 并发的途径

途径一:一个进程内含一个线程,交互是进程间通信;
途径二:一个进程内含多个线程,交互是线程间通信。

多进程并发

如图1.3所示,独立的进程可以通过进程间常规的通信渠道传递讯息(信号、套接字、文件、管道等等)。不过,这种进程之间的通信通常不是设置复杂,就是速度慢,这是因为操作系统会在进程间提供了一定的保护措施,以避免一个进程去修改另一个进程的数据。还有一个缺点是,运行多个进程所需的固定开销:需要时间启动进程,操作系统需要内部资源来管理进程,等等。
当然,以上的机制也不是一无是处:操作系统在进程间提供附加的保护操作和更高级别的通信机制,意味着可以更容易编写安全的并发代码。实际上,在类似于Erlang的编程环境中,将进程作为并发的基本构造块。
使用多进程实现并发还有一个额外的优势———可以使用远程连接(可能需要联网)的方式,在不同的机器上运行独立的进程。虽然,这增加了通信成本,但在设计精良的系统上,这可能是一个提高并行可用行和性能的低成本方式。

多线程并发

并发的另一个途径,在单个进程中运行多个线程。线程很像轻量级的进程:每个线程相互独立运行,且线程可以在不同的指令序列中运行。但是,进程中的所有线程都共享地址空间,并且所有线程访问到大部分数据———全局变量仍然是全局的,指针、对象的引用或数据可以在线程之间传递。虽然,进程之间通常共享内存,但是这种共享通常是难以建立和管理的。因为,同一数据的内存地址在不同的进程中是不相同。图1.4展示了一个进程中的两个线程通过共享内存进行通信。

地址空间共享,以及缺少线程间数据的保护,使得操作系统的记录工作量减小,所以使用多线程相关的开销远远小于使用多个进程。不过,共享内存的灵活性是有代价的:如果数据要被多个线程访问,那么程序员必须确保每个线程所访问到的数据是一致的(在本书第3、4、5和8章中会涉及,线程间数据共享可能会遇到的问题,以及如何使用工具来避免这些问题)。问题并非无解,只要在编写代码时适当地注意即可,这同样也意味着需要对线程通信做大量的工作。

1.2 为什么使用并发?

主要原因有两个:关注点分离(SOC)和性能

1.2.1 为了分离关注点

编写软件时,分离关注点是个好主意;通过将相关的代码与无关的代码分离,可以使程序更容易理解和测试,从而减少出错的可能性。即使一些功能区域中的操作需要在同一时刻发生的情况下,依旧可以使用并发分离不同的功能区域;若不显式地使用并发,就得编写一个任务切换框架,或者在操作中主动地调用一段不相关的代码。

1.2.2 为了性能

多处理器系统,现在越来越普遍。如果想要利用日益增长的计算能力,那就必须设计多任务并发式软件。
两种方式利用并发提高性能。一是业务逻辑并行,即任务并行(task parallelism),二是数据并行(data parallelism),如图像处理。

1.2.3 什么时候不使用并发

基本上,不使用并发的唯一原因就是,收益比不上成本。并发代码增加开发成本,更高复杂度增加出错风险。除非潜在的性能增益足够大或关注点分离地足够清晰,否则,别用并发。
此外,线程是有限的资源。如果让太多的线程同时运行,则会消耗很多操作系统资源,从而使得操作系统整体上运行得更加缓慢。不仅如此,因为每个线程都需要一个独立的堆栈空间,所以运行太多的线程也会耗尽进程的可用内存或地址空间。对于一个可用地址空间为4GB(32bit)的平坦架构的进程来说,这的确是个问题:如果每个线程都有一个1MB的堆栈(很多系统都会这样分配),那么4096个线程将会用尽所有地址空间,不会给代码、静态数据或者堆数据留有任何空间。即便64位(或者更大)的系统不存在这种直接的地址空间限制,但其他资源有限:如果你运行了太多的线程,最终也是出会问题的。尽管线程池(参见第9章)可以用来限制线程的数量,但这也并不是什么灵丹妙药,它也有自己的问题。
客户端/服务器(C/S)应用在服务器端为每一个链接启动一个独立的线程,对于少量的链接是可以正常工作的,但当同样的技术用于需要处理大量链接的高需求服务器时,也会因为线程太多而耗尽系统资源。在这种场景下,使用线程池可以对性能产生优化(参见第9章)。

1.3 C++中的并发和多线程

通过多线程为C++并发提供标准化支持是件新鲜事。只有在C++11标准下,才能编写不依赖平台扩展的多线程代码。了解C++线程库中的众多规则前,先来了解一下其发展的历史。

1.3.1 C++多线程历史

C++98(1998)标准不承认线程的存在,并且各种语言要素的操作效果都以顺序抽象机的形式编写。不仅如此,内存模型也没有正式定义,所以在C++98标准下,没办法在缺少编译器相关扩展的情况下编写多线程应用程序。
当然,编译器供应商可以自由地向语言添加扩展,添加C语言中流行的多线程API———POSIX标准中的C标准和Microsoft Windows API中的那些———这就使得很多C++编译器供应商通过各种平台相关扩展来支持多线程。所以C++本身不支持并行语法时,程序员们已经编写了大量的C++多线程程序了。

1.3.2 新标准支持并发

C++11新标准中不仅有了一个全新的线程感知内存模型,C++标准库也扩展了:包含了用于管理线程(参见第2章)、保护共享数据(参见第3章)、线程间同步操作(参见第4章),以及低级原子操作(参见第5章)的各种类。
新C++线程库很大程度上,是基于上文提到的C++类库的经验积累。特别是,Boost线程库作为新类库的主要模型,很多类与Boost库中的相关类有着相同名称和结构。随着C++标准的进步,Boost线程库也配合着C++标准在许多方面做出改变,因此之前使用Boost的用户将会发现自己非常熟悉C++11的线程库。
新的C++标准直接支持原子操作,允许程序员通过定义语义的方式编写高效的代码,从而无需了解与平台相关的汇编指令。

1.3.3 C++线程库的效率

为了效率,C++类整合了一些底层工具。这样就需要了解相关使用高级工具和使用低级工具的开销差,这个开销差就是抽象代价(abstraction penalty)。该类库在大部分主流平台上都能实现高效(带有非常低的抽象代价)。

1.3.4 平台相关的工具

在C++线程库中提供一个native_handle()成员函数,允许通过使用平台相关API直接操作底层实现。就其本质而言,任何使用native_handle()执行的操作都是完全依赖于平台的。

1.4 开始入门

#include <iostream>
#include <thread>  //①
void hello()  //②
{std::cout << "Hello Concurrent World\n";
}
int main()
{std::thread t(hello);  //③t.join();  //④
}

2 线程管理

2.1 线程管理的基础

2.1.1 启动线程

void do_some_work();
std::thread my_thread(do_some_work);
class background_task
{
public:void operator()() const{do_something();do_something_else();}
};background_task f;
std::thread my_thread(f);

有件事需要注意,当把函数对象传入到线程构造函数中时,需要避免“最令人头痛的语法解析”(C++’s most vexing parse, 中文简介)。如果你传递了一个临时变量,而不是一个命名的变量;C++编译器会将其解析为函数声明,而不是类型对象的定义。

例如:

std::thread my_thread(background_task());

这里相当与声明了一个名为my_thread的函数,这个函数带有一个参数(函数指针指向没有参数并返回background_task对象的函数),返回一个std::thread对象的函数,而非启动了一个线程。
使用在前面命名函数对象的方式,或使用多组括号①,或使用新统一的初始化语法②,可以避免这个问题。

如下所示:

std::thread my_thread((background_task()));  // 1
std::thread my_thread{background_task()};    // 2

使用lambda表达式也能避免这个问题。lambda表达式是C++11的一个新特性,它允许使用一个可以捕获局部变量的局部函数(可以避免传递参数,参见2.2节)。想要具体的了解lambda表达式,可以阅读附录A的A.5节。之前的例子可以改写为lambda表达式的类型:

std::thread my_thread([]{do_something();do_something_else();
});

启动了线程,你需要明确是要等待线程结束(加入式——参见2.1.2节),还是让其自主运行(分离式——参见2.1.3节)。如果std::thread对象销毁之前还没有做出决定,程序就会终止(std::thread的析构函数会调用std::terminate())。因此,即便是有异常存在,也需要确保线程能够正确的_加入_(joined)或_分离_(detached)。

清单2.1 函数已经结束,线程依旧访问局部变量

struct func
{int& i;func(int& i_) : i(i_) {}void operator() (){for (unsigned j=0 ; j<1000000 ; ++j){do_something(i);           // 1. 潜在访问隐患:悬空引用}}
};void oops()
{int some_local_state=0;func my_func(some_local_state);std::thread my_thread(my_func);my_thread.detach();          // 2. 不等待线程结束
}                              // 3. 新线程可能还在运行

这个例子中,已经决定不等待线程结束(使用了detach()②),所以当oops()函数执行完成时③,新线程中的函数可能还在运行。如果线程还在运行,它就会去调用do_something(i)函数①,这时就会访问已经销毁的变量。如同一个单线程程序——允许在函数完成后继续持有局部变量的指针或引用;当然,这从来就不是一个好主意——这种情况发生时,错误并不明显,会使多线程更容易出错。

处理这种情况的常规方法:使线程函数的功能齐全,将数据复制到线程中,而非复制到共享数据中。如果使用一个可调用的对象作为线程函数,这个对象就会复制到线程中,而后原始对象就会立即销毁。但对于对象中包含的指针和引用还需谨慎,例如清单2.1所示。使用一个能访问局部变量的函数去创建线程是一个糟糕的主意(除非十分确定线程会在函数完成前结束)。此外,可以通过join()函数来确保线程在函数完成前结束。

2.1.2 等待线程完成

如果需要等待线程,相关的std::thread实例需要使用join()。在这种情况下,因为原始线程在其生命周期中并没有做什么事,使得用一个独立的线程去执行函数变得收益甚微,但在实际编程中,原始线程要么有自己的工作要做;要么会启动多个子线程来做一些有用的工作,并等待这些线程结束。
这意味着,只能对一个线程使用一次join();一旦已经使用过join(),std::thread对象就不能再次加入了,当对其使用joinable()时,将返回false。

2.1.3 特殊情况下的等待

如果想要分离一个线程,可以在线程启动后,直接使用detach()进行分离。如果打算等待对应线程,通常,当倾向于在无异常的情况下使用join()时,需要在异常处理过程中调用join(),从而避免应用被抛出的异常所终止。
清单 2.2 等待线程完成

struct func; // 定义在清单2.1中
void f()
{int some_local_state=0;func my_func(some_local_state);std::thread t(my_func);try{do_something_in_current_thread();}catch(...){t.join();  // 1throw;}t.join();  // 2
}

代码使用了try/catch块确保访问本地状态的线程退出后,函数才结束。

一种方式是使用“资源获取即初始化方式”(RAII,Resource Acquisition Is Initialization),并且提供一个类,在析构函数中使用join(),如同下面清单中的代码。看它如何简化f()函数。

清单 2.3 使用RAII等待线程完成

class thread_guard
{std::thread& t;
public:explicit thread_guard(std::thread& t_):t(t_){}~thread_guard(){if(t.joinable()) // 1{t.join();      // 2}}thread_guard(thread_guard const&)=delete;   // 3thread_guard& operator=(thread_guard const&)=delete;
};struct func; // 定义在清单2.1中void f()
{int some_local_state=0;func my_func(some_local_state);std::thread t(my_func);thread_guard g(t);do_something_in_current_thread();
}    // 4

2.1.4 后台运行线程

使用detach()会让线程在后台运行,这就意味着主线程不能与之产生直接交互。也就是说,不会等待这个线程结束;如果线程分离,那么就不可能有std::thread对象能引用它,分离线程的确在后台运行,所以分离线程不能被加入。不过C++运行库保证,当线程退出时,相关资源的能够正确回收,后台线程的归属和控制C++运行库都会处理。
通常称分离线程为_守护线程_(daemon threads),UNIX中守护线程是指,没有任何显式的用户接口,并在后台运行的线程。这种线程的特点就是长时间运行;线程的生命周期可能会从某一个应用起始到结束,可能会在后台监视文件系统,还有可能对缓存进行清理,亦或对数据结构进行优化。

std::thread t(do_background_work);
t.detach();
assert(!t.joinable());

清单2.4 使用分离线程去处理其他文档

void edit_document(std::string const& filename)
{open_document_and_display_gui(filename);while(!done_editing()){user_command cmd=get_user_input();if(cmd.type==open_new_document){std::string const new_name=get_filename_from_user();std::thread t(edit_document,new_name);  // 1t.detach();  // 2}else{process_user_input(cmd);}}
}

2.2 向线程函数传递参数

std::thread构造函数中的可调用对象,或函数传递一个参数很简单。需要注意的是,默认参数要拷贝到线程独立内存中,即使参数是引用的形式,也可以在新线程中进行访问。

void f(int i, std::string const& s);
std::thread t(f, 3, "hello");

代码创建了一个调用f(3, “hello”)的线程。注意,函数f需要一个std::string对象作为第二个参数,但这里使用的是字符串的字面值,也就是char const *类型。之后,在线程的上下文中完成字面值向std::string对象的转化。需要特别要注意,当指向动态变量的指针作为参数传递给线程的情况,代码如下:

void f(int i,std::string const& s);
void oops(int some_param)
{char buffer[1024]; // 1sprintf(buffer, "%i",some_param);std::thread t(f,3,buffer); // 2t.detach();
}

这种情况下,buffer②是一个指针变量,指向本地变量,然后本地变量通过buffer传递到新线程中②。并且,函数有很有可能会在字面值转化成std::string对象之前崩溃(oops),从而导致一些未定义的行为。并且想要依赖隐式转换将字面值转换为函数期待的std::string对象,但因std::thread的构造函数会复制提供的变量,就只复制了没有转换成期望类型的字符串字面值。

解决方案就是在传递到std::thread构造函数之前就将字面值转化为std::string对象:

void f(int i,std::string const& s);
void not_oops(int some_param)
{char buffer[1024];sprintf(buffer,"%i",some_param);std::thread t(f,3,std::string(buffer));  // 使用std::string,避免悬垂指针t.detach();
}

还可能遇到相反的情况:期望传递一个引用,但整个对象被复制了。当线程更新一个引用传递的数据结构时,这种情况就可能发生,比如:

void update_data_for_widget(widget_id w,widget_data& data); // 1
void oops_again(widget_id w)
{widget_data data;std::thread t(update_data_for_widget,w,data); // 2display_status();t.join();process_widget_data(data); // 3
}

虽然update_data_for_widget①的第二个参数期待传入一个引用,但是std::thread的构造函数②并不知晓;构造函数无视函数期待的参数类型,并盲目的拷贝已提供的变量。当线程调用update_data_for_widget函数时,传递给函数的参数是data变量内部拷贝的引用,而非数据本身的引用。因此,当线程结束时,内部拷贝数据将会在数据更新阶段被销毁,且process_widget_data将会接收到没有修改的data变量③。对于熟悉std::bind的开发者来说,问题的解决办法是显而易见的:可以使用std::ref将参数转换成引用的形式,从而可将线程的调用改为以下形式:

std::thread t(update_data_for_widget,w,std::ref(data));

在这之后,update_data_for_widget就会接收到一个data变量的引用,而非一个data变量拷贝的引用。

如果你熟悉std::bind,就应该不会对以上述传参的形式感到奇怪,因为std::thread构造函数和std::bind的操作都在标准库中定义好了,可以传递一个成员函数指针作为线程函数,并提供一个合适的对象指针作为第一个参数:

class X
{
public:void do_lengthy_work();
};
X my_x;
std::thread t(&X::do_lengthy_work,&my_x); // 1

这段代码中,新线程将my_x.do_lengthy_work()作为线程函数;my_x的地址①作为指针对象提供给函数。也可以为成员函数提供参数:std::thread构造函数的第三个参数就是成员函数的第一个参数,以此类推(代码如下,译者自加)。

class X
{
public:void do_lengthy_work(int);
};
X my_x;
int num(0);
std::thread t(&X::do_lengthy_work, &my_x, num);

有趣的是,提供的参数可以移动,但不能拷贝。"移动"是指:原始对象中的数据转移给另一对象,而转移的这些数据就不再在原始对象中保存了(译者:比较像在文本编辑的"剪切"操作)。std::unique_ptr就是这样一种类型(译者:C++11中的智能指针),这种类型为动态分配的对象提供内存自动管理机制(译者:类似垃圾回收)。同一时间内,只允许一个std::unique_ptr实现指向一个给定对象,并且当这个实现销毁时,指向的对象也将被删除。移动构造函数(move constructor)和移动赋值操作符(move assignment operator)允许一个对象在多个std::unique_ptr实现中传递(有关"移动"的更多内容,请参考附录A的A.1.1节)。使用"移动"转移原对象后,就会留下一个空指针(NULL)。移动操作可以将对象转换成可接受的类型,例如:函数参数或函数返回的类型。当原对象是一个临时变量时,自动进行移动操作,但当原对象是一个命名变量,那么转移的时候就需要使用std::move()进行显示移动。下面的代码展示了std::move的用法,展示了std::move是如何转移一个动态对象到一个线程中去的:

void process_big_object(std::unique_ptr<big_object>);std::unique_ptr<big_object> p(new big_object);
p->prepare_data(42);
std::thread t(process_big_object,std::move(p));

std::thread的构造函数中指定std::move(p),big_object对象的所有权就被首先转移到新创建线程的的内部存储中,之后传递给process_big_object函数。

标准线程库中和std::unique_ptr在所属权上有相似语义类型的类有好几种,std::thread为其中之一。虽然,std::thread实例不像std::unique_ptr那样能占有一个动态对象的所有权,但是它能占有其他资源:每个实例都负责管理一个执行线程。执行线程的所有权可以在多个std::thread实例中互相转移,这是依赖于std::thread实例的可移动不可复制性。不可复制保性证了在同一时间点,一个std::thread实例只能关联一个执行线程;可移动性使得程序员可以自己决定,哪个实例拥有实际执行线程的所有权。

2.3 转移线程所有权

假设要写一个在后台启动线程的函数,想通过新线程返回的所有权去调用这个函数,而不是等待线程结束再去调用;或完全与之相反的想法:创建一个线程,并在函数中转移所有权,都必须要等待线程结束。总之,新线程的所有权都需要转移。
这就是移动引入std::thread的原因,C++标准库中有很多_资源占有_(resource-owning)类型,比如std::ifstream,std::unique_ptr还有std::thread都是可移动,但不可拷贝。这就说明执行线程的所有权可以在std::thread实例中移动,下面将展示一个例子。例子中,创建了两个执行线程,并且在std::thread实例之间(t1,t2和t3)转移所有权:

void some_function();
void some_other_function();
std::thread t1(some_function);            // 1
std::thread t2=std::move(t1);            // 2
t1=std::thread(some_other_function);    // 3
std::thread t3;                            // 4
t3=std::move(t2);                        // 5
t1=std::move(t3);                        // 6 赋值操作将使程序崩溃

首先,新线程开始与t1相关联。当显式使用std::move()创建t2后②,t1的所有权就转移给了t2。之后,t1和执行线程已经没有关联了;执行some_function的函数现在与t2关联。

然后,与一个临时std::thread对象相关的线程启动了③。为什么不显式调用std::move()转移所有权呢?因为,所有者是一个临时对象——移动操作将会隐式的调用。

t3使用默认构造方式创建④,与任何执行线程都没有关联。调用std::move()将与t2关联线程的所有权转移到t3中⑤。因为t2是一个命名对象,需要显式的调用std::move()。移动操作⑤完成后,t1与执行some_other_function的线程相关联,t2与任何线程都无关联,t3与执行some_function的线程相关联。

最后一个移动操作,将some_function线程的所有权转移⑥给t1。不过,t1已经有了一个关联的线程(执行some_other_function的线程),所以这里系统直接调用std::terminate()终止程序继续运行。

std::thread支持移动,就意味着线程的所有权可以在函数外进行转移,就如下面程序一样。

清单2.5 函数返回std::thread对象

std::thread f()
{void some_function();return std::thread(some_function);
}std::thread g()
{void some_other_function(int);std::thread t(some_other_function,42);return t;
}

当所有权可以在函数内部传递,就允许std::thread实例可作为参数进行传递,代码如下:

void f(std::thread t);
void g()
{void some_function();f(std::thread(some_function));std::thread t(some_function);f(std::move(t));
}

std::thread对象的容器,如果这个容器是移动敏感的(比如,标准中的std::vector<>),那么移动操作同样适用于这些容器。了解这些后,就可以写出类似清单2.7中的代码,代码量产了一些线程,并且等待它们结束。

清单2.7 量产线程,等待它们结束

void do_work(unsigned id);void f()
{std::vector<std::thread> threads;for(unsigned i=0; i < 20; ++i){threads.push_back(std::thread(do_work,i)); // 产生线程} std::for_each(threads.begin(),threads.end(),std::mem_fn(&std::thread::join)); // 对每个线程调用join()
}

2.4 运行时决定线程数量

std::thread::hardware_concurrency()在新版C++标准库中是一个很有用的函数。这个函数将返回能同时并发在一个程序中的线程数量。例如,多核系统中,返回值可以是CPU核芯的数量。返回值也仅仅是一个提示,当系统信息无法获取时,函数也会返回0。

2.5 标识线程

线程标识类型是std::thread::id,可以通过两种方式进行检索。第一种,可以通过调用std::thread对象的成员函数get_id()来直接获取。如果std::thread对象没有与任何执行线程相关联,get_id()将返回std::thread::type默认构造值,这个值表示“无线程”。第二种,当前线程中调用std::this_thread::get_id()(这个函数定义在<thread>头文件中)也可以获得线程标识。

std::thread::id对象可以自由的拷贝和对比,因为标识符就可以复用。如果两个对象的std::thread::id相等,那它们就是同一个线程,或者都“无线程”。如果不等,那么就代表了两个不同线程,或者一个有线程,另一没有线程。

线程库不会限制你去检查线程标识是否一样,std::thread::id类型对象提供相当丰富的对比操作;比如,提供为不同的值进行排序。这意味着允许程序员将其当做为容器的键值,做排序,或做其他方式的比较。按默认顺序比较不同值的std::thread::id,所以这个行为可预见的:当a<bb<c时,得a<c,等等。标准库也提供std::hash<std::thread::id>容器,所以std::thread::id也可以作为无序容器的键值。

std::thread::id master_thread;
void some_core_part_of_algorithm()
{if(std::this_thread::get_id()==master_thread){do_master_thread_work();}do_common_work();
}

3 线程间共享数据

3.1 共享数据带来的问题

当涉及到共享数据时,问题很可能是因为共享数据修改所导致。如果共享数据是只读的,那么只读操作不会影响到数据,更不会涉及对数据的修改,所以所有线程都会获得同样的数据。但是,当一个或多个线程要修改共享数据时,就会产生很多麻烦。这种情况下,就必须小心谨慎,才能确保一切所有线程都工作正常。
不变量(invariants)的概念对程序员们编写的程序会有一定的帮助——对于特殊结构体的描述;比如,“变量包含列表中的项数”。不变量通常会在一次更新中被破坏,特别是比较复杂的数据结构,或者一次更新就要改动很大的数据结构。
双链表中每个节点都有一个指针指向列表中下一个节点,还有一个指针指向前一个节点。其中不变量就是节点A中指向“下一个”节点B的指针,还有前向指针。为了从列表中删除一个节点,其两边节点的指针都需要更新。当其中一边更新完成时,不变量就被破坏了,直到另一边也完成更新;在两边都完成更新后,不变量就又稳定了。

从一个列表中删除一个节点的步骤如下图:

1. 找到要删除的节点N
2. 更新前一个节点指向N的指针,让这个指针指向N的下一个节点
3. 更新后一个节点指向N的指针,让这个指正指向N的前一个节点
4. 删除节点N


图中b和c在相同的方向上指向和原来已经不一致了,这就破坏了不变量。线程间潜在问题就是修改共享数据,致使不变量遭到破坏。这就是并行代码常见错误:条件竞争。

3.1.1 条件竞争

C++标准中也定义了数据竞争这个术语,一种特殊的条件竞争:并发的去修改一个独立对象,数据竞争是(可怕的)未定义行为的起因。
恶性条件竞争通常发生于完成对多于一个的数据块的修改时,例如,对两个连接指针的修改。因为操作要访问两个独立的数据块,独立的指令将会对数据块将进行修改,并且其中一个线程可能正在进行时,另一个线程就对数据块进行了访问。因为出现的概率太低,条件竞争很难查找,也很难复现。

3.1.2 避免恶性条件竞争

这里提供一些方法来解决恶性条件竞争,最简单的办法就是对数据结构采用某种保护机制,确保只有进行修改的线程才能看到不变量被破坏时的中间状态。从其他访问线程的角度来看,修改不是已经完成了,就是还没开始。C++标准库提供很多类似的机制。
另一个选择是对数据结构和不变量的设计进行修改,修改完的结构必须能完成一系列不可分割的变化,也就是保证每个不变量保持稳定的状态,这就是所谓的无锁编程。不过,这种方式很难得到正确的结果。如果到这个级别,无论是内存模型上的细微差异,还是线程访问数据的能力,都会让工作变的复杂。
另一种处理条件竞争的方式是,使用事务的方式去处理数据结构的更新(这里的"处理"就如同对数据库进行更新一样)。所需的一些数据和读取都存储在事务日志中,然后将之前的操作合为一步,再进行提交。当数据结构被另一个线程修改后,或处理已经重启的情况下,提交就会无法进行,这称作为“软件事务内存”。理论研究中,这是一个很热门的研究领域。这个概念将不会在本书中再进行介绍,因为在C++中没有对STM进行直接支持。但是,基本思想会在后面提及。

保护共享数据结构的最基本的方式,是使用C++标准库提供的互斥量。

3.2 使用互斥量保护共享数据

互斥量是C++中一种最通用的数据保护机制,但它不是“银弹”;精心组织代码来保护正确的数据,并在接口内部避免竞争条件是非常重要的。但互斥量自身也有问题,也会造成死锁,或是对数据保护的太多(或太少)。

3.2.1 C++中使用互斥量

C++中通过实例化std::mutex创建互斥量,通过调用成员函数lock()进行上锁,unlock()进行解锁。不过,不推荐实践中直接去调用成员函数,因为调用成员函数就意味着,必须记住在每个函数出口都要去调用unlock(),也包括异常的情况。C++标准库为互斥量提供了一个RAII语法的模板类std::lock_guard,其会在构造的时候提供已锁的互斥量,并在析构的时候进行解锁,从而保证了一个已锁的互斥量总是会被正确的解锁。下面的程序清单中,展示了如何在多线程程序中,使用std::mutex构造的std::lock_guard实例,对一个列表进行访问保护。std::mutexstd::lock_guard都在<mutex>头文件中声明。
清单3.1 使用互斥量保护列表

#include <list>
#include <mutex>
#include <algorithm>std::list<int> some_list;    // 1
std::mutex some_mutex;    // 2void add_to_list(int new_value)
{std::lock_guard<std::mutex> guard(some_mutex);    // 3some_list.push_back(new_value);
}bool list_contains(int value_to_find)
{std::lock_guard<std::mutex> guard(some_mutex);    // 4return std::find(some_list.begin(),some_list.end(),value_to_find) != some_list.end();
}

当所有成员函数都会在调用时对数据上锁,结束时对数据解锁,那么就保证了数据访问时不变量不被破坏。
当然,也不是总是那么理想,聪明的你一定注意到了:当其中一个成员函数返回的是保护数据的指针或引用时,会破坏对数据的保护。具有访问能力的指针或引用可以访问(并可能修改)被保护的数据,而不会被互斥锁限制。互斥量保护的数据需要对接口的设计相当谨慎,要确保互斥量能锁住任何对保护数据的访问,并且不留后门。

3.2.2 精心组织代码来保护共享数据

使用互斥量来保护数据,并不是仅仅在每一个成员函数中都加入一个std::lock_guard对象那么简单;一个迷失的指针或引用,将会让这种保护形同虚设。
清单3.2 无意中传递了保护数据的引用

class some_data
{int a;std::string b;
public:void do_something();
};class data_wrapper
{
private:some_data data;std::mutex m;
public:template<typename Function>void process_data(Function func){std::lock_guard<std::mutex> l(m);func(data);    // 1 传递“保护”数据给用户函数}
};some_data* unprotected;void malicious_function(some_data& protected_data)
{unprotected=&protected_data;
}data_wrapper x;
void foo()
{x.process_data(malicious_function);    // 2 传递一个恶意函数unprotected->do_something();    // 3 在无保护的情况下访问保护数据
}

例子中process_data看起来没有任何问题,std::lock_guard对数据做了很好的保护,但调用用户提供的函数func①,就意味着foo能够绕过保护机制将函数malicious_function传递进去②,在没有锁定互斥量的情况下调用do_something()

3.2.3 发现接口内在的条件竞争

清单3.3 std::stack容器的实现

template<typename T,typename Container=std::deque<T> >
class stack
{
public:explicit stack(const Container&);explicit stack(Container&& = Container());template <class Alloc> explicit stack(const Alloc&);template <class Alloc> stack(const Container&, const Alloc&);template <class Alloc> stack(Container&&, const Alloc&);template <class Alloc> stack(stack&&, const Alloc&);bool empty() const;size_t size() const;T& top();T const& top() const;void push(T const&);void push(T&&);void pop();void swap(stack&&);
};

虽然empty()和size()可能在被调用并返回时是正确的,但其的结果是不可靠的;当它们返回后,其他线程就可以自由地访问栈,并且可能push()多个新元素到栈中,也可能pop()一些已在栈中的元素。这样的话,之前从empty()和size()得到的结果就有问题了。

特别地,当栈实例是非共享的,如果栈非空,使用empty()检查再调用top()访问栈顶部的元素是安全的。如下代码所示:

stack<int> s;
if (! s.empty()){    // 1int const value = s.top();    // 2s.pop();    // 3do_something(value);
}

以上是单线程安全代码:对一个空栈使用top()是未定义行为。对于共享的栈对象,这样的调用顺序就不再安全了,因为在调用empty()①和调用top()②之间,可能有来自另一个线程的pop()调用并删除了最后一个元素。这是一个经典的条件竞争,使用互斥量对栈内部数据进行保护,但依旧不能阻止条件竞争的发生,这就是接口固有的问题。
怎么解决呢?问题发生在接口设计上,所以解决的方法也就是改变接口设计。
表3.1 一种可能执行顺序

Thread A Thread B
if (!s.empty);
if(!s.empty);
int const value = s.top();
int const value = s.top();
s.pop();
do_something(value); s.pop();
do_something(value);

当线程运行时,调用两次top(),栈没被修改,所以每个线程能得到同样的值。值已被处理了两次。这种条件竞争,因为看起来没有任何错误,就会让这个Bug很难定位。
这就需要接口设计上有较大的改动,提议之一就是使用同一互斥量来保护top()和pop()。
选项1: 传入一个引用
第一个选项是将变量的引用作为参数,传入pop()函数中获取想要的“弹出值”:

std::vector<int> result;
some_stack.pop(result);

大多数情况下,这种方式还不错,但有明显的缺点:需要构造出一个栈中类型的实例,用于接收目标值。对于一些类型,这样做是不现实的,因为临时构造一个实例,从时间和资源的角度上来看,都是不划算。对于其他的类型,这样也不总能行得通,因为构造函数需要的一些参数,在代码的这个阶段不一定可用。最后,需要可赋值的存储类型,这是一个重大限制:即使支持移动构造,甚至是拷贝构造(从而允许返回一个值),很多用户自定义类型可能都不支持赋值操作。
选项2:无异常抛出的拷贝构造函数或移动构造函数
对于有返回值的pop()函数来说,只有“异常安全”方面的担忧(当返回值时可以抛出一个异常)。很多类型都有拷贝构造函数,它们不会抛出异常,并且随着新标准中对“右值引用”的支持(详见附录A,A.1节),很多类型都将会有一个移动构造函数,即使他们和拷贝构造函数做着相同的事情,它也不会抛出异常。一个有用的选项可以限制对线程安全的栈的使用,并且能让栈安全的返回所需的值,而不会抛出异常。
选项3:返回指向弹出值的指针
第三个选择是返回一个指向弹出元素的指针,而不是直接返回值。指针的优势是自由拷贝,并且不会产生异常。缺点就是返回一个指针需要对对象的内存分配进行管理,对于简单数据类型(比如:int),内存管理的开销要远大于直接返回值。对于选择这个方案的接口,使用std::shared_ptr是个不错的选择;不仅能避免内存泄露(因为当对象中指针销毁时,对象也会被销毁),而且标准库能够完全控制内存分配方案,也就不需要new和delete操作。这种优化是很重要的:因为堆栈中的每个对象,都需要用new进行独立的内存分配,相较于非线程安全版本,这个方案的开销相当大。
选项4:“选项1 + 选项2”或 “选项1 + 选项3”
对于通用的代码来说,灵活性不应忽视。当你已经选择了选项2或3时,再去选择1也是很容易的。这些选项提供给用户,让用户自己选择对于他们自己来说最合适,最经济的方案。
例:定义线程安全的堆栈
清单3.4 线程安全的堆栈类定义(概述)

#include <exception>
#include <memory>  // For std::shared_ptr<>struct empty_stack: std::exception
{const char* what() const throw();
};template<typename T>
class threadsafe_stack
{
public:threadsafe_stack();threadsafe_stack(const threadsafe_stack&);threadsafe_stack& operator=(const threadsafe_stack&) = delete; // 1 赋值操作被删除void push(T new_value);std::shared_ptr<T> pop();void pop(T& value);bool empty() const;
};

削减接口可以获得最大程度的安全,甚至限制对栈的一些操作。使用std::shared_ptr可以避免内存分配管理的问题,并避免多次使用new和delete操作。
清单3.5 扩充(线程安全)堆栈

#include <exception>
#include <memory>
#include <mutex>
#include <stack>struct empty_stack: std::exception
{const char* what() const throw() {return "empty stack!";};
};template<typename T>
class threadsafe_stack
{
private:std::stack<T> data;mutable std::mutex m;public:threadsafe_stack(): data(std::stack<T>()){}threadsafe_stack(const threadsafe_stack& other){std::lock_guard<std::mutex> lock(other.m);data = other.data; // 1 在构造函数体中的执行拷贝}threadsafe_stack& operator=(const threadsafe_stack&) = delete;void push(T new_value){std::lock_guard<std::mutex> lock(m);data.push(new_value);}std::shared_ptr<T> pop(){std::lock_guard<std::mutex> lock(m);if(data.empty()) throw empty_stack(); // 在调用pop前,检查栈是否为空std::shared_ptr<T> const res(std::make_shared<T>(data.top())); // 在修改堆栈前,分配出返回值data.pop();return res;}void pop(T& value){std::lock_guard<std::mutex> lock(m);if(data.empty()) throw empty_stack();value=data.top();data.pop();}bool empty() const{std::lock_guard<std::mutex> lock(m);return data.empty();}
};

之前对top()和pop()函数的讨论中,恶性条件竞争已经出现,因为锁的粒度太小,需要保护的操作并未全覆盖到。不过,锁住的颗粒过大同样会有问题。还有一个问题,一个全局互斥量要去保护全部共享数据,在一个系统中存在有大量的共享数据时,因为线程可以强制运行,甚至可以访问不同位置的数据,抵消了并发带来的性能提升。在第一版为多处理器系统设计Linux内核中,就使用了一个全局内核锁。虽然这个锁能正常工作,但在双核处理系统的上的性能要比两个单核系统的性能差很多,四核系统就更不能提了。太多请求去竞争占用内核,使得依赖于处理器运行的线程没有办法很好的工作。随后修正的Linux内核加入了一个细粒度锁方案,因为少了很多内核竞争,这时四核处理系统的性能就和单核处理的四倍差不多了。
一个给定操作需要两个或两个以上的互斥量时,另一个潜在的问题将出现:死锁。与条件竞争完全相反——不同的两个线程会互相等待,从而什么都没做。

3.2.4 死锁:问题描述及解决方案

试想有一个玩具,这个玩具由两部分组成,必须拿到这两个部分,才能够玩。例如,一个玩具鼓,需要一个鼓锤和一个鼓才能玩。现在有两个小孩,他们都很喜欢玩这个玩具。当其中一个孩子拿到了鼓和鼓锤时,那就可以尽情的玩耍了。当另一孩子想要玩,他就得等待另一孩子玩完才行。再试想,鼓和鼓锤被放在不同的玩具箱里,并且两个孩子在同一时间里都想要去敲鼓。之后,他们就去玩具箱里面找这个鼓。其中一个找到了鼓,并且另外一个找到了鼓锤。现在问题就来了,除非其中一个孩子决定让另一个先玩,他可以把自己的那部分给另外一个孩子;但当他们都紧握着自己所有的部分而不给予,那么这个鼓谁都没法玩。
现在没有孩子去争抢玩具,但线程有对锁的竞争:一对线程需要对他们所有的互斥量做一些操作,其中每个线程都有一个互斥量,且等待另一个解锁。这样没有线程能工作,因为他们都在等待对方释放互斥量。这种情况就是死锁,它的最大问题就是由两个或两个以上的互斥量来锁定一个操作。
避免死锁的一般建议,就是让两个互斥量总以相同的顺序上锁:总在互斥量B之前锁住互斥量A,就永远不会死锁。
std::lock——可以一次性锁住多个(两个以上)的互斥量,并且没有副作用(死锁风险)。下面的程序清单中,就来看一下怎么在一个简单的交换操作中使用std::lock
清单3.6 交换操作中使用std::lock()std::lock_guard

// 这里的std::lock()需要包含<mutex>头文件
class some_big_object;
void swap(some_big_object& lhs,some_big_object& rhs);
class X
{
private:some_big_object some_detail;std::mutex m;
public:X(some_big_object const& sd):some_detail(sd){}friend void swap(X& lhs, X& rhs){if(&lhs==&rhs)return;std::lock(lhs.m,rhs.m); // 1std::lock_guard<std::mutex> lock_a(lhs.m,std::adopt_lock); // 2std::lock_guard<std::mutex> lock_b(rhs.m,std::adopt_lock); // 3swap(lhs.some_detail,rhs.some_detail);}
};

使用std::lock去锁lhs.m或rhs.m时,可能会抛出异常;这种情况下,异常会传播到std::lock之外。当std::lock成功的获取一个互斥量上的锁,并且当其尝试从另一个互斥量上再获取锁时,就会有异常抛出,第一个锁也会随着异常的产生而自动释放,所以std::lock要么将两个锁都锁住,要不一个都不锁。
虽然std::lock可以在这情况下(获取两个以上的锁)避免死锁,但它没办法帮助你获取其中一个锁。

3.2.5 避免死锁的进阶指导

虽然锁是产生死锁的一般原因,但也不排除死锁出现在其他地方。无锁的情况下,仅需要每个std::thread对象调用join(),两个线程就能产生死锁。这种情况很常见,一个线程会等待另一个线程,其他线程同时也会等待第一个线程结束,所以三个或更多线程的互相等待也会发生死锁。为了避免死锁,这里的指导意见为:当机会来临时,不要拱手让人。以下提供一些个人的指导建议,如何识别死锁,并消除其他线程的等待。
避免嵌套锁
第一个建议往往是最简单的:一个线程已获得一个锁时,再别去获取第二个。如果能坚持这个建议,因为每个线程只持有一个锁,锁上就不会产生死锁。即使互斥锁造成死锁的最常见原因,也可能会在其他方面受到死锁的困扰(比如:线程间的互相等待)。当你需要获取多个锁,使用一个std::lock来做这件事(对获取锁的操作上锁),避免产生死锁。
避免在持有锁时调用用户提供的代码
:因为代码是用户提供的,你没有办法确定用户要做什么;用户程序可能做任何事情,包括获取锁。你在持有锁的情况下,调用用户提供的代码;如果用户代码要获取一个锁,就会违反第一个指导意见,并造成死锁(有时,这是无法避免的)。
使用固定顺序获取锁
当硬性条件要求你获取两个以上(包括两个)的锁,并且不能使用std::lock单独操作来获取它们;那么最好在每个线程上,用固定的顺序获取它们获取它们(锁)。
为了遍历链表,线程必须保证在获取当前节点的互斥锁前提下,获得下一个节点的锁,要保证指向下一个节点的指针不会同时被修改。一旦下一个节点上的锁被获取,那么第一个节点的锁就可以释放了,因为没有持有它的必要性了。
这种“手递手”锁的模式允许多个线程访问列表,为每一个访问的线程提供不同的节点。但是,为了避免死锁,节点必须以同样的顺序上锁:如果两个线程试图用互为反向的顺序,使用“手递手”锁遍历列表,他们将执行到列表中间部分时,发生死锁。当节点A和B在列表中相邻,当前线程可能会同时尝试获取A和B上的锁。另一个线程可能已经获取了节点B上的锁,并且试图获取节点A上的锁——经典的死锁场景。
当A、C节点中的B节点正在被删除时,如果有线程在已获取A和C上的锁后,还要获取B节点上的锁时,当一个线程遍历列表的时候,这样的情况就可能发生死锁。这样的线程可能会试图首先锁住A节点或C节点(根据遍历的方向),但是后面就会发现,它无法获得B上的锁,因为线程在执行删除任务的时候,已经获取了B上的锁,并且同时也获取了A和C上的锁。

这里提供一种避免死锁的方式,定义遍历的顺序,所以一个线程必须先锁住A才能获取B的锁,在锁住B之后才能获取C的锁。这将消除死锁发生的可能性,在不允许反向遍历的列表上。类似的约定常被用来建立其他的数据结构。
使用锁的层次结构
清单3.7 使用层次锁来避免死锁

hierarchical_mutex high_level_mutex(10000); // 1
hierarchical_mutex low_level_mutex(5000);  // 2int do_low_level_stuff();int low_level_func()
{std::lock_guard<hierarchical_mutex> lk(low_level_mutex); // 3return do_low_level_stuff();
}void high_level_stuff(int some_param);void high_level_func()
{std::lock_guard<hierarchical_mutex> lk(high_level_mutex); // 4high_level_stuff(low_level_func()); // 5
}void thread_a()  // 6
{high_level_func();
}hierarchical_mutex other_mutex(100); // 7
void do_other_stuff();void other_stuff()
{high_level_func();  // 8do_other_stuff();
}void thread_b() // 9
{std::lock_guard<hierarchical_mutex> lk(other_mutex); // 10other_stuff();
}

例子也展示了另一点,std::lock_guard<>模板与用户定义的互斥量类型一起使用。虽然hierarchical_mutex不是C++标准的一部分,但是它写起来很容易;一个简单的实现在列表3.8中展示出来。尽管它是一个用户定义类型,它可以用于std::lock_guard<>模板中,因为它的实现有三个成员函数为了满足互斥量操作:lock(), unlock() 和 try_lock()。虽然你还没见过try_lock()怎么使用,但是其使用起来很简单:当互斥量上的锁被一个线程持有,它将返回false,而不是等待调用的线程,直到能够获取互斥量上的锁为止。在std::lock()的内部实现中,try_lock()会作为避免死锁算法的一部分。
列表3.8 简单的层级互斥量实现

class hierarchical_mutex
{std::mutex internal_mutex;unsigned long const hierarchy_value;unsigned long previous_hierarchy_value;static thread_local unsigned long this_thread_hierarchy_value;  // 1void check_for_hierarchy_violation(){if(this_thread_hierarchy_value <= hierarchy_value)  // 2{throw std::logic_error(“mutex hierarchy violated”);}}void update_hierarchy_value(){previous_hierarchy_value=this_thread_hierarchy_value;  // 3this_thread_hierarchy_value=hierarchy_value;}public:explicit hierarchical_mutex(unsigned long value):hierarchy_value(value),previous_hierarchy_value(0){}void lock(){check_for_hierarchy_violation();internal_mutex.lock();  // 4update_hierarchy_value();  // 5}void unlock(){this_thread_hierarchy_value=previous_hierarchy_value;  // 6internal_mutex.unlock();}bool try_lock(){check_for_hierarchy_violation();if(!internal_mutex.try_lock())  // 7return false;update_hierarchy_value();return true;}
};
thread_local unsigned longhierarchical_mutex::this_thread_hierarchy_value(ULONG_MAX);  // 8

超越锁的延伸扩展
当代码已经能规避死锁,std::lock()std::lock_guard能组成简单的锁覆盖大多数情况,但是有时需要更多的灵活性。在这些情况,可以使用标准库提供的std::unique_lock模板。如std::lock_guard,这是一个参数化的互斥量模板类,并且它提供很多RAII类型锁用来管理std::lock_guard类型,可以让代码更加灵活。

3.2.6 std::unique_lock——灵活的锁

std::unqiue_lock使用更为自由的不变量,这样std::unique_lock实例不会总与互斥量的数据类型相关,使用起来要比std:lock_guard更加灵活。首先,可将std::adopt_lock作为第二个参数传入构造函数,对互斥量进行管理;也可以将std::defer_lock作为第二个参数传递进去,表明互斥量应保持解锁状态。这样,就可以被std::unique_lock对象(不是互斥量)的lock()函数的所获取,或传递std::unique_lock对象到std::lock()中。清单3.6可以轻易的转换为清单3.9,使用std::unique_lockstd::defer_lock①,而非std::lock_guardstd::adopt_lock。代码长度相同,几乎等价,唯一不同的就是:std::unique_lock会占用比较多的空间,并且比std::lock_guard稍慢一些。保证灵活性要付出代价,这个代价就是允许std::unique_lock实例不带互斥量:信息已被存储,且已被更新。
清单3.9 交换操作中std::lock()std::unique_lock的使用

class some_big_object;
void swap(some_big_object& lhs,some_big_object& rhs);
class X
{
private:some_big_object some_detail;std::mutex m;
public:X(some_big_object const& sd):some_detail(sd){}friend void swap(X& lhs, X& rhs){if(&lhs==&rhs)return;std::unique_lock<std::mutex> lock_a(lhs.m,std::defer_lock); // 1 std::unique_lock<std::mutex> lock_b(rhs.m,std::defer_lock); // 1 std::def_lock 留下未上锁的互斥量std::lock(lock_a,lock_b); // 2 互斥量在这里上锁swap(lhs.some_detail,rhs.some_detail);}
};

列表3.9中,因为std::unique_lock支持lock(), try_lock()和unlock()成员函数,所以能将std::unique_lock对象传递到std::lock()②。这些同名的成员函数在低层做着实际的工作,并且仅更新std::unique_lock实例中的标志,来确定该实例是否拥有特定的互斥量,这个标志是为了确保unlock()在析构函数中被正确调用。如果实例拥有互斥量,那么析构函数必须调用unlock();但当实例中没有互斥量时,析构函数就不能去调用unlock()。这个标志可以通过owns_lock()成员变量进行查询。

可能如你期望的那样,这个标志被存储在某个地方。因此,std::unique_lock对象的体积通常要比std::lock_guard对象大,当使用std::unique_lock替代std::lock_guard,因为会对标志进行适当的更新或检查,就会做些轻微的性能惩罚。当std::lock_guard已经能够满足你的需求,那么还是建议你继续使用它。当需要更加灵活的锁时,最好选择std::unique_lock,因为它更适合于你的任务。你已经看到一个递延锁的例子,另外一种情况是锁的所有权需要从一个域转到另一个域。

3.2.7 不同域中互斥量所有权的传递

std::unique_lock是可移动,但不可赋值的类型。

std::unique_lock<std::mutex> get_lock()
{extern std::mutex some_mutex;std::unique_lock<std::mutex> lk(some_mutex);prepare_data();return lk;  // 1
}
void process_data()
{std::unique_lock<std::mutex> lk(get_lock());  // 2do_something();
}

std::unique_lock的灵活性同样也允许实例在销毁之前放弃其拥有的锁。可以使用unlock()来做这件事,如同一个互斥量:std::unique_lock的成员函数提供类似于锁定和解锁互斥量的功能。std::unique_lock实例在销毁前释放锁的能力,当锁没有必要在持有的时候,可以在特定的代码分支对其进行选择性的释放。这对于应用性能来说很重要,因为持有锁的时间增加会导致性能下降,其他线程会等待这个锁的释放,避免超越操作。

3.2.8 锁的粒度

选择粒度对于锁来说很重要,为了保护对应的数据,保证锁有能力保护这些数据也很重要。
如果很多线程正在等待同一个资源,当有线程持有锁的时间过长,这就会增加等待的时间。在可能的情况下,锁住互斥量的同时只能对共享数据进行访问;试图对锁外数据进行处理。特别是做一些费时的动作,比如:对文件的输入/输出操作进行上锁。文件输入/输出通常要比从内存中读或写同样长度的数据慢成百上千倍,所以除非锁已经打算去保护对文件的访问,要么执行输入/输出操作将会将延迟其他线程执行的时间,这很没有必要(因为文件锁阻塞住了很多操作),这样多线程带来的性能效益会被抵消。

这能表示只有一个互斥量保护整个数据结构时的情况,不仅可能会有更多对锁的竞争,也会增加锁持锁的时间。较多的操作步骤需要获取同一个互斥量上的锁,所以持有锁的时间会更长。成本上的双重打击也算是为向细粒度锁转移提供了双重激励和可能。
列表3.10 比较操作符中一次锁住一个互斥量

class Y
{
private:int some_detail;mutable std::mutex m;int get_detail() const{std::lock_guard<std::mutex> lock_a(m);  // 1return some_detail;}
public:Y(int sd):some_detail(sd){}friend bool operator==(Y const& lhs, Y const& rhs){if(&lhs==&rhs)return true;int const lhs_value=lhs.get_detail();  // 2int const rhs_value=rhs.get_detail();  // 3return lhs_value==rhs_value;  // 4}
};

3.3 保护共享数据的替代设施

互斥量是最通用的机制,但其并非保护共享数据的唯一方式。这里有很多替代方式可以在特定情况下,提供更加合适的保护。
一个特别极端(但十分常见)的情况就是,共享数据在并发访问和初始化时(都需要保护),但是之后需要进行隐式同步。这可能是因为数据作为只读方式创建,所以没有同步问题;或者因为必要的保护作为对数据操作的一部分,所以隐式的执行。任何情况下,数据初始化后锁住一个互斥量,纯粹是为了保护其初始化过程(这是没有必要的),并且这会给性能带来不必要的冲击。出于以上的原因,C++标准提供了一种纯粹保护共享数据初始化过程的机制。

3.3.1 保护共享数据的初始化过程

假设你与一个共享源,构建代价很昂贵,可能它会打开一个数据库连接或分配出很多的内存。
延迟初始化(Lazy initialization)在单线程代码很常见——每一个操作都需要先对源进行检查,为了了解数据是否被初始化,然后在其使用前决定,数据是否需要初始化:

std::shared_ptr<some_resource> resource_ptr;
void foo()
{if(!resource_ptr){resource_ptr.reset(new some_resource);  // 1}resource_ptr->do_something();
}

清单 3.11 使用一个互斥量的延迟初始化(线程安全)过程

std::shared_ptr<some_resource> resource_ptr;
std::mutex resource_mutex;void foo()
{std::unique_lock<std::mutex> lk(resource_mutex);  // 所有线程在此序列化 if(!resource_ptr){resource_ptr.reset(new some_resource);  // 只有初始化过程需要保护 }lk.unlock();resource_ptr->do_something();
}

这段代码相当常见了,也足够表现出没必要的线程化问题,很多人能想出更好的一些的办法来做这件事,包括声名狼藉的双重检查锁模式:

void undefined_behaviour_with_double_checked_locking()
{if(!resource_ptr)  // 1{std::lock_guard<std::mutex> lk(resource_mutex);if(!resource_ptr)  // 2{resource_ptr.reset(new some_resource);  // 3}}resource_ptr->do_something();  // 4
}

指针第一次读取数据不需要获取锁①,并且只有在指针为NULL时才需要获取锁。然后,当获取锁之后,指针会被再次检查一遍② (这就是双重检查的部分),避免另一的线程在第一次检查后再做初始化,并且让当前线程获取锁。
这个模式为什么声名狼藉呢?因为这里有潜在的条件竞争,未被锁保护的读取操作①没有与其他线程里被锁保护的写入操作③进行同步。因此就会产生条件竞争,这个条件竞争不仅覆盖指针本身,还会影响到其指向的对象;即使一个线程知道另一个线程完成对指针进行写入,它可能没有看到新创建的some_resource实例,然后调用do_something()④后,得到不正确的结果。这个例子是在一种典型的条件竞争——数据竞争,C++标准中这就会被指定为“未定义行为”。这种竞争肯定是可以避免的。可以阅读第5章,那里有更多对内存模型的讨论,包括数据竞争的构成。
C++标准委员会也认为条件竞争的处理很重要,所以C++标准库提供了std::once_flagstd::call_once来处理这种情况。比起锁住互斥量,并显式的检查指针,每个线程只需要使用std::call_once,在std::call_once的结束时,就能安全的知道指针已经被其他的线程初始化了。使用std::call_once比显式使用互斥量消耗的资源更少,特别是当初始化完成后。

std::shared_ptr<some_resource> resource_ptr;
std::once_flag resource_flag;  // 1void init_resource()
{resource_ptr.reset(new some_resource);
}void foo()
{std::call_once(resource_flag,init_resource);  // 可以完整的进行一次初始化resource_ptr->do_something();
}

清单3.12 使用std::call_once作为类成员的延迟初始化(线程安全)

class X
{
private:connection_info connection_details;connection_handle connection;std::once_flag connection_init_flag;void open_connection(){connection=connection_manager.open(connection_details);}
public:X(connection_info const& connection_details_):connection_details(connection_details_){}void send_data(data_packet const& data)  // 1{std::call_once(connection_init_flag,&X::open_connection,this);  // 2connection.send_data(data);}data_packet receive_data()  // 3{std::call_once(connection_init_flag,&X::open_connection,this);  // 2return connection.receive_data();}
};

值得注意的是,std::mutexstd::one_flag的实例就不能拷贝和移动,所以当你使用它们作为类成员函数,如果你需要用到他们,你就得显示定义这些特殊的成员函数。

3.3.2 保护很少更新的数据结构

试想,为了将域名解析为其相关IP地址,我们在缓存中的存放了一张DNS入口表。通常,给定DNS数目在很长的一段时间内保持不变。虽然,在用户访问不同网站时,新的入口可能会被添加到表中,但是这些数据可能在其生命周期内保持不变。所以定期检查缓存中入口的有效性,就变的十分重要了;但是,这也需要一次更新,也许这次更新只是对一些细节做了改动。
使用std::mutex来保护数据结构,显的有些反应过度,比起使用std::mutex实例进行同步,不如使用boost::shared_mutex来做同步。
清单3.13 使用boost::shared_mutex对数据结构进行保护

#include <map>
#include <string>
#include <mutex>
#include <boost/thread/shared_mutex.hpp>class dns_entry;class dns_cache
{std::map<std::string,dns_entry> entries;mutable boost::shared_mutex entry_mutex;
public:dns_entry find_entry(std::string const& domain) const{boost::shared_lock<boost::shared_mutex> lk(entry_mutex);  // 1std::map<std::string,dns_entry>::const_iterator const it=entries.find(domain);return (it==entries.end())?dns_entry():it->second;}void update_or_add_entry(std::string const& domain,dns_entry const& dns_details){std::lock_guard<boost::shared_mutex> lk(entry_mutex);  // 2entries[domain]=dns_details;}
};

3.3.3 嵌套锁

当一个线程已经获取一个std::mutex时(已经上锁),并对其再次上锁,这个操作就是错误的,并且继续尝试这样做的话,就会产生未定义行为。然而,在某些情况下,一个线程尝试获取同一个互斥量多次,而没有对其进行一次释放是可以的。之所以可以,是因为C++标准库提供了std::recursive_mutex类。互斥量锁住其他线程前,你必须释放你拥有的所有锁,所以当你调用lock()三次时,你也必须调用unlock()三次。正确使用std::lock_guard<std::recursive_mutex>std::unique_lock<std::recursive_mutex>可以帮你处理这些问题。

4 同步并发操作

4.1 等待一个事件或其他条件

晚间在列车上等待凌晨到站出站,要么一直不睡等着,要么等待被列车员唤醒。
第二个选择是在等待线程在检查间隙,使用std::this_thread::sleep_for()进行周期性的间歇(详见4.3节):

bool flag;
std::mutex m;void wait_for_flag()
{std::unique_lock<std::mutex> lk(m);while(!flag){lk.unlock();  // 1 解锁互斥量std::this_thread::sleep_for(std::chrono::milliseconds(100));  // 2 休眠100mslk.lock();   // 3 再锁互斥量}
}

这里博主理解为是一个轮询。

4.1.1 等待条件达成

清单4.1 使用std::condition_variable处理数据等待

std::mutex mut;
std::queue<data_chunk> data_queue;  // 1
std::condition_variable data_cond;void data_preparation_thread()
{while(more_data_to_prepare()){data_chunk const data=prepare_data();std::lock_guard<std::mutex> lk(mut);data_queue.push(data);  // 2data_cond.notify_one();  // 3}
}void data_processing_thread()
{while(true){std::unique_lock<std::mutex> lk(mut);  // 4data_cond.wait(lk,[]{return !data_queue.empty();});  // 5data_chunk data=data_queue.front();data_queue.pop();lk.unlock();  // 6process(data);if(is_last_chunk(data))break;}
}

4.1.2 使用条件变量构建线程安全队列

清单4.2 std::queue接口

template <class T, class Container = std::deque<T> >
class queue {
public:explicit queue(const Container&);explicit queue(Container&& = Container());template <class Alloc> explicit queue(const Alloc&);template <class Alloc> queue(const Container&, const Alloc&);template <class Alloc> queue(Container&&, const Alloc&);template <class Alloc> queue(queue&&, const Alloc&);void swap(queue& q);bool empty() const;size_type size() const;T& front();const T& front() const;T& back();const T& back() const;void push(const T& x);void push(T&& x);void pop();template <class... Args> void emplace(Args&&... args);
};

当你忽略构造、赋值以及交换操作时,你就剩下了三组操作:1. 对整个队列的状态进行查询(empty()和size());2.查询在队列中的各个元素(front()和back());3.修改队列的操作(push(), pop()和emplace())。这就和3.2.3中的栈一样了,因此你也会遇到在固有接口上的条件竞争。因此,你需要将front()和pop()合并成一个函数调用,就像之前在栈实现时合并top()和pop()一样。与清单4.1中的代码不同的是:当使用队列在多个线程中传递数据时,接收线程通常需要等待数据的压入。这里我们提供pop()函数的两个变种:try_pop()和wait_and_pop()。try_pop() ,尝试从队列中弹出数据,总会直接返回(当有失败时),即使没有值可检索;wait_and_pop(),将会等待有值可检索的时候才返回。当你使用之前栈的方式来实现你的队列,你实现的队列接口就可能会是下面这样:
清单4.3 线程安全队列的接口

#include <memory> // 为了使用std::shared_ptrtemplate<typename T>
class threadsafe_queue
{
public:threadsafe_queue();threadsafe_queue(const threadsafe_queue&);threadsafe_queue& operator=(const threadsafe_queue&) = delete;  // 不允许简单的赋值void push(T new_value);bool try_pop(T& value);  // 1std::shared_ptr<T> try_pop();  // 2void wait_and_pop(T& value);std::shared_ptr<T> wait_and_pop();bool empty() const;
};

清单4.4 从清单4.1中提取push()和wait_and_pop()

#include <queue>
#include <mutex>
#include <condition_variable>template<typename T>
class threadsafe_queue
{
private:std::mutex mut;std::queue<T> data_queue;std::condition_variable data_cond;
public:void push(T new_value){std::lock_guard<std::mutex> lk(mut);data_queue.push(new_value);data_cond.notify_one();}void wait_and_pop(T& value){std::unique_lock<std::mutex> lk(mut);data_cond.wait(lk,[this]{return !data_queue.empty();});value=data_queue.front();data_queue.pop();}
};
threadsafe_queue<data_chunk> data_queue;  // 1void data_preparation_thread()
{while(more_data_to_prepare()){data_chunk const data=prepare_data();data_queue.push(data);  // 2}
}void data_processing_thread()
{while(true){data_chunk data;data_queue.wait_and_pop(data);  // 3process(data);if(is_last_chunk(data))break;}
}

清单4.5 使用条件变量的线程安全队列(完整版)

#include <queue>
#include <memory>
#include <mutex>
#include <condition_variable>template<typename T>
class threadsafe_queue
{
private:mutable std::mutex mut;  // 1 互斥量必须是可变的 std::queue<T> data_queue;std::condition_variable data_cond;
public:threadsafe_queue(){}threadsafe_queue(threadsafe_queue const& other){std::lock_guard<std::mutex> lk(other.mut);data_queue=other.data_queue;}void push(T new_value){std::lock_guard<std::mutex> lk(mut);data_queue.push(new_value);data_cond.notify_one();}void wait_and_pop(T& value){std::unique_lock<std::mutex> lk(mut);data_cond.wait(lk,[this]{return !data_queue.empty();});value=data_queue.front();data_queue.pop();}std::shared_ptr<T> wait_and_pop(){std::unique_lock<std::mutex> lk(mut);data_cond.wait(lk,[this]{return !data_queue.empty();});std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));data_queue.pop();return res;}bool try_pop(T& value){std::lock_guard<std::mutex> lk(mut);if(data_queue.empty())return false;value=data_queue.front();data_queue.pop();return true;}std::shared_ptr<T> try_pop(){std::lock_guard<std::mutex> lk(mut);if(data_queue.empty())return std::shared_ptr<T>();std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));data_queue.pop();return res;}bool empty() const{std::lock_guard<std::mutex> lk(mut);return data_queue.empty();}
};

4.2 使用期望等待一次性事件

C++标准库模型将这种一次性事件称为期望(future)。当一个线程需要等待一个特定的一次性事件时,在某种程度上来说它就需要知道这个事件在未来的表现形式。之后,这个线程会周期性(较短的周期)的等待或检查,事件是否触发(检查信息板);在检查期间也会执行其他任务。另外,在等待任务期间它可以先执行另外一些任务,直到对应的任务触发,而后等待期望的状态会变为就绪(ready)。一个“期望”可能是数据相关的,也可能不是。当事件发生时(并且期望状态为就绪),这个“期望”就不能被重置。

在C++标准库中,有两种“期望”,使用两种类型模板实现,声明在头文件中: 唯一期望(unique futures)(std::future<>)和共享期望(shared futures)(std::shared_future<>)。这是仿照std::unique_ptrstd::shared_ptrstd::future的实例只能与一个指定事件相关联,而std::shared_future的实例就能关联多个事件。后者的实现中,所有实例会在同时变为就绪状态,并且他们可以访问与事件相关的任何数据。这种数据关联与模板有关,比如std::unique_ptrstd::shared_ptr的模板参数就是相关联的数据类型。在与数据无关的地方,可以使用std::future<void>std::shared_future<void>的特化模板。虽然,我希望用于线程间的通讯,但是“期望”对象本身并不提供同步访问。当多个线程需要访问一个独立“期望”对象时,他们必须使用互斥量或类似同步机制对访问进行保护,如在第3章提到的那样。不过,在你将要阅读到的4.2.5节中,多个线程会对一个std::shared_future<>实例的副本进行访问,而不需要期望同步,即使他们是同一个异步结果。

最基本的一次性事件,就是一个后台运行出的计算结果。在第2章中,你已经了解了std::thread 执行的任务不能有返回值,并且我能保证,这个问题将在使用“期望”后解决——现在就来看看是怎么解决的。

4.2.1 带返回值的后台任务

当任务的结果你不着急要时,你可以使用std::async启动一个异步任务。与std::thread对象等待的方式不同,std::async会返回一个std::future对象,这个对象持有最终计算出来的结果。当你需要这个值时,你只需要调用这个对象的get()成员函数;并且会阻塞线程直到“期望”状态为就绪为止;之后,返回计算结果。
清单4.6 使用std::future从异步任务中获取返回值

#include <future>
#include <iostream>int find_the_answer_to_ltuae();
void do_other_stuff();
int main()
{std::future<int> the_answer=std::async(find_the_answer_to_ltuae);do_other_stuff();std::cout<<"The answer is "<<the_answer.get()<<std::endl;
}

4.2.2 任务与期望

std::packaged_task<>对一个函数或可调用对象,绑定一个期望。当std::packaged_task<> 对象被调用,它就会调用相关函数或可调用对象,将期望状态置为就绪,返回值也会被存储为相关数据。这可以用在构建线程池的结构单元(可见第9章),或用于其他任务的管理,比如在任务所在线程上运行任务,或将它们顺序的运行在一个特殊的后台线程上。当一个粒度较大的操作可以被分解为独立的子任务时,其中每个子任务就可以包含在一个std::packaged_task<>实例中,之后这个实例将传递到任务调度器或线程池中。对任务的细节进行抽象,调度器仅处理std::packaged_task<>实例,而非处理单独的函数。
清单4.8 std::packaged_task<>的偏特化

template<>
class packaged_task<std::string(std::vector<char>*,int)>
{
public:template<typename Callable>explicit packaged_task(Callable&& f);std::future<std::string> get_future();void operator()(std::vector<char>*,int);
};

线程间传递任务
很多图形架构需要特定的线程去更新界面,所以当一个线程需要界面的更新时,它需要发出一条信息给正确的线程,让特定的线程来做界面更新。std::packaged_task提供了完成这种功能的一种方法,且不需要发送一条自定义信息给图形界面相关线程。下面来看看代码。

清单4.9 使用std::packaged_task执行一个图形界面线程

#include <deque>
#include <mutex>
#include <future>
#include <thread>
#include <utility>std::mutex m;
std::deque<std::packaged_task<void()> > tasks;bool gui_shutdown_message_received();
void get_and_process_gui_message();void gui_thread()  // 1
{while(!gui_shutdown_message_received())  // 2{get_and_process_gui_message();  // 3std::packaged_task<void()> task;{std::lock_guard<std::mutex> lk(m);if(tasks.empty())  // 4continue;task=std::move(tasks.front());  // 5tasks.pop_front();}task();  // 6}
}std::thread gui_bg_thread(gui_thread);template<typename Func>
std::future<void> post_task_for_gui_thread(Func f)
{std::packaged_task<void()> task(f);  // 7std::future<void> res=task.get_future();  // 8std::lock_guard<std::mutex> lk(m);  // 9tasks.push_back(std::move(task));  // 10return res;
}

这段代码十分简单:图形界面线程①循环直到收到一条关闭图形界面的信息后关闭②,进行轮询界面消息处理③,例如用户点击,和执行在队列中的任务。当队列中没有任务④,它将再次循环;除非,他能在队列中提取出一个任务⑤,然后释放队列上的锁,并且执行任务⑥。这里,“期望”与任务相关,当任务执行完成时,其状态会被置为“就绪”状态。

将一个任务传入队列,也很简单:提供的函数⑦可以提供一个打包好的任务,可以通过这个任务⑧调用get_future()成员函数获取“期望”对象,并且在任务被推入列表⑨之前,“期望”将返回调用函数⑩。当需要知道线程执行完任务时,向图形界面线程发布消息的代码,会等待“期望”改变状态;否则,则会丢弃这个“期望”。

4.2.3 使用std::promises(单线程多连接)

当你有一个应用,需要处理很多网络连接,它会使用不同线程尝试连接每个接口,因为这能使网络尽早联通,尽早执行程序。当连接较少的时候,这样的工作没有问题(也就是线程数量比较少)。不幸的是,随着连接数量的增长,这种方式变的越来越不合适;因为大量的线程会消耗大量的系统资源,还有可能造成上下文频繁切换(当线程数量超出硬件可接受的并发数时),这都会对性能有影响。最极端的例子就是,因为系统资源被创建的线程消耗殆尽,系统连接网络的能力会变的极差。在不同的应用程序中,存在着大量的网络连接,因此不同应用都会拥有一定数量的线程(可能只有一个)来处理网络连接,每个线程处理可同时处理多个连接事件。

考虑一个线程处理多个连接事件,来自不同的端口连接的数据包基本上是以乱序方式进行处理的;同样的,数据包也将以乱序的方式进入队列。在很多情况下,另一些应用不是等待数据成功的发送,就是等待一批(新的)来自指定网络接口的数据接收成功。
std::promise<T>提供设定值的方式(类型为T),这个类型会和后面看到的std::future<T> 对象相关联。一对std::promise/std::future会为这种方式提供一个可行的机制;在期望上可以阻塞等待线程,同时,提供数据的线程可以使用组合中的“承诺”来对相关值进行设置,以及将“期望”的状态置为“就绪”。

可以通过get_future()成员函数来获取与一个给定的std::promise相关的std::future对象,就像是与std::packaged_task相关。当“承诺”的值已经设置完毕(使用set_value()成员函数),对应“期望”的状态变为“就绪”,并且可用于检索已存储的值。当你在设置值之前销毁std::promise,将会存储一个异常。在4.2.4节中,会详细描述异常是如何传送到线程的。
清单4.10 使用“承诺”解决单线程多连接问题

#include <future>void process_connections(connection_set& connections)
{while(!done(connections))  // 1{for(connection_iterator  // 2connection=connections.begin(),end=connections.end();connection!=end;++connection){if(connection->has_incoming_data())  // 3{data_packet data=connection->incoming();std::promise<payload_type>& p=connection->get_promise(data.id);  // 4p.set_value(data.payload);}if(connection->has_outgoing_data())  // 5{outgoing_packet data=connection->top_of_outgoing_queue();connection->send(data.payload);data.promise.set_value(true);  // 6}}}
}

4.2.4 为“期望”存储“异常”

4.2.5 多个线程的等待

因为std::future是只移动的,所以其所有权可以在不同的实例中互相传递,但是只有一个实例可以获得特定的同步结果;而std::shared_future实例是可拷贝的,所以多个对象可以引用同一关联“期望”的结果。
在每一个std::shared_future的独立对象上成员函数调用返回的结果还是不同步的,所以为了在多个线程访问一个独立对象时,避免数据竞争,必须使用锁来对访问进行保护。优先使用的办法:为了替代只有一个拷贝对象的情况,可以让每个线程都拥有自己对应的拷贝对象。这样,当每个线程都通过自己拥有的std::shared_future对象获取结果,那么多个线程访问共享同步结果就是安全的。

4.3 限定等待时间

介绍两种可能是你希望指定的超时方式:一种是“时延”的超时方式,另一种是“绝对”超时方式。第一种方式,需要指定一段时间(例如,30毫秒);第二种方式,就是指定一个时间点。多数等待函数提供变量,对两种超时方式进行处理。处理持续时间的变量以“_for”作为后缀,处理绝对时间的变量以"_until"作为后缀。
所以,当std::condition_variable的两个成员函数wait_for()和wait_until()成员函数分别有两个负载,这两个负载都与wait()成员函数的负载相关——其中一个负载只是等待信号触发,或时间超期,亦或是一个虚假的唤醒,并且醒来时,会检查锁提供的谓词,并且只有在检查为true时才会返回(这时条件变量的条件达成),或直接而超时。

4.3.1 时钟

std::chrono::system_clock::now()是将返回系统时钟的当前时间。

4.3.2 时延

std::chrono::duration<>函数模板能够对时延进行处理。
标准库在std::chrono命名空间内,为延时变量提供一系列预定义类型:nanoseconds[纳秒] , microseconds[微秒] , milliseconds[毫秒] , seconds[秒] , minutes[分]和hours[时]。

4.3.3 时间点

时钟的时间点可以用std::chrono::time_point<>的类型模板实例来表示。

4.3.4 具有超时功能的函数

使用超时的最简单方式就是,对一个特定线程添加一个延迟处理;当这个线程无所事事时,就不会占用可供其他线程处理的时间。两个处理函数分别是std::this_thread::sleep_for()std::this_thread::sleep_until()。他们的工作就像一个简单的闹钟:当线程因为指定时延而进入睡眠时,可使用sleep_for()唤醒;或因指定时间点睡眠的,可使用sleep_until唤醒。

4.4 使用同步操作简化代码

4.4.1 使用“期望”的函数化编程

术语函数化编程(functional programming)引用于一种编程方式,这种方式中的函数结果只依赖于传入函数的参数,并不依赖外部状态。
快速排序 FP模式版

4.4.2 使用消息传递的同步操作(状态机)

CSP的概念十分简单:当没有共享数据,每个线程就可以进行独立思考,其行为纯粹基于其所接收到的信息。每个线程就都有一个状态机:当线程收到一条信息,它将会以某种方式更新其状态,并且可能向其他线程发出一条或多条信息,对于消息的处理依赖于线程的初始化状态。这是一种正式写入这些线程的方式,并且以有限状态机的模式实现,但是这不是唯一的方案;状态机可以在应用程序中隐式实现。这种方法咋任何给定的情况下,都更加依赖于特定情形下明确的行为要求和编程团队的专业知识。无论你选择用什么方式去实现每个线程,任务都会分成独立的处理部分,这样会消除潜在的混乱(数据共享并发),这样就让编程变的更加简单,且拥有低错误率。

5 C++内存模型和原子类型操作

5.1 内存模型基础

5.1.1 对象和内存位置

C++标准定义类对象为“存储区域”,但对象还是可以将自己的特性赋予其他对象,比如,其类型和生命周期。
这里有四个需要牢记的原则:

  1. 每一个变量都是一个对象,包括作为其成员变量的对象。
  2. 每个对象至少占有一个内存位置。
  3. 基本类型都有确定的内存位置(无论类型大小如何,即使他们是相邻的,或是数组的一部分)。
  4. 相邻位域是相同内存中的一部分。

5.1.2 对象、内存位置和并发

所有东西都在内存中。当两个线程访问不同的内存位置时,不会存在任何问题,一切都工作顺利。而另一种情况下,当两个线程访问同一个内存位置,如果没有线程更新内存位置上的数据,那还好;只读数据不需要保护或同步。当有线程对内存位置上的数据进行修改,那就有可能会产生条件竞争。
为了避免条件竞争,两个线程就需要一定的执行顺序。第一种方式,使用互斥量来确定访问的顺序;另一种方式是使用原子操作同步机制,决定两个线程的访问顺序。当多于两个线程访问同一个内存地址时,对每个访问这都需要定义一个顺序。

5.1.3 修改顺序

每一个在C++程序中的对象,都有(由程序中的所有线程对象)确定好的修改顺序,在的初始化开始阶段确定。在大多数情况下,这个顺序不同于执行中的顺序,但是在给定的执行程序中,所有线程都需要遵守这顺序。如果对象不是一个原子类型,你必要确保有足够的同步操作,来确定每个线程都遵守了变量的修改顺序。当不同线程在不同序列中访问同一个值时,你可能就会遇到数据竞争或未定义行为。如果你使用原子操作,编译器就有责任去替你做必要的同步。

5.2 C++中的原子操作和原子类型

原子操作 是个不可分割的操作。

5.2.1 标准原子类型

标准 原子类型 定义在头文件<atomic>中。
这些类型上的所有操作都是原子的,在语言定义中只有这些类型的操作是原子的,不过你可以用互斥锁来 模拟 原子操作。
实际上,标准原子类型自己的实现就可能是这样模拟出来的:
它们(几乎)都有一个is_lock_free()成员函数。
表5.1 标准原子类型的备选名和与其相关的std::atomic<>特化类

原子类型 相关特化类
atomic_bool std::atomic<bool>
atomic_char std::atomic<char>
atomic_schar std::atomic<signed char>
atomic_uchar std::atomic<unsigned char>
atomic_int std::atomic<int>
atomic_uint std::atomic<unsigned>
atomic_short std::atomic<short>
atomic_ushort std::atomic<unsigned short>
atomic_long std::atomic<long>
atomic_ulong std::atomic<unsigned long>
atomic_llong std::atomic<long long>
atomic_ullong std::atomic<unsigned long long>
atomic_char16_t std::atomic<char16_t>
atomic_char32_t std::atomic<char32_t>
atomic_wchar_t std::atomic<wchar_t>

表5.2 标准原子类型定义(typedefs)和对应的内置类型定义(typedefs)

原子类型定义 标准库中相关类型定义
atomic_int_least8_t int_least8_t
atomic_uint_least8_t uint_least8_t
atomic_int_least16_t int_least16_t
atomic_uint_least16_t uint_least16_t
atomic_int_least32_t int_least32_t
atomic_uint_least32_t uint_least32_t
atomic_int_least64_t int_least64_t
atomic_uint_least64_t uint_least64_t
atomic_int_fast8_t int_fast8_t
atomic_uint_fast8_t uint_fast8_t
atomic_int_fast16_t int_fast16_t
atomic_uint_fast16_t uint_fast16_t
atomic_int_fast32_t int_fast32_t
atomic_uint_fast32_t uint_fast32_t
atomic_int_fast64_t int_fast64_t
atomic_uint_fast64_t uint_fast64_t
atomic_intptr_t intptr_t
atomic_uintptr_t uintptr_t
atomic_size_t size_t
atomic_ptrdiff_t ptrdiff_t
atomic_intmax_t intmax_t
atomic_uintmax_t uintmax_t
通常,标准原子类型是不能拷贝和赋值,他们没有拷贝构造函数和拷贝赋值操作。但是,因为可以隐式转化成对应的内置类型,所以这些类型依旧支持赋值,可以使用load()和store()成员函数,exchange()、compare_exchange_weak()和compare_exchange_strong()。它们都支持复合赋值符:+=, -=, *=, = 等等。并且使用整型和指针的特化类型还支持 ++ 和 --。当然,这些操作也有功能相同的成员函数所对应:fetch_add(), fetch_or() 等等。赋值操作和成员函数的返回值要么是被存储的值(赋值操作),要么是操作前的值(命名函数)。这就能避免赋值操作符返回引用。为了获取存储在引用的的值,代码需要执行单独的读操作,从而允许另一个线程在赋值和读取进行的同时修改这个值,这也就为条件竞争打开了大门。
  1. Store操作,可选如下顺序:memory_order_relaxed, memory_order_release, memory_order_seq_cst
  2. Load操作,可选如下顺序:memory_order_relaxed, memory_order_consume, memory_order_acquire, memory_order_seq_cst。
  3. Read-modify-write(读-改-写)操作,可选如下顺序:memory_order_relaxed, memory_order_consume, memory_order_acquire, memory_order_release, memory_order_acq_rel, memory_order_seq_cst。
    所有操作的默认顺序都是memory_order_seq_cst。
    清单5.1 使用std::atomic_flag实现自旋互斥锁
class spinlock_mutex
{std::atomic_flag flag;
public:spinlock_mutex():flag(ATOMIC_FLAG_INIT){}void lock(){while(flag.test_and_set(std::memory_order_acquire));}void unlock(){flag.clear(std::memory_order_release);}
};

5.2.3 std::atomic的相关操作

最基本的原子整型类型就是std::atomic<bool>。如你所料,它有着比std::atomic_flag更加齐全的布尔标志特性。虽然它依旧不能拷贝构造和拷贝赋值,但是你可以使用一个非原子的bool类型构造它,所以它可以被初始化为true或false,并且你也可以从一个非原子bool变量赋值给std::atomic<bool>的实例:

std::atomic<bool> b(true);
b=false;

std::atomic<bool>也支持对值的普通(不可修改)查找,其会将对象隐式的转换为一个普通的bool值,或显示的调用load()来完成。store()是一个存储操作,而load()是一个加载操作。exchange()是一个“读-改-写”操作:

std::atomic<bool> b;
bool x=b.load(std::memory_order_acquire);
b.store(true);
x=b.exchange(false, std::memory_order_acq_rel);

存储一个新值(或旧值)取决于当前值

6 基于锁的并发数据结构设计

7 无锁并发数据结构设计

8 并发代码设计

9 高级线程管理

10 多线程程序的测试和调试

重点

线程std::thread对象必须调用join或detach

  • thread析构
  • C++11多线程 - Part 2: 连接和分离线程
  • Starting thread causing abort()

Destroys the thread object. If *this has an associated thread (joinable() == true), std::terminate() is called.
创建的std::thread对象会在构造函数结束时销毁,因为它是一个局部变量。如果调用了std::thread的析构函数,而该线程是joinable,则调用std::terminate。即程序会调用Abort(),不进行任何清理工作,然后abnormal program termination(程序异常终止)。

shared_mutex也是基于操作系统底层的读写锁pthread_rwlock_t的封装

参考

1、《C++ Concurrency in Action》[陈晓伟 译]
2、github 翻译地址
3、gitbook 在线阅读
4、书中源码
5、学习C++11/14
6、CPP-Concurrency-In-Action-2ed
7、thread析构
8、C++11多线程 - Part 2: 连接和分离线程
9、Starting thread causing abort()
10、Boost Mutex详细解说

《C++ Concurrency in Action》笔记相关推荐

  1. 《信贷的逻辑与常识》笔记

    序 银行信贷风险管理的反思 现状与趋势 银行贷款的质量变化与经济周期.宏观调控政策等存在很高的相关性 现在银行不良贷款的增加主要是前几年经济快速增长时企业过度投资.银行过度放贷所带来的结果. 从历史情 ...

  2. AI公开课:19.02.27周逵(投资人)《AI时代的投资逻辑》课堂笔记以及个人感悟

    AI公开课:19.02.27周逵(投资人)<AI时代的投资逻辑>课堂笔记以及个人感悟 目录 课堂PPT图片 精彩语录 个人感悟 课堂PPT图片 精彩语录 更新中-- 文件图片已经丢失-- ...

  3. 人工智能入门算法逻辑回归学习笔记

    逻辑回归是一个非常经典的算法,其中也包含了非常多的细节,曾看到一句话:如果面试官问你熟悉哪个机器学习模型,可以说 SVM,但千万别说 LR,因为细节真的太多了. 秉持着精益求精的工匠精神不断对笔记进行 ...

  4. 【逻辑回归学习笔记】

    算法描述 1.逻辑回归要做的事就是寻找分界面实现二分类. 2.问题假设:对一堆三角形和正方形分类. 3.数据输入:已知正方形和三角形的坐标和标签. 4.算法过程: 知识储备 1.分类和回归 ①分类的目 ...

  5. 逻辑回归函数学习笔记

    继续逻辑回归学习,今日笔记记录. 1.逻辑回归和线性回归的关系:对逻辑回归的概率比取自然对数,则得到的是一个线性函数,推导过程如下. 首先,看逻辑回归的定义 其次,计算两个极端y/(1-y),其值为( ...

  6. 2.2 逻辑回归-机器学习笔记-斯坦福吴恩达教授

    逻辑回归 上一节我们知道,使用线性回归来处理 0/1 分类问题总是困难重重的,因此,人们定义了逻辑回归来完成 0/1 分类问题,逻辑一词也代表了是(1) 和 非(0). Sigmoid预测函数 在逻辑 ...

  7. LVM逻辑卷分区笔记

    磁盘的静态分区有其缺点:分区大小难评估,估计不准确,当分区空间不够用的时候,系统管理员可能需要先备份整个系统,清除磁盘空间,然后重新对磁盘进行分区,然后恢复磁盘数据到新分区,且需要停机一段时间进行恢复 ...

  8. 适合理工直男的钟平老师逻辑英语学习笔记

    一切的一切都只是套路!             --鲁迅 核心公式: En: (状语1) 主(定语1) 谓(状语2) (宾)(定语2) (状语1) Ch: (状语1) (定语1)主 (状语2)谓 (定 ...

  9. 【数字逻辑】学习笔记 第四章 Part2 常用组合逻辑电路与竞争、险象

    文章目录 一.常用组合逻辑电路 1. 译码器 (1) 二进制译码器 74LS138(3/8译码器) a. 一般符号和图形符号 b. 74LS138功能表 c. 两片 `74LS138` 构成 `4-1 ...

  10. 线性回归、逻辑回归学习笔记

    学习源代码 import numpy as np import matplotlib.pyplot as plt def true_fun(X): # 这是我们设定的真实函数,即ground trut ...

最新文章

  1. python将空格变成换行_python之路(2)
  2. sql2000安装时报错的问题--实例挂起和267目录名无效
  3. matlab中 intval函数,经常用到取整的函数,今天小小的总结一下!其实很简单,就是几个函数而已~~主要是:ceil,floor,round,intval...
  4. PM2管理工具的使用
  5. 2019江苏高考作文_2019年关于现代组织的5大文章
  6. python的自省基础
  7. yolov3模型识别不出训练图片_技术实践丨基于MindSpore框架Yolov3-darknet模型的篮球动作检测体验...
  8. LTP 语义依存分析
  9. Java学生成绩管理系统主界面和登录界面参考
  10. Keil C51大工程建立,模块化编程
  11. thinkphp5 关联预载入怎么用
  12. java与c#哪个用得多_Java和c#哪个更值得学习?
  13. animation动画案例
  14. git版本回退后,导致已经add的代码丢失
  15. 简单Burp爆破使用方法
  16. 工业视觉 一 工业视觉初识
  17. matlab在重积分计算中的应用,MATLAB在重积分计算中的应用
  18. python大作业爬虫_Python爬虫大作业
  19. html中ing标签怎么写,HTML标签
  20. 指定牛导|肿瘤专业医生芝加哥大学博士后实现夙愿

热门文章

  1. Win10运行程序提示不受信任的解决方法
  2. 惊!成年蚂蚁竟然返老还童!原因居然是。。。。
  3. 用html在黑色背景中写蓝色的字,一般设计中常见黑色背景可搭配字体颜色有()...
  4. 大连英语培训商务英语百家外语当今社会学习商务英语的重要性
  5. Xposed模块初体验——第一篇
  6. IT男的神级吐槽 || 我们IT人的心声(_)
  7. 【TI-AM5728】(1)开发环境搭建
  8. 电压(电流)运算放大器为什么要增大(减小)输入阻抗?
  9. 国产芯片---超高速、高输出电流,电压反馈放大器MS8241兼容替代LM7171
  10. RT-Thread系统 STM32 DAC设备改进,直接调用系统DAC驱动函数设置输出电压