dubbo提供了自己的线程池以及派发策略,其实官方文档上讲的是比较清楚的,只需要结合着源码看下,就能明白具体的原理
dubbo官方文档地址

线程模型

用简单的总结来说,dubbo中的线程模型是有netty本身的io线程 + dubbo提供的线程池组成的,也就是说,不管是正常的请求、还是响应、还是连接请求、断开连接请求,要么是有netty的io线程来执行,要么是线程池中的线程来执行
不同的派发策略,会有不同的处理效果

派发策略以及源码

根据官网的解释,有以下五种派发策略,在自己去学习这五个策略的时候,我一直不明白是如何实现有些请求可以交给线程池去处理,有些请求会交给io线程去处理,看了源码之后,就理解了

问题:
1、首先有五个派发策略,其中有四个都继承了WrappedChannelHandler, 为什么单单direct没有继承?
2、是如何实现请求是要到线程池中?还是到io线程中执行呢?
先回答第二个问题,wrappedChannelHandler是一个handler,是会被netty执行的,所以,如果我们没有做其他处理,默认执行wrappedChannelHandler中的方法,那其实就是把请求交给了netty的io线程去执行
在dubbo中,五个dispatcher类,通过覆写wrappedChannelHandler的方法,来决定是要交给线程池执行?还是io线程执行
也就是说,如果连接事件,要交给线程池执行,那就覆写connect()方法,在子类的connect方法中,将请求交给线程池即可
如果断开连接事件,要交给io线程执行,就不需要在子类中覆写disConnect()方法

WrappedChannelHandler#getExecutorService()

这里先插一段代码,因为这段代码是前提,就是根据dubbo的自适应扩展机制以及我们的配置,来获取一个线程池,默认是fix

public WrappedChannelHandler(ChannelHandler handler, URL url) {this.handler = handler;this.url = url;//初始化线程池,根据adaptive机制,获取dubbo提供的线程池executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) {componentKey = Constants.CONSUMER_SIDE;}DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
}// 获取线程池
public ExecutorService getExecutorService() {ExecutorService cexecutor = executor;if (cexecutor == null || cexecutor.isShutdown()) {cexecutor = SHARED_EXECUTOR;}return cexecutor;
}

AllDispatcher

所有消息都派发到线程池,包括请求,响应,连接事件,断开事件,心跳等。
可以看到,这个dispatch的类,覆写了父类的所有方法,在子类的实现方法中,都是调用的cexecutor.execute()去提交的任务
getExecutorService()就是获取当前程序员配置的要使用的线程池,如果不配置,默认是fix

@Override
public void connected(Channel channel) throws RemotingException {ExecutorService cexecutor = getExecutorService();try {cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));} catch (Throwable t) {throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);}
}@Override
public void disconnected(Channel channel) throws RemotingException {ExecutorService cexecutor = getExecutorService();try {cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));} catch (Throwable t) {throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t);}
}/*** 这里是关键,服务端在接收到请求之后,会把请求交给一个线程池来处理* @param channel* @param message* @throws RemotingException*/
@Override
public void received(Channel channel, Object message) throws RemotingException {ExecutorService cexecutor = getExecutorService();try {cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));} catch (Throwable t) {//TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring//fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time outif(message instanceof Request && t instanceof RejectedExecutionException){Request request = (Request)message;if(request.isTwoWay()){String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();Response response = new Response(request.getId(), request.getVersion());response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);response.setErrorMessage(msg);channel.send(response);return;}}throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);}
}@Override
public void caught(Channel channel, Throwable exception) throws RemotingException {ExecutorService cexecutor = getExecutorService();try {cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));} catch (Throwable t) {throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);}
}

DirectDispatcher

所有消息都不派发到线程池,全部在 IO 线程上直接执行
direct更简单,干脆就没有实现directChannelHandler类,在其dispatch方法中,直接将入参的handler返回

public ChannelHandler dispatch(ChannelHandler handler, URL url) {return handler;
}

所以,我们可以理解为,direct策略,会将所有请求都交给io线程去处理

ConnectionOrderedChannelHandler

在 IO 线程上,将连接、断开事件放入队列,有序逐个执行,其它消息派发到线程池。
这个方法,只有一个比较特殊的点,其他都和上面allDispatcher差别不大;
我一直疑问这里为什么是有序逐个执行

@Override
public void connected(Channel channel) throws RemotingException {try {checkQueueLength();connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));} catch (Throwable t) {throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);}
}

