11.1 引言

​ 了解如何使用多个控制线程(简称线程) 在单进程环境中执行多个任务。一个进程中的所有线程都可以访问该进程的组成部件,如文件描述符fd和内存。

​ 不管在什么情况下,只要单个资源需要在多个用户间共享,就必须处理一致性问题。本章最后讨论目前可用的线程同步机制,防止多个线程在共享资源时出现不一致问题

11.2 线程概念

​ 典型的UNIX进程可以看成只有一个控制线程:一个进程在某一时刻只能做一件事情。有了多个控制线程以后,在程序设计时就可以把进程设计在某一个时刻下能够做不止一件事,每个线程处理各自独立的任务。这种方法有很多好处:

  1. 通过为每种事件类型分配单独的处理线程,可以简化处理异步事件的代码。每个线程在进行事件处理时可以采用同步编程模式,同步编程模式要比异步编程模式简单的多。

  2. 多个进程必须使用操作系统提供的复杂机制才能实现内存和文件描述符的共享,而多个线程可以自动地访问相同的存储地址空间和文件描述符。

  3. 有些问题可以分解从而提高整个程序的吞吐量。在只有一个控制线程的情况下,一个单线程进程要完成多个任务,只需要将这些任务串行化。但有多个控制线程时,相互独立的任务的处理就可以交叉进行,此时只需要为每个任务分配一个单独的线程。当然只有在两个任务的处理过程不相互依赖的情况下,两个任务才可以交叉运行。

  4. 交互的程序同样可以通过多线程来改善相应时间,多线程可以把程序中处理用户输入输出的部分与其他部分分开。

​ 有些人将多线程的程序设计与多处理器或者多核系统联系起来。但是即使程序运行在单处理器上,也能得到多线程编程模型的好处。处理器的数量并不影响程序结构,所以不管处理器的个数多少,程序都可以通过使用线程得以简化。而且,即使多线程程序在串行化任务时不得不阻塞,由于某些线程在阻塞时还有另外一些线程可以运行,所以多线程程序在单处理器上运行还是可以改善响应时间和吞吐量。

​ 每个线程都包含有表示执行环境所必须的信息,其中包括进程中标识线程的线程ID一组寄存器的值,栈,调度优先级和策略,信号屏蔽字,errno变量以及线程私有数据。一个进程的所有信息对于该进程的所有线程都是共享的,包括可执行程序的代码,程序的全局内存和堆内存,栈以及文件描述符

​ 主要讨论的线程结构来自POSIX.1-2001.线程接口也成为pthread 或者 POSIX线程.

11.3 线程标识

​ 实现的时候使用一个结构代表pthread_t数据类型,所以可移植的操作系统实现不能将其作为整数处理。因此必须使用一个函数来对两个线程进行比较。

#include <pthread.h>
int pthread_equal(pthread_t tid1, pthread_t tid2);

​ 使用结构体pthread_t 数据类型的后果是不能用一种可移植的方式来打印该数据类型的值。 在程序调试的时候打印线程id是非常有用的,而在其他情况下通常不需要打印线程ID。因此最坏的情况是,有可能出现不可移植的调试代码。

​ 线程可以通过pthread_self函数获得自身线程ID

#include <pthread.h>
pthread_t phtread_self(void);

​ 当线程需要识别线程ID作为标识的数据结构时,pthread_self 函数可以与pthread_equal 函数一起使用。例如,主线程可能把工作任务放在一个队列中,用线程ID控制每个工作线程处理哪些作业。主线程不允许每个线程任意处理从队列顶端取出的作业,而是由主线程控制作业的分配,主线程会在每个待处理的结构中放置处理该作业的线程ID,每个工作线程只能移出标有自己线程ID的作业。

11.4 线程创建

​ 在传统的UNIX进程模型中,每个进程只有一个控制线程。从概念上讲,这与基于线程的模型中每个进程中只包含一个线程是相同的。在POSIX线程(pthread)情况下,程序开始运行时,它也是以单进程中的单个控制线程启动的。在创建多个控制线程以前,程序的行为与传统的进程并没有什么区别。新增的线程可以通过pthread_create函数创建。

#include <phtread.h>
int pthread_create(pthread_t *restrict tidp,const pthread_attr_t* restrict attr,void*(*start_rtn)(void*),void* restrict arg);

​ 当pthread_create成功返回时,新创建的线程ID会被设置成tidp指向的内存单元attr参数用于定制各种不同的线程属性。指定为NULL为创建一个具有默认属性的线程。

新创建的线程从start_rtn 函数的地址开始运行,该函数只有一个无类型指针参数arg。如果需要向start_rtn函数传入的参数有一个以上,那么需要把这些参数放到一个结构中,然后把这个结构的地址作为arg参数传入。

​ 创建线程时并不能保证哪个线程先运行 :是新创建的线程,还是调用线程。新创建的线程可以访问进程的地址空间,并且继承调用线程的浮点环境和信号屏蔽字,但是该线程的挂起信号集会被清除。

​ 注意,pthread函数在调用失败时通常会返回错误码,它们并不想其他POSIX函数一样设置errno。每个线程都提供errno副本,这只是为了与使用errno的现有函数兼容。在线程中,从函数中返回错误码更为清晰整洁,不需要依赖哪些随着函数执行不断变化的全局状态,这样可以将错误的范围限制在引起出错的函数中。

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <pthread.h>pthread_t ntid;void printids(const char* s)
{pid_t pid;pthread_t tid;pid = getpid();tid = pthread_self();printf("%s pid %lu tid %lu (0x%lx)\n", s, (unsigned long)pid,(unsigned long)tid, (unsigned long)tid);
}void* thr_fn(void* arg)
{printids("new thread: ");return NULL;
}
int main()
{int err;err = pthread_create(&ntid, NULL, thr_fn, NULL);if(err!=0){perror("pthread_create");exit(0);}printids("main thread:");sleep(1);exit(0);
}
patrick@ubuntu:~/apue/chapter11$ ./11-1
main thread: pid 50677 tid 140273204557632 (0x7f93e6877740)
new thread:  pid 50677 tid 140273196046080 (0x7f93e6059700)

​ 这个实例有两个特别之处,需要处理主线程和新线程之间的竞争。第一个特别之处在于,主线程需要休眠,如果主线程不休眠,它就可能退出,这事线程还没有机会运行,整个进程可能就已经终止了。这种行为特征依赖于操作系统中的线程实现和调度算法

​ 第二个特别之处在于新线程是通过pthread_self函数获得自己的线程ID的,而不是从共享内存中读出的,或者从线程的启动例程中以参数的形式接收到的。回忆pthread_create函数,它会通过第一个参数(tidp)返回新建线程的线程ID。在这个例子中,主线程将新线程ID存放在ntid中,但是新建的线程并不能安全地使用它,如果新线程在主线程调用pthread_create返回之前就运行了,那么新线程看到的是未初始化的nitd内容,这个内容并不是正确的线程ID。

11.5 线程终止

​ 如果进程中的任意线程调用了exit,_Exit, 或者 _exit,那么整个进程就会终止。与此相类似,如果默认的动作是终止进程,那么,发送到线程的信号就会终止整个进程(12.8中讨论信号与线程间是如何交互的)。

​ 单个线程可以通过3种方式退出,因此可以在不终止整个进程的情况下,停止它的控制流。

  1. 线程可以简单地从启动例程(start_rtn)中返回,返回值是线程的退出码(return)。
  2. 线程可以被同一进程中的其他线程取消(pthread_cancel)。
  3. 线程调用pthread_exit
#include <pthread.h>
void phtread_exit(void* rval_ptr);

​ rval_ptr参数是一个无类型指针,与传给启动例程的单个参数类似。进程中其他线程也可以通过pthread_join函数访问到整个指针。

#include<pthread.h>
int pthread_join(pthread_t thread, void **rval_ptr);

