flink网络缓冲池
Flink的网络缓冲池NetworkBufferPool。
构造方法需要两个参数,申请的MemorySegment个数和单个MemorySegment的大小,两者乘积是整个内存缓冲池的总容量。
内部存储结构是一个队列,未分配的MemorySegment存储在队列当中,当被分配时会从内存中取出。
try {this.availableMemorySegments = new ArrayBlockingQueue<>(numberOfSegmentsToAllocate);
}
catch (OutOfMemoryError err) {throw new OutOfMemoryError("Could not allocate buffer queue of length "+ numberOfSegmentsToAllocate + " - " + err.getMessage());
}try {for (int i = 0; i < numberOfSegmentsToAllocate; i++) {availableMemorySegments.add(MemorySegmentFactory.allocateUnpooledOffHeapMemory(segmentSize, null));}
}
以上是其构造方法中的主要部分。
NetworkBufferPool提供了createBufferPool()方法可以申请本地缓冲池。
this.numTotalRequiredBuffers += numRequiredBuffers;// We are good to go, create a new buffer pool and redistribute
// non-fixed size buffers.
LocalBufferPool localBufferPool =new LocalBufferPool(this, numRequiredBuffers, maxUsedBuffers, owner);allBufferPools.add(localBufferPool);
需要提供申请的MemorySegment数量和最大使用MemorySegment数量,这些参数也将作为LocalBufferPool的构造方法参数,之后将其存放在去Set中便于管理。
在建立完毕LocalBufferPool并放入容器当中之后会在createBufferPool()方法中通过redistributeBuffers()方法重新分配网络缓冲池中的内存分配。
final int numAvailableMemorySegment = totalNumberOfMemorySegments - numTotalRequiredBuffers;if (numAvailableMemorySegment == 0) {// in this case, we need to redistribute buffers so that every pool gets its minimumfor (LocalBufferPool bufferPool : allBufferPools) {bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments());}return;
}
如果当前,网络缓冲池中的内存正巧耗尽 ,那么将LocalBufferPool容器中的LocalBufferPool中的内存数量全部设为最小并返回。
long totalCapacity = 0; // long to avoid int overflowfor (LocalBufferPool bufferPool : allBufferPools) {int excessMax = bufferPool.getMaxNumberOfMemorySegments() -bufferPool.getNumberOfRequiredMemorySegments();totalCapacity += Math.min(numAvailableMemorySegment, excessMax);
}// no capacity to receive additional buffers?
if (totalCapacity == 0) {return; // necessary to avoid div by zero when nothing to re-distribute
}
之后遍历容器,依次计算剩余容量和 单个容器距离最大容量的差的 最小值之和为totalCapacity,为理想情况下每个LocalBufferPool可分配到的内存总量。
final int memorySegmentsToDistribute = MathUtils.checkedDownCast(Math.min(numAvailableMemorySegment, totalCapacity));long totalPartsUsed = 0; // of totalCapacity
int numDistributedMemorySegment = 0;
for (LocalBufferPool bufferPool : allBufferPools) {int excessMax = bufferPool.getMaxNumberOfMemorySegments() -bufferPool.getNumberOfRequiredMemorySegments();// shortcutif (excessMax == 0) {continue;}totalPartsUsed += Math.min(numAvailableMemorySegment, excessMax);// avoid remaining buffers by looking at the total capacity that should have been// re-distributed up until here// the downcast will always succeed, because both arguments of the subtraction are in the 'int' domainfinal int mySize = MathUtils.checkedDownCast(memorySegmentsToDistribute * totalPartsUsed / totalCapacity - numDistributedMemorySegment);numDistributedMemorySegment += mySize;bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments() + mySize);
}
之后如上,剩余的容量numAvailableMemorySegment和剩余达到最大容量的差totalCapacity两者中的小者也正是接下来需要分配的内存总量。
接下来的LocalBufferPool可多分配的内存量为单 个LocalBufferPool理想可分配内存 占 总理想分配 的比乘上 总可分配内存 最后减去已分配的内存的结果。
flink网络缓冲池相关推荐
- 95-846-820-源码-网络-Flink 网络传输优化技术
文章目录 1.概述 2.Flink 计算模型 Credit-based 数据流控制 重构 Task Thread 和 IO Thread 的协作模型 避免不必要的序列化和反序列化 Object Reu ...
- 原理解析 | 深入了解 Apache Flink 的网络协议栈
Flink 的网络协议栈是组成 flink-runtime 模块的核心组件之一,是每个 Flink 作业的核心.它连接所有 TaskManager 的各个子任务(Subtask),因此,对于 Flin ...
- 使用Flink时遇到的问题(不断更新中)
1.启动不起来 查看JobManager日志: WARN org.apache.flink.runtime.webmonitor.JobManagerRetriever - Failed to ret ...
- Flink的处理背压原理及问题-面试必备
点击上方"朱小厮的博客",选择"设为星标" 后台回复"书",获取 来源:r6a.cn/gtsJ 反压机制(BackPressure)被广泛应 ...
- 独家 | 一文读懂Apache Flink技术
作者:云邪 整理:李泽聚(Flink China社区志愿者) 校对:云邪 / 韩非(Flink China社区志愿者) 本文约6000字,建议阅读10+分钟. 本文为你详细介绍新一代大数据处理引擎Ap ...
- flink 三种时间机制_360深度实践:Flink 与 Storm 协议级对比
本文从数据传输和数据可靠性的角度出发,对比测试了 Storm 与 Flink 在流处理上的性能,并对测试结果进行分析,给出在使用 Flink 时提高性能的建议. Apache Storm.Apache ...
- Flink实时计算性能分析
作者:张馨予 本文从数据传输和数据可靠性的角度出发,对比测试了Storm与Flink在流处理上的性能,并对测试结果进行分析,给出在使用Flink时提高性能的建议. Apache Storm.Apach ...
- Apache Flink,流计算?不仅仅是流计算!
阿里妹导读:2018年12月下旬,由阿里巴巴集团主办的Flink Forward China在北京国家会议中心举行.Flink Forward是由Apache软件基金会授权的全球范围内的Flink技术 ...
- 360深度实践:Flink与Storm协议级对比
戳蓝字"CSDN云计算"关注我们哦! 文 | 张馨予 来源 | 高可用架构 作者 张馨予,360 大数据计算平台负责人.北京邮电大学硕士,2015年加入360系统部,一直 ...
最新文章
- yolov5 mobile 剪枝
- python3练习题:11-20
- WindowsPE 第五章 导出表编程-1(枚举导出表)
- 通过IFeatureClass 接口查询 IWorkspace, 查询通配符
- 中小企业团队敏捷产品开发流程最佳实践
- 互联网日报 | 6月18日 星期五 | 百度与极狐发布量产共享无人车;奈雪的茶预计6月30日登陆港交所;阿里云盘上线PC版...
- Redis分布式锁及分区
- 洛谷 3951 小凯的疑惑
- JavaScript打开窗口
- 轻量级网络——EfficientNet
- 测牛学堂:2023软件测试入门学习指南之测试方法完结总结
- 排队论和对策论(博弈论)
- 从零开始之驱动发开、linux驱动(二十五、framebuffer 子系统框架)
- 可以借助Studio Display 实现旧款Macbook上的“Hey Siri”功能
- Maven 跳过单元测试
- Mask R-CNN学习笔记
- EXCEL2016下将身份证号这列重复项用背景颜色突显
- BL-HUF35A-AV-TRB 电子元器件 BRIGHT 封装SMD 批次2021
- 运维开发面试常见问题汇总(一直在更新)
- jn-社团申请必须要上传图片
热门文章
- Spring Boot 第一个小程序之又来Hello World了
- Python 错误:ValueError: unsupported format character ‘Y‘ (0x59) at index 146
- Python学习笔记之列表切片(六)
- Linux文件操作命令(二)
- vue中根据搜索内容跳转到页面指定位置
- Storm对DRPC权限控制Version1.0.1
- ping 命令_命令PING背后发生了什么,记得Windows XP拒绝利用PING命令通信吗
- 共享内存 传一个类指针_大神是如何学习 Go 语言之为什么使用通信来共享内存...
- 【Keras】减少过拟合的秘诀——Dropout正则化
- UVa - 1617 - Laptop