Dubbo中线程池的应用还是比较广泛的,按照consumer端到provider的RPC的方向来看,consumer端的应用业务线程到netty线程、consuemr端dubbo业务线程池,到provider端的netty boss线程、worker线程和dubbo业务线程池等。这些线程各司其职相互配合,共同完成dubbo RPC服务调用,理解dubbo线程模型对于学习Dubbo原理很有帮助。

dubbo线程模型包括线程模型策略和dubbo线程池策略两个方面,下面就依次进行分析。

dubbo线程模型

Dubbo默认的底层网络通信使用的是Netty,服务提供方NettyServer使用两级线程池,其中EventLoopGroup(boss)主要用来接收客户端的链接请求,并把完成TCP三次握手的连接分发给EventLoopGroup(worker)来处理,注意把boss和worker线程组称为I/O线程,前者处理IO连接事件,后者处理IO读写事件。

设想下,dubbo provider端的netty IO线程是如何处理业务逻辑呢?如果处理逻辑较为简单,并且不会发起新的I/O请求,那么直接在I/O线程上处理会更快,因为这样减少了线程池调度与上下文切换的开销,毕竟线程切换还是有一定成本的。如果逻辑较为复杂,或者需要发起网络通信,比如查询数据库,则I/O线程必须派发请求到新的线程池进行处理,否则I/O线程会被阻塞,导致处理IO请求效率降低。

那Dubbo是如何做的呢?

Dubbo中根据请求的消息类是直接被I/O线程处理还是被业务线程池处理,Dubbo提供了下面几种线程模型:

  • all(AllDispatcher类):所有消息都派发到业务线程池,这些消息包括请求、响应、连接事件、断开事件等,响应消息会优先使用对于请求所使用的线程池。
  • direct(DirectDispatcher类):所有消息都不派发到业务线程池,全部在IO线程上直接执行。
  • message(MessageOnlyDispatcher类):只有请求响应消息派发到业务线程池,其他消息如连接事件、断开事件、心跳事件等,直接在I/O线程上执行。
  • execution(ExecutionDispatcher类):只把请求类消息派发到业务线程池处理,但是响应、连接事件、断开事件、心跳事件等消息直接在I/O线程上执行。
  • connection(ConnectionOrderedDispatcher类):在I/O线程上将连接事件、断开事件放入队列,有序地逐个执行,其他消息派发到业务线程池处理。

dubbo线程池可选模型较多,下面以DirectDispatcher类进行分析,其他流程类似就不在赘述。

public class DirectChannelHandler extends WrappedChannelHandler {@Overridepublic void received(Channel channel, Object message) throws RemotingException {ExecutorService executor = getPreferredExecutorService(message);if (executor instanceof ThreadlessExecutor) {try {executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));} catch (Throwable t) {throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);}} else {handler.received(channel, message);}}
}

DirectDispatcher类重写了received方法,注意 ThreadlessExecutor 被应用在调用 future.get() 之前,先调用 ThreadlessExecutor.wait(),wait 会使业务线程在一个阻塞队列上等待,直到队列中被加入元素。很明显,provider侧调用getPreferredExecutorService(message)返回的不是ThreadlessExecutor,所以会在当前IO线程执行执行。

其他事件,比如连接、异常、断开等,都是在WrappedChannelHandler中默认实现:执行在当前IO线程中执行的,代码如下:

@Override
public void connected(Channel channel) throws RemotingException {handler.connected(channel);
}
@Override
public void disconnected(Channel channel) throws RemotingException {handler.disconnected(channel);
}
@Override
public void sent(Channel channel, Object message) throws RemotingException {handler.sent(channel, message);
}
@Override
public void caught(Channel channel, Throwable exception) throws RemotingException {handler.caught(channel, exception);
}

dubbo线程模型策略

了解了dubbo线程模型之后,小伙伴是不是该问:

既然有那么多的线程模型策略,dubbo线程模型具体使用的是什么策略呢?

从netty启动流程来看,初始化NettyServer时会进行加载具体的线程模型,代码如下:

