【Flink状态】FsStateBackend 下 ValueState > MapState

背景:

对程序进行状态后端替换(Rocks —> Fs)时,程序产生了背压。(状态开启了TTL)

分析办法:

利用Arthas生成CPU采样火焰图,分析是否存在性能瓶颈。

分析过程

发现问题

CPU火焰图

明显看出来,程序在处理MapState时,进行TTL处理时,花费了大量时间,成为了性能瓶颈。

程序主要处理逻辑(已模糊化):1、将用户按用户组KeyBy(通过用户ID取余1000);2、利用MapState存储状态,该状态存储了多个用户数据;(1万用户/Key)
源码分析
MapState的底层对象。


如图,无论是Fs还是RocksDB,都采用TtlMapState封装。
当对MapState进行读取, 通过getWrapped拿到封装后的TtlValue返回,里面包含userValue、lastAccessTimestamp,即用户存储的状态,以及最后一次访问时间(用于判断是否过期);

org.apache.flink.runtime.state.ttl.TtlMapState@Overridepublic UV get(UK key) throws Exception {TtlValue<UV> ttlValue = getWrapped(key);return ttlValue == null ? null : ttlValue.getUserValue();}private TtlValue<UV> getWrapped(UK key) throws Exception {accessCallback.run();return getWrappedWithTtlCheckAndUpdate(() -> original.get(key), v -> original.put(key, v), () -> original.remove(key));}

TTL处理的主要逻辑就在getWrapped。
其中getWrappedWithTtlCheckAndUpdate,逻辑:对指定Key数据进行过期删除及返回状态,同时对Key的TTL进行更新(读是否更新取决于是否配置StateTtlConfig.UpdateType.OnReadAndWrite)。

org.apache.flink.runtime.state.ttl.AbstractTtlDecorator<SE extends Throwable, CE extends Throwable, CLE extends Throwable, V> TtlValue<V> getWrappedWithTtlCheckAndUpdate(SupplierWithException<TtlValue<V>, SE> getter,ThrowingConsumer<TtlValue<V>, CE> updater,ThrowingRunnable<CLE> stateClear) throws SE, CE, CLE {TtlValue<V> ttlValue = getter.get();if (ttlValue == null) {return null;} else if (expired(ttlValue)) {stateClear.run(); // 执行删除 () -> original.remove(key)if (!returnExpired) { // 若配置了不返回过期状态,则会直接返回nullreturn null;}} else if (updateTsOnRead) {updater.accept(rewrapWithNewTs(ttlValue));}return ttlValue;}

每次获取数据时必定会调用put方法,将状态放入(无论读写)。

org.apache.flink.runtime.state.ttl.TtlMapState@Overridepublic void put(UK key, UV value) throws Exception {accessCallback.run();original.put(key, wrapWithTs(value));}

即无论调用get还是put,都会调用accessCallback.run(),其中get会调用2次。
accessCallback是算子在状态初始化时进行赋值的。

org.apache.flink.streaming.api.operators.StreamingRuntimeContext@Overridepublic <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties);stateProperties.initializeSerializerUnlessSet(getExecutionConfig());return keyedStateStore.getMapState(stateProperties);}

Fs/RocksDB都是通过AbstractKeyedStateBackend#getPartitionedState获取State, 其再调用TtlStateFactory.createStateAndWrapWithTtlIfEnabled获取

