DataX 中流的速度限制
概述
这里的流的速度限制是指在单位时间窗口内,最多允许指定的单位数据通过。比如我们需要从源端 A 发送 1000 条数据到目的端 B,如果设置的速度限制为最多 100 条每秒,那么理论上需要 10 秒的时间才能将数据传输完成,即使当前的网络允许在极短的时间便完成这个任务。
但是我们没办法严格控制每秒时间内的数量一定是小于等于 100 的,因为我们不能每传输一条数据便进行速度与其控制的计算,这样会极大的带来性能的损耗。
所以这里引入时间窗口的概念,因为我们希望能够通过计算一个时间窗口内的速度,来判断是否需要进行速度控制。如果该时间窗口内的速度大于阈值,那么便通过睡眠一定的时间,来使更长时间上的平均速度是不高于阈值的。比如一个时间窗口的大小为 1s,阈值仍为 100 条每秒,那么如果在这 1s 以内传输了 300 条数据,则需要等待 2s 以后才能继续进行数据传输。这样,即使在前 1s 速度超过了阈值(300 > 100),但是因为后 2s 的速度为 0,所以从整体上来看,这 3s 的的平均速度仍然是不高于阈值的。最终它的速度走势也许会类似下图:
可以看见,虽然阈值为 100 条每秒,但是仍然会有部分时间窗口内的总量超过阈值,不过通过限制之后时间内的速度,从而使整体平均的速度是不高于阈值的。
公式
根据前面概述,可推理出公式。设置一下变量:
- 时间窗口的时间间隔为:flowControlInterval(单位:毫秒)
- 实际时间间隔为:interval(单位:毫秒)
- 实际时间间隔内传输的数据总量为:numResults(单位:条)
- 最大限制速度为:maxSpeed(单位:条/秒)
- 当前速度为 currentSpeed(单位:条/秒)
- 等待休眠的时间为:limitSleepTime(单位:毫秒)
那么已知 flowControlInterval、interval、numResults 的值,可得到:
当前速度:
currentSpeed = numResults * 1000 / interval;
睡眠时间:
limitSleepTime = currentSpeed * interval / maxSpeed - interval;
使用 Java 代码表示大致如下:
long interval = nowTimestamp - lastTimestamp;
if (interval >= flowControlInterval) {long numResults = totlaResults - lastResults;long currentSpeed = numResults * 1000 / interval;if (currentSpeed > maxSpeed) {// 计算休眠时间limitSleepTime = currentSpeed * interval / maxSpeed - interval;}if (limitSleepTime > 0) {Thread.sleep(limitSleepTime);}
}
这里的 nowTimestamp、totlaResults 为当前时间的时间戳与当前总共传输的数据总量,lastTimestamp、lastResults 为上一次计算后记录的时间的时间戳与数据传输总量,通过计算可得到实际的时间间隔与该时间段内的数据传输量。
注:以上算法与代码均参考于阿里的开源项目 DataX,源码路径:https://github.com/alibaba/DataX/blob/master/core/src/main/java/com/alibaba/datax/core/transport/channel/Channel.java#L192。
总结:
通过以上方式限制流的速度,可以对包括数量、字节大小等所有可以量化的指标进行限制,虽然不能保证每一个单位时间内的速度总是不高于阈值,但是却能使平均的速度是不高于阈值的。
参考:
- DataX: https://github.com/alibaba/DataX
DataX 中流的速度限制相关推荐
- 使用DataX同步MaxCompute数据到TableStore(原OTS)优化指南
概述 现在越来越多的技术架构下会组合使用MaxCompute和TableStore,用MaxCompute作大数据分析,计算的结果会导出到TableStore提供在线访问.MaxCompute提供海量 ...
- 尚硅谷大数据技术之 DataX—4)DataX使用优化
4.1 关键参数 ➢ job.setting.speed.channel : channel并发数 ➢ job.setting.speed.record : 全局配置channel的record限速 ...
- datax(10): 源码解读Communication(Datax通讯类)
前面看了datax的通讯机制,继续看源码-具体的通讯类 Communication.根据datax的运行模式的区别, 数据的收集会有些区别,这篇文章都是讲的在standalone模式下. 一.comm ...
- datax值转换使用以及源码分析
目录 一.DataX的数据传输基础 1. 通道Channel 2. TaskGroupContainer中的成员变量Channel分析 3. Channel配置 二.datax脏数据处理 1. 什么是 ...
- DataX 工具安装部署及使用
一. DataX的安装 1.环境准备 操作系统为CentOS 7.2及以上版本或RedHat 7.2及以上版本,暂不支持SUSE操作系统.python为2.x版本,java为1.8版本. 新建操作系统 ...
- 阿里开源数据同步神器DataX异构数据源间数据同步同步MySQL与HDFS相互实战
Datax 实战使用 继上一篇 阿里开源数据同步神器DataX异构数据源间数据同步基础介绍与快速入门之后的实战篇 1.MySQL-To-HDFS 环境 & 准备说明: 描述: 为了快速搭建测试 ...
- datax 持续数据同步_Datax 数据同步
官方Hello入门例子{ "job": { "content": [ { "reader": { "name": &qu ...
- DataX 安装和使用
阿里云介绍: 1. 下载安装包.作为阿里主要的数据传输工具Datax,阿里已经完全开源到github上面了.下载地址(https://github.com/alibaba/DataX). 2. 安装环 ...
- datax底层原理_手把手实现Datax3.0中的传输通道
Datax的整体框架我们已经大体了解.这次来分析一下reader到writer中间数据的传输层. 这次采取另外一种方式,我们把代码抽取,自己实现一个通道 1-首先是定义一个接口代表传输的每一条数据pu ...
最新文章
- android图片压缩总结
- 读完这些论文和代码,你就能在搜狐算法大赛中获得好成绩了
- r语言中1c0怎么表示什么,r语言表示或者用什么符号?
- 现代软件工程 作业 团队第一个作业
- Linux chmod
- 大学计算机专业的实验室图片,清华大学计算机系人工智能实验室.PDF
- Centos7安装32位库用来安装32位软件程序
- 抓linux肉鸡教程视频,抓肉鸡的教程和软件免费分享(2018一天抓1000只电脑肉鸡视频)...
- 遗传算法解决TSP问题
- 什么是测试开发工程师(SET)?
- 【180620】小人物走路、奔跑的VC++游戏特效
- MySQL(管理员常用命令)
- 【Unity UGUI】屏幕坐标转换
- MavLink 库 c++环境搭建及解ADS-B消息教程
- 多线程- 让程序更高效的运行
- W3Cschool从零开始学C语言笔记(1-2)位、字节及排列组合
- 微信小程序ssm电影院购票+后台管理系统|前后分离VUE
- Buck电路设计之芯片选择---TI电源器件在电赛中的应用
- LeetCode 题库 全	JAVA 解题---771.宝石与石头
- BASE64编码的图片在网页中直接用
热门文章
- Spring-Boot 使用JSR-107集成EHCache3.x (配置Clustered以及DISK)
- CCF 金融信息负面及主体判定
- 联想bios怎么开启TPM2.0?
- Cadence Allegro异型焊盘走线(出线)在焊盘外怎么解决?
- Apache Hadoop Pig 源代码分析(2)
- java覆盖率怎么包含多个工程,在多项目工程中统计子工程的覆盖率
- matlab 圆锥投影,MATLAB 表达式 2*2^(3^2) 的结果是 ( )
- JAVA 水果机游戏及编码
- 百度推出阿波罗计划,自动驾驶进入开放时代了吗?
- 【C++】继承和派生、虚继承和虚基类、虚基类表和虚基类指针