public NettyServer(URL url, ChannelHandler handler) throws RemotingException {super(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME), ChannelHandlers.wrap(handler, url));
}
public static ChannelHandler wrap(ChannelHandler handler, URL url) {return ChannelHandlers.getInstance().wrapInternal(handler, url);
}
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url)));
}

这里根据URL里的线程模型来选择具体的Dispatcher实现类。在此,我们再提一下Dubbo提供的Dispatcher实现类,其默认的实现类是all,也就是AllDispatcher类。既然Dispatcher是通过SPI方式加载的,也就是用户可以自定义自己的线程模型,只需实现Dispatcher类然后配置选择使用自定义的Dispatcher类即可。

dubbo线程池策略

dubbo处理流程,为了尽量早地释放Netty的I/O线程,某些线程模型会把请求投递到线程池进行异步处理,那么这里所谓的线程池是什么样的线程池呢?

其实这里的线程池ThreadPool也是一个扩展接口SPI,Dubbo提供了该扩展接口的一些实现,具体如下:

  • FixedThreadPool:创建一个具有固定个数线程的线程池。
  • LimitedThreadPool:创建一个线程池,这个线程池中的线程个数随着需要量动态增加,但是数量不超过配置的阈值。另外,空闲线程不会被回收,会一直存在。
  • EagerThreadPool:创建一个线程池,在这个线程池中,当所有核心线程都处于忙碌状态时,将创建新的线程来执行新任务,而不是把任务放入线程池阻塞队列。
  • CachedThreadPool:创建一个自适应线程池,当线程空闲1分钟时,线程会被回收;当有新请求到来时,会创建新线程。

知道了这些线程池之后,那么是什么时候进行SPI加载对应的线程池实现呢?具体是在dubbo 线程模型获取对应线程池时进行SPI加载的,具体逻辑在方法 org.apache.dubbo.common.threadpool.manager.DefaultExecutorRepository#createExecutor中:

private ExecutorService createExecutor(URL url) {return (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
}
@SPI("fixed")
public interface ThreadPool {@Adaptive({THREADPOOL_KEY})Executor getExecutor(URL url);
}

从代码来看,默认的线程池策略是fixed模式的线程池,其coreSize默认为200,队列大小为0,其代码如下:

public class FixedThreadPool implements ThreadPool {@Overridepublic Executor getExecutor(URL url) {String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,queues == 0 ? new SynchronousQueue<Runnable>() :(queues < 0 ? new LinkedBlockingQueue<Runnable>(): new LinkedBlockingQueue<Runnable>(queues)),new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));}
}

注:其他线程池策略和FixedThreadPool类似,只不过线程池参数不同而已,这里不再赘述。

小结

从dubbo提供的几种线程模型和线程池策略来看,基本上能满足绝大多数场景的需求了,由于dubbo线程模型和线程池策略都是通过SPI的方式进行加载的,因此如果业务上需要,我们完全可以自定义对应的线程模型和线程池策略,只需要配置下即可。

推荐阅读

Dubbo RPC在consumer端是如何跑起来的​mp.weixin.qq.com

dubbo版的"明朝那些事儿"​mp.weixin.qq.com

责任链的2种实现方式,你更pick哪一种​mp.weixin.qq.com

网络数据是如何传递给进程的​mp.weixin.qq.com

Linux管道那些事儿​mp.weixin.qq.com

从socket api看网络通信流程​mp.weixin.qq.com