程序11-3展示了如何获取已终止的线程退出码

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <pthread.h>void* thr_fn1(void* arg)
{printf("thread 1 returning\n");return ((void*)1);
}void* thr_fn2(void* arg)
{printf("thread 2 exiting\n");pthread_exit((void*)2);
}int main()
{int         err;pthread_t   tid1, tid2;void*       tret;err= pthread_create(&tid1, NULL, thr_fn1, NULL);if(err!=0){perror("pthread_create thread1");exit(0);}err= pthread_create(&tid2, NULL, thr_fn2, NULL);if(err!=0){perror("pthread_create  thread2");exit(0);}err = pthread_join(tid1, & tret);if(err!= 0){perror("pthread_join thread1");exit(0);}printf("thread1 exit with code %ld\n", (long)tret);err = pthread_join(tid2, & tret);if(err!= 0){perror("pthread_join thread2");exit(0);}printf("thread2 exit with code %ld\n", (long)tret);exit(0);
}
patrick@ubuntu:~/apue/chapter11$ ./11-3
thread 1 returning
thread 2 exiting
thread1 exit with code 1
thread2 exit with code 2

​ 可以看到,当一个线程通过调用pthread_exit退出或者简单地从启动例程中返回时,进程中的其他线程可以用过调用pthread_join函数获得该线程的退出状态。

​ pthread_create和pthread_exit函数的无类型指针参数可以传递的值不止一个,这个指针可以传递包含负责信息的结构和地址,但是注意,这个结构所使用的内存在调用者完成调用以后必须仍然是有效的。例如,在调用线程的栈上分配了该结构,然后把指向这个结构的指针传递给了pthread_exit,那么调用pthread_join的线程试图使用该结构时,这个栈有可能已经被撤销,这块内存也已另作他用。

程序11-4 给出了自动变量(分配在栈上)作为pthread_exit的参数时出现的问题

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <pthread.h>struct foo
{int a, b, c, d;
};void printfoo(const char*s, const struct foo *fp)
{printf("%s",s);printf(" structure at 0x%lx\n", (unsigned long) fp);printf(" foo.a = %d\n", fp->a);printf(" foo.b = %d\n", fp->b);printf(" foo.c = %d\n", fp->c);printf(" foo.d = %d\n", fp->d);
}void* thr_fn1(void* arg)
{struct foo foo = {1,2,3,4};  printfoo("thread 1:\n", &foo);pthread_exit((void*)&foo); // stack variable (auto variable)
}void* thr_fn2(void* arg)
{printf("thread 2: ID is %lu\n", (unsigned long)pthread_self());pthread_exit((void*)0);
}int main()
{int         err;pthread_t   tid1, tid2;struct foo  *fp;err = pthread_create(&tid1,NULL,thr_fn1, NULL);if(err!=0){perror("pthread_create");exit(0);}err= pthread_join(tid1, (void*)&fp);if(err!=0){perror("pthread_join thread1");exit(0);}sleep(1);printf("parent starting second thread\n");err= pthread_create(&tid2, NULL, thr_fn2, NULL);if(err!=0){perror("pthread_create thread2");exit(0);}sleep(1);printfoo("parent:\n", fp);exit(0);
}
patrick@ubuntu:~/apue/chapter11$ ./11-4
thread 1:structure at 0x7f9e465e3ed0foo.a = 1foo.b = 2foo.c = 3foo.d = 4
parent starting second thread
thread 2: ID is 140317762144000
parent:structure at 0x7f9e465e3ed0foo.a = 1184679776foo.b = 32670foo.c = 1182275154foo.d = 32670

​ 线程可以通过调用pthread_cancel函数来请求取消同喜进程中的其他线程。

#include <phtread.h>
int pthread_cancel(pthread_t tid);

​ 在默认情况下,pthread_cancel 函数会使得有tid标识的线程行为表现得如同调用了参数为PTHREAD_CANCELLED 的pthread_exit函数,但是,线程可以选择忽略取消或者控制如何被取消,将在12.7中详细讨论。注意pthread_cancel 并不等待线程终止(非阻塞函数),它仅仅提出请求。

​ 线程可以安排她退出时需要调用的函数,这与进程在退出时可以用atexit函数安排退出时类似的。这样的函数称为线程清理处理程序(thread cleanup headler)。一个线程可以建立多个清理处理程序。处理程序记录在栈中,也就是说,它们的执行顺序与注册时相反。

#include <pthread.h>
void pthread_cleanup_push(void (*rtn)(void*), void* arg);
void pthread_cleanup_pop(int execute);

​ 当线程执行以下动作时,清理函数rtn是由pthread_cleanup_push调度的,调用时只有一个参数arg

  1. 调用pthread_exit时;
  2. 响应取消请求时;
  3. 用非零参数execute 调用pthread_cleanup_pop时。

​ 如果execute参数设置为0, 清理函数将不被调用。不管发生上述哪种情况,pthread_cleanup_pop都将删除上次pthread_cleanup_push调用建立的清理处理程序。

​ 这些函数有一个限制,由于他们可以实现为宏,所以必须在与线程相同的作用域中以匹配对的形式使用。pthread_cleanup_push的宏定义可以包含字符{,这种情况下,pthread_cleanup_pop的定义中也要有对应的匹配字符}。

​ 程序11-5给出了一个如何使用线程清理处理程序的例子。这个例子虽然是人为编造的,但它描述了其中涉及的清理机制。注意,虽然我们从来没有想过要传一个参数0个线程启动例程,但还是需要把pthread_cleanup_pop调用和phtread_cleanup_push调用匹配起来,否则,程序编译就可能通不过。

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <pthread.h>void cleanup(void* arg)
{printf("cleanup: %s\n", (char*)arg);
}void* thr_fn1(void* arg)
{printf("thread 1 start\n");pthread_cleanup_push(cleanup, "thread 1 first handler");pthread_cleanup_push(cleanup, "thread 1 second handler");printf("thread 1 push complete\n");if(arg){return ((void*)1);}pthread_cleanup_pop(0);pthread_cleanup_pop(0);return ((void*)1);}void* thr_fn2(void* arg)
{printf("thread 2 start\n");pthread_cleanup_push(cleanup, "thread 2 first handler");pthread_cleanup_push(cleanup, "thread 2 second handler");printf("thread 2 push complete\n");if(arg){pthread_exit((void*)2); // 线程2使用pthread_exit 退出·}pthread_cleanup_pop(0);pthread_cleanup_pop(0);pthread_exit((void*)2);
}int main(void)
{int         err;pthread_t   tid1, tid2;void        *tret;err= pthread_create(&tid1, NULL, thr_fn1, (void*)1);if(err!=0){perror("pthread_create thread 1 ");exit(0);}err= pthread_create(&tid2, NULL, thr_fn2, (void*)1);if(err!=0){perror("pthread_create thread 2 ");}err = pthread_join(tid1, &tret);if(err!=0){perror("pthread_join thread 1 ");exit(0);}printf("thread 1 exit code %ld\n", (long)tret);err = pthread_join(tid2, &tret);if(err!=0){perror("pthread_join thread 2 ");exit(0);}printf("thread 2 exit code %ld\n", (long)tret);exit(0);
}
thread 1 start
thread 1 push complete
thread 2 start
thread 2 push complete
thread 1 exit code 1
cleanup: thread 2 second handler
cleanup: thread 2 first handler
thread 2 exit code 2

​ 从输出结果来看,两个线程都正确的启动和退出了,但是只有第二个线程的清理处理程序被调用了。因此,如果线程是通过从它的启动例程中返回(return)而终止的话,它的清理处理程序就不会被调用。还要注意,清理处理程序是按照他们注册时相反的顺序被调用的。

进程原语 线程原语 描述
fork pthread_create 创建新的控制流
exit pthread_exit 从现有的控制流中退出
waitpid pthread_join 从控制流中得到退出状态
atexit pthread_cancel_push 注册在退出控制流时调用的函数
getpid pthread_self 获取控制流的ID
abort pthread_cancel 请求控制流的非正常退出

​ 在默认情况下,线程的终止状态会保存直到对该线程调用pthread_join。如果线程已经被分离,线程底层存储资源可以在线程终止时立即被收回。在线程分离后,我们不能用pthread_join函数等待它的终止状态,因为对分离状态的线程调用pthread_join会产生未定义的行为。可以用pthread_detach分离线程。

#include <pthread.h>
int pthread_detach(pthread_t tid);返回值:若成功,返回0;否则,返回错误编号

​ 在下一章中,我们将学通过修改传给pthread_create 函数的线程属性 ,创建一个已处于分离状态的线程。

11.6 线程同步

当多个控制线程共享相同的内存时,需要确保每个线程看到一致的数据视图。如果每个线程使用的变量都是其他线程不会读取和修改的,那么就不存在一致性问题,同样,如果变量是只读的,多个线程同时读取该变量也不会有一致性问题。但是,当一个线程可以修改的变量,其他线程也可以读取或者修改的时候,我们就需要对这些线程进行同步,确保它们在访问变量的存储内容时不会访问到无效的值。

​ 为了解决这个问题,线程不得不使用锁,同一时间只允许一个线程访问该变量。图中描述了这种同步。如果线程B希望读取变量,它首先要获取锁。同样,当线程A更新变量时,也需要获取同样的这把锁。这样,线程B在线程A释放锁以前就不能读取变量。

​ [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-7uO2cVuZ-1626884591956)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20200128171033201.png)]