原因就在这里的connectionExecutor;

public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) {super(handler, url);String threadName = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);connectionExecutor = new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(url.getPositiveParameter(Constants.CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)),new NamedThreadFactory(threadName, true),new AbortPolicyWithReport(threadName, url));  // FIXME There's no place to release connectionExecutor!queuewarninglimit = url.getParameter(Constants.CONNECT_QUEUE_WARNING_SIZE, Constants.DEFAULT_CONNECT_QUEUE_WARNING_SIZE);
}

也就是说,这里又new了一个新的线程池,线程池的核心线程数、最大线程数只有一个,其他的,就需要进入到阻塞队列中进行等待

ExecutionChannelHandler

只有请求消息派发到线程池,不含响应,响应和其它连接断开事件,心跳等消息,直接在 IO 线程上执行。

@Override
public void received(Channel channel, Object message) throws RemotingException {ExecutorService cexecutor = getExecutorService();if (message instanceof Request) {try {cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));} catch (Throwable t) {// FIXME: when the thread pool is full, SERVER_THREADPOOL_EXHAUSTED_ERROR cannot return properly,// therefore the consumer side has to wait until gets timeout. This is a temporary solution to prevent// this scenario from happening, but a better solution should be considered later.if (t instanceof RejectedExecutionException) {Request request = (Request) message;if (request.isTwoWay()) {String msg = "Server side(" + url.getIp() + "," + url.getPort()+ ") thread pool is exhausted, detail msg:" + t.getMessage();Response response = new Response(request.getId(), request.getVersion());response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);response.setErrorMessage(msg);channel.send(response);return;}}throw new ExecutionException(message, channel, getClass() + " error when process received event.", t);}} else {handler.received(channel, message);}
}

这里可以看到,覆写了received方法之后,还在方法中,判断是否是request事件,如果是请求事件,就交给dubbo的线程池去处理,反之,交给netty的handler处理

MessageOnlyChannelHandler

只有请求响应消息派发到线程池,其它连接断开事件,心跳等消息,直接在 IO 线程上执行。

@Override
public void received(Channel channel, Object message) throws RemotingException {ExecutorService cexecutor = getExecutorService();try {cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));} catch (Throwable t) {throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);}
}

这里就比较简单了,直接覆写received方法,无论是request请求,还是response请求,都交给dubbo的线程池去处理

线程池

dubbo中所提供的线程池,其实就是基于jdk的线程池,自己定义了四个线程池

cache

缓存线程池,空闲等待时间是1分钟,超过1分钟,没有任务提交,就销毁线程,设置的核心线程数是0

return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,queues == 0 ? new SynchronousQueue<Runnable>() :(queues < 0 ? new LinkedBlockingQueue<Runnable>(): new LinkedBlockingQueue<Runnable>(queues)),new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));

limit

可伸缩线程池,但池中的线程数只会增长不会收缩。只增长不收缩的目的是为了避免收缩时突然来了大流量引起的性能问题。
设置的cores默认是0,threads默认是200,空闲等待时间是无限制,可以理解为没有拉取到排队的任务,就一直等待,也不会销毁线程
所以这个线程池是只会增长,不会收缩

return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,queues == 0 ? new SynchronousQueue<Runnable>() :(queues < 0 ? new LinkedBlockingQueue<Runnable>(): new LinkedBlockingQueue<Runnable>(queues)),new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));

fix

核心线程数、最大线程数都是200,空闲等待时间是0

// 这里的threads默认是200
return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,queues == 0 ? new SynchronousQueue<Runnable>() :(queues < 0 ? new LinkedBlockingQueue<Runnable>(): new LinkedBlockingQueue<Runnable>(queues)),new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));

eager

TODO

