一、服务调用

首先服务消费者通过代理对象 Proxy 发起远程调用,接着通过网络客户端 Client 将编码后的请求发送给服务提供方的网络层上,也就是 Server。Server 在收到请求后,首先要做的事情是对数据包进行解码。然后将解码后的请求发送至分发器 Dispatcher,再由分发器将请求派发到指定的线程池上,最后由线程池调用具体的服务。这就是一个远程调用请求的发送与接收过程。

那么在dubbo中请求是如何派发的?以及线程模型是什么样的那?

二、I/O线程和业务线程分离

  • 如果事件处理的逻辑能迅速完成,并且不会发起新的 IO请求,比如只是在内存中记个标识,则直接在 IO线程上处理更快,因为减少了线程池调度。

  • 但如果事件处理逻辑较慢,或者需要发起新的 IO 请求,比如需要查询数据库,则必须派发到线程池,否则 IO 线程阻塞,将导致不能接收其它请求。

  • 如果用 IO 线程处理事件,又在事件处理过程中发起新的 IO 请求,比如在连接事件中发起登录请求,会报“可能引发死锁”异常,但不会真死锁。

所以在真实的业务场景中是需要将业务线程和I/O线程进行分离处理的。dubbo作为一个服务治理框架,底层的采用Netty作为网络通信的组件,在请求派发的时候支持不同的派发策略。

参考文章:www.cnblogs.com/my_life/art…

三、请求派发策略

连接建立

