理解 Linux 条件变量
理解 Linux 条件变量
1 简单介绍
当多个线程之间由于存在某种依赖关系。导致仅仅有当某个条件存在时,才干够运行某个线程。此时条件变量(pthread_cond_t)能够派上用场。比方:
例1: 当系统不忙(这是一个条件)时,运行扫描文件状态的线程。
例2: 多个线程组成线程池,仅仅有当任务队列中存在任务时。才用当中一个线程去运行这个任务。为避免惊群(thrundering herd),能够採用条件变量同步线程池中的线程。
2 使用方法
条件变量(pthread_cond_t)必须与锁(pthread_mutex_t)一起使用。
条件变量的API:
1) pthread_cond_init
2) pthread_cond_signal / pthread_cond_broadcast
3) pthread_cond_wait / pthread_cond_timedwait
4) pthread_cond_destroy
线程A:
include <stdio.h>
include <sys/time.h>
include <unistd.h>
include <pthread.h>
include <errno.h>
...void A_thread_run(void *arg)
{...pthread_mutex_lock (& lock);// 条件满足, 发出通知pthread_cond_signal (& cond);pthread_mutex_unlock (& lock);...
}
线程B:
void B_thread_run(void *arg)
{for ( ; ; ) {pthread_mutex_lock (&lock);/* pthread_cond_wait 原子调用: 等待条件变量。 解除锁。 然后堵塞* 当 pthread_cond_wait 返回,则条件变量有信号,同一时候上锁** 等待条件有两种方式:条件等待pthread_cond_wait()和计时等待pthread_cond_timedwait(),* 当中计时等待方式假设在给定时刻前条件没有满足,则返回ETIMEOUT* 不管哪种等待方式,都必须和一个相互排斥锁配合。以防止多个线程同一时候请求pthread_cond_wait()* (或pthread_cond_timedwait(),下同)的竞争条件(Race Condition)。
* mutex相互排斥锁必须是普通锁(PTHREAD_MUTEX_TIMED_NP)或者适应锁(PTHREAD_MUTEX_ADAPTIVE_NP), * 且在调用pthread_cond_wait()前必须由本线程加锁(pthread_mutex_lock()),而在更新条件等待队列曾经, * mutex保持锁定状态,并在线程挂起进入等待前解锁。
* 在条件满足从而离开pthread_cond_wait()之前,mutex将被又一次加锁,以与进入pthread_cond_wait()前的加锁动作相应。 * 激发条件有两种形式。pthread_cond_signal()激活一个等待该条件的线程,存在多个等待线程时按入队顺序激活当中一个; * 而pthread_cond_broadcast()则激活全部等待线程(惊群)。 */ pthread_cond_wait (&cond, &lock); if (shutdown) { break; } /* Unlock */ pthread_mutex_unlock (&lock); /* do your task here */ } pthread_mutex_unlock (&lock); pthread_exit (0); }
线程B调用pthread_cond_wait,从而堵塞在此句。等待有信号通知。pthread_cond_wait内部存在原子调用:解除锁和等待条件变量有信号。
当pthread_cond_wait函数返回。表明得到了信号通知,同一时候上锁。
线程A用pthread_cond_signal通知调用了pthread_cond_wait的线程B。
3 避免惊群
这是个狼多肉少。僧多粥少,色鬼多美女少的时代。每当一块肉丢到狼群,就引发一群狼去争抢,但最后仅仅有一仅仅狼得到了肉。这就是惊群(thrundering herd)。
现实世界的惊群,比方老师在课堂上每次提出一个问题,最后仅仅找一个学生回答,时间久了,学生对这个老师的问题就倦怠了。
计算机的惊群会造成server资源空耗。
pthread_cond_signal函数的作用是发送一个信号给另外一个正在处于堵塞等待状态的线程,使其脱离堵塞状态,继续运行.假设没有线程处在堵塞等待状态,pthread_cond_signal也会成功返回。
但使用pthread_cond_signal不会有“惊群现象”产生。它最多仅仅给一个线程发信号。假如有多个线程正在堵塞等待着这个条件变量的话,那么是依据各等待线程优先级的高低确定哪个线程接收到信号開始继续运行。
假设各线程优先级同样,则依据等待时间的长短来确定哪个线程获得信号。但不管怎样一个pthread_cond_signal调用最多发信一次。
4 线程池threadpool
经典的样例就是一个线程池是一个固定数目的线程的组合,当中每一个线程(worker)全然能够做同样的工作。线程池包括这样一个任务(task)队列。用户向任务队列中加入任务,线程池自己主动派发线程去运行任务。
每一个线程有特定于线程的參数(thread argument)。每一个任务也有特定于任务的数据(task argument)。线程函数运行任务函数,同一时候传递给任务函数线程參数和任务參数。
典型的样例就是每一个线程包括了到数据库或其它资源的连接,任务函数能够安全地使用这些连接,由于任务函数是在线程函数中同步运行的。
以下是完整的线程池代码。原来的代码中没有特定于线程的參数,我加入了这部分代码。
threadpool.h
/** 2014-06-18: last modified by cheungmine** Copyright (c) 2011, Mathias Brossard <mathias@brossard.org>.* All rights reserved.* * Redistribution and use in source and binary forms, with or without* modification, are permitted provided that the following conditions are* met:* * 1. Redistributions of source code must retain the above copyright* notice, this list of conditions and the following disclaimer.* * 2. Redistributions in binary form must reproduce the above copyright* notice, this list of conditions and the following disclaimer in the* documentation and/or other materials provided with the distribution.* * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT* HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.*/#ifndef _THREADPOOL_H_
#define _THREADPOOL_H_#ifndef POOL_MAX_THREADS
# define POOL_MAX_THREADS 256
#endif#ifndef POOL_MAX_QUEUES
# define POOL_MAX_QUEUES 1024
#endif#ifndef POOL_DEFAULT_THREADS
# define POOL_DEFAULT_THREADS 32
#endif#ifndef POOL_DEFAULT_QUEUES
# define POOL_DEFAULT_QUEUES 256
#endif/*** @file threadpool.h* @brief Threadpool Header file*/typedef struct threadpool_t threadpool_t;/*** @file threadpool.h* @brief thread_context_t* thread can take itself argument* added by cheungmine.* 2014-06-17*/
typedef struct thread_context_t
{void *pool;pthread_t thread;void *thread_arg;struct threadpool_task_t *task;
} thread_context_t;/*** @struct threadpool_task* @brief the work struct** @var function Pointer to the function that will perform the task.* @var argument Argument to be passed to the function.*/
typedef struct threadpool_task_t
{void (*function)(thread_context_t *);int flags; /* user defined */void * argument;
} threadpool_task_t;typedef enum
{threadpool_invalid = -1,threadpool_lock_failure = -2,threadpool_queue_full = -3,threadpool_shutdown = -4,threadpool_run_failure = -5,threadpool_out_memory = -6
} threadpool_error_t;static const char* threadpool_error_messages[] = {"threadpool_success","threadpool_invalid","threadpool_lock_failure","threadpool_queue_full","threadpool_shutdown","threadpool_run_failure","threadpool_out_memory"
};/*** @function threadpool_create* @brief Creates a threadpool_t object.* @param thread_count Number of worker threads.* @param queue_size Size of the queue.* @param thread_args array of arguments with count of thread_count, NULL if ignored.* @param flags Unused parameter.* @return a newly created thread pool or NULL*/
threadpool_t *threadpool_create (int thread_count, int queue_size, void **thread_args, int flags);/*** @function threadpool_add* @brief add a new task in the queue of a thread pool* @param pool Thread pool to which add the task.* @param function Pointer to the function that will perform the task.* @param argument Argument to be passed to the function.* @param flags Unused parameter.* @return 0 if all goes well, negative values in case of error (@see* threadpool_error_t for codes).*/
int threadpool_add (threadpool_t *pool, void (*routine)(thread_context_t *), void *task_arg, int flags);/*** @function threadpool_destroy* @brief Stops and destroys a thread pool.* @param pool Thread pool to destroy.* @param flags Unused parameter.*/
int threadpool_destroy (threadpool_t *pool, int flags);#endif /* _THREADPOOL_H_ */
threadpool.c
/** 2014-06-18: last modified by cheungmine** Copyright (c) 2011, Mathias Brossard <mathias@brossard.org>.* All rights reserved.** Redistribution and use in source and binary forms, with or without* modification, are permitted provided that the following conditions are* met:** 1. Redistributions of source code must retain the above copyright* notice, this list of conditions and the following disclaimer.** 2. Redistributions in binary form must reproduce the above copyright* notice, this list of conditions and the following disclaimer in the* documentation and/or other materials provided with the distribution.** THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT* HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.*//*** @file threadpool.c* @brief Threadpool implementation file*/#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>#include "threadpool.h"/*** @struct threadpool* @brief The threadpool struct** @var notify Condition variable to notify worker threads.* @var threads Array containing worker threads ID.* @var thread_count Number of threads* @var queue Array containing the task queue.* @var queue_size Size of the task queue.* @var head Index of the first element.* @var tail Index of the next element.* @var shutdown Flag indicating if the pool is shutting down*/
struct threadpool_t {pthread_mutex_t lock;pthread_cond_t notify;int head;int tail;int count;int shutdown;int started;int thread_count;int queue_size;threadpool_task_t *queues;thread_context_t thread_ctxs[0];
};/*** @function void *threadpool_run(void *threadpool)* @brief the worker thread* @param threadpool the pool which own the thread*/
static void *threadpool_run (void *threadpool);int threadpool_free(threadpool_t *pool);threadpool_t *threadpool_create(int thread_count, int queue_size, void **thread_args, int flags)
{int i;threadpool_t *pool = NULL;/* Check thread_count for negative or otherwise very big input parameters */if (thread_count < 0 || thread_count > POOL_MAX_THREADS) {goto err;}if (thread_count == 0) {thread_count = POOL_DEFAULT_THREADS;}/* Check queue_size for negative or otherwise very big input parameters */if (queue_size < 0 || queue_size > POOL_MAX_QUEUES) {goto err;}if (queue_size == 0) {queue_size = POOL_DEFAULT_QUEUES;}/* create threadpool */if ( (pool = (threadpool_t *) malloc (sizeof(threadpool_t) +sizeof(thread_context_t) * thread_count +sizeof(threadpool_task_t) * queue_size)) == NULL ) {goto err;}/* Initialize */pool->thread_count = thread_count;pool->queue_size = queue_size;pool->head = pool->tail = pool->count = 0;pool->shutdown = pool->started = 0;pool->queues = (threadpool_task_t *) (& pool->thread_ctxs[thread_count]);/* Initialize mutex and conditional variable first */if ((pthread_mutex_init (&(pool->lock), NULL) != 0) ||(pthread_cond_init (&(pool->notify), NULL) != 0)) {goto err;}/* Start worker threads */for (i = 0; i < thread_count; i++) {thread_context_t * pctx = & pool->thread_ctxs[i];/* set pool to each thread context */pctx->pool = (void*) pool;/* assign thread argument if valid */if (thread_args) {pctx->thread_arg = thread_args[i];} else {pctx->thread_arg = 0;}if ( pthread_create (& pctx->thread, NULL, threadpool_run, (void*) pctx) != 0) {threadpool_destroy (pool, 0);return NULL;} else {pool->started++;}}return pool;err:if(pool) {threadpool_free(pool);}return NULL;
}int threadpool_add (threadpool_t *pool, void (*function)(thread_context_t *), void *task_arg, int flags)
{int err = 0;int next;if ( pool == NULL || function == NULL ) {return threadpool_invalid;}if (pthread_mutex_lock (&(pool->lock)) != 0) {return threadpool_lock_failure;}next = pool->tail + 1;next = (next == pool->queue_size) ? 0 : next;do {/* Are we full ? */if (pool->count == pool->queue_size) {err = threadpool_queue_full;break;}/* Are we shutting down ?
*/ if (pool->shutdown) { err = threadpool_shutdown; break; } /* Add task to queue */ pool->queues[pool->tail].function = function; pool->queues[pool->tail].argument = task_arg; pool->queues[pool->tail].flags = flags; pool->tail = next; pool->count += 1; /* pthread_cond_broadcast */ if (pthread_cond_signal (&(pool->notify)) != 0) { err = threadpool_lock_failure; break; } } while(0); if (pthread_mutex_unlock (&pool->lock) != 0) { err = threadpool_lock_failure; } return err; } int threadpool_destroy (threadpool_t *pool, int flags) { int i, err = 0; if (pool == NULL) { return threadpool_invalid; } if (pthread_mutex_lock (&(pool->lock)) != 0) { return threadpool_lock_failure; } do { /* Already shutting down */ if (pool->shutdown) { err = threadpool_shutdown; break; } pool->shutdown = 1; /* Wake up all worker threads */ if ((pthread_cond_broadcast(&(pool->notify)) != 0) || (pthread_mutex_unlock(&(pool->lock)) != 0)) { err = threadpool_lock_failure; break; } /* Join all worker thread */ for (i = 0; i < pool->thread_count; i++) { if (pthread_join (pool->thread_ctxs[i].thread, NULL) != 0) { err = threadpool_run_failure; } } } while(0); if (pthread_mutex_unlock (&pool->lock) != 0) { err = threadpool_lock_failure; } /* Only if everything went well do we deallocate the pool */ if (!err) { threadpool_free (pool); } return err; } int threadpool_free (threadpool_t *pool) { if (pool == NULL || pool->started > 0) { return -1; } pthread_mutex_lock (&(pool->lock)); pthread_mutex_destroy (&(pool->lock)); pthread_cond_destroy (&(pool->notify)); free(pool); return 0; } /** * each thread run function */ static void *threadpool_run (void * param) { threadpool_task_t task; thread_context_t * thread_ctx = (thread_context_t *) param; threadpool_t * pool = thread_ctx->pool; for ( ; ; ) { /* Lock must be taken to wait on conditional variable */ pthread_mutex_lock (&(pool->lock)); /* Wait on condition variable, check for spurious wakeups. When returning from pthread_cond_wait(), we own the lock. */ while ((pool->count == 0) && (!pool->shutdown)) { pthread_cond_wait (&(pool->notify), &(pool->lock)); } if (pool->shutdown) { break; } /* Grab our task */ task.function = pool->queues[pool->head].function; task.argument = pool->queues[pool->head].argument; task.flags = pool->queues[pool->head].flags; thread_ctx->task = &task; pool->head += 1; pool->head = (pool->head == pool->queue_size) ? 0 : pool->head; pool->count -= 1; /* Unlock */ pthread_mutex_unlock (&(pool->lock)); /* Get to work */ (*(task.function)) (thread_ctx); } pool->started--; pthread_mutex_unlock (&(pool->lock)); pthread_exit (NULL); return (NULL); }
转载于:https://www.cnblogs.com/xfgnongmin/p/10615095.html
理解 Linux 条件变量相关推荐
- 深入理解Linux 条件变量2:使用条件变量实现[生产-消费]框架
前言 在上一篇文章<深入理解Linux 条件变量1:使用场景.接口说明>我们简单介绍了条件变量的使用场景以及相关接口,正如大神linus所说:talk is cheap,show me t ...
- 深入理解Linux 条件变量3:条件变量为什么要配合着锁使用?
在上一篇文章<深入理解Linux 条件变量2:使用条件变量实现[生产-消费]框架>中,我们通过示例代码演示了条件变量的使用.从条件变量的API接口中我们很容易发现,条件变量必须配合着互斥锁 ...
- Linux先发送条件变量,浅谈Linux条件变量的使用
Linux线程同步之间存在多种机制,条件变量是一种类似操作系统里提到的生产者-消费者算法的同步机制,允许线程以无竞争的方式等待特定条件的发生. 示例伪代码: void* Thread1(void){ ...
- Linux先发送条件变量,linux 条件变量 浅谈Linux条件变量的使用
想了解浅谈Linux条件变量的使用的相关内容吗,在本文为您仔细讲解linux 条件变量的相关知识和一些Code实例,欢迎阅读和指正,我们先划重点:linux,条件变量,下面大家一起来学习吧. Linu ...
- Linux 条件变量详解
LINUX条件变量详解 一.条件变量概述 1.1 函数API讲解 二.函数使用 三.结果展示与分析 一.条件变量概述 条件变量不是一个把锁,它实质上一个类似信号的东西,与锁相互配合使用,因为锁所能 ...
- linux条件变量使用和与信号量的区别
linux条件变量使用和与信号量的区别 今天在学习进程同步机制的时候看见一句话: 条件变量只能在管程中通过两个原语操作--wait原语和signal原语 于是发出了一个疑问:信号量机制和条件变量同步机 ...
- Linux 条件变量使用细节(为何调用 pthread_cond_wait 前加锁,函数内部解锁,返回时又加锁)
一.本文目的 首先说明,本文重点不在怎么用条件变量.这里我先列出 apue 中对于pthread_cond_wait函数的这么一段话: 调用者把锁住的互斥量传给函数,函数然后自动把调用线程放到等待条件 ...
- Linux 条件变量 pthread_cond_wait
条件变量是利用线程间共享的全局变量进行同步的一种机制,主要包括两个动作:一个线程等待"条件变量的条件成立"而挂起:另一个线程使"条件成立"(给出条件成立信号). ...
- linux条件变量唤醒丢失,多线程编程精髓(三)
本篇主要讲Linux环境下的多线程同步内核对象. (1)linux线程同步之互斥体:linux互斥体的用法与windows的临界区对象类似,使用数据结构 pthread_mutex_t表示互斥体对象( ...
最新文章
- 数据结构--栈(附上STL栈)
- Centos8.4 配置本地镜像yum源
- 2021年API攻击数量激增600%+
- Python中,os.listdir遍历纯数字文件乱序如何解决
- 数字证书国产化的趋势
- java学习-狼人杀
- 混沌理论作业简析——两人一组_图像加密解密小游戏
- 百度分享代码--一键分享Baidu Share BEGIN
- 智能卡java_Java智能卡发送命令
- Miniconda3环境搭建详细流程
- 英魂之刃服务器维护进不去,英魂之刃口袋版为什么进不去 进不去解决方法
- 哪里有云南ip服务器,云南那些服务商可以提供云南本地ip服务器
- 华为打造的智慧办公“新物种”,为何在央视节目露脸?
- 披荆斩棘Linux之清理空间
- 常用的maven命令
- 面试宝典之高分回答面试题(三)
- 浅谈快消品行业的数字化转型
- RSA PKCS1(google play receipt 验证)
- Oracle培训感想
- 《流浪地球2》票房突破18亿
热门文章
- AcWing 885. 求组合数 I(递推式预处理)
- AcWing 240. 食物链
- Cortex-M开发板密码登陆界面
- android 如何快速检测到画面变化_电瓶修复—如何快速检测电池的好坏2
- 基于腾讯AI Lab词向量进行未知词、短语向量补齐与域内相似词搜索
- mybatis中的#{}与${}在原理上的区别
- OpenCV基本图形绘制之绘制直线
- Vue : Expected the Promise rejection reason to be an Error
- 从DCF到DCX:构想照进现实
- 在mac os中设置环境变量