14.State-理解原理即可
14.1.Flink中状态的自动管理
14.2.无状态计算和有状态计算
14.2.2.有状态计算,需要考虑历史值,如:sum
14.2.3.状态分类
14.2.4.Managed State & Raw State
14.2.5.Keyed State & Operator State
14.2.5.1.Keyed State & Operator State
14.2.6.代码演示-ManagerState-keyState
14.2.7.代码演示–ManagerState - OperatorState

14.State-理解原理即可

14.1.Flink中状态的自动管理

之前写的Flink代码中其实已经做好了状态自动管理,如
发送hello ,得出(hello,1)
再发送hello ,得出(hello,2)

说明Flink已经自动的将当前数据和历史状态/历史结果进行了聚合,做到了状态的自动管理
在实际开发中绝大多数情况下,我们直接使用自动管理即可
一些特殊情况才会使用手动的状态管理!—后面项目中会使用!
所以这里得先学习state状态如何手动管理!

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** @author tuzuoquan* @date 2022/5/9 9:37*/
public class SourceDemo03_Socket {public static void main(String[] args) throws Exception {//TODO 0.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//TODO 1.sourceDataStream<String> lines = env.socketTextStream("localhost", 9999);//TODO 2.transformation/*SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] arr = value.split(" ");for (String word : arr) {out.collect(word);}}});words.map(new MapFunction<String, Tuple2<String,Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {return Tuple2.of(value,1);}});*/SingleOutputStreamOperator<Tuple2<String,Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] arr = value.split(" ");for (String word : arr) {out.collect(Tuple2.of(word, 1));}}});SingleOutputStreamOperator<Tuple2<String, Integer>> result =wordAndOne.keyBy(t -> t.f0).sum(1);//TODO 3.sinkresult.print();//TODO 4.executeenv.execute();}}

输出结果:

5> (world,1)
3> (hello,1)
5> (aaa,1)
4> (bbb,1)
7> (ccc,1)
5> (aaa,2)

14.2.无状态计算和有状态计算

无状态计算,不需要考虑历史值,如map

hello --> (hello,1)
hello --> (hello,1)

14.2.1.无状态计算

14.2.2.有状态计算,需要考虑历史值,如:sum

hello , (hello,1)
hello , (hello,2)


14.2.3.状态分类

分类详细图解:

14.2.4.Managed State & Raw State

14.2.5.Keyed State & Operator State

Managed State分为两种,Keyed State和Operator State (Raw State都是Operator State)

14.2.5.1.Keyed State & Operator State

Managed State分为两种,Keyed State和Operator State(Raw State都是Operator State)

14.2.6.代码演示-ManagerState-keyState

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/state.html

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** Desc 使用KeyState中的ValueState获取流数据中的最大值/实际中可以使用maxBy即可** @author tuzuoquan* @date 2022/5/10 0:37*/
public class StateDemo01_KeyState {public static void main(String[] args) throws Exception {//TODO 0.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//TODO 1.sourceDataStream<Tuple2<String, Long>> tupleDS = env.fromElements(Tuple2.of("北京", 1L),Tuple2.of("上海", 2L),Tuple2.of("北京", 6L),Tuple2.of("上海", 8L),Tuple2.of("北京", 3L),Tuple2.of("上海", 4L));//TODO 2.transformation//需求:求各个城市的value最大值//实际中使用maxBy即可DataStream<Tuple2<String, Long>> result1 = tupleDS.keyBy(t -> t.f0).maxBy(1);//学习时可以使用KeyState中的ValueState来实现maxBy的底层DataStream<Tuple3<String, Long, Long>> result2 = tupleDS.keyBy(t -> t.f0).map(new RichMapFunction<Tuple2<String, Long>, Tuple3<String, Long, Long>>() {//-1.定义一个状态用来存放最大值private ValueState<Long> maxValueState;//-2.状态初始化@Overridepublic void open(Configuration parameters) throws Exception {//创建状态描述器ValueStateDescriptor stateDescriptor = new ValueStateDescriptor("maxValueState", Long.class);//根据状态描述器获取/初始化状态maxValueState = getRuntimeContext().getState(stateDescriptor);}//-3.使用状态@Overridepublic Tuple3<String, Long, Long> map(Tuple2<String, Long> value) throws Exception {Long currentValue = value.f1;//获取状态Long historyValue = maxValueState.value();//判断状态if (historyValue == null || currentValue > historyValue) {historyValue = currentValue;//更新状态maxValueState.update(historyValue);return Tuple3.of(value.f0, currentValue, historyValue);} else {return Tuple3.of(value.f0, currentValue, historyValue);}}});//TODO 3.sink//result1.print();//4> (北京,6)//1> (上海,8)result2.print();//1> (上海,xxx,8)//4> (北京,xxx,6)//TODO 4.executeenv.execute();}}
输出结果:
4> (北京,1,1)
4> (北京,6,6)
4> (北京,3,6)
1> (上海,2,2)
1> (上海,8,8)
1> (上海,4,8)

14.2.7.代码演示–ManagerState - OperatorState

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;import java.util.Iterator;/*** Desc 使用OperatorState中的ListState模拟KafkaSource进行offset维护** @author tuzuoquan* @date 2022/5/16 12:12*/
public class StateDemo02_OperatorState {public static void main(String[] args) throws Exception {//TODO 0.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//并行度设置为1方便观察env.setParallelism(1);//每隔1s执行一次Checkpointenv.enableCheckpointing(1000);env.setStateBackend(new FsStateBackend("file:///D:/ckp"));env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//固定延迟重启策略: 程序出现异常的时候,重启2次,每次延迟3秒钟重启,超过2次,程序退出env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 3000));//TODO 1.sourceDataStreamSource<String> ds = env.addSource(new MyKafkaSource()).setParallelism(1);//TODO 2.transformation//TODO 3.sinkds.print();//TODO 4.executeenv.execute();}public static class MyKafkaSource extends RichParallelSourceFunction<String> implements CheckpointedFunction {private boolean flag = true;//-1.声明ListState,用来存放offsetprivate ListState<Long> offsetState = null;//用来存放offset的值private Long offset = 0L;//-2.初始化/创建ListState@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {ListStateDescriptor<Long> stateDescriptor = new ListStateDescriptor<Long>("offsetState", Long.class);offsetState = context.getOperatorStateStore().getListState(stateDescriptor);}//-3.使用state@Overridepublic void run(SourceContext<String> ctx) throws Exception {while (flag){Iterator<Long> iterator = offsetState.get().iterator();if(iterator.hasNext()){offset = iterator.next();}offset += 1;int subTaskId = getRuntimeContext().getIndexOfThisSubtask();ctx.collect("subTaskId:"+ subTaskId + ",当前的offset值为:"+offset);Thread.sleep(1000);//模拟异常if(offset % 5 == 0){throw new Exception("bug出现了.....");}}}//-4.state持久化//该方法会定时执行将state状态从内存存入Checkpoint磁盘目录中@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {//清理内容数据并存入Checkpoint磁盘目录中offsetState.clear();offsetState.add(offset);}@Overridepublic void cancel() {flag = false;}}}

输出结果:

subTaskId:0,当前的offset值为:1
subTaskId:0,当前的offset值为:2
subTaskId:0,当前的offset值为:3
subTaskId:0,当前的offset值为:4
subTaskId:0,当前的offset值为:5
subTaskId:0,当前的offset值为:6
subTaskId:0,当前的offset值为:7
subTaskId:0,当前的offset值为:8
subTaskId:0,当前的offset值为:9
subTaskId:0,当前的offset值为:10
subTaskId:0,当前的offset值为:11
subTaskId:0,当前的offset值为:12
subTaskId:0,当前的offset值为:13
subTaskId:0,当前的offset值为:14
subTaskId:0,当前的offset值为:15
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:119)at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229)at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:996)at akka.dispatch.OnComplete.internal(Future.scala:264)at akka.dispatch.OnComplete.internal(Future.scala:261)at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573)at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91)at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:91)at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2, backoffTimeMS=3000)at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)at akka.actor.Actor.aroundReceive(Actor.scala:517)at akka.actor.Actor.aroundReceive$(Actor.scala:515)at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)