从官方描述来看,duboo支持五种派发策略,下面看下是如何实现的。以Ntty4.x为例:

  1. NettyServer

    public NettyServer(URL url, ChannelHandler handler) throws RemotingException {super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));}
    复制代码
  2. ChannelHandlers#wrapInternal
    protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {// 选择调度策略 默认是allreturn new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url))); }
    复制代码

    在NettyServer的构造方法中通过ChannelHandlers#wrap方法设置MultiMessageHandlerHeartbeatHandler并通过SPI扩展选择调度策略。

  3. NettyServer#doOpen
protected void doOpen() throws Throwable {bootstrap = new ServerBootstrap();// 多线程模型// boss线程池,负责和消费者建立新的连接bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));// worker线程池,负责连接的数据交换workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),new DefaultThreadFactory("NettyServerWorker", true));final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);channels = nettyServerHandler.getChannels();bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) // nagele 算法.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)// TIME_WAIT.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) //内存池.childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug.addLast("decoder", adapter.getDecoder()) //设置编解码器.addLast("encoder", adapter.getEncoder()).addLast("handler", nettyServerHandler);}});// bind 端口ChannelFuture channelFuture = bootstrap.bind(getBindAddress());channelFuture.syncUninterruptibly();channel = channelFuture.channel();}
复制代码

设置Netty的boss线程池数量为1,worker线程池(也就是I/O线程)为cpu核心数+1和向Netty中注测Handler用于消息的编解码和处理。

如果我们在一个JVM进程只暴露一个Dubbo服务端口,那么一个JVM进程只会有一个NettyServer实例,也会只有一个NettyHandler实例。并且设置了三个handler,用来处理编解码、连接的创建、消息读写等。在dubbo内部定义了一个ChannelHandler用来和Netty的Channel关联起来,通过上述的代码会发现NettyServer本身也是一个ChannelHandler。通过NettyServer#doOpen暴露服务端口后,客户端就能和服务端建立连接了,而提供者在初始化连接后会调用NettyHandler#channelActive方法来创建一个NettyChannel

  1. NettyChannel
public void channelActive(ChannelHandlerContext ctx) throws Exception {logger.debug("channelActive <" + NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()) + ">" + " channle <" + ctx.channel());//获取或者创建一个NettyChannelNettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);try {if (channel != null) {// <ip:port, channel>channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()), channel);}// 这里的 handler就是NettyServerhandler.connected(channel);} finally {NettyChannel.removeChannelIfDisconnected(ctx.channel());}}
复制代码

与Netty和Dubbo都有自己的ChannelHandler一样,Netty和Dubbo也有着自己的Channel。该方法最后会调用NettyServer#connected方法来检查新添加channel后是否会超出提供者配置的accepts配置,如果超出,则直接打印错误日志并关闭该Channel,这样的话消费者端自然会收到连接中断的异常信息,详细可以见AbstractServer#connected方法。

  1. AbstractServer#connected
public void connected(Channel ch) throws RemotingException {// If the server has entered the shutdown process, reject any new connectionif (this.isClosing() || this.isClosed()) {logger.warn("Close new channel " + ch + ", cause: server is closing or has been closed. For example, receive a new connect request while in shutdown process.");ch.close();return;}Collection<Channel> channels = getChannels();//大于accepts的tcp连接直接关闭if (accepts > 0 && channels.size() > accepts) { logger.error("Close channel " + ch + ", cause: The server " + ch.getLocalAddress() + " connections greater than max config " + accepts);ch.close();return;}super.connected(ch);}
复制代码
  • 在dubbo中消费者和提供者默认只建立一个TCP长连接(详细代码请参考官网源码导读,服务引用一节),为了增加消费者调用服务提供者的吞吐量,可以在消费方增加如下配置:
<dubbo:reference id="demoService" check="false" interface="org.apache.dubbo.demo.DemoService" connections="20"/>
复制代码
  • 提供者可以使用accepts控制长连接的数量防止连接数量过多,配置如下:
<dubbo:protocol name="dubbo" port="20880" accepts="10"/>
复制代码

请求接收

当连接建立完成后,消费者就可以请求提供者的服务了,当请求到来,提供者这边会依次经过如下Handler的处理:

--->NettyServerHandler#channelRead:接收请求消息。

--->AbstractPeer#received:如果服务已经关闭,则返回,否则调用下一个Handler来处理。

--->MultiMessageHandler#received:如果是批量请求,则依次对请求调用下一个Handler来处理。

--->HeartbeatHandler#received: 处理心跳消息。

--->AllChannelHandler#received:该Dubbo的Handler非常重要,因为从这里是IO线程池和业务线程池的隔离。

--->DecodeHandler#received: 消息解码。

--->HeaderExchangeHandler#received:消息处理。

--->DubboProtocol : 调用服务。

  1. AllChannelHandler#received
public void received(Channel channel, Object message) throws RemotingException {// 获取业务线程池ExecutorService cexecutor = getExecutorService();try {// 使用线程池处理消息cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));} catch (Throwable t) {throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);}}
复制代码

这里对execute进行了异常捕获,这是因为I/O线程池是无界的,但业务线程池可能是有界的,所以进行execute提交可能会遇到RejectedExecutionException异常 。

那么这里是如何获取到业务线程池的那?实际上WrappedChannelHandlerxxxChannelHandlerd的装饰类,根据dubbo spi可以知道,获取AllChannelHandler会首先实例化WrappedChannelHandler

  1. WrappedChannelHandler
public WrappedChannelHandler(ChannelHandler handler, URL url) {this.handler = handler;this.url = url;// 获取业务线程池executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) {componentKey = Constants.CONSUMER_SIDE;}DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();dataStore.put(componentKey, Integer.toString(url.getPort()), executor);}复制代码

线程模型

  1. FixedThreadPool
public class FixedThreadPool implements ThreadPool {@Overridepublic Executor getExecutor(URL url) {// 线程池名称DubboServerHanler-server:portString name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);// 缺省线程数量200int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);// 任务队列类型int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,queues == 0 ? new SynchronousQueue<Runnable>() :(queues < 0 ? new LinkedBlockingQueue<Runnable>(): new LinkedBlockingQueue<Runnable>(queues)),new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));}}
复制代码

缺省情况下使用200个线程和SynchronousQueue这意味着如果如果线程池所有线程都在工作再有新任务会直接拒绝。

  1. CachedThreadPool
public class CachedThreadPool implements ThreadPool {@Overridepublic Executor getExecutor(URL url) {String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);// 核心线程数量 缺省为0int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);// 最大线程数量 缺省为Integer.MAX_VALUEint threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);// queue 缺省为0int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);// 空闲线程存活时间int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,queues == 0 ? new SynchronousQueue<Runnable>() :(queues < 0 ? new LinkedBlockingQueue<Runnable>(): new LinkedBlockingQueue<Runnable>(queues)),new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));}
}
复制代码

缓存线程池,可以看出如果提交任务的速度大于maxThreads将会不断创建线程,极端条件下将会耗尽CPU和内存资源。在突发大流量进入时不适合使用。

  1. LimitedThreadPool
public class  LimitedThreadPool implements ThreadPool {@Overridepublic Executor getExecutor(URL url) {String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);// 缺省核心线程数量为0int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);// 缺省最大线程数量200int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);// 任务队列缺省0int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,queues == 0 ? new SynchronousQueue<Runnable>() :(queues < 0 ? new LinkedBlockingQueue<Runnable>(): new LinkedBlockingQueue<Runnable>(queues)),new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));}}
复制代码

不配置的话和FixedThreadPool没有区别。

  1. EagerThreadPool
public class EagerThreadPool implements ThreadPool {@Overridepublic Executor getExecutor(URL url) {String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);// 0int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);// Integer.MAX_VALUEint threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);// 0int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);// 60sint alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);// init queue and executor// 初始任务队列为1TaskQueue<Runnable> taskQueue = new TaskQueue<Runnable>(queues <= 0 ? 1 : queues);EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,threads,alive,TimeUnit.MILLISECONDS,taskQueue,new NamedInternalThreadFactory(name, true),new AbortPolicyWithReport(name, url));taskQueue.setExecutor(executor);return executor;}
}
复制代码

EagerThreadPoolExecutor

public void execute(Runnable command) {if (command == null) {throw new NullPointerException();}// do not increment in method beforeExecute!//已提交任务数量submittedTaskCount.incrementAndGet();try {super.execute(command);} catch (RejectedExecutionException rx) { //大于最大线程数被拒绝任务 重新添加到任务队列// retry to offer the task into queue.final TaskQueue queue = (TaskQueue) super.getQueue();try {if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {submittedTaskCount.decrementAndGet();throw new RejectedExecutionException("Queue capacity is full.", rx);}} catch (InterruptedException x) {submittedTaskCount.decrementAndGet();throw new RejectedExecutionException(x);}} catch (Throwable t) {// decrease any waysubmittedTaskCount.decrementAndGet();throw t;}}
复制代码

TaskQueue

public boolean offer(Runnable runnable) {if (executor == null) {throw new RejectedExecutionException("The task queue does not have executor!");}// 获取当前线程池中的线程数量int currentPoolThreadSize = executor.getPoolSize();// have free worker. put task into queue to let the worker deal with task.// 如果已经提交的任务数量小于当前线程池中线程数量(不是很理解这里的操作)if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {return super.offer(runnable);}// return false to let executor create new worker.//当前线程数小于最大线程程数直接创建新workerif (currentPoolThreadSize < executor.getMaximumPoolSize()) {return false;}// currentPoolThreadSize >= maxreturn super.offer(runnable);}
复制代码

优先创建Worker线程池。在任务数量大于corePoolSize但是小于maximumPoolSize时,优先创建Worker来处理任务。当任务数量大于maximumPoolSize时,将任务放入阻塞队列中。阻塞队列充满时抛出RejectedExecutionException。(相比于cached:cached在任务数量超过maximumPoolSize时直接抛出异常而不是将任务放入阻塞队列)。

根据以上的代码分析,如果消费者的请求过快很有可能导致服务提供者业务线程池抛出RejectedExecutionException异常。这个异常是duboo的采用的线程拒绝策略AbortPolicyWithReport#rejectedExecution抛出的,并且会被反馈到消费端,此时简单的解决办法就是将提供者的服务调用线程池数目调大点,例如如下配置:

<dubbo:provider threads="500"/>
或
<dubbo:protocol name="dubbo" port="20882" accepts="10" threads="500"/>
复制代码

为了保证模块内的主要服务有线程可用(防止次要服务抢占过多服务调用线程),可以对次要服务进行并发限制,例如:

<dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoService" executes="100"/>
复制代码

dubbo的dispatcher 策略默认是all,实际上比较好的处理方式是I/O线程和业务线程分离,所以采取message是比较好得配置。并且如果采用all如果使用的dubo版本比较低很有可能会触发dubbo的bug。一旦业务线程池满了,将抛出执行拒绝异常,将进入caught方法来处理,而该方法使用的仍然是业务线程池,所以很有可能这时业务线程池还是满的,导致下游的一个HeaderExchangeHandler没机会调用,而异常处理后的应答消息正是HeaderExchangeHandler#caught来完成的,所以最后NettyHandler#writeRequested没有被调用,Consumer只能死等到超时,无法收到Provider的线程池打满异常(2.6.x已经修复该问题)。

  • 推荐配置
<dubbo:protocol name="dubbo" port="8888" threads="500" dispatcher="message" />
复制代码

参考文章:manzhizhen.iteye.com/blog/239117…

转载于:https://juejin.im/post/5ce26547e51d4510be453efa

Dubbo线程模型和调度策略相关推荐

  1. 【Java从0到架构师】Dubbo 基础 - 设置启动时检查、直接提供者、线程模型、负载均衡、集群容错、服务降级

    Dubbo 分布式 RPC 分布式核心基础 分布式概述 RPC Dubbo Dubbo 入门程序 - XML.注解 部署管理控制台 Dubbo Admin 修改绑定的注册 IP 地址 设置启动时检查 ...

  2. 如何使用Arthas定位线上 Dubbo 线程池满异常

    点击上方蓝色"程序猿DD",选择"设为星标" 回复"资源"获取独家整理的学习资料! 来源 | 公众号「Kirito的技术分享」 前言 本文是 ...

  3. Arthas | 定位线上 Dubbo 线程池满异常

    作者 | 徐靖峰  阿里云高级开发工程师 前言 Dubbo 线程池满异常应该是大多数 Dubbo 用户都遇到过的一个问题,本文以 Arthas 3.1.7 版本为例,介绍如何针对该异常进行诊断,主要使 ...

  4. netty worker线程数量_Dubbo线程模型

    Dubbo中线程池的应用还是比较广泛的,按照consumer端到provider的RPC的方向来看,consumer端的应用业务线程到netty线程.consuemr端dubbo业务线程池,到prov ...

  5. 记一次线上压测Dubbo线程池队列满的问题

    本文记录一次线上全链路压测出现的Dubbo线程池队列满的问题. 1 问题描述 线上做全链路压测,其中涉及三个系统,调用关系A->B->C,均是dubbo调用.压测的时候C出现CPU满导致服 ...

  6. 面试官:Netty的线程模型可不是Reactor这么简单

    笔者看来Netty的内核主要包括如下图三个部分: 其各个核心模块主要的职责如下: 内存管理 主要提高高效的内存管理,包含内存分配,内存回收. 网通通道 复制网络通信,例如实现对NIO.OIO等底层JA ...

  7. netty的使用场景,线程模型以及如何在springboot中使用netty?

    文章目录 1. 为什么使用netty? 2. netty的线程模型 3. 在springboot中使用netty 4. netty的核心API解释 5. netty中的ByteBuf 1. 为什么使用 ...

  8. netty应用场景_彻底搞懂 netty 线程模型

    编者注:Netty是Java领域有名的开源网络库,特点是高性能和高扩展性,因此很多流行的框架都是基于它来构建的,比如我们熟知的Dubbo.Rocketmq.Hadoop等.本文就netty线程模型展开 ...

  9. Dubbo线程池耗尽问题

    场景:dubbo 线程池耗尽,报错. Caused by: java.util.concurrent.RejectedExecutionException: Thread pool is EXHAUS ...

最新文章

  1. linux 网络管理器未运行怎么解决,Ubuntu下提示网络管理器未运行解决方法
  2. 优化内核报错及解决方法
  3. 42. Vue、React 等前端项目部署,刷新 404 问题解决方案
  4. cfb为什么不需要填充_为什么很多高中生数学成绩不理想,需要补课?因为不熟练啊!...
  5. 发挥游戏人工智能的最大价值:线程化
  6. 华硕vm510l拆电池图解_图解说设备:凯斯CX80C你会买吗?
  7. 专题导读:教育大数据
  8. 中兴ZTE ZXR10系列交换机2818S固件以及更新方法
  9. Jquery checkbox选中问题
  10. 【TWVRP】基于matlab粒子群算法求解带时间窗的车辆路径规划问题【含Matlab源码 334期】
  11. MYSQL时间函数之NOW()
  12. js Date 获取 年 月 日
  13. 计算机编程教育资源,风变编程以科技实现教育普惠,俱进教育公平
  14. GEA 1.7 工具及资产管道
  15. CAD高版本转低版本的方法有哪些?你一定用的到哦
  16. 小游戏的processing实现
  17. JMeter逻辑控制器 详解
  18. 七夕专属程序员的浪漫
  19. 【分享】QY-IMX8M主板简介
  20. CH340有线USB转串与CH9140无线蓝牙转串

热门文章

  1. 可以从max中导出静态模型并渲染了。
  2. 严正声明:微信上假冒“科研星球”公众号,请勿关注
  3. 代谢组学的相关分析数据库,MetaboAnalyst 5.0 使用指南
  4. Matlab学习一本通,matlab基础教程
  5. linux解压后缀为.xz,xz后缀名文件解压方法
  6. python3 scrapy 教程_Scrapy 教程
  7. Matlab | matlab中@的用法总结(附matlab测试代码):What does “@“ do ?
  8. 简述hdfs工作原理_hdfs工作机制和原理 简述hdfs的原理
  9. 乒乓球比赛赛程_国乒今年最后一站比赛延期!赛程缩短比赛地温暖,教练组考察队员...
  10. 文章页点赞php代码,wordpress文章页面添加点赞功能