org.apache.flink.runtime.state.ttl.TtlStateFactory
// 初始化各类状态的创建工厂类
private Map<Class<? extends StateDescriptor>, SupplierWithException<IS, Exception>> createStateFactories() {return Stream.of(Tuple2.of(ValueStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createValueState),Tuple2.of(ListStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createListState),Tuple2.of(MapStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createMapState),Tuple2.of(ReducingStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createReducingState),Tuple2.of(AggregatingStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createAggregatingState)).collect(Collectors.toMap(t -> t.f0, t -> t.f1));}private <UK, UV> IS createMapState() throws Exception {MapStateDescriptor<UK, UV> mapStateDesc = (MapStateDescriptor<UK, UV>) stateDesc;MapStateDescriptor<UK, TtlValue<UV>> ttlDescriptor = new MapStateDescriptor<>(stateDesc.getName(),mapStateDesc.getKeySerializer(),new TtlSerializer<>(LongSerializer.INSTANCE, mapStateDesc.getValueSerializer()));return (IS) new TtlMapState<>(createTtlStateContext(ttlDescriptor));}private <OIS extends State, TTLS extends State, V, TTLV> TtlStateContext<OIS, V>createTtlStateContext(StateDescriptor<TTLS, TTLV> ttlDescriptor) throws Exception {ttlDescriptor.enableTimeToLive(stateDesc.getTtlConfig()); // also used by RocksDB backend for TTL compaction filter configOIS originalState = (OIS) stateBackend.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory());return new TtlStateContext<>(originalState, ttlConfig, timeProvider, (TypeSerializer<V>) stateDesc.getSerializer(),registerTtlIncrementalCleanupCallback((InternalKvState<?, ?, ?>) originalState));}private Runnable registerTtlIncrementalCleanupCallback(InternalKvState<?, ?, ?> originalState) {StateTtlConfig.IncrementalCleanupStrategy config =ttlConfig.getCleanupStrategies().getIncrementalCleanupStrategy();boolean cleanupConfigured = config != null && incrementalCleanup != null;boolean isCleanupActive = cleanupConfigured &&isStateIteratorSupported(originalState, incrementalCleanup.getCleanupSize());Runnable callback = isCleanupActive ? incrementalCleanup::stateAccessed : () -> { }; // 注意,就是这儿了if (isCleanupActive && config.runCleanupForEveryRecord()) {stateBackend.registerKeySelectionListener(stub -> callback.run());}return callback;}

至此,追溯到了callback的赋值流程,isCleanupActive 决定了callback的真正实现是incrementalCleanup::stateAccessed还是{}。

org.apache.flink.runtime.state.ttl.TtlStateFactoryprivate boolean isStateIteratorSupported(InternalKvState<?, ?, ?> originalState, int size) {boolean stateIteratorSupported = false;try {// 不同的状态实现有差异stateIteratorSupported = originalState.getStateIncrementalVisitor(size) != null;} catch (Throwable t) {// ignore}return stateIteratorSupported;}

FsStateBackend 的 originalState 是 HeapMapState,其具体实现返回StateEntryIterator对象,即callback为incrementalCleanup::stateAccessed

public StateIncrementalVisitor<K, N, S> getStateIncrementalVisitor(int recommendedMaxNumberOfReturnedRecords) {return new StateEntryIterator(recommendedMaxNumberOfReturnedRecords);}

stateAccessed的具体实现:

org.apache.flink.runtime.state.ttl.TtlIncrementalCleanupvoid stateAccessed() {initIteratorIfNot();try {runCleanup();} catch (Throwable t) {throw new FlinkRuntimeException("Failed to incrementally clean up state with TTL", t);}}private void runCleanup() {int entryNum = 0;Collection<StateEntry<K, N, S>> nextEntries;while (entryNum < cleanupSize &&stateIterator.hasNext() &&!(nextEntries = stateIterator.nextEntries()).isEmpty()) {for (StateEntry<K, N, S> state : nextEntries) {// 获取所有非null和未过期的状态S cleanState = ttlState.getUnexpiredOrNull(state.getState());if (cleanState == null) {stateIterator.remove(state);} else if (cleanState != state.getState()) {stateIterator.update(state, cleanState);}}entryNum += nextEntries.size();}}org.apache.flink.runtime.state.ttl.TtlMapState@Nullable@Overridepublic Map<UK, TtlValue<UV>> getUnexpiredOrNull(@Nonnull Map<UK, TtlValue<UV>> ttlValue) {Map<UK, TtlValue<UV>> unexpired = new HashMap<>();TypeSerializer<TtlValue<UV>> valueSerializer = ((MapSerializer<UK, TtlValue<UV>>) original.getValueSerializer()).getValueSerializer();for (Map.Entry<UK, TtlValue<UV>> e : ttlValue.entrySet()) {if (!expired(e.getValue())) {// we have to do the defensive copy to update the valueunexpired.put(e.getKey(), valueSerializer.copy(e.getValue()));}}return ttlValue.size() == unexpired.size() ? ttlValue : unexpired;}
问题结论

从源码能看出,每次读写都会执行一次状态清理。也就是针对FsStateBackend当前key只要进行MapState的访问, 就会对所有的value进行遍历。

举例:若Key: userGroup1下有1万个用户,MapState有1万个entries, 只要用户组来一条数据,产生一次访问,就要对整个MapState进行一次遍历清理,这个操作在状态较大的情况下是相当繁重的。

至此,结论与火焰图相吻合。

解决办法

既然是因为单个key存储的MapState元素过多,那解决办法理所应当想到让单个key存储的State变小。
两种方案:
1、(不推荐)减小用户组,让每个用户组用户数量控制较少,该方法治标不治本;
2、(推荐)改用ValueState;

ValueState 分析

ValueState和MapState的流程较为类似,其读写同样会调用accessCallback.run(), 已经进行TTL更新和状态删除。

org.apache.flink.runtime.state.ttl.TtlValueState@Overridepublic T value() throws IOException {accessCallback.run();return getWithTtlCheckAndUpdate(original::value, original::update);}@Overridepublic void update(T value) throws IOException {accessCallback.run();original.update(wrapWithTs(value));}

ValueState较为不同的地方,在于getUnexpiredOrNull直接对value做TTL处理即可。

@Nullable@Overridepublic TtlValue<T> getUnexpiredOrNull(@Nonnull TtlValue<T> ttlValue) {return expired(ttlValue) ? null : ttlValue;}

拓展

RocksDBStateBackend为何没有这个问题?

回到初始化状态,生成 callback那一天。

boolean isCleanupActive = cleanupConfigured &&isStateIteratorSupported(originalState, incrementalCleanup.getCleanupSize());
Runnable callback = isCleanupActive ? incrementalCleanup::stateAccessed : () -> { };

isStateIteratorSupported这个方法依赖于MapState的具体实现。

private boolean isStateIteratorSupported(InternalKvState<?, ?, ?> originalState, int size) {boolean stateIteratorSupported = false;try {stateIteratorSupported = originalState.getStateIncrementalVisitor(size) != null;} catch (Throwable t) {// ignore}return stateIteratorSupported;
}

前文提到,RockDB的TtlMapState封装了RocksDBMapState,其的具体实现如下:

 @Overridepublic StateIncrementalVisitor<K, N, V> getStateIncrementalVisitor(int recommendedMaxNumberOfReturnedRecords) {throw new UnsupportedOperationException("Global state entry iterator is unsupported for RocksDb backend");}

也就是isCleanupActive为false,callback为() -> { },不会在读写前对整个MapState进行遍历,故而未发生这个问题。

总结

FsStateBackend 使用状态时,若非业务需要,尽量使用ValueState。必须要使用MapState的场景,注意控制每个key的MapState的entries数目。

【Flink状态】FsStateBackend 下 ValueState > MapState相关推荐

  1. Flink教程(13) Keyed State状态管理之ValueState的使用 温差报警

    Keyed State状态管理之ValueState的使用 温差报警 系列文章 一.ValueState的方法 二.实验案例 1. 温度Bean 2. 将字符串映射成SensorRecord对象 3. ...

  2. Flink状态管理与CheckPoint、Savepoint

    转载自:https://blog.csdn.net/hxcaifly/article/details/84673292     https://blog.csdn.net/rlnLo2pNEfx9c/ ...

  3. Flink 状态管理与 Checkpoint 机制

    点击上方"zhisheng",选择"设为星标" 一.状态分类 相对于其他流计算框架,Flink 一个比较重要的特性就是其支持有状态计算.即你可以将中间的计算结果 ...

  4. 【推荐实践】Flink 状态(State)管理在推荐场景中的应用

    导语 Flink 提供了灵活丰富的状态管理,可轻松解决数据之间的关联性.本文介绍了Flink 状态(State)管理在推荐场景中的应用,大家结合自己的应用场景与业务逻辑,选择合适的状态管理. 背景 F ...

  5. Flink状态后端配置(设置State Backend)

    Flink提供不同的状态后端(state backends)来区分状态的存储方式和存储位置.flink状态可以存储在java堆内存内或者内存之外.通过状态后端的设置,flink允许应用保持大容量的状态 ...

  6. Flink状态管理和容错机制介绍

    作者: 施晓罡 本文来自2018年8月11日在北京举行的 Flink Meetup会议,分享来自于施晓罡,目前在阿里大数据团队部从事Blink方面的研发,现在主要负责Blink状态管理和容错相关技术的 ...

  7. 【Flink】Flink状态的缩放(rescale)与键组(Key Group)设计

    1.概述 转载:Flink状态的缩放(rescale)与键组(Key Group)设计 这个东东还没了解过.来看一下. 在之前那篇讲解Flink Timer的文章里,我曾经用三言两语简单解释了Key ...

  8. 聊聊flink的FsStateBackend

    序 本文主要研究一下flink的FsStateBackend StateBackend flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/r ...

  9. Flink状态一致性检查点

    Flink状态一致性检查点 一致性检查点:是指在某一个时刻所有算子将同一个任务都完成的情况下进行的一个快照(方便后续计算出错时,提供一个数据恢复的快照).Flink有状态的流处理,内部每个算子任务都可 ...

最新文章

  1. php require_once 不起作用,关于php:require_once()或die()无法正常工作
  2. ASP3.0给我们带来的新技术之一---DataShaping技术
  3. 三维空间长度温度数量_罗斯蒙特644温度变送器怎样接线?
  4. WinCE内核裁减(中文字体)及字库和内核的分离(转)
  5. 计算机联锁仿真软件设计,一种基于LabVIEW的计算机联锁仿真系统的制作方法
  6. 关于JTAG——韦东山嵌入式Linux视频学习笔记02
  7. opencv学习笔记11:图像滤波(均值,方框,高斯,中值)
  8. zookeeper windows 下安装
  9. 1)C++对象大小计算
  10. python数据挖掘学习笔记】十三.WordCloud词云配置过程及词频分析
  11. 使用实体框架返回数据表
  12. php简介的编辑器,推荐几款功能强大的PHP编辑器
  13. (博主可帮找错)Servlet.service() for servlet [dispatcherServlet] path [] threw exception feign.Feig,可截图私聊博主
  14. js 前端导出报错 格式不正确_js-xlsx 实现前端 Excel 导出(支持多 sheet)
  15. robotium3.6与4.0以后的区别
  16. 计算机休眠状态和关,win7系统关于睡眠和休眠这两种状态的区别
  17. idea 报错Process finished with exit code 1
  18. 实现树莓派控制电机的运转
  19. GAP:Learning Contextual Representations for Semantic Parsing with Generation-Augmented Pre-Training
  20. SLI、SLO和SLA

热门文章

  1. Ableton Live 11 Suite for mac,音乐制作工具
  2. acer计算机配置,acer电脑设置u盘启动方法
  3. java计算机毕业设计的问卷调查系统设计与实现源程序+mysql+系统+lw文档+远程调试
  4. 企业计算机审计实例分析,计算机审计工效挂钩企业实例.pdf
  5. 一刷77-回溯-78子集(m)(剑指 Offer II 079. 所有子集)
  6. 高速电路设计基本概念之——stitching via/aggressor via
  7. Python读取xlsx表格并转换成Python列表
  8. Python读取xlsx表格并转换成Python列表,简单可行
  9. mfc webbrowser判断网页加载完成
  10. nuc-oj-最终圣战