netty worker线程数量_Dubbo线程模型相关推荐

  1. 《线程管理:传递参数、确定线程数量、线程标识》

    参考<c++ Concurrency In Action >第二章做的笔记 目录 传递参数 量产线程 线程标识 传递参数 thread构造函数的附加参数会拷贝至新线程的内存空间中,即使函数 ...

  2. ios查看线程数量_iOS线程数量监控工具

    简单却强大的线程监控工具 KKThreadMonitor :当线程过多或瞬间创建大量子线程(线程爆炸),控制台就打印出所有的线程堆栈.便于分析造成子线程过多或线程爆炸的原因. /******* 线程爆 ...

  3. python获取当前线程数量_python 线程数

    python 多线程 真正的多线程吗? 对于多核处理器,在同一时间确实可以多个线程独立运行,但在Python中确不是这样的了.原因在于,python虚拟机中引入了GIL这一概念.GIL(Global ...

  4. Python 多线程总结(2)— 线程锁、线程池、线程数量、互斥锁、死锁、线程同步

    主要介绍使用 threading 模块创建线程的 3 种方式,分别为: 创建 Thread 实例函数 创建 Thread 实例可调用的类对象 使用 Thread 派生子类的方式 多线程是提高效率的一种 ...

  5. C# 开启HTTP监听服务与线程数量控制

    如何开IP和端口的HTTP监听这里就是按照实现这种方式来加以说明,另外线程不要再循环里面不停的new Thread()这样很耗性能. 开启HTTP服务监听 /// <summary>/// ...

  6. eclipse mysql 线程池_JAVA5线程池使用

    线程池是Java5提供的一个新技术,方便我们快速简洁的定义线程池.包括如下: 诸如 Web 服务器.数据库服务器.文件服务器或邮件服务器之类的许多服务器应用程序都面向处理来自某些远程来源的大量短小的任 ...

  7. java线程池1001java线程池_深入浅出Java(Android )线程池ThreadPoolExecutor

    前言 关于线程池 在Java/Android开发中,设计到并发的请求,那基本上是离不开线程池了.用线程池的好处: 1.减少线程频繁创建.销毁的开销: 2.好控制并发量,降低OOM的可能,至于原因文中会 ...

  8. threadpoolexecutor创建线程池_线程池ThreadPoolExecutor源码分析

    什么是线程池 创建线程要花费昂贵的资源和时间,如果任务来了才创建那么响应时间会变长,而且一个进程能创建的线程数量有限.为了避免这些问题,在程序启动的时候就创建若干线程来响应出来,它们被称为线程池,里面 ...

  9. 多线程、并发/并行、自定义线程类、线程安全、守护线程、定时器、线程状态、线程池

    目录 进程和线程: 进程: 线程: 多线程的好处: 线程调度: 分时调度: 抢占式调度: 并发与并行: 线程的生命周期: 实现线程的两种基本方式(还有第三种): 创建Thread线程类: 创建Runn ...

最新文章

  1. 思科交换机vlan配置
  2. 人工智能火了 高端人才成了香饽饽
  3. vs2003打开项目错误
  4. 数据库-排序-升降序-多列
  5. NYOJ-491 幸运三角形
  6. Debian,从安装到喜欢
  7. vue项目使用大华摄像头怎样初始化_海康、大华摄像头chrome高版本实时播放(java集成)...
  8. python中jieba库使用教程
  9. 机器人学导论—机器人相关术语
  10. 开放世界游戏中的大地图背后有哪些实现技术
  11. excel学习-自定义图表颜色(QQ截图+colorpix取色器)
  12. 解决ASUS P5GC-MX/1333声卡驱动不能正常安装的问题
  13. Cesium 加载地形数据
  14. 【PART 1】OAK-D+TurtleBot3机器人项目全解析:SLAM、ROS、深度图、点云。
  15. ARCore从零到一 (1) 搭建开发环境
  16. centos中startup.sh启动服务脚本
  17. VUE(11) : 图片点击全屏展示
  18. Mac系统如何取消自动播放视频和实况照片?
  19. 由 hacked by 1byte 想到的,再说两句
  20. 【计算机组成原理】门阵列译码器

热门文章

  1. frameset 后台管理_易达CMS下载-易达CMS(免费开源网站管理系统)v3.0.0.1103免费版
  2. 公共课计算机基础怎么样,公共课第一学期《计算机基础》
  3. 15年3月c语言试卷,2015年3月二级C语言新增无纸化真题试卷(三)
  4. 深度学习之基于Tensorflow2.0实现Xception网络
  5. python类中方法相互调用_python 类中方法之间的调用
  6. php odbc 分页,用php实现odbc数据分页显示一例_php技巧
  7. mysql 锁怎么使用_Mysql锁一般使用
  8. sql增删改查_Sirvia 一套web端增删改查系统
  9. 《计算机组成原理》_学习笔记(二)
  10. php引用代码_PHP引用是什么?php中引用的介绍(代码实例)