​ 两个或多个线程试图在同一时间修改同一变量时,也需要进行同步。考虑变量增量操作的情况,增量操作通常分解为以下三步。

  1. 从内存单元读入寄存器。
  2. 从寄存器中对变量做增量操作。
  3. 把新的值写回内存单元。

​ 如果两个线程试图几乎在同一时间对同一个变量做增量操作而不进行同步的话,结果就可能出现不一致,变量可能比原来增加了1,也有可能比原来增加了2,具体增加了1还是2要取决于第二个线程开始操作时获取的数值。如果第二个线程执行第一步要比第一个线程执行第三部要早,第二个线程读到的值与第一个线程一样,为变量加1,然后写回去,事实上没有实际的效果,总的来说变量只增加了1。

​ 如果修改操作是原子操作,那么就不存在竞争。在前面的例子中,如果增加1只需要一个寄存器周期,那么就没有竞争存在。如果数据总是以顺序一致出现的,就不需要额外的同步。当多个线程观察不到数据的不一致时,那么操作就是顺序一致的。在现代计算机系统中,存储访问需要多个总线周期,多处理器的总线周期通常在多处理器上是交叉的,所以我们并不能保证数据是顺序一致的

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-3kBSvhhN-1626884591958)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20200128183529693.png)]

​ 在顺序一致环境中,可以把数据修改操作解释为运行线程的顺序操作步骤。可以把这样的操作描述为"线程A对变量增加了1, 然后线程B对变量增加了1,所以变量的值就比原来大2",或者描述为"线程B对变量增加了1, 然后线程A对变量增加了1 ,所以变量的值就比原来的大2"。这两个线程的任何操作顺序都不可能让变量出现除上述值外的其他值。

​ 除了计算机体系结构以外,程序使用变量的方式也会引起竞争,也会导致不一致的情况发生。例如,我们可能对某个变量加1,然后基于这个值做出某种决定。因为这个增量操作步骤和这个决定步骤的组合并非原子操作,所以就给不一致情况的出现提供了可能。

11.6.1 互斥量 mutex

​ 可以使用pthread的互斥接口来保护数据,确保同一时间只有一个线程访问数据。互斥量(mutex)从本质上说是一把锁,在访问共享资源前对互斥量进行设置(加锁),在访问完成后释放(解锁)互斥量。对互斥量进行加锁后,任何其他试图再次对互斥量加锁的线程都会被阻塞直到当前线程释放该互斥锁。如果释放互斥量时有一个以上的线程阻塞,那么所有该锁上阻塞的线程都会变成可运行状态,第一个变为可运行的线程就可以对互斥量加锁,其他线程就会看到互斥量依然是锁着的,只能回去再次等待它重新变为可用。在这种方式下,每次只有一个线程可以向前执行。

​ 只有将所有线程都设计成遵守相同数据访问规则的,互斥机制才能正常工作。操作系统并不会为我们做数据访问的串行化。如果允许其中的某个线程在没有得到锁的情况下也可以访问共享资源,那么即使其他线程在使用共享资源前都申请锁,也还是会出现数据不一致的问题。

​ 互斥变量是用pthread_mutex_t数据类型表示的。在使用互斥变量以前,必须首先对它进行初始化,可以把它设置为常量PTHREAD_MUTEX_INITIALIZER(只适用于静态分配互斥量),也可以通过调用pthread_mutex_init进行初始化。如果动态分配互斥量(例如,调用malloc函数),在释放内存前需要调用pthread_mutex_destroy.

#include <pthread.h>
int phtread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t * restrict attr);
int pthread_mutex_destroy(pthread_mutex_t *mutex);两个函数的返回值:成功,返回0;否则,返回错误编号

​ 要用默认的属性初始化互斥量,只需把**attr设为NULL。12.4节中讨论互斥量属性。

对互斥量进行加锁,需要调用pthread_mutex_lock。如果互斥量已经上锁,调用线程将阻塞直到互斥量被解锁。对互斥量解锁,需要调用pthread_mutex_unlock。

#include <pthread.h>
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);所有函数的返回值:成功,返回0;否则,返回错误编号

如果不希望线程被阻塞,它可以使用pthread_mutex_trylock尝试对互斥量进行加锁。如果调用pthread_mutex_trylock时互斥量处于未锁住状态,那么pthread_mutex_trylock将锁住互斥量,不会出现阻塞直接返回0,否则pthread_mutex_trylock就会失败,不能锁住互斥量,返回EBUSY

​ 例程11-10描述了用于保护某个数据结构的互斥量。当一个以上的线程需要访问动态分配的对象时,我们可以在对象中嵌入引用计数,确保在所有使用该对象的线程完成数据访问之前,该对象的内存空间不会被释放。

​ 在对引用计数加1,减1,检查引用计数是否到达0这些操作之前需要锁住互斥量。在foo_alloc 函数中将引用计数初始化为1时没必要加锁,因为在这个操作之前分配线程时唯一引用该对象的线程。但是在这之后如果要将该对象放到一个列表中,那么它就有可能被别的线程发现,这时候需要首先对它加锁。

​ 在使用该对象之前,线程需要调用foo_hold对这个对象的引用计数加1,当对象使用完毕时,必须调用foo_rele释放引用。最后一个引用被释放时,对象所占的内存空间就被释放。

​ 在这个例子中,我们忽略了线程在调用foo_hold之前是如何找到对象的。如果有另一个线程在调用foo_hold之前阻塞等待互斥锁,这是即使对象的引用计数为0, foo_rele释放该对象的内存仍然是不对的。可以通过确保对象在释放内存前不会被找到这种方式来避免上述问题。可以通过下面的例子来看看如何做到这一点。

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <pthread.h>struct foo
{int              f_count;pthread_mutex_t  f_lock;int              f_id;/*more stuff here*/
};struct foo* foo_alloc(int id) // alloc the object
{struct foo *fp;if((fp =(struct foo*) malloc(sizeof(struct foo)))!=NULL){fp->f_count =1;fp->f_id = id;if(pthread_mutex_init(&fp->f_lock, NULL)!= 0){free(fp);return (NULL);}/* ... continue initialization ... */}return fp;
}void foo_hold(struct foo* fp) /*add a reference to the object*/
{pthread_mutex_lock(&fp->f_lock);fp->f_count++;pthread_mutex_unlock(&fp->f_lock);}void foo_rele(struct foo* fp) /*release a reference to the object*/
{pthread_mutex_lock(&fp->f_lock);if(--fp->f_count == 0){pthread_mutex_unlock(&fp->f_lock);pthread_mutex_destroy(&fp->f_lock);free(fp);}else{   pthread_mutex_unlock(&fp->f_lock);}
}

11.6.2 避免死锁

​ 如果线程试图对同一个互斥量加锁两次,那么它自身就会陷入死锁状态,但是使用互斥量时,还有其他不太明显的方式也能产生死锁。例如,程序中使用一个以上的互斥量时,如果允许一个线程一直占有第一个互斥量,并且在试图锁住第二个互斥量时处于阻塞状态,但是拥有第二个互斥量的线程也在试图锁住第一个互斥量。因为两个线程都在互相请求另一个线程拥有的资源,所以这两个线程都无法向前进行,于是就产生死锁。

​ 可以通过仔细控制互斥量加锁顺序来避免死锁的发生。例如,假设需要对两个互斥量A和B同时加锁。如果所有线程总是在对互斥量B加锁之前锁住互斥量A,那么使用这两个互斥量就不会产生死锁(当然在其他的资源上仍可能出现死锁)。类似地,如果所有的线程总是在锁住互斥量A之前锁住互斥量B,那么也不会发生死锁。可能出现的死锁只会发生在一个线程试图锁住另一个线程以相反的顺序锁住的互斥量。