14.State-理解原理即可、Flink中状态的自动管理、无状态计算和有状态计算、状态分类、Managed State Raw State\Keyed StateOperator State相关推荐

  1. 【大数据】带你理解并使用flink中的Time、Window(窗口)、Windows Function(窗口函数)

    提醒:本文的示例代码基于flink1.13,在讲window的使用时也会说明flink版本一些api的弃用情况. 文章目录 一.Time的简介 二.Window的概念 三.Window的类型 1.分类 ...

  2. Flink中基本的State类型介绍

    我们知道,Flink是一个默认就有状态的分析引擎,为避免Task在处理过程中挂掉了,而导致内存中的数据丢失,Flink引入了State和CheckPoint机制,其中State就是Flink的一种基于 ...

  3. 【flink】Flink 中的木桶效应:单个 subtask 卡死导致整个任务卡死

    1.概述 转载:Flink 中的木桶效应:单个 subtask 卡死导致整个任务卡死 工作或者面试中一般都要求面试者有较强的独立解决问题的能力,解决问题的前提是:我们对相应组件的原理非常清楚.本文先讲 ...

  4. Flink 中的木桶效应:单个 subtask 卡死导致整个任务卡死

    Flink 从入门到精通 系列文章 工作或者面试中一般都要求面试者有较强的独立解决问题的能力,解决问题的前提是:我们对相应组件的原理非常清楚.本文先讲述原理,再结合实战分析一个线上任务的异常案例. 本 ...

  5. C++动态内存管理好难怎么办?零基础图文讲解,小白轻松理解原理

    首先我们先了解一下内存: C语言使用malloc/free动态管理内存空间,C++引入了new/delete,new[]/delete[]来动态管理内存. 如果大家在自学C++中遇到困难,想找一个学习 ...

  6. flink分析使用之八内存管理机制

    一.flink内存机制 在前面的槽机制上,提到了内存的共享,这篇文章就分析一下,在Flink中对内存的管理.在Flink中,内存被抽象出来,形成了一套自己的管理机制.Flink本身基本是以Java语言 ...

  7. 软件系统兼容性设计_港口大型设备状态评估及管理信息系统的设计

    陈 昆 宋 婷 方 俊 邹浩阳 黄钟韬 0 引言 近年来,为适应现代工业的快速发展以及国际贸易日益激烈的竞争环境,港口装卸设备设计制造正朝着高速化.大型化和智能化的方向发展. 港口大型设备起重量大.作 ...

  8. Flink中的状态与容错

    1.概述 Flink支持有状态计算,根据支持得不同状态类型,分别有Keyed State和Operator State.针对状态数据得持久化,Flink提供了Checkpoint机制处理:针对状态数据 ...

  9. as点击发送广播_Apache Flink 中广播状态的实用指南

    翻译 | 王柯凝 校对 | 邱从贤(山智) 自版本 Flink 1.5.0 以来,Apache Flink 提供了一种新的状态类型,称为广播状态(Broadcast State).在本文中,将解释什么 ...

