Flink的网络缓冲池NetworkBufferPool。

构造方法需要两个参数,申请的MemorySegment个数和单个MemorySegment的大小,两者乘积是整个内存缓冲池的总容量。

内部存储结构是一个队列,未分配的MemorySegment存储在队列当中,当被分配时会从内存中取出。

try {this.availableMemorySegments = new ArrayBlockingQueue<>(numberOfSegmentsToAllocate);
}
catch (OutOfMemoryError err) {throw new OutOfMemoryError("Could not allocate buffer queue of length "+ numberOfSegmentsToAllocate + " - " + err.getMessage());
}try {for (int i = 0; i < numberOfSegmentsToAllocate; i++) {availableMemorySegments.add(MemorySegmentFactory.allocateUnpooledOffHeapMemory(segmentSize, null));}
}

以上是其构造方法中的主要部分。

NetworkBufferPool提供了createBufferPool()方法可以申请本地缓冲池。

this.numTotalRequiredBuffers += numRequiredBuffers;// We are good to go, create a new buffer pool and redistribute
// non-fixed size buffers.
LocalBufferPool localBufferPool =new LocalBufferPool(this, numRequiredBuffers, maxUsedBuffers, owner);allBufferPools.add(localBufferPool);

需要提供申请的MemorySegment数量和最大使用MemorySegment数量,这些参数也将作为LocalBufferPool的构造方法参数,之后将其存放在去Set中便于管理。

在建立完毕LocalBufferPool并放入容器当中之后会在createBufferPool()方法中通过redistributeBuffers()方法重新分配网络缓冲池中的内存分配。

final int numAvailableMemorySegment = totalNumberOfMemorySegments - numTotalRequiredBuffers;if (numAvailableMemorySegment == 0) {// in this case, we need to redistribute buffers so that every pool gets its minimumfor (LocalBufferPool bufferPool : allBufferPools) {bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments());}return;
}

如果当前,网络缓冲池中的内存正巧耗尽 ,那么将LocalBufferPool容器中的LocalBufferPool中的内存数量全部设为最小并返回。

long totalCapacity = 0; // long to avoid int overflowfor (LocalBufferPool bufferPool : allBufferPools) {int excessMax = bufferPool.getMaxNumberOfMemorySegments() -bufferPool.getNumberOfRequiredMemorySegments();totalCapacity += Math.min(numAvailableMemorySegment, excessMax);
}// no capacity to receive additional buffers?
if (totalCapacity == 0) {return; // necessary to avoid div by zero when nothing to re-distribute
}

之后遍历容器,依次计算剩余容量和 单个容器距离最大容量的差的 最小值之和为totalCapacity,为理想情况下每个LocalBufferPool可分配到的内存总量。

final int memorySegmentsToDistribute = MathUtils.checkedDownCast(Math.min(numAvailableMemorySegment, totalCapacity));long totalPartsUsed = 0; // of totalCapacity
int numDistributedMemorySegment = 0;
for (LocalBufferPool bufferPool : allBufferPools) {int excessMax = bufferPool.getMaxNumberOfMemorySegments() -bufferPool.getNumberOfRequiredMemorySegments();// shortcutif (excessMax == 0) {continue;}totalPartsUsed += Math.min(numAvailableMemorySegment, excessMax);// avoid remaining buffers by looking at the total capacity that should have been// re-distributed up until here// the downcast will always succeed, because both arguments of the subtraction are in the 'int' domainfinal int mySize = MathUtils.checkedDownCast(memorySegmentsToDistribute * totalPartsUsed / totalCapacity - numDistributedMemorySegment);numDistributedMemorySegment += mySize;bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments() + mySize);
}

之后如上,剩余的容量numAvailableMemorySegment和剩余达到最大容量的差totalCapacity两者中的小者也正是接下来需要分配的内存总量。

接下来的LocalBufferPool可多分配的内存量为单 个LocalBufferPool理想可分配内存 占 总理想分配 的比乘上 总可分配内存 最后减去已分配的内存的结果。

