[source_address] https://blog.dujiong.net/2016/07/17/muduo-6/

muduo源码阅读之Thread和ThreadPool

在muduo的one loop per thread + thread pool模型中,线程和线程池应该是其中最基础也是最重要的两个组件了。所以,本文深入代码,学习Thread和ThreadPool两个类的结构和实现。

Thread类

__thread关键字

学习Thread class之前,先了解一个关键字的用法:__thread。
__thread是GCC内置的线程局部存储设施。它的实现非常高效,比Pthread库中的pthread_key_t(muduo中ThreadLocal)快很多。__thread变量是表示每个线程有一份独立实体,各个线程的变量值互不干扰。__thread只能修饰POD类型,不能修饰class类型,因为无法自动调用构造函数和析构函数。
Thread类的封装用到了命名空间CurrentThread,这个空间中定义了和线程相关的一些独立属性。

namespace CurrentThread
{__thread int t_cachedTid = 0;__thread char t_tidString[32];__thread int t_tidStringLength = 6;__thread const char* t_threadName = "unknown";
}

其中,t_cachedTid表示线程的真实id,Pthread库中提供了pthread_self()获取当前线程的标识,类型为pthread_t。但是,pthread_t不一定是数值类型,也可能是一个结构体,这带来了一些问题。
(1)无法打印输出pthread_t,因为不知道其确切类型。
(2)无法比较pthread_t大小或计算hash值。
(3)pthread_t值只在进程内有意义,与操作系统的任务调度之间无法建立有效关系,Pthread库只能保证在同一进程之内,同一时刻的各个线程的id不同。
所以,muduo采用gettid()系统调用的返回值作为线程id,muduo中将操作封装为gettid()函数。但是,我们知道,调用系统调用开销比较大,所以,muduo中采用__thread变量t_cachedTid来存储,在线程第一次使用tid时通过系统调用获得,存储在t_cachedTid中,以后使用时不再需要系统调用了。
t_tidString[32]:用string类型表示tid,以便输出日志。
t_tidStringLength:string类型tid的长度。
t_threadName:线程的名字。

相关的数据结构

在Thread的实现中,还用到了两个数据结构,一个位ThreadData,用来辅助调用线程执行的函数。另一个为ThreadNameInitializer,为线程的创建做环境准备。其中用到了pthread_atfork(NULL,NULL,&afterfork)。该函数的原型为
int pthread_atfork(void (*prepare)(void), void (*parent)(void), void (*child)(void));
这是一个跟进程创建有关的函数,为fork的调用做准备和调用后子进程父进程的初始化。prepare函数在调用fork前执行,parent在调用fork后的父进程中执行,child在调用fork后的子进程中执行。
当然,在实际应用中,多线程不要调用fork(),否则会出现一些问题。因为fork智能克隆当前线程的thread of control,却不克隆其他线程。fork()之后,除了当前线程之外,其他线程都消失了。这样,会出现很多问题。比如,如果复制了一个lock的mutex,却没有复制unlock的线程,那么在给mutex加锁时就会出现死锁。

Thread类分析

首先来看Thread类的数据成员和构造函数。

class Thread : boost::noncopyable
{typedef boost::function<void ()> ThreadFunc;...private:bool started_;bool joined_;pthread_t pthreadId_;boost::shared_ptr<pid_t> tid_;ThreadFunc func_;string name_;static AtomicInt32 numCreated_;
};
Thread::Thread(ThreadFunc&& func, const string& n): started_(false), joined_(false),pthreadId_(0),tid_(new pid_t(0)),func_(func),name_(n)
{setDefaultName();
}

其中有两处需要说明一下,首先是shared_ptr<pid> tid_,可能有人会有疑问:为什么这里要用shared_ptr包装pid?
原因是tid_所属的对象Thread在主线程(A)中创建,而tid_需要在新创建的线程B中进行赋值操作,如果tid使用裸指针的方式传递给线程(B),那么线程A中Thread对象析构(下文)销毁后,线程B持有的就是一个野指针,所以,在Thread对象中将以shared_ptr包装。
然后是numCreated_,是一个静态变量,类型为AtomicInt32,原子类型,用来表示第几次创建线程实例,在记录日志时可用记录为:“线程名+numCreated_”。

接下来看Thread的一些接口函数。

void Thread::start()
{started = true;detail::ThreadData* data = new detail::ThreadData(func_, name_, tid_);if(pthread_create(&pthreadId_, NULL, &detail::startThread, data));{started_ = false;delete data;LOG_SYSFATAL << "Failed in pthread_create";}
}