dubbo的线程模型、派发策略、线程池策略相关推荐

  1. Reactor三种线程模型与Netty线程模型

    一.Reactor三种线程模型 1.1.单线程模型 单个线程以非阻塞IO或事件IO处理所有IO事件,包括连接.读.写.异常.关闭等等.单线程Reactor模型基于同步事件分离器来分发事件,这个同步事件 ...

  2. [并发并行]_[线程模型]_[Pthread线程使用模型之一管道Pipeline]

    场景 1.经常在Windows, MacOSX 开发C多线程程序的时候, 经常需要和线程打交道, 如果开发人员的数量不多时, 同时掌握Win32和pthread线程 并不是容易的事情, 而且使用Win ...

  3. netty worker线程数量_Dubbo线程模型

    Dubbo中线程池的应用还是比较广泛的,按照consumer端到provider的RPC的方向来看,consumer端的应用业务线程到netty线程.consuemr端dubbo业务线程池,到prov ...

  4. netty应用场景_彻底搞懂 netty 线程模型

    编者注:Netty是Java领域有名的开源网络库,特点是高性能和高扩展性,因此很多流行的框架都是基于它来构建的,比如我们熟知的Dubbo.Rocketmq.Hadoop等.本文就netty线程模型展开 ...

  5. reactor线程模型_面试一文搞定JAVA的网络IO模型

    1,最原始的BIO模型 该模型的整体思路是有一个独立的Acceptor线程负责监听客户端的链接,它接收到客户端链接请求之后为每个客户端创建一个新的线程进行链路处理,处理完成之后,通过输出流返回应答给客 ...

  6. netty socket超时设置_彻底搞懂 netty 线程模型

    编者注:Netty是Java领域有名的开源网络库,特点是高性能和高扩展性,因此很多流行的框架都是基于它来构建的,比如我们熟知的Dubbo.Rocketmq.Hadoop等.本文就netty线程模型展开 ...

  7. Netty和RPC框架线程模型分析

    <Netty 进阶之路>.<分布式服务框架原理与实践>作者李林锋深入剖析Netty和RPC框架线程模型.李林锋已在 InfoQ 上开设 Netty 专题持续出稿,感兴趣的同学可 ...

  8. Netty 和 RPC 框架线程模型分析

    https://www.infoq.cn/article/9Ib3hbKSgQaALj02-90y 1. 背景 1.1 线程模型的重要性 对于 RPC 框架而言,影响其性能指标的主要有三个要素: I/ ...

  9. 面试官:Netty的线程模型可不是Reactor这么简单

    笔者看来Netty的内核主要包括如下图三个部分: 其各个核心模块主要的职责如下: 内存管理 主要提高高效的内存管理,包含内存分配,内存回收. 网通通道 复制网络通信,例如实现对NIO.OIO等底层JA ...

  10. 【Netty】Netty 简介 ( 原生 NIO 弊端 | Netty 框架 | Netty 版本 | 线程模型 | 线程 阻塞 IO 模型 | Reactor 模式引入 )

    文章目录 一. NIO 原生 API 弊端 二. Netty 简介 三. Netty 架构 四. Netty 版本 五. Netty 线程模型 六. 阻塞 IO 线程模型 七. 反应器 ( React ...

最新文章

  1. JavaScript函数式编程学习
  2. 关于MYSQL日期 字符串 时间戳互转
  3. Unity 4.3 2D 教程:新手上路
  4. 浅谈接口对前后端测试的意义
  5. 计算机社团技术部部长述职报告,社团部长个人工作总结(精选6篇)
  6. Linux FrameBuffer操作(二十七)
  7. h5小游戏构建架设h5棋牌平台开发制作
  8. ecshop修改模板可输出php代码,修改ecshop模板体会
  9. iBeacon技术解析
  10. html画布里面画圆,html5 canvas 画布画圆
  11. 各版本sqlserver下载地址
  12. Scancode到Keycode的映射
  13. 苹果cms影视系统成品站打包+电影先生6.1.1模板优化版+15W+数据
  14. C++学习15:C++模板的参数
  15. 计算机社团展示ppt,学生社团管理系统.ppt
  16. 卸载重装 Windows 10 内置应用的最全方法,还你一个干净清爽的系统
  17. 怎么看待互联网正在回暖这一说法?
  18. PHP高级工程面试题汇总(2018.05)
  19. ASP.NET Web Form学习
  20. Signal Tap II使用

热门文章

  1. arcgis 合并名字相同的要素_【转】ArcGIS中各种合并要素(Union、Merge、Append、Dissolve)的异同点分析...
  2. java为什么要连接Mysql_为什么要启动mysql workbech,java才能连接mysql数据库呢?
  3. 485.最大连续1的个数
  4. 193.有效电话号码
  5. java 内省 反射_java 反射与内省
  6. 并发编程----AQS架构
  7. 深度学习笔记(四):循环神经网络的概念,结构和代码注释
  8. 深度学习中的激活函数导引
  9. MATLAB中保存eps文件的正确做法 | 保留颜色
  10. uni的numberbox怎么用_jQuery EasyUI表单插件Numberbox数字框