为什么80%的码农都做不了架构师?>>>   

摘要: 当资源成为瓶颈时,服务框架需要对消费者做限流,启动流控保护机制。流量控制有多种策略,比较常用的有:针对访问速率的静态流控、针对资源占用的动态流控、针对消费者并发连接数的连接控制和针对并行访问数的并发控制。在分布式架构中,应用和应用之间的调用类型分为以下两种,流控方式也略有不同。

点此查看原文:https://yq.aliyun.com/articles/380180?spm=a2c41.11181499.0.0

当资源成为瓶颈时,服务框架需要对消费者做限流,启动流控保护机制。流量控制有多种策略,比较常用的有:针对访问速率的静态流控、针对资源占用的动态流控、针对消费者并发连接数的连接控制和针对并行访问数的并发控制。在实践中,各种流量控制策略需要综合使用才能起到较好的效果。

在分布式架构中,应用和应用之间的调用类型分为以下两种,流控方式也略有不同。

同步RPC类调用,比如RESTful,Dubbo,HSF等都属于该类。对于该类同步调用,通常限流方式为两种:针对服务提供者的并发全局流控,或针对服务消费者的并发局部流控。两种的控制手段类似,都是通过限制服务端或客服端并发调用数来进行限制。

异步MQ类调用,典型如RocketMQ, Kafka,等。对于该类异步调用,通常限流方式是在订阅端限流。限流方式为两种:针对消息订阅者的并发流控,或针对消息订阅者的消费延时流控。

针对消息订阅者的消费延时流控基本原理是,在每次客户端消费时,可以增加一个延时来控制消费速度,这样理论消费并发最快速度为:

MaxRate = 1 / ConsumInterval * ConcurrentThreadNumber

比如如果消息并发消费线程为20,延时为100ms,则理论上可以将并发消费控制在200以下。具体公式如下:

200 = 1 / 0.1 * 20

相比并发线程数流控,消费延时流控优点在于实现相对简单,对MQ类客户端包依赖较少,不需要客户端提供控制并发线程数的动态调整接口。

以上各种流量控制方法,在分布式架构下,如果要做到全局动态控制,一个简单的技术方法是依赖配置中心,即通过配置中心来进行流控参数的下发。

下面章节详细介绍如何基于配置中心来实现异步消息消费的全局动态流控。使用的例子为阿里云上的 MQ (消息队列)和 ACM (应用配置管理)两款产品。

注:之所以用MQ为示例是因为在本文撰写之时,正好MQ Consumer Client SDK并不支持动态调整现成并发数,因此通过基于ACM来动态调整消费延迟的方法正好可以解决MQ消费流控动态的问题。

基于消费延时流控的基本原理

基本原理如下。其中,管理员或应用程序通过ACM控制台发布消费延时配置(RCV_INTERVAL_TIME),所有MQ消费程序订阅该配置。理论上,该配置从发布到下发所有客户端,可以在1秒内完成(取决于网络延时)。

代码示例

该章节基于配置中心来实现异步消息消费的全局动态流控的代码示例。使用的例子为阿里云上的MQ(消息队列)和ACM(应用配置管理)两款产品,基于Java语言。关于SDK的详细介绍,可参见两款产品的官方文档。

在ACM上创建消费延时的参数,截屏如下。

设置全局消费延时变量

首先,设置消费接收延时的全局变量, 如下。

     // 初始化消息接收延时参数,单位为millisecondstatic int RCV_INTERVAL_TIME = 10000;// 初始化配置服务,控制台通过示例代码自动获取下面参数ConfigService.init("acm.aliyun.com", /*租户ID*/"xxx", /*AK*/"xxx", /*SK*/"yyy");    // 主动获取配置String content = ConfigService.getConfig("app.mq.qos", "DEFAULT_GROUP", 6000);Properties p = new Properties();try {p.load(new StringReader(content));RCV_INTERVAL_TIME = Integer.valueOf(p.getProperty("RCV_INTERVAL_TIME"));} catch (IOException e) {e.printStackTrace();}

其次,设置ACM listener,确保当配置被修改时,即使更新 RCV_INTERVAL_TIME 参数, 如下。

     // 初始化的时候,给配置添加监听,配置变更会回调通知ConfigService.addListener("app.mq.qos", "DEFAULT_GROUP", new ConfigChangeListener() {public void receiveConfigInfo(String configInfo) {Properties p = new Properties();try {p.load(new StringReader(configInfo));RCV_INTERVAL_TIME = Integer.valueOf(p.getProperty("RCV_INTERVAL_TIME"));} catch (IOException e) {e.printStackTrace();}}});