​ 有时候,应用程序的结构使得对互斥量进行排序是很困难的。如果涉及了太多的锁和数据结构,可用的函数并不能把它转换成简单的层次,那么就需要采用另外的办法。在这种情况下,可以先释放占有的锁,然后过一段时间后再试。这种情况也可以用pthread_mutex_trylock接口避免死锁。如果已经占有某些锁而且pthread_mutex_trylock接口返回成功,那么就可以前进,但是如果不能获取锁,可以先释放自己已经占有的锁,做好清理工作,然后过一段时间后再重新试。

​ 例程:在这个实例中,我们更新了上面的程序,展示了两个互斥量的使用方法。在同时需要两个互斥量时,总是让它们以相同的顺序加锁,这样可以避免死锁。第二个互斥量维护着一个用于跟踪foo数据结构的散列列表。这样hashlock互斥量既可以保护foo数据结构中的散列表fh,又可以保护散列链字段f_next。 foo结构中的f_lock互斥量保护对foo结构中其他字段的访问。

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <pthread.h>
#define     NHASH       29
#define     HASH(id)    (((unsigned long)id)%NHASH)struct foo *fh[NHASH];
pthread_mutex_t hashlock =  PTHREAD_MUTEX_INITIALIZER;struct foo
{int              f_count;pthread_mutex_t  f_lock;int              f_id;struct foo*      f_next;/*more stuff here*/
};struct foo* foo_alloc(int id) // alloc the object
{struct foo  *fp;int         idx;if((fp =(struct foo*) malloc(sizeof(struct foo)))!=NULL){fp->f_count =1;fp->f_id = id;if(pthread_mutex_init(&fp->f_lock, NULL)!= 0){free(fp);return (NULL);}idx = HASH(id);pthread_mutex_lock(&hashlock);fp->f_next = fh[idx];fh[idx] = fp;pthread_mutex_lock(&fp->f_lock);pthread_mutex_unlock(&hashlock);pthread_mutex_unlock(&fp->f_lock);/* ... continue initialization ... */}return fp;
}void foo_hold(struct foo* fp) /*add a reference to the object*/
{pthread_mutex_lock(&fp->f_lock);fp->f_count++;pthread_mutex_unlock(&fp->f_lock);}struct foo*  foo_find(int id) /*find the existing object*/
{struct foo* fp;pthread_mutex_lock(&hashlock);for(fp=fh[HASH(id)]; fp!=NULL; fp = fp->f_next){if(fp->f_id == id){   foo_hold(fp);break;}}pthread_mutex_unlock(&hashlock);return fp;
}void foo_rele(struct foo* fp) /*release a reference to the object*/
{struct foo  *tfp;int         idx;pthread_mutex_lock(&fp->f_lock);if(--fp->f_count == 0) /*last reference*/{pthread_mutex_unlock(&fp->f_lock);pthread_mutex_lock(&hashlock);pthread_mutex_lock(&fp->f_lock);pthread_mutex_destroy(&fp->f_lock);/*need to recheck the condition*/if(fp->f_count !=1){fp->f_count --;pthread_mutex_unlock(&fp->f_lock);pthread_mutex_unlock(&hashlock);return;}/*remove from list*/idx = HASH(fp->f_id);tfp = fh[idx];if(tfp== fp){fh[idx] = fp->f_next;}else{   while(tfp->f_next!=fp){tfp = tfp->f_next;}tfp->f_next = fp->f_next;}free(fp);}else{   fp->f_count--;pthread_mutex_unlock(&fp->f_lock);}
}

​ 比较两个例程,可以看出,分配函数现在锁住了散列列表锁,把新的结构添加到了散列桶中,而且在对散列列表锁解锁之前,先锁定了新结构中的互斥量。因为新的结构是放在全局列表中,其他线程可以找到它,所以在初始化完成之前,需要阻塞其他线程试图访问新结构。

​ foo_find函数锁住散列表锁,然后搜素被请求的结构,如果找到了,就增加其引用计数并返回指向该结构的指针。注意,加锁的顺序是,现在foo_find 函数中锁定散列列表锁,然后再在foo_hold函数中锁定foo结构中的f_lock互斥量。

​ 现在有了两个锁以后,foo_rele函数就变得更加复杂了。如果这是最后一个引用,就需要对这个结构互斥量进行解锁,因为我们需要从散列列表中删除这个结构,这样才可以获取散列列表锁,然后重新获取结构互斥量。从上一次获得结构互斥量以来我们可能被阻塞着,所以需要重新检查条件,判断是否还需要释放这个结构。如果另一个线程在我们为满足锁顺序而阻塞时发现这个结构并对其引用计数加1,那么只需要简单地对整个引用计数减1,对所有东西解锁,然后返回。

​ 这种锁方法很复杂,所以我们要重新审视原来的设计。我们也可以使用散列列表锁来保护结构引用计数,使事情大大简化。结构互斥量可以保护foo结构中的其他任何东西。下面的例程反应了这种变化。

#include <stdlib.h>
#include <pthread.h>#define NHASH   29
#define HASH(id)    (((unsigned long)id)%NHASH)struct foo* fh[NHASH];
pthread_mutex_t hashlock =  PTHREAD_MUTEX_INITIALIZER;struct foo
{int              f_count;int              f_id;struct foo*      f_next;pthread_mutex_t  f_lock;
};struct foo* foo_alloc(int id)
{struct foo  *fp;int         idx;if((fp =(struct foo*)malloc(sizeof(struct foo)))!=NULL){fp->f_count = 1;fp->f_id = id;if(pthread_mutex_init(&fp->f_lock, NULL)!=0){free(fp);return NULL;}idx = HASH(id);pthread_mutex_lock(&hashlock);fp->f_next = fh[idx];fh[idx] = fp;pthread_mutex_lock(&fp->f_lock);pthread_mutex_unlock(&hashlock);/* ...continue initialization... */pthread_mutex_unlock(&fp->f_lock);}return fp;
}void foo_hold(struct foo* fp)
{pthread_mutex_lock(&hashlock);fp->f_count++;pthread_mutex_unlock(&hashlock);
}struct foo* foo_find(int id)
{struct foo* fp;pthread_mutex_lock(&hashlock);for(fp = fh[HASH(id)]; fp!=NULL; fp =fp->f_next){if(fp->f_id == id){fp->f_count++;break;}}pthread_mutex_unlock(&hashlock);return fp;
}void foo_rele(struct foo* fp)
{struct foo  *tfp;int         idx;pthread_mutex_lock(&hashlock);if(--fp->f_count ==0) /* last reference , remove from list*/{idx = HASH(fp->f_id);tfp = fh[idx];if(tfp == fp){fh[idx] = fp->f_next;}else{while(tfp->f_next != fp){tfp = tfp->f_next;}tfp->f_next = fp->f_next;}pthread_mutex_unlock(&hashlock);pthread_mutex_destroy(&fp->f_lock);free(fp);}else{pthread_mutex_unlock(&hashlock);}
}

​ 注意,与上面的例程相比,11-12中的程序就简单多了。两种用途使用相同的锁时,围绕散列列表和引用计数的锁的排序问题就不存在了。多线程的软件设计涉及这两者之间的折中。如果锁的粒度太粗,就会出现很多线程阻塞等待相同的锁,这可能并不能改善并发性。如果锁的粒度太细,那么过多的锁开销会使系统性能受到影响,而且代码变得复杂。作为一个程序员,需要在满足锁需求的情况下,在代码的复杂性和性能之间找到正确的平衡。

11.6.3 函数pthread_mutex_timedlock

​ 当线程试图获取一个已经加锁的互斥量时,pthread_mutex_timedlock互斥量原语允许绑定线程阻塞时间。pthread_mutex_timedlock函数与pthread_mutex_lock是基本等价的,但是在达到超时等待时间值时,pthread_mutex_timedlock不会对互斥量进行加锁,而是返回错误码ETIMEDOUT。

#include <pthread.h>
#include <time.h>
int pthread_mutex_timedlock(pthread_mutex_t* restrict mutex,const struct timespec* restrict tsptr);返回值: 若成功,返回0;否则,返回错误编号.

