flink中的HybirdmemorySegment
Flink使用MemorySegment来管理内存,同时也是flink中内存的抽象。MemorySegment的实现也分为HeapMemorySegment和HybirdMemorySegment。
其中,HeapMemorySegment实现很简单,数据通过其内部的byte[]数组来实现。
HybirdmemorySegment既可以使用堆内内存,也可以使用堆外内存。
如何来确认使用的是堆内内存还是堆外内存?使用堆内内存的存储在构造方法中直接传入一个byte数组,而使用堆外内存的情况下则是在其构造方法中传入一个java.nio的ByteBuffer来作为堆外内存的存储。
HybridMemorySegment(byte[] buffer, Object owner) {super(buffer, owner);this.offHeapBuffer = null;
}
HybridMemorySegment(ByteBuffer buffer, Object owner) {super(checkBufferAndGetAddress(buffer), buffer.capacity(), owner);this.offHeapBuffer = buffer;
}
来看其中的HybirdmemorySegment的put()方法。
@Override
public final void put(int offset, ByteBuffer source, int numBytes) {// check the byte array offset and lengthif ((offset | numBytes | (offset + numBytes)) < 0) {throw new IndexOutOfBoundsException();}final int sourceOffset = source.position();final int remaining = source.remaining();if (remaining < numBytes) {throw new BufferUnderflowException();}if (source.isDirect()) {// copy to the target memory directlyfinal long sourcePointer = getAddress(source) + sourceOffset;final long targetPointer = address + offset;if (targetPointer <= addressLimit - numBytes) {UNSAFE.copyMemory(null, sourcePointer, heapMemory, targetPointer, numBytes);source.position(sourceOffset + numBytes);}else if (address > addressLimit) {throw new IllegalStateException("segment has been freed");}else {throw new IndexOutOfBoundsException();}}else if (source.hasArray()) {// move directly into the byte arrayput(offset, source.array(), sourceOffset + source.arrayOffset(), numBytes);// this must be after the get() call to ensue that the byte buffer is not// modified in case the call failssource.position(sourceOffset + numBytes);}else {// neither heap buffer nor direct bufferwhile (source.hasRemaining()) {put(offset++, source.get());}}
}
这里这个三个参数的put()方法第代表将第二个参数的ByteBuffer作为数据源,将接下来第三个参数个数的byte数据放到以HybirdmemorySegment中第一个参数偏移量为起点的数据当中。
如果数据源的ByteBuffer占据的是堆外空间,那么将会直接通过getAddress()方法计算得到其指针,再根据其数据源buffer有效数据开始位置position得到数据源的源数据起始位置的指针,在根据HybirdmemorySegment中数据的第一个参数偏移量得到本身起始位置的指针,根据这些直接通过unsafe将堆外内存的数据拷贝到HybirdmemorySegment中的目标位置,然后相应的数据源buffer的position曾加相应的大小。
如果数据源是堆内内存,那么则是取出数据源的buffer的array数组,在以array数组为参数的put()方法中继续操作。
@Override
public final void put(int index, byte[] src, int offset, int length) {// check the byte array offset and lengthif ((offset | length | (offset + length) | (src.length - (offset + length))) < 0) {throw new IndexOutOfBoundsException();}final long pos = address + index;if (index >= 0 && pos <= addressLimit - length) {final long arrayAddress = BYTE_ARRAY_BASE_OFFSET + offset;UNSAFE.copyMemory(src, arrayAddress, heapMemory, pos, length);}else if (address > addressLimit) {throw new IllegalStateException("segment has been freed");}else {// index is in fact invalidthrow new IndexOutOfBoundsException();}
}
这里底层还是通过unsafe,但由于是堆内内存,其偏移量是一个相对偏移 ,以数据源在堆内内存为基准,不需要再同之前的一样得到一个指针。
那么,HybirdmemorySegment中得到bytebuffer指针的getAddress()方法是怎么实现的。
private static final Field ADDRESS_FIELD;static {try {ADDRESS_FIELD = java.nio.Buffer.class.getDeclaredField("address");ADDRESS_FIELD.setAccessible(true);}catch (Throwable t) {throw new RuntimeException("Cannot initialize HybridMemorySegment: off-heap memory is incompatible with this JVM.", t);}
}private static long getAddress(ByteBuffer buffer) {if (buffer == null) {throw new NullPointerException("buffer is null");}try {return (Long) ADDRESS_FIELD.get(buffer);}catch (Throwable t) {throw new RuntimeException("Could not access direct byte buffer address.", t);}
}
在HybirdmemorySegment中,有一个私有的静态成员,可以看到是通过反射机制得到的java.nio的buffer中的address字段,也就是buffer的指针字段,在getAddress()方法中,也就是根据目标buffer中的此字段得到目标的指针。
get()方法与put()方法类似。
flink中的HybirdmemorySegment相关推荐
- Flink从入门到精通100篇(二十一)-万字长文详解 Flink 中的 CopyOnWriteStateTable
前言 现如今想阅读 HashMap 源码实际上比较简单,因为网上一大堆博客去分析 HashMap 和 ConcurrentHashMap.本文详细分析 CopyOnWriteStateTable 源码 ...
- as点击发送广播_Apache Flink 中广播状态的实用指南
翻译 | 王柯凝 校对 | 邱从贤(山智) 自版本 Flink 1.5.0 以来,Apache Flink 提供了一种新的状态类型,称为广播状态(Broadcast State).在本文中,将解释什么 ...
- 《从0到1学习Flink》—— 介绍Flink中的Stream Windows
前言 目前有许多数据分析的场景从批处理到流处理的演变, 虽然可以将批处理作为流处理的特殊情况来处理,但是分析无穷集的流数据通常需要思维方式的转变并且具有其自己的术语(例如,"windowin ...
- flink中的WaterMark调研和具体实例
一些基本概念介绍: Event Time 事件时间是每个事件在其生产设备上发生的时间 Ingestion Time 摄取时间是数据进入Flink的时间 Processing Time 处理时间是是指正 ...
- Flink中的容错机制
1 checkpoint Flink 故障恢复机制的核心,就是应用状态的一致性检查点checkpoint. 在Spark Streaming中仅仅是针对driver的故障恢复做了数据和元数据 ...
- Flink中的状态管理
1 Flink中的状态 当数据流中的许多操作只查看一个每次事件(如事件解析器),一些操作会跨多个事件的信息(如窗口操作).这些操作称为有状态.状态由一个任务维护,并且用来计算某个结果的所有数据,都 ...
- 如何在 Flink 中规划 RocksDB 内存容量?
本文描述了一些配置选项,这些选项将帮助您有效地管理规划 Apache Flink 中 RocksDB state backend 的内存大小.在前面的文章[1]中,我们描述了 Flink 中支持的可选 ...
- 如何在 Apache Flink 中使用 Python API?
本文根据 Apache Flink 系列直播课程整理而成,由 Apache Flink PMC,阿里巴巴高级技术专家 孙金城 分享.重点为大家介绍 Flink Python API 的现状及未来规划, ...
- flink中akka的使用 以jobClient提交任务为例子
在flink中,集群内部的组件之间通过akka来互相通信,其中采用了akka中的actor模型. 当需要提交一个可用的任务交由jobManager来处理并分配资源时,将会在ClusterClinet中 ...
最新文章
- code blocks 快捷键
- 亲近自然的加州玻璃豪宅
- 利用flask将opencv实时视频流输出到浏览器
- java在记事本找不到_好烦,用记事本练习JDBC总是找不到类
- 根据工作时间计算小组成员得分
- CNN结构:用于检测的CNN结构进化-一站式方法
- jQuery 实现Ajax
- 二叉树的字符图形显示程序_每个程序员都必须知道的8种通用数据结构
- 《Linux设备驱动开发详解》学习笔记一
- pandas 选取第一行_用pandas中的DataFrame时选取行或列的方法
- wenbao 与将linux系统(kali)装入U盘
- MAC编译OpenJDK8:ld: library not found for -lstdc++(独家解决办法)
- Java IO流笔记4 --- File类
- 转载:《算法刷题LeetCode(中文版)》LeetCode题解,151道题完整版
- 无线网服务器拒绝连接,网络拒绝连接什么原因
- 2021-09-26 WPF上位机 45-关键帧动画
- Django之DRF自定义action
- ppt文件怎么压缩,ppt压缩的办法步骤
- 程序员生涯困惑时的自我解脱
- 小程序开发工具绑定服务器,微信小程序绑定到第三方平台流程
热门文章
- Struts2的标签概述
- BigInt:JavaScript 中的任意精度整数
- Java8新特性总结 - 4.方法引用
- 文件分片_怎样屏蔽QQ和微信外发文件,同时允许发送截图
- 初识贪心——调度问题
- 使用了未经检查或不安全的操作_违规操作就是对家庭的不负责!电气安全员提醒你的安全常识...
- 阿里大牛精心整理了46张PPT,教你弄懂JVM、GC算法和性能调优!
- 手机必备OCR文字识别软件:福昕扫描王使用攻略
- 计算机网络资料篇(一)——HTTP
- 利用react-router实现按需加载、登录验证、刷新组件。。。