设置 MQ 消费延时逻辑

完整实例如下。

注:这里 RCV_INTERVAL_TIME 参数的访问是故意没有加锁的,读者可以自行思考原因。Aliyun ONS Client不提供动态线程并发数,默认并发为20。因此这里正好使用消费延时参数来动态调节QoS。

     //以下代码可直接贴在Main()函数里Properties properties = new Properties();properties.put(PropertyKeyConst.ConsumerId, "CID_consumer_group");properties.put(PropertyKeyConst.AccessKey,"xxx");properties.put(PropertyKeyConst.SecretKey, "yyy");properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");// 设置 TCP 接入域名(此处以公共云生产环境为例)properties.put(PropertyKeyConst.ONSAddr,"http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");Consumer consumer = ONSFactory.createConsumer(properties);consumer.subscribe(/*Topic*/"topic-name", /*Tag*/null, new MessageListener() {public Action consume(Message message, ConsumeContext context) {// MQ Subscribe QoS logical start, // Each consuming process will sleep for RCV_INTERVAL_TIME seconds with 100 ms sleeping cycle.// Within each cycle, the thread will check RCV_INTERVAL_TIME in case it's set to a smaller value. // RCV_INTERVAL_TIME <= 0 means no sleeping.int rcvIntervalTimeLeft = RCV_INTERVAL_TIME;while (rcvIntervalTimeLeft > 0) {if (rcvIntervalTimeLeft > RCV_INTERVAL_TIME) {rcvIntervalTimeLeft = RCV_INTERVAL_TIME;}try {if (rcvIntervalTimeLeft >= 100) {rcvIntervalTimeLeft -= 100;Thread.sleep(100);} else {Thread.sleep(rcvIntervalTimeLeft);rcvIntervalTimeLeft = 0;}} catch (InterruptedException e) {e.printStackTrace();}}// MQ Subscribe interval logical endsSystem.out.println("Receive: " + message);/** Put your business logic here.*/doSomething();return Action.CommitMessage;}});consumer.start();

运行结果

单机运行consumer进行消费,假设queue内的消息无限多,不存在消费万的情况,分三段测试,分别运行约5分钟,通过ACM配置推送来达到以下效果。

RCV_INTERVAL_TIME = 100 ms

RCV_INTERVAL_TIME = 5000 ms

RCV_INTERVAL_TIME = 1000 ms

结果如下,在单MQ消费业务处理耗时约100ms情况下的,单机并发20线程的测试结果。

RCV_INTERVAL_TIME = 100 ms:平均消费性能约为 9000 tpm 左右

RCV_INTERVAL_TIME = 5000 ms:平均消费性能被限制到了 200 tpm 左右

RCV_INTERVAL_TIME = 1000 ms:平均消费性能回升到到了 1100 tpm 左右

以上结果基本达到消费和 tpm 成反比的预期,最关键的是整个过程中,应用不中断,流控推送结果秒级生效到分布式集群。单机性能结果如下所示。

转载于:https://my.oschina.net/yunqi/blog/1612514

