摘要: Flink是jvm之上的大数据处理引擎。

Flink是jvm之上的大数据处理引擎,jvm存在java对象存储密度低、full gc时消耗性能,gc存在stw的问题,同时omm时会影响稳定性。同时针对频繁序列化和反序列化问题flink使用堆内堆外内存可以直接在一些场景下操作二进制数据,减少序列化反序列化的消耗。同时基于大数据流式处理的特点,flink定制了自己的一套序列化框架。flink也会基于cpu L1 L2 L3高速缓存的机制以及局部性原理,设计使用缓存友好的数据结构。flink内存管理和spark的tungsten的内存管理的出发点很相似。

内存模型

Flink可以使用堆内和堆外内存,内存模型如图所示:

flink使用内存划分为堆内内存和堆外内存。按照用途可以划分为task所用内存,network memory、managed memory、以及framework所用内存,其中task network managed所用内存计入slot内存。framework为taskmanager公用。

堆内内存包含用户代码所用内存、heapstatebackend、框架执行所用内存。

堆外内存是未经jvm虚拟化的内存,直接映射到操作系统的内存地址,堆外内存包含框架执行所用内存,jvm堆外内存、Direct、native等。

Direct memory内存可用于网络传输缓冲。network memory属于direct memory的范畴,flink可以借助于此进行zero copy,从而减少内核态到用户态copy次数,从而进行更高效的io操作。

jvm metaspace存放jvm加载的类的元数据,加载的类越多,需要的空间越大,overhead用于jvm的其他开销,如native memory、code cache、thread stack等。

Managed Memory主要用于RocksDBStateBackend和批处理算子,也属于native memory的范畴,其中rocksdbstatebackend对应rocksdb,rocksdb基于lsm数据结构实现,每个state对应一个列族,占有独立的writebuffer,rocksdb占用native内存大小为 blockCahe + writebufferNum * writeBuffer + index ,同时堆外内存是进程之间共享的,jvm虚拟化大量heap内存耗时较久,使用堆外内存的话可以有效的避免该环节。但堆外内存也有一定的弊端,即监控调试使用相对复杂,对于生命周期较短的segment使用堆内内存开销更低,flink在一些情况下,直接操作二进制数据,避免一些反序列化带来的开销。如果需要处理的数据超出了内存限制,则会将部分数据存储到硬盘上。

内存管理

类似于OS中的page机制,flink模拟了操作系统的机制,通过page来管理内存,flink对应page的数据结构为dataview和MemorySegment,memorysegment是flink内存分配的最小单位,默认32kb,其可以在堆上也可以在堆外,flink通过MemorySegment的数据结构来访问堆内堆外内存,借助于flink序列化机制(序列化机制会在下一小节讲解),memorysegment提供了对二进制数据的读取和写入的方法,flink使用datainputview和dataoutputview进行memorysegment的二进制的读取和写入,flink可以通过HeapMemorySegment 管理堆内内存,通过HybridMemorySegment来管理堆内和堆外内存,MemorySegment管理jvm堆内存时,其定义一个字节数组的引用指向内存端,基于该内部字节数组的引用进行操作的HeapMemorySegment。

public abstract class MemorySegment {/*** The heap byte array object relative to which we access the memory.*  如果为堆内存,则指向访问的内存的引用,否则若内存为非堆内存,则为null* <p>Is non-<tt>null</tt> if the memory is on the heap, and is <tt>null</tt>, if the memory is* off the heap. If we have this buffer, we must never void this reference, or the memory* segment will point to undefined addresses outside the heap and may in out-of-order execution* cases cause segmentation faults.*/protected final byte[] heapMemory;/*** The address to the data, relative to the heap memory byte array. If the heap memory byte* array is <tt>null</tt>, this becomes an absolute memory address outside the heap.* 字节数组对应的相对地址*/protected long address;
}

HeapMemorySegment用来分配堆上内存。

public final class HeapMemorySegment extends MemorySegment {/*** An extra reference to the heap memory, so we can let byte array checks fail by the built-in* checks automatically without extra checks.* 字节数组的引用指向该内存段*/private byte[] memory;public void free() {super.free();this.memory = null;}public final void get(DataOutput out, int offset, int length) throws IOException {out.write(this.memory, offset, length);}
}

