一、前言

高并发环境下,服务端不能及时处理请求,造成大量请求线程挤压,最终会造成大面积的服务崩溃现象(服务雪崩),根据服务特点设定合理的请求拒绝策略。

下面是几种限流方式的源码阅读。

二、服务治理

2.1 connections——客户端

2.1.1 refer方法

找到DubboProtocol.class,他的层级关系如图:

1.refer方法:我的当前版本为dubbo2.7.3,为啥找不到refer方法?
——是改成了protocolBindingRefer方法。

DubboProtocol#protocolBindingRefer:

@Overridepublic <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {optimizeSerialization(url);// create rpc invoker.DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);invokers.add(invoker);return invoker;}

2.DubboProtocol#getClients方法:创建连接的代码在getClients方法中:

注意:这里出现了connections参数

private ExchangeClient[] getClients(URL url) {// whether to share connectionboolean useShareConnect = false;//1.获取CONNECTIONS_KEY的参数值,CONNECTIONS_KEY=“connections”,值就是我们配置里的connections值int connections = url.getParameter(CONNECTIONS_KEY, 0);List<ReferenceCountExchangeClient> shareClients = null;//2.如果没有配置,connections 是为0的,设置为共享连接,否则,一个连接一个服务if (connections == 0) {useShareConnect = true;/*** The xml configuration should have a higher priority than properties.*/String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);shareClients = getSharedClient(url, connections);}//3.创建connections个ExchangeClinet数组ExchangeClient[] clients = new ExchangeClient[connections];for (int i = 0; i < clients.length; i++) {if (useShareConnect) {//如果是共享连接,获取共享连接clients[i] = shareClients.get(i);} else {//4.如果不是共享连接,创建新的连接clients[i] = initClient(url);}}return clients;}

3.DubboProtocol#initClient方法:创建新的客户端

private ExchangeClient initClient(URL url) {// 1.设置客户端类型,以何种方式向服务端请求;mina,netty等,默认nettyString str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));//拿到编码方式,默认dubbourl = url.addParameter(CODEC_KEY, DubboCodec.NAME);// 拿到心跳,默认1分钟url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));// BIO is not allowed since it has severe performance issue.不允许使用BIO,因为它有严重的性能问题。//2.如果1中获取的客户端不存在,抛异常if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {throw new RpcException("Unsupported client type: " + str + "," +" supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));}ExchangeClient client;try {// connection should be lazyif (url.getParameter(LAZY_CONNECT_KEY, false)) {client = new LazyConnectExchangeClient(url, requestHandler);} else {//3.创建客户端client = Exchangers.connect(url, requestHandler);}} catch (RemotingException e) {throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);}return client;}

4.Exchangers#coonect方法:

public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {if (url == null) {throw new IllegalArgumentException("url == null");} else if (handler == null) {throw new IllegalArgumentException("handler == null");} else {url = url.addParameterIfAbsent("codec", "exchange");return getExchanger(url).connect(url, handler);}}

5.Transporters#connect方法

public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {if (url == null) {throw new IllegalArgumentException("url == null");} else {Object handler;if (handlers != null && handlers.length != 0) {if (handlers.length == 1) {handler = handlers[0];} else {handler = new ChannelHandlerDispatcher(handlers);}} else {handler = new ChannelHandlerAdapter();}return getTransporter().connect(url, (ChannelHandler)handler);}}

其中最后一行getTransporter().connect(url, (ChannelHandler)handler)中,实现类有下图几种:

由上图可知,客户端传输数据方式一共由三种:

  1. Grizzly
  2. mina
  3. netty(兼容以前的,不推荐)
  4. netty4(推荐)

以上几种传输方式可以配置中配置,默认netty4。

2.2 accepts——服务端

2.2.1AbstactServer.connected

1.AbstactServer#connected方法

@Overridepublic void connected(Channel ch) throws RemotingException {// If the server has entered the shutdown process, reject any new connectionif (this.isClosing() || this.isClosed()) {logger.warn("Close new channel " + ch + ", cause: server is closing or has been closed. For example, receive a new connect request while in shutdown process.");ch.close();return;}Collection<Channel> channels = getChannels();if (accepts > 0 && channels.size() > accepts) {logger.error("Close channel " + ch + ", cause: The server " + ch.getLocalAddress() + " connections greater than max config " + accepts);ch.close();return;}super.connected(ch);}

关键代码:if (accepts > 0 && channels.size() > accepts)

如果超出accepts就打印异常,关闭channel并返回,accepts是从配置中获取的值。

2.3 tps

1.TpsLimitFilter#invoke方法

public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {if (!tpsLimiter.isAllowable(invoker.getUrl(), invocation)) {throw new RpcException("Failed to invoke service " +invoker.getInterface().getName() +"." +invocation.getMethodName() +" because exceed max service tps.");}return invoker.invoke(invocation);}

关键代码:if (!tpsLimiter.isAllowable(invoker.getUrl(), invocation))

2. DefaultTpsLimiter#isAllowable方法

public boolean isAllowable(URL url, Invocation invocation) {//TPS_LIMIT_RATE_KEY="tps",就是配置里的tps=""配置int rate = url.getParameter(TPS_LIMIT_RATE_KEY, -1);//TPS_LIMIT_INTERVAL_KEY = "tps.interval",DEFAULT_TPS_LIMIT_INTERVAL=60*1000long interval = url.getParameter(TPS_LIMIT_INTERVAL_KEY, DEFAULT_TPS_LIMIT_INTERVAL);String serviceKey = url.getServiceKey();if (rate > 0) {StatItem statItem = stats.get(serviceKey);if (statItem == null) {stats.putIfAbsent(serviceKey, new StatItem(serviceKey, rate, interval));statItem = stats.get(serviceKey);} else {//rate or interval has changed, rebuildif (statItem.getRate() != rate || statItem.getInterval() != interval) {stats.put(serviceKey, new StatItem(serviceKey, rate, interval));statItem = stats.get(serviceKey);}}return statItem.isAllowable();} else {StatItem statItem = stats.get(serviceKey);if (statItem != null) {stats.remove(serviceKey);}}return true;}

关键代码: statItem.isAllowable()

3.StatItem#isAllowable方法

public boolean isAllowable() {long now = System.currentTimeMillis();if (now > lastResetTime + interval) {token = buildLongAdder(rate);lastResetTime = now;}if (token.sum() < 0) {return false;}token.decrement();return true;}

因为dubbo2.7.3版本这段代码本人还没有好好理解,故用dubbo2.6以前的版本来分析:

public boolean isAllowable() {long now = System.currentTimeMillis();if (now > lastResetTime + interval) { // 重置窗口token.set(rate); lastResetTime = now;}int value = token.get();boolean flag = false;while (value > 0 && !flag) {// 每来一个请求减少一个许可;如果减少后值为当前value值-1,返回true;如果失败,再判断条件一直while循环flag = token.compareAndSet(value, value - 1); value = token.get();}return flag;}

在dubbo框架中, 类似的上面isAllowable()方法的原子性计算有很多,这也是我们多线程的重点实践,所以此处需要多看多学多感受。弄懂了以后并发编程就舒服多了。

附tps核心算法:

2.4 executes——服务端

这里executes设置指的是最大线程数。

1.ExecutesLimitFilter#invoke方法

public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {URL url = invoker.getUrl();String methodName = invocation.getMethodName();//EXECUTES_KEY,就是配置中的executes,默认0-表示不限制int max = url.getMethodParameter(methodName, EXECUTES_KEY, 0);//超过了直接失败if (!RpcStatus.beginCount(url, methodName, max)) {throw new RpcException(RpcException.LIMIT_EXCEEDED_EXCEPTION,"Failed to invoke method " + invocation.getMethodName() + " in provider " +url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max +"\" /> limited.");}invocation.setAttachment(EXECUTELIMIT_FILTER_START_TIME, String.valueOf(System.currentTimeMillis()));try {return invoker.invoke(invocation);} catch (Throwable t) {if (t instanceof RuntimeException) {throw (RuntimeException) t;} else {throw new RpcException("unexpected exception when ExecuteLimitFilter", t);}}}

2.5 actives——客户端

1.ActiveLimitFilter#invoke方法

public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {URL url = invoker.getUrl();String methodName = invocation.getMethodName();//ACTIVES_KEY,就是配置中的actives,默认0-表示不现在int max = invoker.getUrl().getMethodParameter(methodName, ACTIVES_KEY, 0);final RpcStatus rpcStatus = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());//与服务端不同,如果并发量超过了,进行死循环:线程等待超时时间,等另外的线程执行完成后会唤醒所有等待线程,接着计算并发量,如果这次线程没超过,计算过程消耗的时间是否大于等于超时时间,如果是——抛出异常,如果不是——就可以进行远程访问了。if (!RpcStatus.beginCount(url, methodName, max)) {long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), TIMEOUT_KEY, 0);long start = System.currentTimeMillis();long remain = timeout;synchronized (rpcStatus) {//死循环,直到调用成功或者超时抛出异常while (!RpcStatus.beginCount(url, methodName, max)) {try {//线程等待超时时间rpcStatus.wait(remain);} catch (InterruptedException e) {// ignore}//计算消耗时间long elapsed = System.currentTimeMillis() - start;remain = timeout - elapsed;//消耗时间超过超时时间,抛异常if (remain <= 0) {throw new RpcException(RpcException.LIMIT_EXCEEDED_EXCEPTION,"Waiting concurrent invoke timeout in client-side for service:  " +invoker.getInterface().getName() + ", method: " + invocation.getMethodName() +", elapsed: " + elapsed + ", timeout: " + timeout + ". concurrent invokes: " +rpcStatus.getActive() + ". max concurrent invoke limit: " + max);}}}}invocation.setAttachment(ACTIVELIMIT_FILTER_START_TIME, String.valueOf(System.currentTimeMillis()));return invoker.invoke(invocation);}

2.ActiveLimitFilter#onError方法

@Overridepublic void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {String methodName = invocation.getMethodName();URL url = invoker.getUrl();int max = invoker.getUrl().getMethodParameter(methodName, ACTIVES_KEY, 0);if (t instanceof RpcException) {RpcException rpcException = (RpcException)t;if (rpcException.isLimitExceed()) {return;}}RpcStatus.endCount(url, methodName, getElapsed(invocation), false);notifyFinish(RpcStatus.getStatus(url, methodName), max);}

最后两行代码就是active数量-1,并唤醒其他线程。

注:dubbo2.7.3源码"active数量-1,并唤醒其他线程"代码在onError方法中,而dubbo2.6以下是在invoke方法的最后,下面贴上了2.6代码

三.总结

线程方式限流包括:

  • TpsLimitFilter(tps:每秒事务请求数)
  • ExecuteLimitFilter(服务端)
  • ActiveLimitFilter(客户端)

需要注意的是其中actives(客户端)设置是有等待超时这个机制的,而execute(服务端)是没有的。

几种dubbo限流方式源码阅读就写到这里,这次只是粗略的阅读,了解其大致过程,以后有时间再阅读这些代码中涉及到的并发编程以及它所采取的方案。

Dubbo限流方式源码阅读相关推荐

  1. Soul网关源码阅读(七)限流插件初探

    Soul网关源码阅读(七)限流插件初探 简介     前面的文章中对处理流程探索的差不多了,今天来探索下限流插件:resilience4j 示例运行 环境配置     启动下MySQL和redis d ...

  2. Soul 网关源码阅读(四)Dubbo请求概览

    Soul 网关源码阅读(四)Dubbo请求概览 简介     本次启动一个dubbo服务示例,初步探索Soul网关源码的Dubbo请求处理流程 示例运行 环境配置     在Soul源码clone下来 ...

  3. 【Dubbo源码阅读系列】之远程服务调用(上)

    今天打算来讲一讲 Dubbo 服务远程调用.笔者在开始看 Dubbo 远程服务相关源码的时候,看的有点迷糊.后来慢慢明白 Dubbo 远程服务的调用的本质就是动态代理模式的一种实现.本地消费者无须知道 ...

  4. 【Dubbo源码阅读系列】服务暴露之本地暴露

    在上一篇文章中我们介绍 Dubbo 自定义标签解析相关内容,其中我们自定义的 XML 标签 <dubbo:service /> 会被解析为 ServiceBean 对象(传送门:Dubbo ...

  5. Dubbo注册协议原理以及源码阅读

    前言 继上次小编所讲RPC协议暴露服务并且远程调用之后,小编这次给大家带来注册中心协议整体流程原理以及源码精讲,Dubbo协议服务暴露与引用以及源码分析文章中,远程服务暴露可以只通过RPC协议即可,那 ...

  6. Soul网关源码阅读(十)自定义简单插件编写

    Soul网关源码阅读(十)自定义简单插件编写 简介     综合前面所分析的插件处理流程相关知识,此次我们来编写自定义的插件:统计请求在插件链中的经历时长 编写准备     首先我们先探究一下,一个P ...

  7. Soul网关源码阅读(九)插件配置加载初探

    Soul网关源码阅读(九)插件配置加载初探 简介     今日来探索一下插件的初始化,及相关的配置的加载 源码Debug 插件初始化     首先来到我们非常熟悉的插件链调用的类: SoulWebHa ...

  8. Soul网关源码阅读(八)路由匹配初探

    Soul网关源码阅读(八)路由匹配初探 简介      今日看看路由的匹配相关代码,查看HTTP的DividePlugin匹配 示例运行      使用HTTP的示例,运行Soul-Admin,Sou ...

  9. 大神手把手教源码阅读的方法、误区以及三种境界

    丁威 中间件兴趣圈 读完需要 1 分钟 速读仅需 1 分钟 在技术职场中普遍存在如下几种现象: 对待工作中所使用的技术不需要阅读源码,只需在开发过程中能够熟练运用就行 看源码太费时间,而且容易忘记,如 ...

  10. 阿里资深架构师推荐:技术人改如何提升源码阅读能力

    本文行文思路:先抛出源码阅读方法,然后结合Sentinel创作过程谈谈具体是如何运用这些技巧,最后解答几个源码阅读的误区. Sentinel 系列共包含15篇文章,主要以源码分析为手段,图文并茂的方式 ...

最新文章

  1. linux一键重装系统脚本,一键重装CentOS纯净版系统shell脚本
  2. JTable demo
  3. 《Pytorch - BP全连接神经网络模型》
  4. java 指令接口架构,JavaSE 基础大纲
  5. java 循环效率_Java For循环效率测试
  6. Codeforces 816C/815A - Karen and Game
  7. 无向图java_Java实现无向图的建立与遍历
  8. 正在打dota的过程中,接到淘宝网面试电话
  9. 如何保证FTP文件下载或上传数据完整
  10. fork和vfork,exec
  11. Linux - 搭建LDAP统一认证服务
  12. oracle补丁安装
  13. wpa_supplicant配置
  14. windows下安装sloth
  15. 混合基金量化投资策略应该怎么制定?
  16. 骑士CMSgetshell复现
  17. Urchin.exe使用说明
  18. 如何开展兼容性测试?兼容性测试有什么作用?
  19. mkv视频怎么转成mp4?
  20. 解决:The server time zone value ‘�й���׼ʱ��‘ is unrecognized or represents more than one time zone报错问题

热门文章

  1. 一文读懂特征值分解EVD与奇异值分解SVD
  2. 贝叶斯概率推断:概率分布
  3. 机器学习- 吴恩达Andrew Ng Week6 Regularized Linear Regression and Bias/Variance知识总结
  4. python网络编程基础--http
  5. cadence设计运算放大器_「好设计论文」一种用于高精度DAC的实用型CMOS带隙基准源...
  6. java.io.FileWriter class doesn’t use UTF-8 by default
  7. 【ZOJ 3715 —— 13年浙江省赛K】Kindergarten Election 【枚举答案进行判断】 【夺宝奇兵 —— CCPC-Wannafly Winter Camp Day1】
  8. 【POJ1804】Brainman 【求逆序数】
  9. java使用io上传文件_文件传输基础——Java IO流
  10. python樱花树画法图片_武大樱花又盛开,用Python画一棵樱花树