解锁新姿势 | 如何用配置中心实现全局动态流控?相关推荐

  1. 解锁新姿势 |如何利用配置中心规范构建PaaS服务配置

    为什么80%的码农都做不了架构师?>>>    摘要: 在上一篇文章中,我们以MQ和ACM为例,讨论了如何借助配置中心对消息进行限流管理的场景.在本文中,我们继续以该场景为例,讲述如 ...

  2. [解锁新姿势] 兄dei 我感觉你在写bug

    前言: 继上篇 [解锁新姿势] 兄dei,你代码需要优化了 介绍一些代码的优化的小技巧. 但是我们除了在代码编写上需要优雅, 还需要编写对应的测试用例, 以此来保证代码的质量. 在这篇我们继续在学习如 ...

  3. android解锁win,Win10电脑解锁新姿势:WP/安卓手机、微软手环当钥匙

    IT之家讯 微软在官方网站公布了Win10的开发路线图,其中描述了目前已经实现的功能.正在预览测试以及正在开发中的功能.根据描述,微软正在开发一种全新的Win10电脑解锁方式. 首先,你可以使用自己的 ...

  4. [解锁新姿势] 回想起被 `if-else` 支配的恐惧,我们要打倒 if - else

    前言 [解锁新姿势] 兄dei,你代码需要优化了 在之前文章说到,简单 if-else,可以使用 卫语句 进行优化.但是在实际开发中,往往不是简单 if-else 结构,我们通常会不经意间写下如下代码 ...

  5. Spring Cloud Alibaba配置实例nacos+sentinel+dubbo实行服务注册、配置中心、熔断限流

    通过Spring Cloud Alibaba相关组件nacos+sentinel+dubbo实行服务注册.配置中心.熔断限流等功能 1.本机安装nacos和sentinel-dashboard服务端 ...

  6. 又拍云张聪:OpenResty 动态流控的几种姿势

    2019 年 1 月 12 日,由又拍云.OpenResty 中国社区主办的 OpenResty × Open Talk 全国巡回沙龙·深圳站圆满结束,又拍云首席架构师张聪在活动上做了< Ope ...

  7. python获取最新学术文献_快解锁新姿势,教你如何用Python搞定文献搜索和科研图片!...

    相比实验论文,发表SCI应该更让科研狗们重视和焦虑. 起初看到读博的同学发表SCI论文,心里面就已经酸了,后来「本科生发数篇 SCI」的新闻屡见不鲜,现在甚至连小学生都跑出来分一杯羹-- 前段时间,B ...

  8. python批量检索文献_快解锁新姿势,教你如何用Python搞定文献搜索和科研图片!...

    相比实验论文,发表SCI应该更让科研狗们重视和焦虑. 起初看到读博的同学发表SCI论文,心里面就已经酸了,后来「本科生发数篇 SCI」的新闻屡见不鲜,现在甚至连小学生都跑出来分一杯羹-- 前段时间,B ...

  9. windows编程 识别拖动_Quicker 解锁新姿势!Windows 还能这么用?

    不用记住软件复杂的快捷方式,轻轻按下鼠标滚轮,便可唤出当前软件的专属工具箱,一键启动原本要多次点击鼠标的操作,这样的时刻是否很美妙? Excel 选中表格数据加黑框并横向打印,一键搞定 Quicker ...

最新文章

  1. 何恺明大神新作:一种用于目标检测的主流ViT架构,效果SOTA
  2. 刚搭建的linux环境的基本优化以及优化脚本---菜鸟初写
  3. 什么是xmlschema
  4. Spark技术内幕: Task向Executor提交的源代码解析
  5. RabbitMQ知多少
  6. pae扩展内存 linux,浅析linux内核内存管理之PAE
  7. [数据结构]用插入排序和选择排序的思想实现优先级队列
  8. 前端学习(2038)vue之电商管理系统电商系统之优化nprogress加载进度条
  9. linux 文件系统路径,Linux编程 1 (文件系统路径说明, 目录结构说明)
  10. 多线程条件通行工具——Semaphore
  11. curl txt批量_curl与wget高级用法
  12. python重新运行安装_无法重新安装Python?
  13. SQLSERVER聚集索引的整理(重建)的必要性测试
  14. 判断数组、集合list、string、int、double等是否为空,判断是否为值类型
  15. LNMP架构数据迁移到NFS存储
  16. 在小程序中如何使用svg图标
  17. 硅谷真假u盘测试软件,Silicom硅谷真假U盘测试1.0.4正式版
  18. 驾考计算机播报原理,科目三电子路考流程详解 考驾照的都看看!
  19. 域名重定向工具 —— SwitchHosts 实用教程
  20. 深度理解面向对象的基础-抽象(一)

热门文章

  1. VS中的C#项目怎样引入另一个项目
  2. MyBatisPlus条件构造器带条件删除delete使用
  3. JS获取当前时间的前n天/后n天
  4. java readtoend_java项目和C#项目实现通信
  5. php上传手机文件到服务器,安卓上传文件至PHP服务器(示例代码)
  6. 我在神策做研发 | 与客户难题“对抗”的百余天
  7. 重磅 | 品牌零售行业数据驱动业务指南,全新上线!
  8. 神策数据曹犟将出任导师:宝洁黑客马拉松聚焦工业大数据
  9. Ajax(form表单文件上传、请求头之contentType、Ajax传递json数据、Ajax文件上传)
  10. Java练习 SDUT-2401最大矩形面积