Thread::start()将调用pthread_create()创建新线程,detail::startThread()是新线程的入口函数,data是新线程执行的辅助结构体。detail::startThread()调用data->runInThread()执行线程逻辑(func_)。

int Thread::join()
{assert(started_);assert(!joined_);joined_ = true;return pthread_join(pthreadId_, NULL);
}Thread::~Thread()
{if(started_ && !joined_){pthread_detach(pthreadId_);}
}

Thread析构的时候没有销毁持有的Pthreads句柄(pthread_t),也就是说Thread的析构不会等待线程结束。如果Thread对象的生命期长于线程,然后通过Thread::join()来等待线程结束并释放线程资源。如果Thread对象的生命期短于线程,那么析构时会自动detach线程,避免了资源泄露。

ThreadPool类

ThreadPool(线程池)本质上是一个生产者-消费者的模型,在实际中主要完成计算任务。在muduo线程池中有一个存放工作线程的容器ptr_vector,相当于消费者;有一个存放任务的队列deque。
任务队列是有界的,类似于BoundedBlockingQueue,实现时需要两个条件变量。
以下是ThreadPool的数据成员:

class ThreadPool : boost::noncopyable
{typedef boost::function<void ()> Task;private:MutexLock mutex_;Condition notEmpty_;Condition notFull_;string name_;Task threadInitCallback_;boost::ptr_vector<muduo::Thread> threads_;std::deque<Task> queue_;size_t maxQueueSize_;bool running_;
}

其中threadInitCallback_可由setThreadInitCallback(const Task& cb)设置,设置回调函数,每次在执行任务前先调用。在线程池开始运行之前,需要先设置任务队列的大小(调用setMaxQueueSize()),因为运行线程池时,线程会从任务队列取任务。
接下来是ThreadPool的一些接口函数。

void ThreadPool::start(int numThreads)
{assert(threads_.empty());running_ = true;threads_.reserve(numThreads);for (int i = 0; i < numThreads; ++i){char id[32];snprintf(id, sizeof id, "%d", i+1);threads_.push_back(new muduo::Thread(boost::bind(&ThreadPool::runInThread, this), name_+id));threads_[i].start();}if(numThreads == 0 && threadInitCallback_){threadInitCallback_();}
}

void ThreadPool::start(int numThreads)开启线程池,按照线程数量numThreads_创建工作线程,线程函数为ThreadPool::runInThread()。

void ThreadPool::runInThread()
{try{if(threadInitCallback_){threadInitCallback_();}while(running_){Task task(take());if(task){task();}}}catch(const Exception& ex){...}}

如果设置了threadInitCallback_,则进行执行任务前的一些初始化操作。然后从任务队列中取任务执行,有可能阻塞,当任务队列为空时。

ThreadPool::Task ThreadPool::take()
{MutexLockGuard lock(mutex_);while(queue_.empty() && running_){notEmpty_.wait();}Task task;if(!queue_.empty()){task = queue_.front();queue_.pop_front();if(maxQueueSize_ > 0){notFull_.notify();}}return task;
}

多线程从消息队列中取任务的时候,需要加锁保护。等到队列非空信号,就取任务。取出之后,便告知任务队列已经非满,可以继续添加任务。

void ThreadPool::run(const Task& task)
{if(threads_.empty()){task();}else{MutexLockGuard lock(mutex_);while(isFull()){notFull_.wait();}assert(!isFull());queue_.push_back(task);notEmpty_.notfy();}
}

如果ThreadPool没有子线程(set和start操作在run之前),就在主线程中执行该task,否则,将任务加入到队列,并通知线程从中取task,如果队列已满,便等待。

ThreadPool::~ThreadPool()
{if(running_){stop();}
}
void ThreadPool::stop()
{{MutexLockGuard lock(mutex_);running_ = false;notEmpty_.notifyAll();}for_each(threads_.begin(),threads_.end(),boost::bind(&muduo::Thread::join, _1));
}

最后是ThreadPool的析构函数,在其中调用stop(),唤醒所有等待的线程,然后对线程池中的每一个线程执行join()。

总结

以上就是muduo中Thread和ThreadPool类的学习,有很多源码,有点啰嗦。但是,在muduo的one loop per thread + thread pool模型中,Thread和ThreadPool是很重要的组件,所以需要深入地掌握。

转载于:https://www.cnblogs.com/ym65536/p/8284205.html

