[zz]muduo源码阅读之Thread和ThreadPool
[source_address] https://blog.dujiong.net/2016/07/17/muduo-6/
muduo源码阅读之Thread和ThreadPool
在muduo的one loop per thread + thread pool模型中,线程和线程池应该是其中最基础也是最重要的两个组件了。所以,本文深入代码,学习Thread和ThreadPool两个类的结构和实现。
Thread类
__thread关键字
学习Thread class之前,先了解一个关键字的用法:__thread。
__thread是GCC内置的线程局部存储设施。它的实现非常高效,比Pthread库中的pthread_key_t(muduo中ThreadLocal)快很多。__thread变量是表示每个线程有一份独立实体,各个线程的变量值互不干扰。__thread只能修饰POD类型,不能修饰class类型,因为无法自动调用构造函数和析构函数。
Thread类的封装用到了命名空间CurrentThread,这个空间中定义了和线程相关的一些独立属性。
namespace CurrentThread
{__thread int t_cachedTid = 0;__thread char t_tidString[32];__thread int t_tidStringLength = 6;__thread const char* t_threadName = "unknown";
}
其中,t_cachedTid表示线程的真实id,Pthread库中提供了pthread_self()获取当前线程的标识,类型为pthread_t。但是,pthread_t不一定是数值类型,也可能是一个结构体,这带来了一些问题。
(1)无法打印输出pthread_t,因为不知道其确切类型。
(2)无法比较pthread_t大小或计算hash值。
(3)pthread_t值只在进程内有意义,与操作系统的任务调度之间无法建立有效关系,Pthread库只能保证在同一进程之内,同一时刻的各个线程的id不同。
所以,muduo采用gettid()系统调用的返回值作为线程id,muduo中将操作封装为gettid()函数。但是,我们知道,调用系统调用开销比较大,所以,muduo中采用__thread变量t_cachedTid来存储,在线程第一次使用tid时通过系统调用获得,存储在t_cachedTid中,以后使用时不再需要系统调用了。
t_tidString[32]:用string类型表示tid,以便输出日志。
t_tidStringLength:string类型tid的长度。
t_threadName:线程的名字。
相关的数据结构
在Thread的实现中,还用到了两个数据结构,一个位ThreadData,用来辅助调用线程执行的函数。另一个为ThreadNameInitializer,为线程的创建做环境准备。其中用到了pthread_atfork(NULL,NULL,&afterfork)
。该函数的原型为
int pthread_atfork(void (*prepare)(void), void (*parent)(void), void (*child)(void));
这是一个跟进程创建有关的函数,为fork的调用做准备和调用后子进程父进程的初始化。prepare函数在调用fork前执行,parent在调用fork后的父进程中执行,child在调用fork后的子进程中执行。
当然,在实际应用中,多线程不要调用fork(),否则会出现一些问题。因为fork智能克隆当前线程的thread of control,却不克隆其他线程。fork()之后,除了当前线程之外,其他线程都消失了。这样,会出现很多问题。比如,如果复制了一个lock的mutex,却没有复制unlock的线程,那么在给mutex加锁时就会出现死锁。
Thread类分析
首先来看Thread类的数据成员和构造函数。
class Thread : boost::noncopyable
{typedef boost::function<void ()> ThreadFunc;...private:bool started_;bool joined_;pthread_t pthreadId_;boost::shared_ptr<pid_t> tid_;ThreadFunc func_;string name_;static AtomicInt32 numCreated_;
};
Thread::Thread(ThreadFunc&& func, const string& n): started_(false), joined_(false),pthreadId_(0),tid_(new pid_t(0)),func_(func),name_(n)
{setDefaultName();
}
其中有两处需要说明一下,首先是shared_ptr<pid> tid_
,可能有人会有疑问:为什么这里要用shared_ptr包装pid?
原因是tid_所属的对象Thread在主线程(A)中创建,而tid_需要在新创建的线程B中进行赋值操作,如果tid使用裸指针的方式传递给线程(B),那么线程A中Thread对象析构(下文)销毁后,线程B持有的就是一个野指针,所以,在Thread对象中将以shared_ptr包装。
然后是numCreated_,是一个静态变量,类型为AtomicInt32,原子类型,用来表示第几次创建线程实例,在记录日志时可用记录为:“线程名+numCreated_”。
接下来看Thread的一些接口函数。
void Thread::start()
{started = true;detail::ThreadData* data = new detail::ThreadData(func_, name_, tid_);if(pthread_create(&pthreadId_, NULL, &detail::startThread, data));{started_ = false;delete data;LOG_SYSFATAL << "Failed in pthread_create";}
}
Thread::start()将调用pthread_create()创建新线程,detail::startThread()是新线程的入口函数,data是新线程执行的辅助结构体。detail::startThread()调用data->runInThread()执行线程逻辑(func_)。
int Thread::join()
{assert(started_);assert(!joined_);joined_ = true;return pthread_join(pthreadId_, NULL);
}Thread::~Thread()
{if(started_ && !joined_){pthread_detach(pthreadId_);}
}
Thread析构的时候没有销毁持有的Pthreads句柄(pthread_t),也就是说Thread的析构不会等待线程结束。如果Thread对象的生命期长于线程,然后通过Thread::join()来等待线程结束并释放线程资源。如果Thread对象的生命期短于线程,那么析构时会自动detach线程,避免了资源泄露。
ThreadPool类
ThreadPool(线程池)本质上是一个生产者-消费者的模型,在实际中主要完成计算任务。在muduo线程池中有一个存放工作线程的容器ptr_vector,相当于消费者;有一个存放任务的队列deque。
任务队列是有界的,类似于BoundedBlockingQueue,实现时需要两个条件变量。
以下是ThreadPool的数据成员:
class ThreadPool : boost::noncopyable
{typedef boost::function<void ()> Task;private:MutexLock mutex_;Condition notEmpty_;Condition notFull_;string name_;Task threadInitCallback_;boost::ptr_vector<muduo::Thread> threads_;std::deque<Task> queue_;size_t maxQueueSize_;bool running_;
}
其中threadInitCallback_可由setThreadInitCallback(const Task& cb)设置,设置回调函数,每次在执行任务前先调用。在线程池开始运行之前,需要先设置任务队列的大小(调用setMaxQueueSize()),因为运行线程池时,线程会从任务队列取任务。
接下来是ThreadPool的一些接口函数。
void ThreadPool::start(int numThreads)
{assert(threads_.empty());running_ = true;threads_.reserve(numThreads);for (int i = 0; i < numThreads; ++i){char id[32];snprintf(id, sizeof id, "%d", i+1);threads_.push_back(new muduo::Thread(boost::bind(&ThreadPool::runInThread, this), name_+id));threads_[i].start();}if(numThreads == 0 && threadInitCallback_){threadInitCallback_();}
}
void ThreadPool::start(int numThreads)
开启线程池,按照线程数量numThreads_创建工作线程,线程函数为ThreadPool::runInThread()。
void ThreadPool::runInThread()
{try{if(threadInitCallback_){threadInitCallback_();}while(running_){Task task(take());if(task){task();}}}catch(const Exception& ex){...}}
如果设置了threadInitCallback_,则进行执行任务前的一些初始化操作。然后从任务队列中取任务执行,有可能阻塞,当任务队列为空时。
ThreadPool::Task ThreadPool::take()
{MutexLockGuard lock(mutex_);while(queue_.empty() && running_){notEmpty_.wait();}Task task;if(!queue_.empty()){task = queue_.front();queue_.pop_front();if(maxQueueSize_ > 0){notFull_.notify();}}return task;
}
多线程从消息队列中取任务的时候,需要加锁保护。等到队列非空信号,就取任务。取出之后,便告知任务队列已经非满,可以继续添加任务。
void ThreadPool::run(const Task& task)
{if(threads_.empty()){task();}else{MutexLockGuard lock(mutex_);while(isFull()){notFull_.wait();}assert(!isFull());queue_.push_back(task);notEmpty_.notfy();}
}
如果ThreadPool没有子线程(set和start操作在run之前),就在主线程中执行该task,否则,将任务加入到队列,并通知线程从中取task,如果队列已满,便等待。
ThreadPool::~ThreadPool()
{if(running_){stop();}
}
void ThreadPool::stop()
{{MutexLockGuard lock(mutex_);running_ = false;notEmpty_.notifyAll();}for_each(threads_.begin(),threads_.end(),boost::bind(&muduo::Thread::join, _1));
}
最后是ThreadPool的析构函数,在其中调用stop(),唤醒所有等待的线程,然后对线程池中的每一个线程执行join()。
总结
以上就是muduo中Thread和ThreadPool类的学习,有很多源码,有点啰嗦。但是,在muduo的one loop per thread + thread pool模型中,Thread和ThreadPool是很重要的组件,所以需要深入地掌握。
转载于:https://www.cnblogs.com/ym65536/p/8284205.html
[zz]muduo源码阅读之Thread和ThreadPool相关推荐
- mysql thread conn_MySQL源码阅读2-连接与线程管理
本篇是第二篇,MySQL初始化完成之后,便进入一个死循环中,接受客户端请求,并完成客户端的命令(如果在window下启动多个listener,则分别启动线程监听).该篇介绍MySQL服务中的连接与线程 ...
- 应用监控CAT之cat-client源码阅读(一)
CAT 由大众点评开发的,基于 Java 的实时应用监控平台,包括实时应用监控,业务监控.对于及时发现线上问题非常有用.(不知道大家有没有在用) 应用自然是最初级的,用完之后,还想了解下其背后的原理, ...
- muduo源码client/server通信流程
今天来学习一下muduo源码中client和server间的大致通信流程,以echo服务为例,先看一下echo对面的main函数代码. #include "examples/simple/e ...
- 12.源码阅读(app启动流程-android api 26)
activity的启动流程之前已经通过源码了解了,那么app的启动流程是怎样的,从我们按下app的图标,到应用启动起来显示出画面,中间都经历了什么? 安卓是基于java的,所以和java有一定的相似性 ...
- Java8 ArrayBlockingQueue 源码阅读
一.什么是 ArrayBlockingQueue ArrayBlockingQueue 是 GUC(java.util.concurrent) 包下的一个线程安全的阻塞队列,底层使用数组实现. 除了线 ...
- 多线程与高并发(四):LockSupport,高频面试题,AQS源码,以及源码阅读方法论
补充几道面试题 锁升级过程:无锁.偏向锁.轻量级锁.重量级锁 StampedLock 自己看一下 面试题:syn和Reentrantlock的区别? LockSupport LockSupport.p ...
- zookeeper 源码阅读(2)
接着zookeeper 源码阅读(1) Zookeeper服务器的启动,大致可以分为以下五个步骤 1. 配置文件解析. 2. 初始化数据管理器. 3. 初始化网络I/O管理器. 4. 数据恢复. 5. ...
- surefire 拉起 junit 单元测试类 源码阅读(一)
根据surefire 拉起Junit单元测试类 输出的报错日志 跟踪执行过程: 日志1: java.lang.reflect.InvocationTargetExceptionat sun.refle ...
- DotText源码阅读(7) --Pingback/TrackBack
DotText源码阅读(7) --Pingback/TrackBack 博客这种服务的区别于论坛和所谓文集网站,很大程度上我认为是由于pingback/trackback的存在,使得博客这种自媒体有可 ...
- AQS源码阅读笔记(一)
AQS源码阅读笔记 先看下这个类张非常重要的一个静态内部类Node.如下: static final class Node {//表示当前节点以共享模式等待锁static final Node SHA ...
最新文章
- 独家 | 一文读懂随机森林的解释和实现(附python代码)
- poj 2947 Widget Factory
- FreeSWITCH中文语音包
- vim树形目录NERDTree
- 160804、oracle查询:取出每组中的第一条记录
- SpringBoot怎么直接访问templates下的html页面
- 1074. 元素和为目标值的子矩阵数量
- 在多线程数据平面开发套件(DPDK)应用程序中优化内存使用
- 人为什么必须积极有为?
- 内涵! 程序员才懂的动图(太 TM 形象了)
- 单片机课程设计—简易频率计—课程设计任务书
- html浮动提示框,JavaScript浮动提示框Tooltip效果
- IB心理学生物分析模块
- oracle18c创建数据库,Oracle 18C 手工创建CDB
- Android jetpack DataBinding 与RecyclerView
- 随机森林 Iris 特征重要性
- 2022第十二届中国电子文件管理论坛嘉宾揭晓
- Mac tips - 打开【键盘重复按键】功能
- 超全面的前端切图技巧,读这篇就够了
- 微信支付可以设置0元么?
热门文章
- STM32的备份寄存器和控制状态寄存器
- 转载~final, static和 nested class 总结 原文~http://yulin10.bokee.com/2544792.html
- display:none与visibility:hidden的区别 ,还有html5的新属性hidden
- liunx 下mysql 的安装
- Thinking in Java 16.3返回一个数组
- Python 文件(文件夹)匹配(glob模块)(转载)
- 无锁同步-C++11之Atomic和CAS
- MySQL error(2006) server has gone away
- 什么软件可以让头发变黑_头发特别干枯毛躁,请问什么方法可以让头发恢复到顺滑状态?...
- oracle增加表字段_史上最详细的oracle 中的CR块介绍--一致性读