最新文章

  1. python02-条件语句到循环语句
  2. 前端入门(雷云特效,css)
  3. Elasticsearch之Mapping Meta-Fields
  4. 使用Varnish加速Web
  5. 2007标注没有文字_应用技巧:CAD在机械工程制图中尺寸标注
  6. MSF(六):后渗透
  7. Codeforces Round #622 (Div. 2) D. Happy New Year 状压dp
  8. 16-Flutter移动电商实战-切换后页面状态的保持AutomaticKeepAliveClientMixin
  9. 【Python】Python实战从入门到精通之一 -- 教你深入理解Python中的变量和数据类型
  10. 如何学习IOS开发~三个字(学思做)
  11. [动态规划] leetcode 10 正则表达式匹配
  12. 蓝桥杯 ALGO-157 算法训练 阶乘末尾
  13. 线性方程组的求解(C++)
  14. caffe学习日记--lesson3:win8系统,VS2013下的caffe工程编译探究
  15. 基于CentOS7配置ArcGIS enterprise
  16. 个人小程序/京东推广链接/长链接/短连接/跳转到京东购物小程序
  17. MT4 DDE数据交换
  18. Experimental Class Task 4-1: Pupil Calculator
  19. 复杂性研究简介——从西蒙到霍兰
  20. 信息系统安全管理架构

热门文章

  1. 工作流管理系统的概念介绍
  2. 获取淘宝搜索商品列表数据
  3. 三菱FX3SA PLC通过FX3G-485BD模块及变频器通信指令和三菱D700变频器进行通讯记录
  4. 黑白照片如何变彩色?看完这篇你就会了
  5. [NOI 2005]聪聪和可可
  6. R语言实战读书笔记(五)高级数据管理
  7. 获取TPM的VID以及DID
  8. 冒着得罪“技术大V”的风险,曝光他的搞钱套路!
  9. table表格中行与行间距
  10. 基于java+springboot+mybatis+vue+elementui的高铁订票管理系统