[zz]muduo源码阅读之Thread和ThreadPool相关推荐

  1. mysql thread conn_MySQL源码阅读2-连接与线程管理

    本篇是第二篇,MySQL初始化完成之后,便进入一个死循环中,接受客户端请求,并完成客户端的命令(如果在window下启动多个listener,则分别启动线程监听).该篇介绍MySQL服务中的连接与线程 ...

  2. 应用监控CAT之cat-client源码阅读(一)

    CAT 由大众点评开发的,基于 Java 的实时应用监控平台,包括实时应用监控,业务监控.对于及时发现线上问题非常有用.(不知道大家有没有在用) 应用自然是最初级的,用完之后,还想了解下其背后的原理, ...

  3. muduo源码client/server通信流程

    今天来学习一下muduo源码中client和server间的大致通信流程,以echo服务为例,先看一下echo对面的main函数代码. #include "examples/simple/e ...

  4. 12.源码阅读(app启动流程-android api 26)

    activity的启动流程之前已经通过源码了解了,那么app的启动流程是怎样的,从我们按下app的图标,到应用启动起来显示出画面,中间都经历了什么? 安卓是基于java的,所以和java有一定的相似性 ...

  5. Java8 ArrayBlockingQueue 源码阅读

    一.什么是 ArrayBlockingQueue ArrayBlockingQueue 是 GUC(java.util.concurrent) 包下的一个线程安全的阻塞队列,底层使用数组实现. 除了线 ...

  6. 多线程与高并发(四):LockSupport,高频面试题,AQS源码,以及源码阅读方法论

    补充几道面试题 锁升级过程:无锁.偏向锁.轻量级锁.重量级锁 StampedLock 自己看一下 面试题:syn和Reentrantlock的区别? LockSupport LockSupport.p ...

  7. zookeeper 源码阅读(2)

    接着zookeeper 源码阅读(1) Zookeeper服务器的启动,大致可以分为以下五个步骤 1. 配置文件解析. 2. 初始化数据管理器. 3. 初始化网络I/O管理器. 4. 数据恢复. 5. ...

  8. surefire 拉起 junit 单元测试类 源码阅读(一)

    根据surefire 拉起Junit单元测试类 输出的报错日志 跟踪执行过程: 日志1: java.lang.reflect.InvocationTargetExceptionat sun.refle ...

  9. DotText源码阅读(7) --Pingback/TrackBack

    DotText源码阅读(7) --Pingback/TrackBack 博客这种服务的区别于论坛和所谓文集网站,很大程度上我认为是由于pingback/trackback的存在,使得博客这种自媒体有可 ...

  10. AQS源码阅读笔记(一)

    AQS源码阅读笔记 先看下这个类张非常重要的一个静态内部类Node.如下: static final class Node {//表示当前节点以共享模式等待锁static final Node SHA ...

最新文章

  1. 独家 | 一文读懂随机森林的解释和实现(附python代码)
  2. poj 2947 Widget Factory
  3. FreeSWITCH中文语音包
  4. vim树形目录NERDTree
  5. 160804、oracle查询:取出每组中的第一条记录
  6. SpringBoot怎么直接访问templates下的html页面
  7. 1074. 元素和为目标值的子矩阵数量
  8. 在多线程数据平面开发套件(DPDK)应用程序中优化内存使用
  9. 人为什么必须积极有为?
  10. 内涵! 程序员才懂的动图(太 TM 形象了)
  11. 单片机课程设计—简易频率计—课程设计任务书
  12. html浮动提示框,JavaScript浮动提示框Tooltip效果
  13. IB心理学生物分析模块
  14. oracle18c创建数据库,Oracle 18C 手工创建CDB
  15. Android jetpack DataBinding 与RecyclerView
  16. 随机森林 Iris 特征重要性
  17. 2022第十二届中国电子文件管理论坛嘉宾揭晓
  18. Mac tips - 打开【键盘重复按键】功能
  19. 超全面的前端切图技巧,读这篇就够了
  20. 微信支付可以设置0元么?

热门文章

  1. STM32的备份寄存器和控制状态寄存器
  2. 转载~final, static和 nested class 总结 原文~http://yulin10.bokee.com/2544792.html
  3. display:none与visibility:hidden的区别 ,还有html5的新属性hidden
  4. liunx 下mysql 的安装
  5. Thinking in Java 16.3返回一个数组
  6. Python 文件(文件夹)匹配(glob模块)(转载)
  7. 无锁同步-C++11之Atomic和CAS
  8. MySQL error(2006) server has gone away
  9. 什么软件可以让头发变黑_头发特别干枯毛躁,请问什么方法可以让头发恢复到顺滑状态?...
  10. oracle增加表字段_史上最详细的oracle 中的CR块介绍--一致性读