HybridMemorySegment即支持onheap和offheap内存,flink通过jvm的unsafe操作,如果对象o不为null,为onheap的场景,并且后面的地址或者位置是相对位置,那么会直接对当前对象(比如数组)的相对位置进行操作。如果对象o为null,操作的内存块不是JVM堆内存,为off-heap的场景,并且后面的地址是某个内存块的绝对地址,那么这些方法的调用也相当于对该内存块进行操作。

public final class HybridMemorySegment extends MemorySegment {@Overridepublic ByteBuffer wrap(int offset, int length) {if (address <= addressLimit) {if (heapMemory != null) {return ByteBuffer.wrap(heapMemory, offset, length);}else {try {ByteBuffer wrapper = offHeapBuffer.duplicate();wrapper.limit(offset + length);wrapper.position(offset);return wrapper;}catch (IllegalArgumentException e) {throw new IndexOutOfBoundsException();}}}else {throw new IllegalStateException("segment has been freed");}}
}

flink通过MemorySegmentFactory来创建memorySegment,memorySegment是flink内存分配的最小单位。对于跨memorysegment的数据方位,flink抽象出一个访问视图,数据读取datainputView,数据写入dataoutputview。

/*** This interface defines a view over some memory that can be used to sequentially read the contents of the memory.* The view is typically backed by one or more {@link org.apache.flink.core.memory.MemorySegment}.*/
@Public
public interface DataInputView extends DataInput {
private MemorySegment[] memorySegments; // view持有的MemorySegment的引用, 该组memorysegment可以视为一个内存页,
flink可以顺序读取memorysegmet中的数据
/*** Reads up to {@code len} bytes of memory and stores it into {@code b} starting at offset {@code off}.* It returns the number of read bytes or -1 if there is no more data left.* @param b byte array to store the data to* @param off offset into byte array* @param len byte length to read* @return the number of actually read bytes of -1 if there is no more data left*/int read(byte[] b, int off, int len) throws IOException;
}

dataoutputview是数据写入的视图,outputview持有多个memorysegment的引用,flink可以顺序的写入segment。