​ 超时制定愿意等待的绝对时间(与相对时间相比,制定在时间X之前可以阻塞等待,而不是说愿意等待阻塞Y秒)。这个超时时间是用timespec结构表示的,它用s和ns来描述时间。

​ 实例 11-13 给出了如何用pthread_mutex_timedlock 避免永久阻塞。

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <pthread.h>int main()
{int              err;struct timespec  tout;struct tm*       tmp;char buf[64];pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;pthread_mutex_lock(&lock);printf("mutex locked..\n");clock_gettime(CLOCK_REALTIME, &tout);tmp = localtime(&tout.tv_sec);strftime(buf, sizeof(buf), "%r", tmp);printf("current time is %s\n", buf);tout.tv_sec+= 10; /*10 sec from now*//*catuon: this could lead to dead lock */err = pthread_mutex_timedlock(&lock, &tout);clock_gettime(CLOCK_REALTIME, &tout);tmp = localtime(&tout.tv_sec);strftime(buf, sizeof(buf), "%r", tmp);printf("the time is now %s\n", buf);if(err == 0){printf("mutex locked again!\n");}else{printf("can't lock mutex again: %s\n", strerror(err));}exit(0);
}
patrick@ubuntu:~/apue/chapter11$ ./11-13
mutex locked..
current time is 09:00:28 PM
the time is now 09:00:38 PM
can't lock mutex again: Connection timed out

​ 这个程序故意对它已有的互斥量进行加锁,目的是演示pthread_mutex_timedlock是如何工作的。不推荐在实际中使用这种策略,因为它会导致死锁。

​ 注意,阻塞的时间可能会有所不同,造成不同的原因有很多种:开始时间可能在某秒的中间位置,系统时钟的精度可能不足以精确到支持我们指定的超时时间值,或者在程序继续运行前,调度延迟可能会增加时间值。

11.6.4 读写锁

​ 读写锁(reader-writer lock)与互斥量相似,不过读写锁允许更高的并行性。互斥量要么是锁住状态,要么是不加锁状态,而且一次只有一个线程可以对其加锁。读写锁有三种状态:读模式下加锁状态,写模式下加锁状态,不加锁状态。一次只有一个线程可以占有写模式的读写锁,但是多个线程可以同时占有读模式的读写锁。

​ 当读写锁时写加锁状态时,在这个锁被解锁前,所有试图对这个锁加锁的线程都会被阻塞。当读写锁在读加锁状态时,所有试图以读模式对它加锁的线程都可以得到访问权,但是任何希望以写模式对此锁进行加锁的线程都会被阻塞,直到所有线程释放它们的读锁为止。虽然各操作系统对读写锁的实现各不相同,但当读写锁处于读模式锁住的状态,而这时有一个线程试图以写模式获取锁时,读写锁通常会阻塞后续读模式的锁请求。这样可以避免读模式锁长期占用,而等待的写模式锁请求一直得不到满足。

​ 读写锁非常适合于对数据结构读的次数远大于写的情况。当读写锁在写模式下时,它所保护的数据结构就可以被安全地修改,因为一次只有一个线程可以在写模式下拥有这个锁。当读写锁在读模式下时,只要线程先获取了读模式下的读写锁,该锁所保护的数据结构就可以被多个获得读模式的线程读取。

​ 读写锁也叫做共享互斥锁(shared-exclusive lock)。 当读写锁时读模式锁住时,就可以说成时共享模式锁住的。当它是写模式锁住时,就可以说成是以互斥模式锁住的。

​ 与互斥量相比,读写锁在使用之前必须初始化,在释放它们底层内存之前必须销毁。

#include <pthread.h>
int pthread_rwlock_init(pthread_rwlock_t* restrict rwlock,const pthread_rwlockattr_t* restrict attr);
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);两个函数的返回值: 成功,返回0;失败,返回错误编号。

​ 读写锁通过pthread_rwlock_init进行初始化。如果希望读写锁有默认的属性,可以传一个null指针给attr,将在12.4.2节中讨论读写锁的属性。

​ Single Unix Specification 在XSI扩展中定义了PTHREAD_RWLOCK_INITIALIZER常量。如果默认属性就足够的话,可以用它对静态分配的读写锁进行初始化。

​ 在释放读写锁占用的内存之前,需要调用pthread_rwlock_destroy做清理工作。如果pthread_rwlock_inti为读写锁分配了资源,pthread_rwlock_destroy将释放这些资源。如果在调用pthread_rwlock_destroy之前就释放了读写锁占用的内存空间,那么分配给这个锁的资源就会丢失。

​ 要在读模式下锁定读写锁,需要调用pthread_rwlock_rdlock。要在写模式下锁定读写锁,需要调用pthread_rwlock_wrlock。不管以何种方式锁住读写锁,都可以调用pthread_rwlock_unlock进行解锁。

#include <pthread.h>
int phtread_rwlock_rdlock(pthread_rwlock_t *rwlock);int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);

​ 各种实现可能会对共享模式下可获取读写锁的次数进行限制,所以需要检查pthread_rwlock_rdlock 的返回值。即使pthread_rwlock_wrlock和pthread_rwlock_unlock有错误返回,而且从技术上来讲,在调用函数时总是应该检查错误返回,但是如果锁设计合理的话,就不需要检查它们。错误返回值得定义只是针对不正确使用读写锁的情况(如未经初始化的锁),或者试图获取自己拥有的锁从而可能造成死锁的情况。但是需要注意,有些特定的实现可能会定义另外的错误返回。

​ Single UNIX Specification还定义了读写锁原语的条件版本。

#include <pthread.h>
int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);

​ 可以获取锁时,这两个函数返回0。否则,它们会返回错误EBUSY。这两个函数可以用于我们前面讨论的遵守某种锁层次但还不能完全避免死锁的情况。

​ 11-14中的程序解释了读写锁的使用。作业请求队列由单个读写锁保护。这个例子给出了生产者-消费者模型一种可能的实现,多个工作线程获取单个主线程分配给它们的作业。

#include <stdlib.h>
#include <pthread.h>struct job
{struct job* j_next;struct job* j_prev;pthread_t   j_id;  // tells which thread handles this job /*more stuff here*/
};struct queue
{struct job* q_head;struct job* q_tail;pthread_rwlock_t    q_lock;
};/** Initialize a queue * */int queue_init(struct queue* qp)
{int err;qp->q_head = NULL;qp->q_tail = NULL;err= pthread_rwlock_init (&qp->q_lock, NULL);if(err != 0){return (err);}/* ...continue initialization... */return 0;
}/** Insert a job at the head of the queue .*/void job_insert(struct queue* qp, struct job* jp)
{pthread_rwlock_wrlock(&qp->q_lock);/* insert a node to the head of a queue */jp->j_next = qp->q_head;jp->j_prev = NULL;if(qp->q_head != NULL){qp->q_head->j_prev = jp;}else{qp->q_tail=jp; /* list was empty */}qp->q_head = jp;pthread_rwlock_unlock(&qp->q_lock);
}/** Append a job on the tail of the queue */ void job_append(struct queue* qp, struct job* jp)
{pthread_rwlock_wrlock(&qp->q_lock);jp->j_next = NULL;jp->j_prev = qp->q_tail;if(qp->q_tail != NULL){qp->q_tail->j_next = jp;}else{qp->q_head = jp;}qp->q_tail =jp;pthread_rwlock_unlock(&qp->q_lock);
}/** Remove the given job from a queue */ void job_remove(struct queue* qp, struct job* jp)
{pthread_rwlock_wrlock(&qp->q_lock);if(jp == qp->q_head){qp->q_head = jp->j_next;if(qp->q_tail == jp){qp->q_tail = NULL;}else{jp->j_next->j_prev = jp->j_prev;}}else if(jp == qp->q_tail){qp->q_tail = jp->j_prev;jp->j_prev->j_next = jp->j_next;}else{jp->j_prev->j_next= jp->j_next;jp->j_next->j_prev= jp->j_prev;}pthread_rwlock_unlock(&qp->q_lock);
}/** Find a job for the given thread id */ struct job* job_find(struct queue* qp, pthread_t id)
{struct job* jp;if(pthread_rwlock_rdlock(&qp->q_lock) !=0){   return NULL;}for(jp = qp->q_head; jp!=NULL; jp=jp->j_next){if(pthread_equal(jp->j_id, id)){break;}}pthread_rwlock_unlock(&qp->q_lock);return jp;
}

