1. 状态函数的实现

  • 状态函数通过运行上下文存储和访问状态
  • 键状态类似于分布式Map 每个状态函数实例维护一段范围的键状态
  • 使用键状态的状态函数必须应用于KeyedStream(已按键分区后的流)
  • 键状态类型 包括单值 列表 Map和聚合类型
1.1 在RuntimeContext中定义键状态(keyed State)
static class StateMachineMapper extends RichFlatMapFunction<Event, Alert> {/** The state for the current key. */private ValueState<Integer> currentState;@Overridepublic void open(Configuration conf) {// get access to the state objectcurrentState = getRuntimeContext().getState(new ValueStateDescriptor<>("state", Integer.class));}@Overridepublic void flatMap(Event evt, Collector<Alert> out) throws Exception {// get the current state for the key (source address)// if no state exists, yet, the state must be the state machine's initial stateInteger state = currentState.value();if(state==null){currentState.update(1);}else {System.out.println("key: "+evt.sourceAddress()+" state:"+state);currentState.update(state + 1);}}
}
1.2 在用户函数中实现算子状态
  • 算子状态(operator state)维护在每个单独的算子实例中
  • 算子状态包括List State,List Union State和BroadCast State
  • 用户函数通过实现ListCheckpointed接口来操作List State算子状态
static class StateMachineMapper extends RichFlatMapFunction<Event, Alert> implements ListCheckpointed<Integer> {/** The state for the current key. */private Integer currentState=0;@Overridepublic void flatMap(Event evt, Collector<Alert> out) throws Exception {// get the current state for the key (source address)// if no state exists, yet, the state must be the state machine's initial stateSystem.out.println(currentState);currentState=currentState+1;}
//Flink运行检查点时会执行该方法 对状态进行存储@Overridepublic List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {return Lists.newArrayList(currentState);}
//当作业启动或失败时会执行该方法用于状态的初始化@Overridepublic void restoreState(List<Integer> state) throws Exception {currentState=state.get(0);}
}
  • 算子状态类型为List结构是用于应对状态算子并行度的改变 当增加或减少状态算子并行度时 那算子状态就需要在并行实例中进行重分配 这需要要求能够合并或分割算子状态
  • Broadcast State算子状态是能够在所有状态算子间共享的状态
  • 用户函数通过继承CheckpointedFunction接口可同时操作键状态和算子状态
  • 用户函数通过继承CheckpointListener接口获取所有状态算子完成将其状态回写远程存储的通知

2.状态应用的鲁棒和性能

  • 状态后端和检查点算法的选择影响状态应用的鲁棒和性能
2.1 状态后端(state backend)
  • 状态后端负责维护每个算子实例的状态 且当检查点运行时负责将状态发送给远程持久化存储设备
  • 状态后端是插件化实现的 Flink提供三种状态后端实现 包括基于内存 基于磁盘和基于RocksDB
  • StateBackend是用于实现用户自定义状态后端的接口
//配置RocksDBStateBackend为Flink应用的状态后端
final String checkpointDir = params.get("checkpoint-dir");
boolean incrementalCheckpoints = params.getBoolean("incremental-checkpoints", false);
env.setStateBackend(new RocksDBStateBackend(checkpointDir, incrementalCheckpoints));
2.2 检查点(Checkpointing)开启
  • 流应用失败不应该影响计算正确性
  • 流应用失败不应该丢失状态 因为其可能是不可恢复的
  • 检查点机制指的是在流应用运行的某个时间点 对应用中所有内置状态和状态函数进行快照
  • 检查点机制和状态恢复机制保证对流应用的状态的有且仅有一次的一致性保证
  • 检查点开启需要设置一个运行周期 决定正常流处理中检查点运行的开销和失败后恢复的时间
val env = StreamExecutionEnvironment.getExecutionEnvironment
// set checkpointing interval to 10 seconds (10000 milliseconds)
env.enableCheckpointing(10000L)
2.3 状态算子的更新
  • 保存点(savepoint)机制保证不会因更新状态算子而停止的应用在重启时丢失状态
2.4 调节状态应用性能
2.5 避免状态泄漏

3. 可查询状态(Queryable State)

  • 键状态可以以只读的键值形式暴露给外部系统
3.1 可查询状态服务构成
  • QueryableStateClient 供外部系统使用的访问键状态的客户端
  • QueryableStateClientProxy 接受和响应客户端请求 每个TM运行一个该实例 因为键状态分布于所有算子实例 代理需要实现键对应的状态状态维护于哪个TM中 该信息维护于JM中
  • QueryableStateServer 对ClientProxy请求发起响应 每个TM运行一个该实例用于访问本地状态后端的键状态

3.2 可查询状态的暴露
  • 在open方法中为键状态设置可查询状态
override def open(parameters: Configuration): Unit = {// create state descriptorval lastTempDescriptor = new ValueStateDescriptor[Double]("lastTemp", classOf[Double])// enable queryable state and set its external identifierlastTempDescriptor.setQueryable("lastTemperature")// obtain the state handlelastTempState = getRuntimeContext.getState[Double](lastTempDescriptor)
}
  • 将流写入一个可查询状态的sink
tenSecsMaxTemps.keyBy(_._1).asQueryableState("maxTemperature")
3.3 从外部系统访问可查询状态
  • 通过引入依赖来获取QueryableStateClient相关代码
<dependency><groupid>org.apache.flink</groupid><artifactid>flink-queryable-state-client-java_2.11</artifactid><version>1.5.0</version>
</dependency>
  • 创建访问可查询状态的客户端
//tmHostName是任意TM的IP地址
val client: QueryableStateClient = new QueryableStateClient(tmHostname, proxyPort)

Stream Processing With Flink (7) 状态算子和用户函数相关推荐