/*** This interface defines a view over some memory that can be used to sequentially write contents to the memory.* The view is typically backed by one or more {@link org.apache.flink.core.memory.MemorySegment}.*/
@Public
public interface DataOutputView extends DataOutput {
private final List<MemorySegment> memory; // memorysegment的引用
/*** Copies {@code numBytes} bytes from the source to this view.* @param source The source to copy the bytes from.* @param numBytes The number of bytes to copy.void write(DataInputView source, int numBytes) throws IOException;
}

上一小节中讲到的managedmemory内存部分,flink使用memorymanager来管理该内存,managedmemory只使用堆外内存,主要用于批处理中的sorting、hashing、以及caching(社区消息,未来流处理也会使用到该部分),在流计算中作为rocksdbstatebackend的部分内存。memeorymanager通过memorypool来管理memorysegment。

/*** The memory manager governs the memory that Flink uses for sorting, hashing, caching or off-heap state backends* (e.g. RocksDB). Memory is represented either in {@link MemorySegment}s of equal size or in reserved chunks of certain* size. Operators allocate the memory either by requesting a number of memory segments or by reserving chunks.* Any allocated memory has to be released to be reused later.* <p>The memory segments are represented as off-heap unsafe memory regions (both via {@link HybridMemorySegment}).* Releasing a memory segment will make it re-claimable by the garbage collector, but does not necessarily immediately* releases the underlying memory.*/
public class MemoryManager {/*** Allocates a set of memory segments from this memory manager.* <p>The total allocated memory will not exceed its size limit, announced in the constructor.* @param owner The owner to associate with the memory segment, for the fallback release.* @param target The list into which to put the allocated memory pages.* @param numberOfPages The number of pages to allocate.* @throws MemoryAllocationException Thrown, if this memory manager does not have the requested amount*                                   of memory pages any more.*/public void allocatePages(Object owner,Collection<MemorySegment> target,int numberOfPages) throws MemoryAllocationException {
}private static void freeSegment(MemorySegment segment, @Nullable Collection<MemorySegment> segments) {segment.free();if (segments != null) {segments.remove(segment);}}
/*** Frees this memory segment.* <p>After this operation has been called, no further operations are possible on the memory* segment and will fail. The actual memory (heap or off-heap) will only be released after this* memory segment object has become garbage collected.*/public void free() {// this ensures we can place no more data and trigger// the checks for the freed segmentaddress = addressLimit + 1;}
}

对于上一小节中提到的NetWorkMemory的内存,flink使用networkbuffer做了一层buffer封装。buffer的底层也是memorysegment,flink通过bufferpool来管理buffer,每个taskmanager都有一个netwokbufferpool,该tm上的各个task共享该networkbufferpool,同时task对应的localbufferpool所需的内存需要从networkbufferpool申请而来,它们都是flink申请的堆外内存。

上游算子向resultpartition写入数据时,申请buffer资源,使用bufferbuilder将数据写入memorysegment,下游算子从resultsubpartition消费数据时,利用bufferconsumer从memorysegment中读取数据,bufferbuilder与bufferconsumer一一对应。同时这一流程也和flink的反压机制相关。如图

/*** A buffer pool used to manage a number of {@link Buffer} instances from the* {@link NetworkBufferPool}.* <p>Buffer requests are mediated to the network buffer pool to ensure dead-lock* free operation of the network stack by limiting the number of buffers per* local buffer pool. It also implements the default mechanism for buffer* recycling, which ensures that every buffer is ultimately returned to the* network buffer pool.* <p>The size of this pool can be dynamically changed at runtime ({@link #setNumBuffers(int)}. It* will then lazily return the required number of buffers to the {@link NetworkBufferPool} to* match its new size.*/
class LocalBufferPool implements BufferPool {
@Nullableprivate MemorySegment requestMemorySegment(int targetChannel) throws IOException {MemorySegment segment = null;synchronized (availableMemorySegments) {returnExcessMemorySegments();if (availableMemorySegments.isEmpty()) {segment = requestMemorySegmentFromGlobal();}// segment may have been released by buffer pool ownerif (segment == null) {segment = availableMemorySegments.poll();}if (segment == null) {availabilityHelper.resetUnavailable();}if (segment != null && targetChannel != UNKNOWN_CHANNEL) {if (subpartitionBuffersCount[targetChannel]++ == maxBuffersPerChannel) {unavailableSubpartitionsCount++;availabilityHelper.resetUnavailable();}}}return segment;}}/*** A result partition for data produced by a single task.** <p>This class is the runtime part of a logical {@link IntermediateResultPartition}. Essentially,* a result partition is a collection of {@link Buffer} instances. The buffers are organized in one* or more {@link ResultSubpartition} instances, which further partition the data depending on the* number of consuming tasks and the data {@link DistributionPattern}.* <p>Tasks, which consume a result partition have to request one of its subpartitions. The request* happens either remotely (see {@link RemoteInputChannel}) or locally (see {@link LocalInputChannel})The life-cycle of each result partition has three (possibly overlapping) phases:Produce  Consume  Release  Buffer management State management*/
public abstract class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {@Overridepublic BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {checkInProduceState();return bufferPool.requestBufferBuilderBlocking(targetChannel);}}
}

自定义序列化框架

flink对自身支持的基本数据类型,实现了定制的序列化机制,flink数据集对象相对固定,可以只保存一份schema信息,从而节省存储空间,数据序列化就是java对象和二进制数据之间的数据转换,flink使用TypeInformation的createSerializer接口负责创建每种类型的序列化器,进行数据的序列化反序列化,类型信息在构建streamtransformation时通过typeextractor根据方法签名类信息等提取类型信息并存储在streamconfig中。

/*** Creates a serializer for the type. The serializer may use the ExecutionConfig* for parameterization.* 创建出对应类型的序列化器* @param config The config used to parameterize the serializer.* @return A serializer for this type.*/@PublicEvolvingpublic abstract TypeSerializer<T> createSerializer(ExecutionConfig config);
/*** A utility for reflection analysis on classes, to determine the return type of implementations of transformation* functions.*/
@Public
public class TypeExtractor {
/*** Creates a {@link TypeInformation} from the given parameters.* If the given {@code instance} implements {@link ResultTypeQueryable}, its information* is used to determine the type information. Otherwise, the type information is derived* based on the given class information.* @param instance         instance to determine type information for* @param baseClass           base class of {@code instance}* @param clazz              class of {@code instance}* @param returnParamPos  index of the return type in the type arguments of {@code clazz}* @param <OUT>               output type* @return type information*/@SuppressWarnings("unchecked")@PublicEvolvingpublic static <OUT> TypeInformation<OUT> createTypeInfo(Object instance, Class<?> baseClass, Class<?> clazz,int returnParamPos) {if (instance instanceof ResultTypeQueryable) {return ((ResultTypeQueryable<OUT>) instance).getProducedType();} else {return createTypeInfo(baseClass, clazz, returnParamPos, null, null);}}
}

对于嵌套的数据类型,flink从最内层的字段开始序列化,内层序列化的结果将组成外层序列化结果,反序列时,从内存中顺序读取二进制数据,根据偏移量反序列化为java对象。flink自带序列化机制存储密度很高,序列化对应的类型值即可。

flink中的table模块在memorysegment的基础上使用了BinaryRow的数据结构,可以更好地减少反序列化开销,需要反序列化是可以只序列化相应的字段,而无需序列化整个对象。

同时你也可以注册子类型和自定义序列化器,对于flink无法序列化的类型,会交给kryo进行处理,如果kryo也无法处理,将强制使用avro来序列化,kryo序列化性能相对flink自带序列化机制较低,开发时可以使用env.getConfig().disableGenericTypes()来禁用kryo,尽量使用flink框架自带的序列化器对应的数据类型。

缓存友好的数据结构

cpu中L1、L2、L3的缓存读取速度比从内存中读取数据快很多,高速缓存的访问速度是主存的访问速度的很多倍。另外一个重要的程序特性是局部性原理,程序常常使用它们最近使用的数据和指令,其中两种局部性类型,时间局部性指最近访问的内容很可能短期内被再次访问,空间局部性是指地址相互临近的项目很可能短时间内被再次访问。

结合这两个特性设计缓存友好的数据结构可以有效的提升缓存命中率和本地化特性,该特性主要用于排序操作中,常规情况下一个指针指向一个<key,v>对象,排序时需要根据指针pointer获取到实际数据,然后再进行比较,这个环节涉及到内存的随机访问,缓存本地化会很低,使用序列化的定长key + pointer,这样key就会连续存储到内存中,避免的内存的随机访问,还可以提升cpu缓存命中率。对两条记录进行排序时首先比较key,如果大小不同直接返回结果,只需交换指针即可,不用交换实际数据,如果相同,则比较指针实际指向的数据。

后记

flink社区已走向流批一体的发展,后继将更多的关注与流批一体的引擎实现及结合存储层面的实现。flink服务请使用华为云 EI DLI-FLINK serverless服务。

参考

[1]: https://ci.apache.org/projects/flink/flink-docs-stable/

[2]: https://github.com/apache/flink

[3]: https://ververica.cn/

点击关注,第一时间了解华为云新鲜技术~

一文带你彻底了解大数据处理引擎Flink内存管理相关推荐

  1. 一文带你看懂 MySQL 存储引擎

    本文目录: 1.MySQL体系结构 2.存储引擎介绍 3.MySQL 存储引擎特性 4.MySQL 有哪些存储引擎 5.了解 MySQL 数据存储方式 6.MySQL存储引擎介绍 6.1 CSV存储引 ...

  2. 手把手教你搭建实时大数据引擎FLINK

    手把手教你搭建实时大数据引擎FLINK 服务器规划 Standalone高可用HA模式 架构图 下载并上传tar包 具体安装步骤 yarm 集群环境搭建 服务器规划 服务器规划 服务名称 职能 zhe ...

  3. 第三代大数据处理方案Flink

    Apache Flink Flink作为第三代流计算引擎,同采取了DAG Stage拆分的思想构建了存粹的流计算框架.被人们称为第三代大数据处理方案.该计算框架和Spark设计理念出发点恰好相反. S ...

  4. 下一代大数据处理引擎,阿里云实时计算独享模式重磅发布

    11月14日,阿里云重磅发布了实时计算独享模式,即用户独享一部分物理资源,这部分资源在网络/磁盘/CPU/内存等资源上跟其他用户完全独立,是实时计算在原有共享模式基础上的重大升级. 独享模式优点更加突 ...

  5. 下一代大数据处理引擎,阿里云实时计算独享模式重磅发布 1

    摘要: 11月14日,阿里云重磅发布了实时计算独享模式,即用户独享一部分物理资源,这部分资源在网络/磁盘/CPU/内存等资源上跟其他用户完全独立,是实时计算在原有共享模式基础上的重大升级.(观看实时计 ...

  6. 新一代大数据处理引擎 Apache Flink

    这几年大数据的飞速发展,出现了很多热门的开源社区,其中著名的有 Hadoop.Storm,以及后来的 Spark,他们都有着各自专注的应用场景.Spark 掀开了内存计算的先河,也以内存为赌注,赢得了 ...

  7. 深度解读!新一代大数据引擎Flink厉害在哪?(附实现原理细节)

    导语 | 大数据计算分为离线计算和实时计算,其中离线计算就是我们通常说的批计算,代表技术是Hadoop MapReduce.Hive等:实时计算也被称作流计算,代表技术是Storm.Spark Str ...

  8. 一文读懂JVM虚拟机:JVM虚拟机的内存管理(万字详解)

    JVM虚拟机的内存管理 文章目录 JVM虚拟机的内存管理 JVM与操作系统 Java虚拟机规范和 Java 语言规范的关系 java虚拟机的内存管理 JVM整体架构 一.PC 程序计数器 二.虚拟机栈 ...

  9. 医疗保健、零售、金融、制造业……一文带你看懂大数据对工业领域的影响!...

    作者 | Zubair Hassan 译者 | 风车云马 责编 | 徐威龙 封图| CSDN 下载于视觉中国 随着大数据技术的兴起,工业领域在很大程度上发生了变化.智能手机和其他通讯方式的使用迅速增加 ...

最新文章

  1. matlab模拟三体运动_从灯泡到超级计算机,如何模拟浩瀚星空?| 赛先生
  2. mysql匹配数据结构_MySQL索引背后的数据结构及原理
  3. Enterprise search debugging via test report in AG3
  4. api报错 javaee maven_JavaEE关于Maven的配置与学习
  5. 数学界的花木兰——苏菲﹒热尔曼
  6. 关于使用 Python 析构函数的正确姿势
  7. 仅用38天 2021年快递业务量已超100亿件
  8. 解决Win10中WerFault错误报告问题
  9. java线程--object.waitobject.notify
  10. java敏感词汇过滤工具类
  11. 企业数据防泄漏解决方案的介绍!
  12. 葫芦兄弟java7723_颠峰对决之喜羊羊大战葫芦娃
  13. 戴尔灵越系列服务器是什么,戴尔灵越系列哪个好-2021戴尔灵越系列型号选购推荐...
  14. C语言经典问题——兑换硬币
  15. 投递问题--图论--ACM算法
  16. WPF之布局属性HorizontalAlignment、HorizontalContentAlignment、VertialAlignment、VerticalContentAlignment
  17. NNI 2 用于实例
  18. Android 注入 看雪
  19. php 正则车架号,iOS 车架号、船舶号等正则【原创】
  20. NFC的读写卡模式——前台调度系统

热门文章

  1. ES6规格之数组的空位
  2. oracle数据库中的系统自带表情_Oracle数据库自带表空间的详细说明
  3. matlab圆形器件,计算围绕点+ Matlab的圆形箱
  4. dbcc dbreindex server sql_SQL Server性能的提高,可通过DBCC DBREINDEX重建索引
  5. trunk口_什么是Trunk?Trunk详解
  6. 33、JSONP跨域
  7. 开发MIS系统需要的技术及其含义、作用
  8. Python中的字符串与字符编码
  9. JFreeChart的简单应用及乱码解决
  10. Uni2D Unity4.3 2D Skeletal Animation