​ 在这个例子中,凡是需要向队列中增加作业或者从队列中删除作业的时候,都采用了写模式来锁住队列的读写锁。不管何时搜索队列,都需要获取读模式下的锁,允许所有的工作线程并发地搜索队列。在这种情况下,只有在线程搜索作业的频率远远高于增加或删除作业时,使用读写锁才能改善性能。

​ 工作线程只能从队列中读取与他们线程ID匹配的作业。由于作业结构同一时间只能由一个线程使用,所以不需要额外的加锁。

11.6.5 带有超时的读写锁

​ 与互斥量一样,Single UNIX Specification 提供了带有超时的读写加锁函数,使应用程序在获取读写锁避免陷入永久阻塞状态。这两个函数是pthread_rwlock_timedrdlock 和 pthread_rwlock_timedwrlock。

#include <pthread.h>
#include <time.h>int pthread_rwlock_timedrdlock(pthread_rwlock* restrict rwlockconst struct timespec* restrict tsptr);
int pthread_rwlock_timedwrlock(pthread_rwlock* restrict rwlockconst struct timespec* restrict tsptr);两个函数的返回值:若成功,返回0;否则,返回错误编号

​ 这两个函数的行为与它们“不计时的” 版本类似。tsptr参数指向timespec结构,指定线程应该停止阻塞的时间。如果它们不能获取锁,那么超时到期时,这两个函数将返回ETIMEOUT错误。与pthread_mutex_timedlock函数类似,超时指定的是绝对时间,而不是相对时间。

11.6.6 条件变量

​ 条件变量时线程可用的另一种同步机制。条件变量给多个线程提供了一个回合的场所。条件变量与互斥量一起使用时,允许线程以无竞争的方式等待特定的条件发生。

​ 条件变量本身是由互斥量保护的。线程字改变条件状态之前必须首先锁住互斥量。其他线程在获得互斥量之前不会察觉到这种改变,因为互斥量必须在锁定以后才能计算条件。
​ 在使用条件变量之前,必须先对它进行初始化。有pthread_cond_t 数据类型表示的条件变量可以用两种方式进行初始化,可以把常量PTHREAD_COND_INITIALIZER赋给静态分配的条件变量,但是如果条件变量是动态分配的,则需要使用pthread_cond_init函数对它进行初始化。

​ 在释放条件变量底层的内存空间之前,可以使用pthread_cond_destroy函数对条件变量进行反初始化(deinitialize)。

#include <pthread.h>int pthread_cond_wait(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr);
int pthread_cond_destroy(pthread_cond_t * cond);

​ 除非要创建一个具有非默认属性的条件变量,否则pthread_cond_init函数的attr参数可以设置为NULL。将在12.4.3节中讨论条件变量属性。

​ 我们使用pthread_cond_wait等待条件变量为真。如果在给定的时间内条件不能满足,那么会生成一个返回错误码的变量。

#include <pthread.h>
int phtread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrct mutex);
int pthread_cond_timedwait(phtread_cond_t *restrict condpthread_mutex_t *restrict mutexconst struct timespec* restrict tsptr);

​ 传递给phtread_cond_wait 的互斥量对条件进行保护。调用者把锁住的互斥量传给函数,函数然后自动把调用线程放到等待条件的线程列表上,对互斥量解锁。这就关闭了条件检查和线程进入休眠状态等待条件改变这两个操作之间的时间通道,这样线程就不会错过条件的任何变化,phtread_cond_wait返回时,互斥量再次被锁住。

​ pthread_cond_timedwait函数的功能与pthread_cond_wait函数相似,只是多了一个超时(tsptr)。超时值指定了我们愿意等待多长时间,它是通过timespec结构指定的。

​ 如图11-13所示,需要指定愿意等待多长时间,这个时间值是个绝对数而不是相对数。例如,假设愿意等待3分钟。那么,并不是把3分钟转换成timespec结构,而是需要把当前时间加上3分钟再转换成timespec结构。

​ 可以使用clock_gettime函数获取timespec结构表示的当前时间。但是目前并不是所有平台都支持这个函数,因此,也可以用另一个函数gettimeofday获取timeval结构表示当前时间,然后把这个时间转换成timespec结构。要得到超时值得绝对时间,可以使用下面的函数(假设阻塞的最大时间使用分来表示的)。

#include <sys/time.h>
#include <stdlib.h>void maketimeout(struct timespec *tsp, long minutes)
{struct timeval now;/*get current time */gettimeofday(&now, NULL);tsp->tv_sec = now.tv_sec;tsp->tv_nsec = now.tv_usec* 1000; /* usec to nsec  *//* add the offset to get timeout value */tsp->tv_sec += minutes*60;
}

​ 如果超时到期时条件还是没有出现,pthread_cond_timedwait将重新获取互斥量,然后返回错误ETIMEDOUT。从pthread_cond_wait 或者 pthread_cond_timedwait调用成功返回时,线程需要重新计算条件,因为另一个线程可能已经在运行并改变了条件。

​ 有两个函数可以用于通知线程条件满足。pthread_cond_signal函数至少能唤醒一个等待该条件的线程,而pthread_cond_broadcast函数则能唤醒等待该条件的所有线程。

#include <pthread.h>
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);两个函数的返回值:成功,返回0;否则,返回错误编号。

​ 在调用pthread_cond_signal或者pthread_cond_broadcast时,我们说这是在给线程或者条件发信号。必须注意,一定要在条件状态改变以后再给线程发信号。

​ 例程11-15给出了如何结合使用条件变量和互斥量对线程进行同步。

#include <pthread.h>struct msg
{struct msg* m_next;/* ...more stuff here ...*/
};struct msg* workq;pthread_cond_t      qready = PTHREAD_COND_INITIALIZER;pthread_mutex_t   qlock  = PTHREAD_MUTEX_INITIALIZER;void process_msg(void)
{struct msg *mp;for(;;){pthread_mutex_lock(&qlock);/* pthread_cond_wait must be used in a while loop  */while(workq==NULL){pthread_cond_wait(&qready, &qlock);}mp =workq;workq= mp->m_next;pthread_mutex_unlock(&qlock);/* now process the message mp */}
}void enqueue_msg(struct msg* mp)
{pthread_mutex_lock(&qlock);mp->m_next = workq;workq = mp;pthread_mutex_unlock(&qlock);pthread_cond_signal(&qready);
}

条件是工作队列的状态。我们用互斥量保护条件,在while循环中判断条件。把消息放到工作队列时,需要占有互斥量,但在给等待线程发信号时,不需要占有互斥量。需要线程在调用pthread_cond_signal之前把消息从队列中拖出了,就可以在释放互斥量以后完成这部分工作。因为我们是在while循环中检查条件,所以不存在这样的问题:线程醒来,发现队列仍为空,然后返回继续等待。如果代码不能容忍这种竞争,就需要在给线程发信号的时候占有互斥量。

11.6.7 自旋锁

​ 自旋锁与互斥量类似,但它不是通过休眠使进程阻塞,而是在获取锁之前一直处于忙等待(自旋)阻塞状态。自旋锁可以用于以下情况:锁被持有的时间短,而且线程额并不希望在重新调度上花费太多的成本。

​ 自旋锁通常作为底层原语用于实现其他类型的锁。根据它们所基于的系统体系结构,可以通过使用测试并设置指令有效地实现。当然这里说的有效也还是会导致CPU资源的浪费:当线程自旋等待锁变为可用时,CPU不能做其他事情。这也是自旋锁只能被持有一小段时间的原因。不鼓励使用busy waiting。

​ 当自旋锁用在非抢占式内核是非常有用的:除了提供互斥机制以外,它们会阻塞中断,这样中断处理程序就不会让系统陷入死锁状态,因为它需要获取已被加锁的自旋锁(把中断想成是另一种抢占)。在这种类型的内核中,中断处理程序不能休眠,因此它们能用的同步原语只能是自旋锁。

