概述

这里的流的速度限制是指在单位时间窗口内,最多允许指定的单位数据通过。比如我们需要从源端 A 发送 1000 条数据到目的端 B,如果设置的速度限制为最多 100 条每秒,那么理论上需要 10 秒的时间才能将数据传输完成,即使当前的网络允许在极短的时间便完成这个任务。

但是我们没办法严格控制每秒时间内的数量一定是小于等于 100 的,因为我们不能每传输一条数据便进行速度与其控制的计算,这样会极大的带来性能的损耗。

所以这里引入时间窗口的概念,因为我们希望能够通过计算一个时间窗口内的速度,来判断是否需要进行速度控制。如果该时间窗口内的速度大于阈值,那么便通过睡眠一定的时间,来使更长时间上的平均速度是不高于阈值的。比如一个时间窗口的大小为 1s,阈值仍为 100 条每秒,那么如果在这 1s 以内传输了 300 条数据,则需要等待 2s 以后才能继续进行数据传输。这样,即使在前 1s 速度超过了阈值(300 > 100),但是因为后 2s 的速度为 0,所以从整体上来看,这 3s 的的平均速度仍然是不高于阈值的。最终它的速度走势也许会类似下图:


可以看见,虽然阈值为 100 条每秒,但是仍然会有部分时间窗口内的总量超过阈值,不过通过限制之后时间内的速度,从而使整体平均的速度是不高于阈值的。

公式

根据前面概述,可推理出公式。设置一下变量:

  • 时间窗口的时间间隔为:flowControlInterval(单位:毫秒)
  • 实际时间间隔为:interval(单位:毫秒)
  • 实际时间间隔内传输的数据总量为:numResults(单位:条)
  • 最大限制速度为:maxSpeed(单位:条/秒)
  • 当前速度为 currentSpeed(单位:条/秒)
  • 等待休眠的时间为:limitSleepTime(单位:毫秒)

那么已知 flowControlInterval、interval、numResults 的值,可得到:

当前速度:

currentSpeed = numResults * 1000 / interval;

睡眠时间:

limitSleepTime = currentSpeed * interval / maxSpeed - interval;

使用 Java 代码表示大致如下:

long interval = nowTimestamp - lastTimestamp;
if (interval >= flowControlInterval) {long numResults = totlaResults - lastResults;long currentSpeed = numResults * 1000 / interval;if (currentSpeed > maxSpeed) {// 计算休眠时间limitSleepTime = currentSpeed * interval / maxSpeed - interval;}if (limitSleepTime > 0) {Thread.sleep(limitSleepTime);}
}

这里的 nowTimestamp、totlaResults 为当前时间的时间戳与当前总共传输的数据总量,lastTimestamp、lastResults 为上一次计算后记录的时间的时间戳与数据传输总量,通过计算可得到实际的时间间隔与该时间段内的数据传输量。

注:以上算法与代码均参考于阿里的开源项目 DataX,源码路径:https://github.com/alibaba/DataX/blob/master/core/src/main/java/com/alibaba/datax/core/transport/channel/Channel.java#L192。

总结:

通过以上方式限制流的速度,可以对包括数量、字节大小等所有可以量化的指标进行限制,虽然不能保证每一个单位时间内的速度总是不高于阈值,但是却能使平均的速度是不高于阈值的。

参考:

  • DataX: https://github.com/alibaba/DataX

DataX 中流的速度限制相关推荐

  1. 使用DataX同步MaxCompute数据到TableStore(原OTS)优化指南

    概述 现在越来越多的技术架构下会组合使用MaxCompute和TableStore,用MaxCompute作大数据分析,计算的结果会导出到TableStore提供在线访问.MaxCompute提供海量 ...

  2. 尚硅谷大数据技术之 DataX—4)DataX使用优化

    4.1 关键参数 ➢ job.setting.speed.channel : channel并发数 ➢ job.setting.speed.record : 全局配置channel的record限速 ...

  3. datax(10): 源码解读Communication(Datax通讯类)

    前面看了datax的通讯机制,继续看源码-具体的通讯类 Communication.根据datax的运行模式的区别, 数据的收集会有些区别,这篇文章都是讲的在standalone模式下. 一.comm ...

  4. datax值转换使用以及源码分析

    目录 一.DataX的数据传输基础 1. 通道Channel 2. TaskGroupContainer中的成员变量Channel分析 3. Channel配置 二.datax脏数据处理 1. 什么是 ...

  5. DataX 工具安装部署及使用

    一. DataX的安装 1.环境准备 操作系统为CentOS 7.2及以上版本或RedHat 7.2及以上版本,暂不支持SUSE操作系统.python为2.x版本,java为1.8版本. 新建操作系统 ...

  6. 阿里开源数据同步神器DataX异构数据源间数据同步同步MySQL与HDFS相互实战

    Datax 实战使用 继上一篇 阿里开源数据同步神器DataX异构数据源间数据同步基础介绍与快速入门之后的实战篇 1.MySQL-To-HDFS 环境 & 准备说明: 描述: 为了快速搭建测试 ...

  7. datax 持续数据同步_Datax 数据同步

    官方Hello入门例子{ "job": { "content": [ { "reader": { "name": &qu ...

  8. DataX 安装和使用

    阿里云介绍: 1. 下载安装包.作为阿里主要的数据传输工具Datax,阿里已经完全开源到github上面了.下载地址(https://github.com/alibaba/DataX). 2. 安装环 ...

  9. datax底层原理_手把手实现Datax3.0中的传输通道

    Datax的整体框架我们已经大体了解.这次来分析一下reader到writer中间数据的传输层. 这次采取另外一种方式,我们把代码抽取,自己实现一个通道 1-首先是定义一个接口代表传输的每一条数据pu ...

最新文章

  1. android图片压缩总结
  2. 读完这些论文和代码,你就能在搜狐算法大赛中获得好成绩了
  3. r语言中1c0怎么表示什么,r语言表示或者用什么符号?
  4. 现代软件工程 作业 团队第一个作业
  5. Linux chmod
  6. 大学计算机专业的实验室图片,清华大学计算机系人工智能实验室.PDF
  7. Centos7安装32位库用来安装32位软件程序
  8. 抓linux肉鸡教程视频,抓肉鸡的教程和软件免费分享(2018一天抓1000只电脑肉鸡视频)...
  9. 遗传算法解决TSP问题
  10. 什么是测试开发工程师(SET)?
  11. 【180620】小人物走路、奔跑的VC++游戏特效
  12. MySQL(管理员常用命令)
  13. 【Unity UGUI】屏幕坐标转换
  14. MavLink 库 c++环境搭建及解ADS-B消息教程
  15. 多线程- 让程序更高效的运行
  16. W3Cschool从零开始学C语言笔记(1-2)位、字节及排列组合
  17. 微信小程序ssm电影院购票+后台管理系统|前后分离VUE
  18. Buck电路设计之芯片选择---TI电源器件在电赛中的应用
  19. LeetCode 题库 全 JAVA 解题---771.宝石与石头
  20. BASE64编码的图片在网页中直接用

热门文章

  1. Spring-Boot 使用JSR-107集成EHCache3.x (配置Clustered以及DISK)
  2. CCF 金融信息负面及主体判定
  3. 联想bios怎么开启TPM2.0?
  4. Cadence Allegro异型焊盘走线(出线)在焊盘外怎么解决?
  5. Apache Hadoop Pig 源代码分析(2)
  6. java覆盖率怎么包含多个工程,在多项目工程中统计子工程的覆盖率
  7. matlab 圆锥投影,MATLAB 表达式 2*2^(3^2) 的结果是 (    )
  8. JAVA 水果机游戏及编码
  9. 百度推出阿波罗计划,自动驾驶进入开放时代了吗?
  10. 【C++】继承和派生、虚继承和虚基类、虚基类表和虚基类指针