Dubbo thread pool is exhausted

生产环境真实发生的故障,在业务高峰期 某个业务接口频繁报Dubbo 线程池资源耗尽的问题,后面经过反复的排查,确定问题的原因是由于数据库表没有创建索引,导致慢sql 影响查询速度,高峰期并发量又比较大,直接将线程池资源耗尽,系统奔溃无法提供服务。


Dubbo 线程池策略 (官网地址: https://dubbo.apache.org/zh/docsv2.7/dev/impls/threadpool/ )

Dubbo 消息派发机制 (官网地址: https://dubbo.apache.org/zh/docsv2.7/dev/impls/dispatcher/ )



/** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements.  See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package org.apache.dubbo.common.threadpool;import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.Adaptive;
import org.apache.dubbo.common.extension.SPI;
import org.apache.dubbo.common.threadpool.support.fixed.FixedThreadPool;import java.util.concurrent.Executor;import static org.apache.dubbo.common.constants.CommonConstants.THREADPOOL_KEY;/*** ThreadPool * FixedThreadPool.NAME = "fixed" (即SPI机制默认使用 固定线程池)*/
public interface ThreadPool {/*** Thread pool** @param url URL contains thread parameter* @return thread pool*/@Adaptive({THREADPOOL_KEY})Executor getExecutor(URL url);}


package org.apache.dubbo.common.threadpool.support.fixed;import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.threadlocal.NamedInternalThreadFactory;
import org.apache.dubbo.common.threadpool.ThreadPool;
import org.apache.dubbo.common.threadpool.support.AbortPolicyWithReport;import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_QUEUES;
import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_THREADS;
import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_THREAD_NAME;
import static org.apache.dubbo.common.constants.CommonConstants.QUEUES_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.THREADS_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.THREAD_NAME_KEY;/*** Creates a thread pool that reuses a fixed number of threads** @see java.util.concurrent.Executors#newFixedThreadPool(int)*/
public class FixedThreadPool implements ThreadPool {public static final String NAME = "fixed";@Overridepublic Executor getExecutor(URL url) {String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);// 线程池的大小默认 200 DEFAULT_THREADS = 200int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,queues == 0 ? new SynchronousQueue<Runnable>() :(queues < 0 ? new LinkedBlockingQueue<Runnable>(): new LinkedBlockingQueue<Runnable>(queues)),new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));}}


Dubbo 框架内部默认实现了4中线程池,分别为:

  1. 固定大小线程池 默认大小200
  2. 可缓存的线程池 核心线程数0 最大线程数 Integer.MAX_VALUE, 阻塞队列大小默认为0 (这个很有意思 后面再做研究)
  3. 受限的线程池 核心线程数0 最大线程数 Integer.MAX_VALUE, 阻塞队列大小默认为0
  4. eager 当线程池核心线程达到阈值时,新任务不会放入队列 而是开启新线程进行处理


消息派发机制体现在Dubbo 的服务端,当netty接受到客户端的请求后,将请求消息派发到自定义的线程池中,派发的规则就是消息派发机制。在dubbo 服务端的处理程序中处理netty 自带的Reactor模型(主从Reactor服务器模型),即接受请求、处理请求分别利用boss线程、worker线程(也叫IO线程)进行处理。dubbo的消息派发处于workder线程的下游,我把它叫做业务线程处理机制。



package org.apache.dubbo.remoting;import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.Adaptive;
import org.apache.dubbo.common.extension.SPI;
import org.apache.dubbo.remoting.transport.dispatcher.all.AllDispatcher;/*** ChannelHandlerWrapper (SPI, Singleton, ThreadSafe) 单例且线程安全* 默认使用 ALL的派发机制,即所有请求都放入到 业务线程进行处理*/
public interface Dispatcher {/*** dispatch the message to threadpool.** @param handler* @param url* @return channel handler*/@Adaptive({Constants.DISPATCHER_KEY, "dispather", "channel.handler"})// The last two parameters are reserved for compatibility with the old configurationChannelHandler dispatch(ChannelHandler handler, URL url);}


package org.apache.dubbo.remoting.transport.dispatcher.all;import org.apache.dubbo.common.URL;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.Dispatcher;/*** default thread pool configure*/
public class AllDispatcher implements Dispatcher {public static final String NAME = "all";@Overridepublic ChannelHandler dispatch(ChannelHandler handler, URL url) {// 使用 AllChannelHandler 进行消息处理return new AllChannelHandler(handler, url);}}






package org.example.api.day02;import org.example.api.RpcResult;import javax.validation.Valid;
import javax.validation.constraints.NotNull;public interface IQuery {RpcResult<String> query(@Valid @NotNull(message ="args 不能为空") String args);


package org.example.provider.day02;import org.apache.dubbo.config.annotation.DubboService;
import org.example.api.RpcResult;
import org.example.api.day02.IQuery;import java.util.concurrent.TimeUnit;@DubboService(version = "1.0.0",validation = "true",timeout = 2000)
public class QueryProvider implements IQuery {@Overridepublic RpcResult<String> query(String args) {// 模拟DB线程池资源 耗尽 然后阻塞请求; 以下阻塞代码的逻辑并不严谨 仅仅是为了说明下当时的问题// 量大的情况下 没有该阻塞 同样会出现 thread pool is exhausted的bug (注释掉也可以复现问题)try {TimeUnit.MILLISECONDS.sleep(1500);} catch (Exception e) {return RpcResult.error(e.getMessage());}System.out.println("query arguments : " + args);return RpcResult.success("return DB data " + args);}



server:port: 8085
dubbo:application:name: dubbo-providerqos-enable: falseregistry:address: zookeeper:// dubboport: 20889# 线程池大小为5threads: 5# 修改默认的消息派发机制 (all)dispatcher: messageconsumer:check: falseprovider:filter: -validation



启动Provider 后, 使用jmeter工具进行dubbo接口性能测试,工具的安装可以参考另一篇Blog Dubbo 测试

使用100个线程 循环两次对接口进行测试。模拟100并发,实际上系统配置的并发线程大小为5,因此肯定会报错



 "code": 1,"detailMessage": "Failfast invoke providers dubbo:// RandomLoadBalance select from all providers [org.apache.dubbo.registry.integration.RegistryDirectory$InvokerDelegate@48dfd63f] for service org.apache.dubbo.rpc.service.GenericService method $invoke on consumer use dubbo version 2.7.8-jar-with-dependencies, but no luck to perform the invocation. Last error is: Failed to invoke remote method: $invoke, provider: dubbo://, cause: org.apache.dubbo.remoting.RemotingException: Server side(,20889) thread pool is exhausted, detail msg:Thread pool is EXHAUSTED! Thread Name: DubboServerHandler-, Pool Size: 5 (active: 5, core: 5, max: 5, largest: 5), Task: 5 (completed: 0), Executor status:(isShutdown:false, isTerminated:false, isTerminating:false), in dubbo://!","cause": {"detailMessage": "org.apache.dubbo.remoting.RemotingException: Server side(,20889) thread pool is exhausted, detail msg:Thread pool is EXHAUSTED! Thread Name: DubboServerHandler-, Pool Size: 5 (active: 5, core: 5, max: 5, largest: 5), Task: 5 (completed: 0), Executor status:(isShutdown:false, isTerminated:false, isTerminating:false),



@Overridepublic void received(Channel channel, Object message) throws RemotingException {ExecutorService executor = getPreferredExecutorService(message);try {executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));} catch (Throwable t) {//执行业务线程时 如果出现线程池耗尽,触发线程拒绝策略 则直接报错if(message instanceof Request && t instanceof RejectedExecutionException){// 报错的真正代码sendFeedback(channel, (Request) message, t);return;}throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);}}


protected void sendFeedback(Channel channel, Request request, Throwable t) throws RemotingException {if (request.isTwoWay()) {String msg = "Server side(" + url.getIp() + "," + url.getPort()+ ") thread pool is exhausted, detail msg:" + t.getMessage();Response response = new Response(request.getId(), request.getVersion());response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);response.setErrorMessage(msg);channel.send(response);return;}

至此 以逆向的思维的方式,已经将问题复现了 并跟踪了一下源码,解决问题的方式无非是以下几种:

  1. 调整线程池的大小 调大
  2. 修改消息派发机制的类型
  3. 解决业务代码中影响效率的问题 (慢SQL、耗时的操作)