​ 但是,在用户层,自旋锁并不是非常有用,除非运行在不允许抢占的实时调度类中。运行在分时调度类中的用户层线程字两种情况下可以被取消调度:当它们的时间片到期时,或者具有更高调度优先级的线程就绪变成可运行时。在这些情况下,如果线程拥有自旋锁,它就会进入休眠状态,阻塞在锁上的其他线程自旋的时间可能会比预期的时间更长。

很多互斥量的实现非常高效,以至于应用程序采用互斥锁的性能与曾经采用过自旋锁的性能基本是相同的。事实上,有些互斥量的实现在试图获取互斥量的时候会自旋一小段时间,只有在自旋计数到达某一阈值的时候才会休眠。这些因素,加上现代处理器的进步,使得上下文切换越来越快,也使得自旋锁只在某些特定的情况下有用

​ 自旋锁的接口与互斥量的接口类似,这使得它可以比较容易的从一个替换为另一个。可以用pthread_spin_init 函数对自旋锁进行初始化。用pthread_spin_destroy函数进行自旋锁的反初始化。

#include <pthread.h>
int pthread_spin_init(pthread_spinlock_t *lock, int pshared);
int pthread_spin_destroy(pthread_spinlock_t *lock);两个函数的返回值:成功,返回0;否则,返回错误编号

​ 只有一个属性是自旋锁特有的,这个属性只支持线程进程共享同步(Thread Process-Shared Synchronization) 选项(这个选项目前在Single UNIX Specification 中是强制的, 见图2-5)的平台上才用的到。pshared参数表示进程共享属性,表明自旋锁是如何获取的。如果它设为PTHREAD_PROCESS_SHARD,则自旋锁能被可以访问锁底层内存的线程所获取,即便是那些线程属于不同的进程,情况也是如此。否则pshared参数设为PTHREAD_PROCESS_PRIVATE,自旋锁就只能被初始化该锁的进程内部的线程所访问。

​ 可以用pthread_spin_lock或pthread_spin_trylock对自旋锁进行加锁,前者在获取锁之前一直自旋,后者如果不能获取锁,就立即返回EBUSY错误。注意,pthread_spin_trylock不能自旋。不管以何种方式加锁,自旋锁都可以通过pthread_spin_unlock函数解锁。

#include <pthread.h>int pthread_spin_lock(pthread_spinlock_t* lock);
int pthread_spin_trylock(pthread_spinlock_t* lock);
int pthread_spin_unlock(pthread_spinlock_t* lock);所有函数的返回值:成功,返回0;否则,返回错误编号

​ 注意,如果自旋锁当前在解锁状态的话,pthread_spin_lock函数不要自旋就可以对它加锁。如果线程已经对它加锁了,结果就是未定义的。调用pthread_spin_lock会返回EDEADLK错误(或其他错误),或者调用可能会永久自旋。具体行为依赖于世界的实现。试图对没有加锁的自旋锁进行解锁,结果也是未定义的。

​ 不管是pthread_spin_lock 还是pthread_spin_trylock,返回值为0的话就表示自旋锁被加锁。需要注意,不要调用在持有自旋锁情况下可能会进入休眠的函数。如果调用了这些函数,会浪费cpu资源,因为其他线程需要获取自旋锁需要等待的时间就延长了。

11.6.8 屏障

​ 屏障(barrier)是用户协调多个线程并行工作的同步机制。屏障允许每个线程等待,直到所有的合作线程都到达某一点,然后从该点继续执行。我们已经看到一种屏障,pthread_join函数就是一种屏障,允许一个线程等待,直到另一个线程退出。

​ 但是屏障对象的概念更广,它们允许任意数量的线程等待,直到所有的线程完成处理工作,而线程不需要退出。所有线程达到屏障后可以接着工作。

​ 可以使用pthread_barrier_init 对屏障进行初始化,用pthread_barrier_destroy函数进行反初始化。

#include <pthread.h>
int pthread_barrier_init(pthread_barrier_t *restrict barrier,const pthread_barrierattr_t *restrict attr,unsigned int count);
int pthread_barrier_destroy(pthread_barrier *barrier);两个函数的返回值: 成功,返回0; 否则,返回错误编号

​ 初始化屏障时,可以使用count参数指定,在允许所有线程继续运行之前,必须到达屏障的线程数目。使用attr参数指定屏障对象的属性,我们会在下一章详细讨论。现在设置attr为NULL,用默认属性初始化屏障。如果使用pthread_barrier_init函数为屏障分配资源,那么在反初始化屏障可以调用pthread_barrier_destroy函数释放相应的资源。

​ 可以使用pthread_barrier_wait函数来表明,线程已完成工作,准备等所有其他线程赶上来。

#include <pthread.h>
int pthread_barrier_wait(pthread_barrier_t *barrier);返回值:若成功,返回0或者PTHREAD_BARRIER_SERIAL_THREAD;否则,返回错误编号

​ 调用pthread_barrier_wait 的线程在屏障计数(调用pthread_barrier_init时设定)未满足条件时,就会进入休眠状态。如果该线程是最后一个调用pthread_barrier_wait的线程,就满足了屏障计数,所有线程都被唤醒。

​ 对于一个任意线程,pthread_barrier_wait函数返回了PTHREAD_BARRIER_SERIAL_THREAD。剩下的线程看到的返回值是0。这使得一个线程可以作为主线程,它可以工作在其他所有线程已完成的工作结果上

​ 一旦到达屏障计数值,而且线程处于非阻塞状态,屏障就可以被重用。但是除非在调用了pthread_barrier_destroy函数之后,又调用了pthread_barrier_init函数对计数用另外的数进行初始化,否则屏障计数不会改变。

​ 例程11-16给出了在一个任务上合作的多线程之间如何使用屏障进行同步。

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <pthread.h>
#include <sys/time.h>
#include <limits.h>
#define NTHR    8                /* number of threads */
#define NUMNUM  80000000L        /* number of numbers to sort  */
#define TNUM    (NUMNUM/NTHR)    /* number to sort per thread  */long nums[NUMNUM];
long snums[NUMNUM];pthread_barrier_t b;#ifdef SOLARIS
#define heapsort qsort
#else
extern int heapsort(void*, size_t, size_t, int(*)(const void*, const void*));
#endif/** Compare two long integers (helper function for heapsort )*/ int complong(const void* arg1, const void* arg2)
{long l1 = *(long*)arg1;long l2 = *(long*)arg2;if(l1 ==l2){return 0;}else if(l1<l2){return -1;}else{return 1;}
}/** Worker thread to sort a portion of the set of numbers.*/ void* thr_fn(void* arg)
{   long idx = (long)arg;heapsort(&nums[idx], TNUM, sizeof(long), complong);pthread_barrier_wait(&b);/** Go off and perform more work..*/ return NULL;
}/** merge the results of the individual sorted ranges.*/ void merge()
{long idx[NTHR];long i, minidx, sidx, num;for(i =0; i<NTHR; ++i){idx[i] = i*TNUM;}for(sidx = 0; sidx<NUMNUM; sidx++){num = LONG_MAX;for(i =0; i<NTHR; ++i){if((idx[i]<(i+1)*TNUM) && nums[idx[i]]<num){num = nums[idx[i]];minidx = i;}}snums[sidx] = nums[idx[minidx]];idx[minidx]++;}
}int main()
{FILE*           fp;unsigned long   i;struct timeval  start, end;long long       startusec, endusec; double          elapsed;int             err;pthread_t       tid;/** Create the initial set of numbers to sort. */ srandom(1);for(int i=0; i<NUMNUM; ++i){nums[i] =random();}/** Create 8 threads to sort the numbers. */ gettimeofday(&start, NULL);pthread_barrier_init(&b, NULL, NTHR+1);for(i =0; i<NTHR; ++i){err = pthread_create(&tid, NULL, thr_fn, (void*)(i*TNUM));if(err != 0){perror("pthread_create");exit(0);}}pthread_barrier_wait(&b);merge();gettimeofday(&end, NULL);/** Print the sorted list. */ startusec = start.tv_sec * 1000000+start.tv_usec;endusec = end.tv_sec*1000000 +end.tv_usec;elapsed = (double)(endusec - startusec) /1000000.0;printf("sort took %.4f seconds\n", elapsed);fp =fopen("./data", "w+");for(int i=0; i<NUMNUM; ++i){fprintf(fp,"%ld\n", snums[i]); }exit(0);
}
# 多线程排序
patrick@ubuntu:~/apue/chapter11$ ./11-16
sort took 24.4866 seconds#单线程排序
patrick@ubuntu:~/apue/chapter11$ ./onesort
sort took 124.6038 seconds