  1. Flink 状态管理:算子状态、键值分区状态、状态后端、有状态算子的扩缩容

    文章目录 状态管理 算子状态 键值分区状态 状态后端(State Backends) 有状态算子的扩缩容 状态管理 通常意义上,函数里所有需要任务去维护并用来计算结果的数据都属于任务的状态,可以把状态 ...

  2. 一周一论文(翻译)——[SIGMOD 19] Elasticutor:Rapid Elasticity for Realtime Stateful Stream Processing

    Abstract 弹性非常适用于流系统,以保证针对工作负载动态的低延迟,例如到达率的激增和数据分布的波动.现有系统使用以resource-centric的方法实现弹性,该方法在并行实例(即执行程序)之 ...

  3. Concepts:Stateful Stream Processing

    Stateful Stream Processing 有状态流处理 What is State? 状态是什么? While many operations in a dataflow simply l ...

  4. Stream Processing:滑动窗口的聚集(aggregation)操作的优化算法讲解

    本文将要讲解流处理中滑动窗口聚集操作的相关优化算法.将分别从下面几个方面讲解: 什么是滑动窗口? 什么是滑动窗口的聚集操作? 聚集操作的优化的必要性在哪里? 有哪些优化算法,它们的原理分别是什么? 4 ...

  5. 1.15.Flink state(状态)管理与恢复、什么是state、Keyed State、Operator State、状态容错(生成快照,恢复快照),checkPoint简介,重启策略等

    1.15.Flink state(状态)管理与恢复 1.15.1.什么是state 1.15.2.状态(State) 1.15.3.Keyed State 1.15.4.Operator State ...

  6. Netflix: 从 Batch ETL 到 Stream Processing 的转型之路

    大胆预测:重量级的数据应用,包括但不仅限于数据分析,数据挖掘,计算广告等,将全部会转换成实时数据处理架构.在电子化市场营销,尤其当今信息技术快速发展的前提下,数据处理的快慢直接影响变现的质量. 爱好收 ...

  7. 【Flink】介绍Flink中状态一致性的保证

    1.概述 转载:介绍Flink中状态一致性的保证 再次温习了这篇文章有了不一样的收货.侵权可删,这里是方便自己找到. 1. 一致性 1.1 介绍状态一致性 有状态的流处理,内部每个算子任务都可以有自己 ...

  8. 14.State-理解原理即可、Flink中状态的自动管理、无状态计算和有状态计算、状态分类、Managed State Raw State\Keyed StateOperator State

    14.State-理解原理即可 14.1.Flink中状态的自动管理 14.2.无状态计算和有状态计算 14.2.2.有状态计算,需要考虑历史值,如:sum 14.2.3.状态分类 14.2.4.Ma ...

  9. Flink大数据实时计算系列-Flink Exactly Once及Flink的状态存储State Backend

    Flink大数据实时计算系列-Flink Exactly Once及Flink的状态存储State Backend 目录 Flink的状态存储State Backend Flink参考链接 Flink ...

  10. Flink DataStream iterate算子的简单使用

    Flink DataStream iterate算子的简单使用 由于DataStream程序可能永远不会完成,因此没有最大迭代次数.相反你需要指定流的哪个部分反馈到迭代,哪个部分使用split转换或转 ...

最新文章

  1. 存储器里面的一个采用直接映射方式的32KB缓存-一个四路组相连的缓存,容量为16KB
  2. 使用UDP的简单C/S程序
  3. linux 串口特别是接收
  4. 【设计模式】C++单例模式
  5. 图像锐化处理算法matlab,图像锐化matlab算法
  6. python使用xlrd读取xlsx文件_$ 用python处理Excel文档(1)——用xlrd模块读取xls/xlsx文档...
  7. “全球+”浪潮下,企业出海选择合适的“技术船舶”成关键
  8. vue.js bootstrap 下拉列表_陕西省百度下拉总代理
  9. linux重定向到某目录文件,linux shell中12 21 1file_path重定向语法详解
  10. [转]用C#编写ActiveX控件
  11. 基于java 网页的宠物店管理系统
  12. 传奇私服服务器怎么增加npc,传奇新建NPC/npc修改功能/NPC修改模版
  13. 电机驱动详解--从原理到智能车驱动(DRV8701)
  14. 循环经济升级推动产业升级发展建议
  15. 怎么注册免费苹果开发者账号?
  16. 江苏省政府参事徐惠民莅临聚合数据走访指导
  17. linux重新mac,用 Linux 让旧 MacBook 重获新生
  18. 数据湖 数据孤岛 数据沼泽
  19. jmeter--参数化--详解
  20. Centos7防火墙与IPTABLES详解

热门文章

  1. 【Sarah】第一周
  2. Elasticsearch 7.9.3 发布
  3. Github README.md中添加图片
  4. 16kb等于多少b_一篇文章讲透MySQL为什么要用B+树实现索引
  5. dx绘制2d图像_【3D建模】聊聊2D动画软件
  6. PHP 订单拆单后明细总金额与订单金额存在差异
  7. 《Yii2 By Example》第2章:创建一个简单的新闻阅读器
  8. 代码管理学:通过文档记录,实现工作传承
  9. 终于有一次,排名没有进步
  10. systemback慎用:安装后无法启动