flink网络缓冲池相关推荐

  1. 95-846-820-源码-网络-Flink 网络传输优化技术

    文章目录 1.概述 2.Flink 计算模型 Credit-based 数据流控制 重构 Task Thread 和 IO Thread 的协作模型 避免不必要的序列化和反序列化 Object Reu ...

  2. 原理解析 | 深入了解 Apache Flink 的网络协议栈

    Flink 的网络协议栈是组成 flink-runtime 模块的核心组件之一,是每个 Flink 作业的核心.它连接所有 TaskManager 的各个子任务(Subtask),因此,对于 Flin ...

  3. 使用Flink时遇到的问题(不断更新中)

    1.启动不起来 查看JobManager日志: WARN org.apache.flink.runtime.webmonitor.JobManagerRetriever - Failed to ret ...

  4. Flink的处理背压​原理及问题-面试必备

    点击上方"朱小厮的博客",选择"设为星标" 后台回复"书",获取 来源:r6a.cn/gtsJ 反压机制(BackPressure)被广泛应 ...

  5. 独家 | 一文读懂Apache Flink技术

    作者:云邪 整理:李泽聚(Flink China社区志愿者) 校对:云邪 / 韩非(Flink China社区志愿者) 本文约6000字,建议阅读10+分钟. 本文为你详细介绍新一代大数据处理引擎Ap ...

  6. flink 三种时间机制_360深度实践:Flink 与 Storm 协议级对比

    本文从数据传输和数据可靠性的角度出发,对比测试了 Storm 与 Flink 在流处理上的性能,并对测试结果进行分析,给出在使用 Flink 时提高性能的建议. Apache Storm.Apache ...

  7. Flink实时计算性能分析

    作者:张馨予 本文从数据传输和数据可靠性的角度出发,对比测试了Storm与Flink在流处理上的性能,并对测试结果进行分析,给出在使用Flink时提高性能的建议. Apache Storm.Apach ...

  8. Apache Flink,流计算?不仅仅是流计算!

    阿里妹导读:2018年12月下旬,由阿里巴巴集团主办的Flink Forward China在北京国家会议中心举行.Flink Forward是由Apache软件基金会授权的全球范围内的Flink技术 ...

  9. 360深度实践:Flink与Storm协议级对比

    戳蓝字"CSDN云计算"关注我们哦! 文 |  张馨予     来源 | 高可用架构 作者 张馨予,360 大数据计算平台负责人.北京邮电大学硕士,2015年加入360系统部,一直 ...

最新文章

  1. yolov5 mobile 剪枝
  2. python3练习题:11-20
  3. WindowsPE 第五章 导出表编程-1(枚举导出表)
  4. 通过IFeatureClass 接口查询 IWorkspace, 查询通配符
  5. 中小企业团队敏捷产品开发流程最佳实践
  6. 互联网日报 | 6月18日 星期五 | 百度与极狐发布量产共享无人车;奈雪的茶预计6月30日登陆港交所;阿里云盘上线PC版...
  7. Redis分布式锁及分区
  8. 洛谷 3951 小凯的疑惑
  9. JavaScript打开窗口
  10. 轻量级网络——EfficientNet
  11. 测牛学堂:2023软件测试入门学习指南之测试方法完结总结
  12. 排队论和对策论(博弈论)
  13. 从零开始之驱动发开、linux驱动(二十五、framebuffer 子系统框架)
  14. 可以借助Studio Display 实现旧款Macbook上的“Hey Siri”功能
  15. Maven 跳过单元测试
  16. Mask R-CNN学习笔记
  17. EXCEL2016下将身份证号这列重复项用背景颜色突显
  18. BL-HUF35A-AV-TRB 电子元器件 BRIGHT 封装SMD 批次2021
  19. 运维开发面试常见问题汇总(一直在更新)
  20. jn-社团申请必须要上传图片

热门文章

  1. Spring Boot 第一个小程序之又来Hello World了
  2. Python 错误:ValueError: unsupported format character ‘Y‘ (0x59) at index 146
  3. Python学习笔记之列表切片(六)
  4. Linux文件操作命令(二)
  5. vue中根据搜索内容跳转到页面指定位置
  6. Storm对DRPC权限控制Version1.0.1
  7. ping 命令_命令PING背后发生了什么,记得Windows XP拒绝利用PING命令通信吗
  8. 共享内存 传一个类指针_大神是如何学习 Go 语言之为什么使用通信来共享内存...
  9. 【Keras】减少过拟合的秘诀——Dropout正则化
  10. UVa - 1617 - Laptop