​ 这个例子给出了多个线程只执行一个任务时,使用屏障的简单情况。在更加实际的情况下,工作线程在调用pthread_barrier_wait函数返回后会接着执行其他的活动。

​ 在这个实例中,使用8个线程分解了800万个数的排序工作。每个线程用堆排序算法对100万个数进行排序(详细的算法请参阅Knuth[1998])。然后用主线程调用一个函数对这些结果进行合并。

​ 并不需要使用pthread_barrier_wait 函数中的返回值 PTHREAD_BARRIER_SERIAL_THREAD 来决定哪个线程执行结果的合并操作,因为我们使用主线程来完成这个任务。这也是把屏障计数值设为工作线程数加1的原因,主线程也作为其中的一个候选线程。

​ 如果只用一个线程去完成800万个数的堆排序,那么与例程11-16相比,我们将能看到例程11-16的程序在性能上有显著提升。在8核处理器系统上,单线程程序对800万个数进行排序需要12.14秒。同样的系统,使用8个并行线程和一个合并结果的线程,相同800万个数的排序仅需要1.91秒,速度提升了6倍。

11.7 小结

e numbers.
*/

gettimeofday(&start, NULL);
pthread_barrier_init(&b, NULL, NTHR+1);
for(i =0; i<NTHR; ++i)
{err = pthread_create(&tid, NULL, thr_fn, (void*)(i*TNUM));if(err != 0){perror("pthread_create");exit(0);}
}
pthread_barrier_wait(&b);
merge();
gettimeofday(&end, NULL);/** Print the sorted list. */ startusec = start.tv_sec * 1000000+start.tv_usec;
endusec = end.tv_sec*1000000 +end.tv_usec;
elapsed = (double)(endusec - startusec) /1000000.0;
printf("sort took %.4f seconds\n", elapsed);
fp =fopen("./data", "w+");for(int i=0; i<NUMNUM; ++i)
{fprintf(fp,"%ld\n", snums[i]);
}exit(0);

}


```shell
# 多线程排序
patrick@ubuntu:~/apue/chapter11$ ./11-16
sort took 24.4866 seconds#单线程排序
patrick@ubuntu:~/apue/chapter11$ ./onesort
sort took 124.6038 seconds

​ 这个例子给出了多个线程只执行一个任务时,使用屏障的简单情况。在更加实际的情况下,工作线程在调用pthread_barrier_wait函数返回后会接着执行其他的活动。

​ 在这个实例中,使用8个线程分解了800万个数的排序工作。每个线程用堆排序算法对100万个数进行排序(详细的算法请参阅Knuth[1998])。然后用主线程调用一个函数对这些结果进行合并。

​ 并不需要使用pthread_barrier_wait 函数中的返回值 PTHREAD_BARRIER_SERIAL_THREAD 来决定哪个线程执行结果的合并操作,因为我们使用主线程来完成这个任务。这也是把屏障计数值设为工作线程数加1的原因,主线程也作为其中的一个候选线程。

​ 如果只用一个线程去完成800万个数的堆排序,那么与例程11-16相比,我们将能看到例程11-16的程序在性能上有显著提升。在8核处理器系统上,单线程程序对800万个数进行排序需要12.14秒。同样的系统,使用8个并行线程和一个合并结果的线程,相同800万个数的排序仅需要1.91秒,速度提升了6倍。

11.7 小结

​ 本章介绍了线程的概念,讨论了现有的创建和销毁线程的POSIX.1原语;此外,还介绍了线程同步的问题,讨论了5个基本的同步机制(互斥量,读写锁,条件变量,自旋锁以及屏障),了解了如何使用它们来保护共享资源。

Ch11. Threads 线程相关推荐

  1. 操作系统——Threads 线程

    目录​​​​​​​ 1. Overview 2. Multicore Programming 多核编程 2.1 Concurrency vs. Parallelism 并发vs.并行 2.2 Prog ...

  2. 如图两道面试题,顺便深入线程池,并连环17问

    这两面试题是基友朋友最近去面滴滴遇到的,今天就借着这两面试真题来深入一波线程池吧,这篇文章力求把线程池核心点和常问的面试点一网打尽,当然个人能力有限,可能会有遗漏,欢迎留言补充! 先把问题列出来,如果 ...

  3. [转] Node.js的线程和进程

    [From] http://www.admin10000.com/document/4196.html 前言 很多Node.js初学者都会有这样的疑惑,Node.js到底是单线程的还是多线程的?通过本 ...

  4. java 线程转储_获取Java线程转储的常用方法(推荐)

    1. 线程转储简介 线程转储(Thread Dump)就是JVM中所有线程状态信息的一次快照. 线程转储一般使用文本格式, 可以将其保存到文本文件中, 然后人工查看和分析, 或者使用工具/API自动分 ...

  5. python threading-单线程 多线程 主线程 子线程 setDeamon join

    python threading-单线程 多线程 主线程 子线程 setDeamon join 单线程 多线程 主线程和子线程 setDaemon() join() 测试多线程下程序运行的时间 创建多 ...

  6. java线程池案例_使用Executors 和 ThreadPoolExecutor实现Java线程池案例

    并发主题 使用Executors 和 ThreadPoolExecutor实现Java线程池案例 首先需要一个工作线程: package com.journaldev.threadpool; publ ...

  7. 深入线程池的问题连环炮

    这一篇是看了这一篇文章之后用于个人的学习记录,加入了一些个人的理解,其中一些图片也是来源于这篇文章https://mp.weixin.q‍q.com/s/NDOx94yY06OnHjrYq2lVYw ...

  8. 【Linux】生产者消费者编程实现-线程池+信号量

    生产者消费者编程实现,采用了线程池以及信号量技术. 线程的概念就不多说,首先说一下多线程的好处:多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞 ...

  9. linux下C语言简单实现线程池

    0 前言 网上关于线程池的例子还是不少,简单明了的倒是比较少,看了网上的资料,打算借鉴网上的一些例子,自己实现以下. 线程的概念就不多说,首先说一下多线程的好处:多线程技术主要解决处理器单元内多个线程 ...

最新文章

  1. C语言循环选择还有,C语言第五讲,语句 顺序循环选择.(示例代码)
  2. 【HDOJ】4333 Revolving Digits
  3. mac下homebrew一些总结
  4. linux冒泡算法程序,用蛮力法解决冒泡排序 - linux-tao的个人空间 - OSCHINA - 中文开源技术交流社区...
  5. 【云栖大会】基因计算:解读生命的力量
  6. 开课啦! dubbo-go 微服务升级实战
  7. linux虚拟机时间同步
  8. Non-standard serial port baud rate setting
  9. THINK PHP 学习笔记20171115
  10. Kafka和Unix管道的示例
  11. GStreamer 简化 Linux 多媒体开发
  12. 三级等级保护之安全运维管理
  13. python图片批量转换成灰度图像
  14. 总体均值的区间估计和习题
  15. 20.Consent Controller Get请求逻辑实现
  16. ttl传输种过期_来自 202.112.36.253 的回复: TTL 传输中过期。解决思路
  17. 把数组改为用逗号隔开的形式
  18. 租房是每个奋斗者的必修课
  19. Oracle EBS 12.2.7系统克隆教程
  20. Datename() 函数与DatePart()函数

热门文章

  1. 那些会阻碍程序员成长的细节
  2. 【优秀课设】基于Python的百度API的OCR名片识别【含完整API账户】
  3. 禁用uwebiview 的反弹功能 bounces
  4. MySQL 规范数据库设计
  5. 六所大学要增加计算机类硕士专业,三所大学将被撤销!2021年学位授权点审核名单公布...
  6. 项目实战-图像识别项目-通过QT制作图形界面并调用百度AI进行图像识别(一)
  7. Python3网络爬虫:今日头条新闻App的广告数据抓取
  8. 2018年买华硕笔记本 安装 Fedora 28,pcieport errors flood the journal
  9. bestCoder 2015 百度之星程序设计大赛 资格赛-1003-IP聚合
  10. 生物信息学仿真软件SInC的初步使用教程