第五课 大数据技术之Fink1.13的实战学习-状态编程和容错机制

文章目录

  • 第五课 大数据技术之Fink1.13的实战学习-状态编程和容错机制
    • 第一节 状态介绍
      • 1.1 Flink中的状态
      • 1.2 状态的管理
      • 1.3 状态的分类
    • 第二节 按键分区状态
      • 2.1 按键分区状态概念和特点
      • 2.2 支持的结构类型
      • 2.3 代码实现
      • 2.4 状态生存时间(TTL)
    • 第三节 算子状态
      • 3.1 算子状态的概念和特点
      • 3.2 状态类型
      • 3.3 代码实现
    • 第四节 广播状态和状态持久化
      • 4.1 广播状态基本用法
      • 4.2 广播状态代码实例
      • 4.3 状态持久化和状态后端
    • 第五节 容错机制
      • 5.1 检查点介绍
      • 5.2 检查点算法
      • 5.3 检查点配置
      • 5.4 保存点(Savepoint)
    • 第六节 状态一致性
      • 6.1 一致性的概念和级别
      • 6.2 端到端的状态一致性
      • 6.3 幂等写入和事务写入
      • 6.4 Flink和Kafka连接时的精确一次保证

第一节 状态介绍

1.1 Flink中的状态

  1. Flink 处理机制的核心,就是“有状态的流式计算”。我们已经多次提到了“状态”(state),不论是简单聚合、窗口聚合,还是处理函数的应用,都会有状态的身影出现。我们已经简单介绍过有状态流处理,状态就如同事务处理时数据库中保存的信息一样,是用来辅助进行任务计算的数据。而在 Flink 这样的分布式系统中,我们不仅需要定义出状态在任务并行时的处理方式,还需要考虑如何持久化保存、以便发生故障时正确地恢复。这就需要一套完整的管理机制来处理所有的状态。
  2. 在流处理中,数据是连续不断到来和处理的。每个任务进行计算处理时,可以基于当前数据直接转换得到输出结果;也可以依赖一些其他数据。这些由一个任务维护,并且用来计算输出结果的所有数据,就叫作这个任务的状态
  3. 在 Flink 中,算子任务可以分为无状态和有状态两种情况。
  4. 无状态的算子任务只需要观察每个独立事件,根据当前输入的数据直接转换输出结果。例如,可以将一个字符串类型的数据拆分开作为元组输出;也可以对数据做一些计算,比如每个代表数量的字段加 1。我们之前讲到的基本转换算子,如 map、filter、flatMap,计算时不依赖其他数据,就都属于无状态的算子。
  5. 有状态的算子任务,则除当前数据之外,还需要一些其他数据来得到计算结果。这里的“其他数据”,就是所谓的状态(state),最常见的就是之前到达的数据,或者由之前数据计算出的某个结果。比如,做求和(sum)计算时,需要保存之前所有数据的和,这就是状态;窗口算子中会保存已经到达的所有数据,这些也都是它的状态。另外,如果我们希望检索到某种事件模式,比如“先有下单行为,后有支付行为”,那么也应该把之前的行为保存下来,这同样属于状态。容易发现,之前讲过的聚合算子、窗口算子都属于有状态的算子。
  6. 为有状态算子的一般处理流程,具体步骤如下。
    • 算子任务接收到上游发来的数据;
    • 获取当前状态;
    • 根据业务逻辑进行计算,更新状态;
    • 得到计算结果,输出发送到下游任务。

1.2 状态的管理

  1. 在传统的事务型处理架构中,这种额外的状态数据是保存在数据库中的。而对于实时流处理来说,这样做需要频繁读写外部数据库,如果数据规模非常大肯定就达不到性能要求了。所以 Flink 的解决方案是,将状态直接保存在内存中来保证性能,并通过分布式扩展来提高吞吐量。
  2. 在 Flink 中,每一个算子任务都可以设置并行度,从而可以在不同的 slot 上并行运行多个实例,我们把它们叫作“并行子任务”。而状态既然在内存中,那么就可以认为是子任务实例上的一个本地变量,能够被任务的业务逻辑访问和修改。
  3. 这样看来状态的管理似乎非常简单,我们直接把它作为一个对象交给 JVM 就可以了。然而大数据的场景下,我们必须使用分布式架构来做扩展,在低延迟、高吞吐的基础上还要保证容错性,一系列复杂的问题就会随之而来了。
    • 状态的访问权限。我们知道 Flink 上的聚合和窗口操作,一般都是基于 KeyedStream的,数据会按照 key 的哈希值进行分区,聚合处理的结果也应该是只对当前 key 有效。然而同一个分区(也就是 slot)上执行的任务实例,可能会包含多个 key 的数据,它们同时访问和更改本地变量,就会导致计算结果错误。所以这时状态并不是单纯的本地变量。
    • 容错性,也就是故障后的恢复。状态只保存在内存中显然是不够稳定的,我们需要将它持久化保存,做一个备份;在发生故障后可以从这个备份中恢复状态。
    • 我们还应该考虑到分布式应用的横向扩展性。比如处理的数据量增大时,我们应该相应地对计算资源扩容,调大并行度。这时就涉及到了状态的重组调整。
  4. 可见状态的管理并不是一件轻松的事。好在 Flink 作为有状态的大数据流式处理框架,已经帮我们搞定了这一切。Flink 有一套完整的状态管理机制,将底层一些核心功能全部封装起来,包括状态的高效存储和访问、持久化保存和故障恢复,以及资源扩展时的调整。这样,我们只需要调用相应的 API 就可以很方便地使用状态,或对应用的容错机制进行配置,从而将更多的精力放在业务逻辑的开发上。

1.3 状态的分类

  1. Flink 的状态有两种:托管状态(Managed State)和原始状态(Raw State)。

  2. 托管状态就是由 Flink 统一管理的,状态的存储访问、故障恢复和重组等一系列问题都由 Flink 实现,我们只要调接口就可以;而原始状态则是自定义的,相当于就是开辟了一块内存,需要我们自己管理,实现状态的序列化和故障恢复。

  3. 托管状态是由 Flink 的运行时(Runtime)来托管的;在配置容错机制后,状态会自动持久化保存,并在发生故障时自动恢复。当应用发生横向扩展时,状态也会自动地重组分配到所有的子任务实例上。对于具体的状态内容,Flink 也提供了值状态(ValueState)、列表状态(ListState)、映射状态(MapState)、聚合状态(AggregateState)等多种结构,内部支持各种数据类型。聚合、窗口等算子中内置的状态,就都是托管状态;我们也可以在富函数类(RichFunction)中通过上下文来自定义状态,这些也都是托管状态。

  4. 对比之下,原始状态就全部需要自定义了。Flink 不会对状态进行任何自动操作,也不知道状态的具体数据类型,只会把它当作最原始的字节(Byte)数组来存储。我们需要花费大量的精力来处理状态的管理和维护。

  5. 所以只有在遇到托管状态无法实现的特殊需求时,我们才会考虑使用原始状态;一般情况下不推荐使用。绝大多数应用场景,我们都可以用 Flink 提供的算子或者自定义托管状态来实现需求。

  6. 接下来我们的重点就是托管状态(Managed State)我们知道在 Flink 中,一个算子任务会按照并行度分为多个并行子任务执行,而不同的子任务会占据不同的任务槽(task slot)。由于不同的 slot 在计算资源上是物理隔离的,所以 Flink能管理的状态在并行任务间是无法共享的,每个状态只能针对当前子任务的实例有效。

  7. 很多有状态的操作(比如聚合、窗口)都是要先做 keyBy 进行按键分区的。按键分区之后,任务所进行的所有计算都应该只针对当前 key 有效,所以状态也应该按照 key 彼此隔离。在这种情况下,状态的访问方式又会有所不同。基于这样的想法,我们又可以将托管状态分为两类:算子状态和按键分区状态

  8. 算子状态。状态作用范围限定为当前的算子任务实例,也就是只对当前并行子任务实例有效。这就意味着对于一个并行子任务,占据了一个“分区”,它所处理的所有数据都会访问到相同的状态,状态对于同一任务而言是共享的。

  9. 算子状态可以用在所有算子上,使用的时候其实就跟一个本地变量没什么区别。因为本地变量的作用域也是当前任务实例。在使用时,我们还需进一步实现 CheckpointedFunction 接口。

  10. 按键分区状态(Keyed State)状态是根据输入流中定义的键(key)来维护和访问的,所以只能定义在按键分区流(KeyedStream)中,也就 keyBy 之后才可以使用。

  11. 按键分区状态应用非常广泛。之前讲到的聚合算子必须在 keyBy 之后才能使用,就是因为聚合的结果是以 Keyed State 的形式保存的。另外,也可以通过富函数类(Rich Function)来自定义 Keyed State,所以只要提供了富函数类接口的算子,也都可以使用 Keyed State。

  12. 所以即使是 map、filter 这样无状态的基本转换算子,我们也可以通过富函数类给它们“追加”Keyed State,或者实现 CheckpointedFunction 接口来定义 Operator State;从这个角度讲,Flink 中所有的算子都可以是有状态的,不愧是“有状态的流处理”。

  13. 无论是 Keyed State 还是 Operator State,它们都是在本地实例上维护的,也就是说每个并行子任务维护着对应的状态,算子的子任务之间状态不共享。

第二节 按键分区状态

2.1 按键分区状态概念和特点

  1. 在实际应用中,我们一般都需要将数据按照某个 key 进行分区,然后再进行计算处理;所以最为常见的状态类型就是 Keyed State。之前介绍到 keyBy 之后的聚合、窗口计算,算子所持有的状态,都是 Keyed State。
  2. 另外,我们还可以通过富函数类(Rich Function)对转换算子进行扩展、实现自定义功能,比如 RichMapFunction、RichFilterFunction。在富函数中,我们可以调用.getRuntimeContext()获取当前的运行时上下文(RuntimeContext),进而获取到访问状态的句柄;这种富函数中自定义的状态也是 Keyed State。
  3. 按键分区状态(Keyed State)顾名思义,是任务按照键(key)来访问和维护的状态。它的特点非常鲜明,就是以 key 为作用范围进行隔离。
  4. 我们知道,在进行按键分区(keyBy)之后,具有相同键的所有数据,都会分配到同一个并行子任务中;所以如果当前任务定义了状态,Flink 就会在当前并行子任务实例中,为每个键值维护一个状态的实例。于是当前任务就会为分配来的所有数据,按照 key 维护和处理对应的状态。
  5. 因为一个并行子任务可能会处理多个 key 的数据,所以 Flink 需要对 Keyed State 进行一些特殊优化。在底层,Keyed State 类似于一个分布式的映射(map)数据结构,所有的状态会根据 key 保存成键值对(key-value)的形式。这样当一条数据到来时,任务就会自动将状态的访问范围限定为当前数据的 key,从 map 存储中读取出对应的状态值。所以具有相同 key 的所有数据都会到访问相同的状态,而不同 key 的状态之间是彼此隔离的。
  6. 这种将状态绑定到 key 上的方式,相当于使得状态和流的逻辑分区一一对应了:不会有别的 key 的数据来访问当前状态;而当前状态对应 key 的数据也只会访问这一个状态,不会分发到其他分区去。这就保证了对状态的操作都是本地进行的,对数据流和状态的处理做到了分区一致性。
  7. 另外,在应用的并行度改变时,状态也需要随之进行重组。不同 key 对应的 Keyed State可以进一步组成所谓的键组(key groups),每一组都对应着一个并行子任务。键组是 Flink 重新分配 Keyed State 的单元,键组的数量就等于定义的最大并行度。当算子并行度发生改变时,Keyed State 就会按照当前的并行度重新平均分配,保证运行时各个子任务的负载相同。
  8. 需要注意,使用 Keyed State 必须基于 KeyedStream。没有进行 keyBy 分区的 DataStream,即使转换算子实现了对应的富函数类,也不能通过运行时上下文访问 Keyed State。

2.2 支持的结构类型

  1. 实际应用中,需要保存为状态的数据会有各种各样的类型,有时还需要复杂的集合类型,比如列表(List)和映射(Map)。对于这些常见的用法,Flink 的按键分区状态(Keyed State)提供了足够的支持。接下来我们就来了解一下 Keyed State 所支持的结构类型.
  2. 值状态(ValueState)顾名思义,状态中只保存一个“值”(value)。ValueState<T>本身是一个接口,源码中定义如下:
public interface ValueState<T> extends State { T value() throws IOException;void update(T value) throws IOException;
}
  1. 这里的 T 是泛型,表示状态的数据内容可以是任何具体的数据类型。如果想要保存一个长整型值作为状态,那么类型就是ValueState<Long>。我们可以在代码中读写值状态,实现对于状态的访问和更新。

    • T value():获取当前状态的值;
    • update(T value):对状态进行更新,传入的参数 value 就是要覆写的状态值。
  2. 在具体使用时,为了让运行时上下文清楚到底是哪个状态,我们还需要创建一个“状态描述器”(StateDescriptor)来提供状态的基本信息。例如源码中,ValueState 的状态描述器构造方法如下:
public ValueStateDescriptor(String name, Class<T> typeClass) {super(name, typeClass, null);
}
  1. 这里需要传入状态的名称和类型——这跟我们声明一个变量时做的事情完全一样。有了这个描述器,运行时环境就可以获取到状态的控制句柄(handler)了。
  2. **列表状态(ListState)**将需要保存的数据,以列表(List)的形式组织起来。在 ListState<T>接口中同样有一个类型参数 T,表示列表中数据的类型。ListState 也提供了一系列的方法来操作状态,使用方式与一般的 List 非常相似。
    • Iterable<T> get():获取当前的列表状态,返回的是一个可迭代类型 Iterable<T>
    • update(List<T> values):传入一个列表 values,直接对状态进行覆盖;
    • add(T value):在状态列表中添加一个元素 value;
    • addAll(List<T> values):向列表中添加多个元素,以列表 values 形式传入。
      类似地,ListState 的状态描述器就叫作 ListStateDescriptor,用法跟 ValueStateDescriptor完全一致。
  3. **映射状态(MapState)**把一些键值对(key-value)作为状态整体保存起来,可以认为就是一组 key-value 映射的列表。对应的 MapState<UK, UV>接口中,就会有 UK、UV 两个泛型,分别表示保存的 key和 value 的类型。同样,MapState 提供了操作映射状态的方法,与 Map 的使用非常类似。
    • UV get(UK key):传入一个 key 作为参数,查询对应的 value 值;
    • put(UK key, UV value):传入一个键值对,更新 key 对应的 value 值;
    • putAll(Map<UK, UV> map):将传入的映射 map 中所有的键值对,全部添加到映射状态中;
    • remove(UK key):将指定 key 对应的键值对删除;
    • boolean contains(UK key):判断是否存在指定的 key,返回一个 boolean 值。另外,MapState 也提供了获取整个映射相关信息的方法:
    • Iterable<Map.Entry<UK, UV>> entries():获取映射状态中所有的键值对;
    • Iterable<UK> keys():获取映射状态中所有的键(key),返回一个可迭代 Iterable 类型;
    • Iterable<UV> values():获取映射状态中所有的值(value),返回一个可迭代 Iterable类型;
    • boolean isEmpty():判断映射是否为空,返回一个 boolean 值。
  4. **归约状态(ReducingState)**类似于值状态(Value),不过需要对添加进来的所有数据进行归约,将归约聚合之后的值作为状态保存下来。ReducintState<T>这个接口调用的方法类似于 ListState,只不过它保存的只是一个聚合值,所以调用.add()方法时,不是在状态列表里添加元素,而是直接把新数据和之前的状态进行归约,并用得到的结果更新状态。
  5. 归约逻辑的定义,是在归约状态描述器(ReducingStateDescriptor)中,通过传入一个归约函数(ReduceFunction)来实现的。这里的归约函数,就是我们之前介绍 reduce 聚合算子时讲到的 ReduceFunction,所以状态类型跟输入的数据类型是一样的。
public ReducingStateDescriptor(
String name, ReduceFunction<T> reduceFunction, Class<T> typeClass) {...}
  1. 这里的描述器有三个参数,其中第二个参数就是定义了归约聚合逻辑的 ReduceFunction,另外两个参数则是状态的名称和类型。
  2. 聚合状态(AggregatingState)与归约状态非常类似,聚合状态也是一个值,用来保存添加进来的所有数据的聚合结果。与 ReducingState 不同的是,它的聚合逻辑是由在描述器中传入一个更加一般化的聚合函数来定义的;这也就是之前我们讲过的 AggregateFunction,里面通过一个累加器(Accumulator)来表示状态,所以聚合的状态类型可以跟添加进来的数据类型完全不同,使用更加灵活。
  3. 同样地,AggregatingState 接口调用方法也与 ReducingState 相同,调用.add()方法添加元素时,会直接使用指定的 AggregateFunction 进行聚合并更新状态。

2.3 代码实现

  1. 了解了按键分区状态(Keyed State)的基本概念和类型,接下来我们就可以尝试在代码中使用状态了。
  2. 整体介绍, 在 Flink 中,状态始终是与特定算子相关联的;算子在使用状态前首先需要“注册”,其实就是告诉 Flink 当前上下文中定义状态的信息,这样运行时的 Flink 才能知道算子有哪些状态。
    状态的注册,主要是通过“状态描述器”(StateDescriptor)来实现的。状态描述器中最重要的内容,就是状态的名称(name)和类型(type)。我们知道 Flink 中的状态,可以认为是加了一些复杂操作的内存中的变量;而当我们在代码中声明一个局部变量时,都需要指定变量类型和名称,名称就代表了变量在内存中的地址,类型则指定了占据内存空间的大小。同样地,我们一旦指定了名称和类型,Flink 就可以在运行时准确地在内存中找到对应的状态,进而返回状态对象供我们使用了。所以在一个算子中,我们也可以定义多个状态,只要它们的名称不同就可以了。
  3. 另外,状态描述器中还可能需要传入一个用户自定义函数(user-defined-function,UDF),
    用来说明处理逻辑,比如前面提到的 ReduceFunction 和 AggregateFunction。以 ValueState 为例,我们可以定义值状态描述器如下:
ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>("my state", // 状态名称Types.LONG // 状态类型
);
  1. 这里我们定义了一个叫作“my state”的长整型 ValueState 的描述器。代码中完整的操作是,首先定义出状态描述器;然后调用.getRuntimeContext()方法获取运行时上下文;继而调用 RuntimeContext 的获取状态的方法,将状态描述器传入,就可以得到对应的状态了。
  2. 因为状态的访问需要获取运行时上下文,这只能在富函数类(Rich Function)中获取到,所以自定义的 Keyed State 只能在富函数中使用。当然,底层的处理函数(Process Function)本身继承了 AbstractRichFunction 抽象类,所以也可以使用。
  3. 在富函数中,调用.getRuntimeContext()方法获取到运行时上下文之后,RuntimeContext 有以下几个获取状态的方法:
ValueState<T> getState(ValueStateDescriptor<T>) MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>) ListState<T> getListState(ListStateDescriptor<T>)
ReducingState<T> getReducingState(ReducingStateDescriptor<T>)AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)
  1. 对于不同结构类型的状态,只要传入对应的描述器、调用对应的方法就可以了。获取到状态对象之后,就可以调用它们各自的方法进行读写操作了。另外,所有类型的状态都有一个方法.clear(),用于清除当前状态。代码中使用状态的整体结构如下:
public static class MyFlatMapFunction extends RichFlatMapFunction<Long, String> {// 声明状态private transient ValueState<Long> state;@Overridepublic void open(Configuration config) {// 在 open 生命周期方法中获取状态ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>("my state", // 状态名称Types.LONG // 状态类型);state = getRuntimeContext().getState(descriptor);}@Overridepublic void flatMap(Long input, Collector<String> out) throws Exception {// 访问状态Long currentState = state.value();currentState += 1;   // 状态数值加 1// 更新状态state.update(currentState);if (currentState >= 100) {out.collect(“state:+currentState);state.clear();   // 清空状态}}
}
  1. 因为 RichFlatmapFunction 中的.flatmap()是每来一条数据都会调用一次的,所以我们不应该在这里调用运行时上下文的.getState()方法,而是在生命周期方法.open()中获取状态对象。另外还有一个问题,我们获取到的状态对象也需要有一个变量名称 state(注意这里跟状态的名称 my state 不同),但这个变量不应该在 open 中声明——否则在.flatmap()里就访问不到了。所以我们还需要在外面直接把它定义为类的属性,这样就可以在不同的方法中通用了。而在外部又不能直接获取状态,因为编译时是无法拿到运行时上下文的。所以最终的解决方案就变成了:在外部声明状态对象,在 open 生命周期方法中通过运行时上下文获取状态。
    9/ 这里需要注意,这种方式定义的都是 Keyed State,它对于每个 key 都会保存一份状态实例。所以对状态进行读写操作时,获取到的状态跟当前输入数据的 key 有关;只有相同 key的数据,才会操作同一个状态,不同 key 的数据访问到的状态值是不同的。而且上面提到的.clear()方法,也只会清除当前 key 对应的状态。
  2. 另外,状态不一定都存储在内存中,也可以放在磁盘或其他地方,具体的位置是由一个可
    配置的组件来管理的,这个组件叫作“状态后端”(State Backend)。下面我们给出不同类型的状态的应用实例。
  3. **值状态(ValueState)**我们这里会使用用户 id 来进行分流,然后分别统计每个用户的 pv 数据,由于我们并不想每次 pv 加一,就将统计结果发送到下游去,所以这里我们注册了一个定时器,用来隔一段时间发送 pv 的统计结果,这样对下游算子的压力不至于太大。具体实现方式是定义一个用来保存定时器时间戳的值状态变量。当定时器触发并向下游发送数据以后,便清空储存定时器时间戳的状态变量,这样当新的数据到来时,发现并没有定时器存在,就可以注册新的定时器了,注册完定时器之后将定时器的时间戳继续保存在状态变量中。
import com.atguigu.chapter05.ClickSource;
import com.atguigu.chapter05.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;import org.apache.flink.util.Collector;public class PeriodicPvExample{public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));stream.print("input");// 统计每个用户的pv,隔一段时间(10s)输出一次结果stream.keyBy(data -> data.user).process(new PeriodicPvResult()).print();env.execute();}// 注册定时器,周期性输出pvpublic static class PeriodicPvResult extends KeyedProcessFunction<String ,Event, String>{// 定义两个状态,保存当前pv值,以及定时器时间戳ValueState<Long> countState;ValueState<Long> timerTsState;@Overridepublic void open(Configuration parameters) throws Exception {countState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("count", Long.class));timerTsState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timerTs", Long.class));}@Overridepublic void processElement(Event value, Context ctx, Collector<String> out) throws Exception {// 更新count值Long count = countState.value();if (count == null){countState.update(1L);} else {countState.update(count + 1);}// 注册定时器if (timerTsState.value() == null){ctx.timerService().registerEventTimeTimer(value.timestamp + 10 * 1000L);timerTsState.update(value.timestamp + 10 * 1000L);}}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {out.collect(ctx.getCurrentKey() + " pv: " + countState.value());// 清空状态timerTsState.clear();}}}
  1. **列表状态(ListState)**在 Flink SQL 中,支持两条流的全量 Join,语法如下:
SELECT * FROM A INNER JOIN B WHERE A.id = B.id;
  1. 这样一条 SQL 语句要慎用,因为 Flink 会将 A 流和 B 流的所有数据都保存下来,然后进行 Join。不过在这里我们可以用列表状态变量来实现一下这个 SQL 语句的功能。代码如下:

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;public class TwoStreamFullJoinExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Tuple3<String, String, Long>> stream1 = env.fromElements(Tuple3.of("a", "stream-1", 1000L),Tuple3.of("b", "stream-1", 2000L)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Long>>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {@Overridepublic long extractTimestamp(Tuple3<String, String, Long> t, long l) {return t.f2;}}));SingleOutputStreamOperator<Tuple3<String, String, Long>> stream2 = env.fromElements(Tuple3.of("a", "stream-2", 3000L),Tuple3.of("b", "stream-2", 4000L)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Long>>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {@Overridepublic long extractTimestamp(Tuple3<String, String, Long> t, long l) {return t.f2;}}));stream1.keyBy(r -> r.f0).connect(stream2.keyBy(r -> r.f0)).process(new CoProcessFunction<Tuple3<String, String, Long>, Tuple3<String, String, Long>, String>() {private ListState<Tuple3<String, String, Long>> stream1ListState;private ListState<Tuple3<String, String, Long>> stream2ListState;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);stream1ListState = getRuntimeContext().getListState(new ListStateDescriptor<Tuple3<String, String, Long>>("stream1-list", Types.TUPLE(Types.STRING, Types.STRING)));stream2ListState = getRuntimeContext().getListState(new ListStateDescriptor<Tuple3<String, String, Long>>("stream2-list", Types.TUPLE(Types.STRING, Types.STRING)));}@Overridepublic void processElement1(Tuple3<String, String, Long> left, Context context, Collector<String> collector) throws Exception {stream1ListState.add(left);for (Tuple3<String, String, Long> right : stream2ListState.get()) {collector.collect(left + " => " + right);}}@Overridepublic void processElement2(Tuple3<String, String, Long> right, Context context, Collector<String> collector) throws Exception {stream2ListState.add(right);for (Tuple3<String, String, Long> left : stream1ListState.get()) {collector.collect(left + " => " + right);}}}).print();env.execute();}
}
  1. 输出结果是:
(a,stream-1,1000) => (a,stream-2,3000)(b,stream-1,2000) => (b,stream-2,4000)
  1. **映射状态(MapState)**映射状态的用法和 Java 中的 HashMap 很相似。在这里我们可以通过 MapState 的使用来探索一下窗口的底层实现,也就是我们要用映射状态来完整模拟窗口的功能。这里我们模拟一个滚动窗口。我们要计算的是每一个 url 在每一个窗口中的 pv 数据。我们之前使用增量聚合和全窗口聚合结合的方式实现过这个需求。这里我们用 MapState 再来实现一下。

import com.atguigu.chapter05.ClickSource;
import com.atguigu.chapter05.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;import org.apache.flink.util.Collector;import java.sql.Timestamp;// 使用KeyedProcessFunction模拟滚动窗口
public class FakeWindowExample {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));// 统计每10s窗口内,每个url的pvstream.keyBy(data -> data.url).process(new FakeWindowResult(10000L)).print();env.execute();}public static class FakeWindowResult extends KeyedProcessFunction<String, Event, String>{// 定义属性,窗口长度private Long windowSize;public FakeWindowResult(Long windowSize) {this.windowSize = windowSize;}// 声明状态,用map保存pv值(窗口start,count)MapState<Long, Long> windowPvMapState;@Overridepublic void open(Configuration parameters) throws Exception {windowPvMapState = getRuntimeContext().getMapState(new MapStateDescriptor<Long, Long>("window-pv", Long.class, Long.class));}@Overridepublic void processElement(Event value, Context ctx, Collector<String> out) throws Exception {// 每来一条数据,就根据时间戳判断属于哪个窗口Long windowStart = value.timestamp / windowSize * windowSize;Long windowEnd = windowStart + windowSize;// 注册 end -1 的定时器,窗口触发计算ctx.timerService().registerEventTimeTimer(windowEnd - 1);// 更新状态中的pv值if (windowPvMapState.contains(windowStart)){Long pv = windowPvMapState.get(windowStart);windowPvMapState.put(windowStart, pv + 1);} else {windowPvMapState.put(windowStart, 1L);}}// 定时器触发,直接输出统计的pv结果@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {Long windowEnd = timestamp + 1;Long windowStart = windowEnd - windowSize;Long pv = windowPvMapState.get(windowStart);out.collect( "url: " + ctx.getCurrentKey()+ " 访问量: " + pv+ " 窗口:" + new Timestamp(windowStart) + " ~ " + new Timestamp(windowEnd));// 模拟窗口的销毁,清除map中的keywindowPvMapState.remove(windowStart);}}    }
  1. **聚合状态(AggregatingState)**我们举一个简单的例子,对用户点击事件流每 5 个数据统计一次平均时间戳。这是一个类似计数窗口(CountWindow )求平均值的计算,这里我们可以使用一个有聚合状态的RichFlatMapFunction 来实现。
import com.atguigu.chapter05.ClickSource;
import com.atguigu.chapter05.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;import java.sql.Timestamp;public class AverageTimestampExample {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));// 统计每个用户的点击频次,到达5次就输出统计结果stream.keyBy(data -> data.user).flatMap(new AvgTsResult()).print();env.execute();}public static class AvgTsResult extends RichFlatMapFunction<Event, String>{// 定义聚合状态,用来计算平均时间戳AggregatingState<Event, Long> avgTsAggState;// 定义一个值状态,用来保存当前用户访问频次ValueState<Long> countState;@Overridepublic void open(Configuration parameters) throws Exception {avgTsAggState = getRuntimeContext().getAggregatingState(new AggregatingStateDescriptor<Event, Tuple2<Long, Long>, Long>("avg-ts",new AggregateFunction<Event, Tuple2<Long, Long>, Long>() {@Overridepublic Tuple2<Long, Long> createAccumulator() {return Tuple2.of(0L, 0L);}@Overridepublic Tuple2<Long, Long> add(Event value, Tuple2<Long, Long> accumulator) {return Tuple2.of(accumulator.f0 + value.timestamp, accumulator.f1 + 1);}@Overridepublic Long getResult(Tuple2<Long, Long> accumulator) {return accumulator.f0 / accumulator.f1;}@Overridepublic Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {return null;}},Types.TUPLE(Types.LONG, Types.LONG)));countState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("count", Long.class));}@Overridepublic void flatMap(Event value, Collector<String> out) throws Exception {Long count = countState.value();if (count == null){count = 1L;} else {count ++;}countState.update(count);avgTsAggState.add(value);// 达到5次就输出结果,并清空状态if (count == 5){out.collect(value.user + " 平均时间戳:" + new Timestamp(avgTsAggState.get()));countState.clear();}}}
}

2.4 状态生存时间(TTL)

  1. 在实际应用中,很多状态会随着时间的推移逐渐增长,如果不加以限制,最终就会导致存储空间的耗尽。一个优化的思路是直接在代码中调用.clear()方法去清除状态,但是有时候我们的逻辑要求不能直接清除。这时就需要配置一个状态的“生存时间”(time-to-live,TTL),当状态在内存中存在的时间超出这个值时,就将它清除。
  2. 具体实现上,如果用一个进程不停地扫描所有状态看是否过期,显然会占用大量资源做无用功。状态的失效其实不需要立即删除,所以我们可以给状态附加一个属性,也就是状态的“失效时间”。状态创建的时候,设置 失效时间 = 当前时间 + TTL;之后如果有对状态的访问和修改,我们可以再对失效时间进行更新;当设置的清除条件被触发时(比如,状态被访问的时候,或者每隔一段时间扫描一次失效状态),就可以判断状态是否失效、从而进行清除了。
  3. 配置状态的 TTL 时,需要创建一个 StateTtlConfig 配置对象,然后调用状态描述器的.enableTimeToLive()方法启动 TTL 功能。
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(10)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("my state", String.class);stateDescriptor.enableTimeToLive(ttlConfig);
  1. 这里用到了几个配置项:

    • .newBuilder()状态 TTL 配置的构造器方法,必须调用,返回一个 Builder 之后再调用.build()方法就可以得到 StateTtlConfig 了。方法需要传入一个 Time 作为参数,这就是设定的状态生存时间。
    • .setUpdateType()设置更新类型。更新类型指定了什么时候更新状态失效时间,这里的 OnCreateAndWrite表示只有创建状态和更改状态(写操作)时更新失效时间。另一种类型 OnReadAndWrite 则表示无论读写操作都会更新失效时间,也就是只要对状态进行了访问,就表明它是活跃的,从而延长生存时间。这个配置默认为 OnCreateAndWrite。
    • .setStateVisibility()设置状态的可见性。所谓的“状态可见性”,是指因为清除操作并不是实时的,所以当状态过期之后还有可能基于存在,这时如果对它进行访问,能否正常读取到就是一个问题了。这里设置的 NeverReturnExpired 是默认行为,表示从不返回过期值,也就是只要过期就认为它已经被清除了,应用不能继续读取;这在处理会话或者隐私数据时比较重要。对应的另一种配置是 ReturnExpireDefNotCleanedUp,就是如果过期状态还存在,就返回它的值。
  2. 除此之外,TTL 配置还可以设置在保存检查点(checkpoint)时触发清除操作,或者配置增量的清理(incremental cleanup),还可以针对 RocksDB 状态后端使用压缩过滤器(compaction filter)进行后台清理。
  3. 这里需要注意,目前的 TTL 设置只支持处理时间。另外,所有集合类型的状态(例如ListState、MapState)在设置 TTL 时,都是针对每一项(per-entry)元素的。也就是说,一个列表状态中的每一个元素,都会以自己的失效时间来进行清理,而不是整个列表一起清理。

第三节 算子状态

3.1 算子状态的概念和特点

  1. 除按键分区状态(Keyed State)之外,另一大类受控状态就是算子状态(Operator State)。从某种意义上说,算子状态是更底层的状态类型,因为它只针对当前算子并行任务有效,不需要考虑不同 key 的隔离。算子状态功能不如按键分区状态丰富,应用场景较少,它的调用方法也会有一些区别。
  2. 算子状态(Operator State)就是一个算子并行实例上定义的状态,作用范围被限定为当前算子任务。算子状态跟数据的 key 无关,所以不同 key 的数据只要被分发到同一个并行子任务,就会访问到同一个 Operator State。
  3. 算子状态的实际应用场景不如 Keyed State 多,一般用在 Source 或 Sink 等与外部系统连接的算子上,或者完全没有 key 定义的场景。比如 Flink 的 Kafka 连接器中,就用到了算子状态。在我们给 Source 算子设置并行度后,Kafka 消费者的每一个并行实例,都会为对应的主题(topic)分区维护一个偏移量, 作为算子状态保存起来。这在保证 Flink 应用“精确一次”(exactly-once)状态一致性时非常有用。
  4. 当算子的并行度发生变化时,算子状态也支持在并行的算子任务实例之间做重组分配。根据状态的类型不同,重组分配的方案也会不同。

3.2 状态类型

  1. 算子状态也支持不同的结构类型,主要有三种:ListState、UnionListState 和 BroadcastState。
  2. **列表状态(ListState)**与 Keyed State 中的 ListState 一样,将状态表示为一组数据的列表。与 Keyed State 中的列表状态的区别是:在算子状态的上下文中,不会按键(key)分别处理状态,所以每一个并行子任务上只会保留一个“列表”(list),也就是当前并行子任务上所有状态项的集合。列表中的状态项就是可以重新分配的最细粒度,彼此之间完全独立。
  3. 当算子并行度进行缩放调整时,算子的列表状态中的所有元素项会被统一收集起来,相当于把多个分区的列表合并成了一个“大列表”,然后再均匀地分配给所有并行任务。这种“均匀分配”的具体方法就是“轮询”(round-robin),与之前介绍的 rebanlance 数据传输方式类似,是通过逐一“发牌”的方式将状态项平均分配的。这种方式也叫作“平均分割重组”(even-split redistribution)。
  4. 算子状态中不会存在“键组”(key group)这样的结构,所以为了方便重组分配,就把它直接定义成了“列表”(list)。这也就解释了,为什么算子状态中没有最简单的值状态(ValueState)。
  5. **联合列表状态(UnionListState)**与 ListState 类似,联合列表状态也会将状态表示为一个列表。它与常规列表状态的区别在于,算子并行度进行缩放调整时对于状态的分配方式不同。UnionListState 的重点就在于“联合”(union)。在并行度调整时,常规列表状态是轮询分配状态项,而联合列表状态的算子则会直接广播状态的完整列表。这样,并行度缩放之后的并行子任务就获取到了联合后完整的“大列表”,可以自行选择要使用的状态项和要丢弃的状态项。这种分配也叫作“联合重组”(union redistribution)。如果列表中状态项数量太多,为资源和效率考虑一般不建议使用联合重组的方式。
  6. **广播状态(BroadcastState)**有时我们希望算子并行子任务都保持同一份“全局”状态,用来做统一的配置和规则设定。这时所有分区的所有数据都会访问到同一个状态,状态就像被“广播”到所有分区一样,这种特殊的算子状态,就叫作广播状态(BroadcastState)。因为广播状态在每个并行子任务上的实例都一样,所以在并行度调整的时候就比较简单,只要复制一份到新的并行任务就可以实现扩展;而对于并行度缩小的情况,可以将多余的并行子任务连同状态直接砍掉——因为状态都是复制出来的,并不会丢失。
  7. 在底层,广播状态是以类似映射结构(map)的键值对(key-value)来保存的,必须基于一个“广播流”(BroadcastStream)来创建。

3.3 代码实现

  1. 我们已经知道,状态从本质上来说就是算子并行子任务实例上的一个特殊本地变量。它的特殊之处就在于 Flink 会提供完整的管理机制,来保证它的持久化保存,以便发生故障时进行状态恢复;另外还可以针对不同的 key 保存独立的状态实例。按键分区状态(Keyed State)对这两个功能都要考虑;而算子状态(Operator State)并不考虑 key 的影响,所以主要任务就是要让 Flink 了解状态的信息、将状态数据持久化后保存到外部存储空间。
  2. 看起来算子状态的使用应该更加简单才对。不过仔细思考又会发现一个问题:我们对状态进行持久化保存的目的是为了故障恢复;在发生故障、重启应用后,数据还会被发往之前分配的分区吗?显然不是,因为并行度可能发生了调整,不论是按键(key)的哈希值分区,还是直接轮询(round-robin)分区,数据分配到的分区都会发生变化。这很好理解,当打牌的人数从 3 个增加到 4 个时,即使牌的次序不变,轮流发到每个人手里的牌也会不同。数据分区发生变化,带来的问题就是,怎么保证原先的状态跟故障恢复后数据的对应关系呢?
  3. 对于 Keyed State 这个问题很好解决:状态都是跟 key 相关的,而相同 key 的数据不管发往哪个分区,总是会全部进入一个分区的;于是只要将状态也按照 key 的哈希值计算出对应的分区,进行重组分配就可以了。恢复状态后继续处理数据,就总能按照 key 找到对应之前的状态,就保证了结果的一致性。所以 Flink 对 Keyed State 进行了非常完善的包装,我们不需实现任何接口就可以直接使用。
  4. 而对于 Operator State 来说就会有所不同。因为不存在 key,所有数据发往哪个分区是不可预测的;也就是说,当发生故障重启之后,我们不能保证某个数据跟之前一样,进入到同一个并行子任务、访问同一个状态。所以 Flink 无法直接判断该怎样保存和恢复状态,而是提供了接口,让我们根据业务需求自行设计状态的快照保存(snapshot)和恢复(restore)逻辑。
  5. CheckpointedFunction 接口。在 Flink 中,对状态进行持久化保存的快照机制叫作“检查点”(Checkpoint)。于是使用算子状态时,就需要对检查点的相关操作进行定义,实现一个 CheckpointedFunction 接口。CheckpointedFunction 接口在源码中定义如下:
public interface CheckpointedFunction {// 保存状态快照到检查点时,调用这个方法void snapshotState(FunctionSnapshotContext context) throws Exception// 初始化状态时调用这个方法,也会在恢复状态时调用void   initializeState(FunctionInitializationContext   context)   throws Exception;
}
  1. 每次应用保存检查点做快照时,都会调用.snapshotState()方法,将状态进行外部持久化。而在算子任务进行初始化时,会调用. initializeState()方法。这又有两种情况:一种是整个应用第一次运行,这时状态会被初始化为一个默认值(default value);另一种是应用重启时,从检查点(checkpoint)或者保存点(savepoint)中读取之前状态的快照,并赋给本地状态。所以,接口中的.snapshotState()方法定义了检查点的快照保存逻辑,而. initializeState()方法不仅定义了初始化逻辑,也定义了恢复逻辑。
  2. 这里需要注意,CheckpointedFunction 接口中的两个方法,分别传入了一个上下文(context)作为参数。不同的是,.snapshotState()方法拿到的是快照的上下文 FunctionSnapshotContext,它可以提供检查点的相关信息,不过无法获取状态句柄;而. initializeState()方法拿到的是FunctionInitializationContext,这是函数类进行初始化时的上下文,是真正的“运行时上下文”。FunctionInitializationContext 中提供了“算子状态存储”(OperatorStateStore)和“按键分区状态存储(” KeyedStateStore),在这两个存储对象中可以非常方便地获取当前任务实例中的 Operator State 和 Keyed State。例如:
ListStateDescriptor<String> descriptor = new ListStateDescriptor<>( "buffered-elements", Types.of(String));ListState<String> checkpointedState   = context.getOperatorStateStore().getListState(descriptor);
  1. 我们看到,算子状态的注册和使用跟 Keyed State 非常类似,也是需要先定义一个状态描述器(StateDescriptor),告诉 Flink 当前状态的名称和类型,然后从上下文提供的算子状态存储(OperatorStateStore)中获取对应的状态对象。如果想要从 KeyedStateStore 中获取 Keyed State也是一样的,前提是必须基于定义了 key 的 KeyedStream,这和富函数类中的方式并不矛盾。通过这里的描述可以发现,CheckpointedFunction 是 Flink 中非常底层的接口,它为有状态的流处理提供了灵活且丰富的应用。
  2. 示例代码接下来我们举一个算子状态的应用案例。在下面的例子中,自定义的 SinkFunction 会在CheckpointedFunction 中进行数据缓存,然后统一发送到下游。这个例子演示了列表状态的平均分割重组(event-split redistribution)。
import com.atguigu.chapter05.ClickSource;
import com.atguigu.chapter05.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;import java.util.ArrayList;
import java.util.List;public class BufferingSinkExample {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.enableCheckpointing(10000L);
//        env.setStateBackend(new EmbeddedRocksDBStateBackend());//        env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage(""));CheckpointConfig checkpointConfig = env.getCheckpointConfig();checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);checkpointConfig.setMinPauseBetweenCheckpoints(500);checkpointConfig.setCheckpointTimeout(60000);checkpointConfig.setMaxConcurrentCheckpoints(1);checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);checkpointConfig.enableUnalignedCheckpoints();SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));stream.print("input");// 批量缓存输出stream.addSink(new BufferingSink(10));env.execute();}public static class BufferingSink implements SinkFunction<Event>, CheckpointedFunction {private final int threshold;private transient ListState<Event> checkpointedState;private List<Event> bufferedElements;public BufferingSink(int threshold) {this.threshold = threshold;this.bufferedElements = new ArrayList<>();}@Overridepublic void invoke(Event value, Context context) throws Exception {bufferedElements.add(value);if (bufferedElements.size() == threshold) {for (Event element: bufferedElements) {// 输出到外部系统,这里用控制台打印模拟System.out.println(element);}System.out.println("==========输出完毕=========");bufferedElements.clear();}}@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {checkpointedState.clear();// 把当前局部变量中的所有元素写入到检查点中for (Event element : bufferedElements) {checkpointedState.add(element);}}@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {ListStateDescriptor<Event> descriptor = new ListStateDescriptor<>("buffered-elements",Types.POJO(Event.class));checkpointedState = context.getOperatorStateStore().getListState(descriptor);// 如果是从故障中恢复,就将ListState中的所有元素添加到局部变量中if (context.isRestored()) {for (Event element : checkpointedState.get()) {bufferedElements.add(element);}}}}
}
  1. 当初始化好状态对象后,我们可以通过调用. isRestored()方法判断是否是从故障中恢复。在代码中 BufferingSink 初始化时,恢复出的 ListState 的所有元素会添加到一个局部变量bufferedElements 中,以后进行检查点快照时就可以直接使用了。在调用.snapshotState()时,直接清空 ListState,然后把当前局部变量中的所有元素写入到检查点中。
  2. 对于不同类型的算子状态,需要调用不同的获取状态对象的接口,对应地也就会使用不同的状态分配重组算法。比如获取列表状态时,调用.getListState() 会使用最简单的 平均分割重组(even-split redistribution)算法;而获取联合列表状态时,调用的是.getUnionListState() ,对应就会使用联合重组(union redistribution) 算法。

第四节 广播状态和状态持久化

4.1 广播状态基本用法

  1. 算子状态中有一类很特殊,就是广播状态(Broadcast State)。从概念和原理上讲,广播状态非常容易理解:状态广播出去,所有并行子任务的状态都是相同的;并行度调整时只要直接复制就可以了。然而在应用上,广播状态却与其他算子状态大不相同。
  2. 让所有并行子任务都持有同一份状态,也就意味着一旦状态有变化,所以子任务上的实例
    都要更新。什么时候会用到这样的广播状态呢?
  3. 一个最为普遍的应用,就是“动态配置”或者“动态规则”。我们在处理流数据时,有时会基于一些配置(configuration)或者规则(rule)。简单的配置当然可以直接读取配置文件,一次加载,永久有效;但数据流是连续不断的,如果这配置随着时间推移还会动态变化,那又该怎么办呢?
  4. 一个简单的想法是,定期扫描配置文件,发现改变就立即更新。但这样就需要另外启动一个扫描进程,如果扫描周期太长,配置更新不及时就会导致结果错误;如果扫描周期太短,又会耗费大量资源做无用功。解决的办法,还是流处理的“事件驱动”思路——我们可以将这动态的配置数据看作一条流,将这条流和本身要处理的数据流进行连接(connect),就可以实时地更新配置进行计算了。
    由于配置或者规则数据是全局有效的,我们需要把它广播给所有的并行子任务。而子任务需要把它5. 5. 作为一个算子状态保存起来,以保证故障恢复后处理结果是一致的。这时的状态,就是一个典型的广播状态。我们知道,广播状态与其他算子状态的列表(list)结构不同,底层是以键值对(key-value)形式描述的,所以其实就是一个映射状态(MapState)。
  5. 代码可以直接调用 DataStream 的.broadcast()方法,传入一个映射状态描述器MapStateDescriptor说明状态的名称和类型,就可以得到一个广播流BroadcastStream进而将要处理的数据流与这条广播流进行连接就会得到广播连接流BroadcastConnectedStream。注意广播状态只能用在广播连接流中。
MapStateDescriptor<String, Rule>    ruleStateDescriptor =   newMapStateDescriptor<>(...);BroadcastStream<Rule> ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor);DataStream<String> output = stream.connect(ruleBroadcastStream).process( new BroadcastProcessFunction<>() {...} );
  1. 这里我们定义了一个“规则流”ruleStream,里面的数据表示了数据流 stream 处理的规则,规则的数据类型定义为 Rule。于是需要先定义一个 MapStateDescriptor 来描述广播状态,然后传入 ruleStream.broadcast()得到广播流,接着用 stream 和广播流进行连接。这里状态描述器中的 key 类型为 String,就是为了区分不同的状态值而给定的 key 的名称。
  2. 对 于 广 播 连 接 流 调 用 .process() 方 法 , 可 以 传 入 “ 广 播 处 理 函 数 ” KeyedBroadcastProcessFunction 或者 BroadcastProcessFunction 来进行处理计算。广播处理函数里面有两个方法.processElement()和.processBroadcastElement(),源码中定义如下:
public  abstract  class  BroadcastProcessFunction<IN1,  IN2,  OUT>  extends BaseBroadcastProcessFunction {...public  abstract  void  processElement(IN1  value,  ReadOnlyContext  ctx,
Collector<OUT> out) throws Exception;public  abstract  void  processBroadcastElement(IN2  value,  Context  ctx,
Collector<OUT> out) throws Exception;
...
}
  1. 这里的.processElement()方法,处理的是正常数据流,第一个参数 value 就是当前到来的流数据;而.processBroadcastElement()方法就相当于是用来处理广播流的,它的第一个参数 value就是广播流中的规则或者配置数据。两个方法第二个参数都是一个上下文 ctx,都可以通过调用.getBroadcastState()方法获取到当前的广播状态;区别在于,.processElement()方法里的上下文 是 “ 只 读 ” 的 ( ReadOnly ), 因 此 获 取 到 的 广 播 状 态 也 只 能 读 取 不 能 更 改 ;而.processBroadcastElement()方法里的 Context 则没有限制,可以根据当前广播流中的数据更新状态。
Rule rule = ctx.getBroadcastState( new MapStateDescriptor<>("rules", Types.String, Types.POJO(Rule.class))).get("my rule");
  1. 通过调用 ctx.getBroadcastState()方法,传入一个 MapStateDescriptor,就可以得到当前的叫作“rules”的广播状态;调用它的.get()方法,就可以取出其中“my rule”对应的值进行计算处理。

4.2 广播状态代码实例

  1. 接下来我们举一个广播状态的应用案例。考虑在电商应用中,往往需要判断用户先后发生的行为的“组合模式”,比如“登录-下单”或者“登录-支付”,检测出这些连续的行为进行统计,就可以了解平台的运用状况以及用户的行为习惯。具体代码如下:
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.util.Collector;public class BroadcastStateExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 读取用户行为事件流DataStreamSource<Action> actionStream = env.fromElements(new Action("Alice", "login"),new Action("Alice", "pay"),new Action("Bob", "login"),new Action("Bob", "buy"));// 定义行为模式流,代表了要检测的标准DataStreamSource<Pattern> patternStream = env.fromElements(new Pattern("login", "pay"),new Pattern("login", "buy"));// 定义广播状态的描述器,创建广播流MapStateDescriptor<Void, Pattern> bcStateDescriptor = new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class));BroadcastStream<Pattern> bcPatterns = patternStream.broadcast(bcStateDescriptor);// 将事件流和广播流连接起来,进行处理DataStream<Tuple2<String, Pattern>> matches = actionStream.keyBy(data -> data.userId).connect(bcPatterns).process(new PatternEvaluator());matches.print();env.execute();}public static class PatternEvaluatorextends KeyedBroadcastProcessFunction<String, Action, Pattern, Tuple2<String, Pattern>> {// 定义一个值状态,保存上一次用户行为ValueState<String> prevActionState;@Overridepublic void open(Configuration conf) {prevActionState = getRuntimeContext().getState(new ValueStateDescriptor<>("lastAction", Types.STRING));}@Overridepublic void processBroadcastElement(Pattern pattern,Context ctx,Collector<Tuple2<String, Pattern>> out) throws Exception {BroadcastState<Void, Pattern> bcState = ctx.getBroadcastState(new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class)));// 将广播状态更新为当前的patternbcState.put(null, pattern);}@Overridepublic void processElement(Action action, ReadOnlyContext ctx,Collector<Tuple2<String, Pattern>> out) throws Exception {Pattern pattern = ctx.getBroadcastState(new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class))).get(null);String prevAction = prevActionState.value();if (pattern != null && prevAction != null) {// 如果前后两次行为都符合模式定义,输出一组匹配if (pattern.action1.equals(prevAction) && pattern.action2.equals(action.action)) {out.collect(new Tuple2<>(ctx.getCurrentKey(), pattern));}}// 更新状态prevActionState.update(action.action);}}// 定义用户行为事件POJO类public static class Action {public String userId;public String action;public Action() {}public Action(String userId, String action) {this.userId = userId;this.action = action;}@Overridepublic String toString() {return "Action{" +"userId=" + userId +", action='" + action + '\'' +'}';}}// 定义行为模式POJO类,包含先后发生的两个行为public static class Pattern {public String action1;public String action2;public Pattern() {}public Pattern(String action1, String action2) {this.action1 = action1;this.action2 = action2;}@Overridepublic String toString() {return "Pattern{" +"action1='" + action1 + '\'' +", action2='" + action2 + '\'' +'}';}}
}
  1. 这里我们将检测的行为模式定义为 POJO 类 Pattern,里面包含了连续的两个行为。由于广播状态中只保存了一个 Pattern,并不关心 MapState 中的 key,所以也可以直接将 key 的类型指定为 Void,具体值就是 null。在具体的操作过程中,我们将广播流中的 Pattern 数据保存为广播变量;在行为数据 Action 到来之后读取当前广播变量,确定行为模式,并将之前的一次行为保存为一个 ValueState——这是针对当前用户的状态保存,所以用到了 Keyed State。检测到如果前一次行为与 Pattern 中的 action1 相同,而当前行为与 action2 相同,则发现了匹配模式的一组行为,输出检测结果。

4.3 状态持久化和状态后端

  1. 在 Flink 的状态管理机制中,很重要的一个功能就是对状态进行持久化(persistence)保存,这样就可以在发生故障后进行重启恢复。Flink 对状态进行持久化的方式,就是将当前所有分布式状态进行“快照”保存,写入一个“检查点”(checkpoint)或者保存点(savepoint)保存到外部存储系统中。具体的存储介质,一般是分布式文件系统(distributed file system)。
  2. 检查点(Checkpoint)有状态流应用中的检查点(checkpoint),其实就是所有任务的状态在某个时间点的一个快照(一份拷贝)。简单来讲,就是一次“存盘”,让我们之前处理数据的进度不要丢掉。在一个流应用程序运行时,Flink 会定期保存检查点,在检查点中会记录每个算子的 id 和状态;如果发生故障,Flink 就会用最近一次成功保存的检查点来恢复应用的状态,重新启动处理流程,就如同“读档”一样。
  3. 如果保存检查点之后又处理了一些数据,然后发生了故障,那么重启恢复状态之后这些数据带来的状态改变会丢失。为了让最终处理结果正确,我们还需要让源(Source)算子重新读取这些数据,再次处理一遍。这就需要流的数据源具有“数据重放”的能力,一个典型的例子就是 Kafka,我们可以通过保存消费数据的偏移量、故障重启后重新提交来实现数据的重放。这是对“至少一次”(at least once)状态一致性的保证,如果希望实现“精确一次”(exactly once)的一致性,还需要数据写入外部系统时的相关保证。
  4. 默认情况下,检查点是被禁用的,需要在代码中手动开启。直接调用执行环境的.enableCheckpointing()方法就可以开启检查点。这里传入的参数是检查点的间隔时间,单位为毫秒。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getEnvironment();env.enableCheckpointing(1000);
  1. 除了检查点之外,Flink 还提供了“保存点”(savepoint)的功能。保存点在原理和形式上跟检查点完全一样,也是状态持久化保存的一个快照;区别在于,保存点是自定义的镜像保存,所以不会由 Flink 自动创建,而需要用户手动触发。这在有计划地停止、重启应用时非常有用。

  2. 状态后端(State Backends)检查点的保存离不开 JobManager 和 TaskManager,以及外部存储系统的协调。在应用进行检查点保存时,首先会由 JobManager 向所有 TaskManager 发出触发检查点的命令;TaskManger 收到之后,将当前任务的所有状态进行快照保存,持久化到远程的存储介质中;完成之后向 JobManager 返回确认信息。这个过程是分布式的,当 JobManger 收到所有TaskManager 的返回信息后,就会确认当前检查点成功保存,如图所示。而这一切工作的协调,就需要一个“专职人员”来完成。

  3. 在 Flink 中,状态的存储、访问以及维护,都是由一个可插拔的组件决定的,这个组件就叫作状态后端(state backend)。状态后端主要负责两件事:一是本地的状态管理,二是将检查点(checkpoint)写入远程的持久化存储。

  4. 状态后端的分类状态后端是一个“开箱即用”的组件,可以在不改变应用程序逻辑的情况下独立配置。Flink 中提供了两类不同的状态后端,一种是“哈希表状态后端”(HashMapStateBackend),另一种是“内嵌 RocksDB 状态后端”(EmbeddedRocksDBStateBackend)。如果没有特别配置,系统默认的状态后端是 HashMapStateBackend。

    • 哈希表状态后端(HashMapStateBackend)这种方式就是我们之前所说的,把状态存放在内存里。具体实现上,哈希表状态后端在内部会直接把状态当作对象(objects),保存在 Taskmanager 的 JVM 堆(heap)上。普通的状态,以及窗口中收集的数据和触发器(triggers),都会以键值对(key-value)的形式存储起来,所以底层是一个哈希表(HashMap),这种状态后端也因此得名。
  5. 对于检查点的保存,一般是放在持久化的分布式文件系统(file system)中,也可以通过配置“检查点存储”(CheckpointStorage)来另外指定。HashMapStateBackend 是将本地状态全部放入内存的,这样可以获得最快的读写速度,使计算性能达到最佳;代价则是内存的占用。它适用于具有大状态、长窗口、大键值状态的作业,对所有高可用性设置也是有效的。

  6. 内嵌 RocksDB 状态后端(EmbeddedRocksDBStateBackend)RocksDB 是一种内嵌的 key-value 存储介质,可以把数据持久化到本地硬盘。配置EmbeddedRocksDBStateBackend 后,会将处理中的数据全部放入 RocksDB 数据库中,RocksDB默认存储在 TaskManager 的本地数据目录里。与 HashMapStateBackend 直接在堆内存中存储对象不同,这种方式下状态主要是放在RocksDB 中的。数据被存储为序列化的字节数组(Byte Arrays),读写操作需要序列化/反序列化,因此状态的访问性能要差一些。另外,因为做了序列化,key 的比较也会按照字节进行,而不是直接调用.hashCode()和.equals()方法。

  7. 对于检查点,同样会写入到远程的持久化文件系统中。EmbeddedRocksDBStateBackend 始终执行的是异步快照,也就是不会因为保存检查点而阻塞数据的处理;而且它还提供了增量式保存检查点的机制,这在很多情况下可以大大提升保存效率。由于它会把状态数据落盘,而且支持增量化的检查点,所以在状态非常大、窗口非常长、键/值状态很大的应用场景中是一个好选择,同样对所有高可用性设置有效。

  8. 如何选择正确的状态后端HashMap 和 RocksDB 两种状态后端最大的区别,就在于本地状态存放在哪里:前者是内存,后者是 RocksDB。在实际应用中,选择那种状态后端,主要是需要根据业务需求在处理性能和应用的扩展性上做一个选择。

  9. HashMapStateBackend 是内存计算,读写速度非常快;但是,状态的大小会受到集群可用内存的限制,如果应用的状态随着时间不停地增长,就会耗尽内存资源。

  10. 而 RocksDB 是硬盘存储,所以可以根据可用的磁盘空间进行扩展,而且是唯一支持增量检查点的状态后端,所以它非常适合于超级海量状态的存储。不过由于每个状态的读写都需要做序列化/反序列化,而且可能需要直接从磁盘读取数据,这就会导致性能的降低,平均读写性能要比 HashMapStateBackend 慢一个数量级。

  11. 我们可以发现,实际应用就是权衡利弊后的取舍。最理想的当然是处理速度快且内存不受限制可以处理海量状态,那就需要非常大的内存资源了,这会导致成本超出项目预算。比起花更多的钱,稍慢的处理速度或者稍小的处理规模,老板可能更容易接受一点。

  12. 状态后端的配置在不做配置的时候,应用程序使用的默认状态后端是由集群配置文件 flink-conf.yaml 中指定的,配置的键名称为 state.backend。这个默认配置对集群上运行的所有作业都有效,我们可以通过更改配置值来改变默认的状态后端。另外,我们还可以在代码中为当前作业单独配置状态后端,这个配置会覆盖掉集群配置文件的默认值。

    • 配置默认的状态后端在 flink-conf.yaml 中,可以使用 state.backend 来配置默认状态后端。配置项的可能值为 hashmap,这样配置的就是 HashMapStateBackend;也可以是 rocksdb,这样配置的就是 EmbeddedRocksDBStateBackend。另外,也可以是一个实现了状态后端工厂StateBackendFactory 的类的完全限定类名。
  13. 下面是一个配置 HashMapStateBackend 的例子:

# 默认状态后端state.backend: hashmap # 存放检查点的文件路径
state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints
  1. 这里的 state.checkpoints.dir 配置项,定义了状态后端将检查点和元数据写入的目录。为每个作业(Per-job)单独配置状态后端每个作业独立的状态后端,可以在代码中,基于作业的执行环境直接设置。代码如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStateBackend(new HashMapStateBackend());
  1. 上面代码设置的是 HashMapStateBackend,如果想要设置 EmbeddedRocksDBStateBackend,可以用下面的配置方式:
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.setStateBackend(new EmbeddedRocksDBStateBackend());
  1. 需要注意,如果想在 IDE 中使用 EmbeddedRocksDBStateBackend,需要为 Flink 项目添加依赖
        <dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_${scala.binary.version</artifactId><version>1.13.0</version></dependency>
  1. 而由于 Flink 发行版中默认就包含了 RocksDB,所以只要我们的代码中没有使用 RocksDB的相关内容,就不需要引入这个依赖。即使我们在 flink-conf.yaml 配置文件中设定了state.backend 为 rocksdb,也可以直接正常运行,并且使用 RocksDB 作为状态后端。

第五节 容错机制

5.1 检查点介绍

  1. 流式数据连续不断地到来,无休无止;所以流处理程序也是持续运行的,并没有一个明确的结束退出时间。机器运行程序,996 起来当然比人要容易得多,不过希望“永远运行”也是不切实际的。因为各种硬件软件的原因,运行一段时间后程序可能异常退出、机器可能宕机,如果我们只依赖一台机器来运行,就会使得任务的处理被迫中断。
  2. 一个解决方案就是多台机器组成集群,以“分布式架构”来运行程序。这样不仅扩展了系统的并行处理能力,而且可以解决单点故障的问题,从而大大提高系统的稳定性和可用性。
  3. 在分布式架构中,当某个节点出现故障,其他节点基本不受影响。这时只需要重启应用,恢复之前某个时间点的状态继续处理就可以了。这一切看似简单,可是在实时流处理中,我们不仅需要保证故障后能够重启继续运行,还要保证结果的正确性、故障恢复的速度、对处理性能的影响,这就需要在架构上做出更加精巧的设计。
  4. 在 Flink 中,有一套完整的容错机制(fault tolerance)来保证故障后的恢复,其中最重要的就是检查点(checkpoint)。
  5. 发生故障之后怎么办?最简单的想法当然是重启机器、重启应用。由于是分布式的集群,即使一个节点无法恢复,也不会影响应用的重启执行。这里的问题在于,流处理应用中的任务都是有状态的,而为了快速访问这些状态一般会直接放在堆内存里;现在重启应用,内存中的状态已经丢失,就意味着之前的计算全部白费了,需要从头来过。就像编写文档或是玩 RPG游戏,因为宕机没保存而要重来一遍是一件令人崩溃的事情;这种惨痛的经历让我们养成了一个好习惯——随时存档,这样即使遇到宕机也可以读档继续了。
  6. 在流处理中,我们同样可以用存档读档的思路,把之前的计算结果做个保存,这样重启之后就可以继续处理新数据、而不需要重新计算了。进一步地,我们知道在有状态的流处理中,任务继续处理新数据,并不需要“之前的计算结果”,而是需要任务“之前的状态”。所以我们最终的选择,就是将之前某个时间点所有的状态保存下来,这份“存档”就是所谓的“检查点”(checkpoint)。
  7. 遇到故障重启的时候,我们可以从检查点中“读档”,恢复出之前的状态,这样就可以回到当时保存的一刻接着处理数据了。
  8. 检查点是 Flink 容错机制的核心。这里所谓的“检查”,其实是针对故障恢复的结果而言的:故障恢复之后继续处理的结果,应该与发生故障前完全一致,我们需要“检查”结果的正确性。所以,有时又会把 checkpoint 叫作“一致性检查点”。
  9. 检查点的保存什么时候进行检查点的保存呢?最理想的情况下,我们应该“随时”保存,也就是每处理完一个数据就保存一下当前的状态;这样如果在处理某条数据时出现故障,我们只要回到上一个数据处理完之后的状态,然后重新处理一遍这条数据就可以。这样重复处理的数据最少,完全没有多余操作,可以做到最低的延迟。然而实际情况不会这么完美。
  10. 周期性的触发保存。“随时存档”确实恢复起来方便,可是需要我们不停地做存档操作。如果每处理一条数据就进行检查点的保存,当大量数据同时到来时,就会耗费很多资源来频繁做检查点,数据处理的速度就会受到影响。所以更好的方式是,每隔一段时间去做一次存档,这样既不会影响数据的正常处理,也不会有太大的延迟——毕竟故障恢复的情况不是随时发生的。在 Flink 中,检查点的保存是周期性触发的,间隔时间可以进行设置。
  11. 所以检查点作为应用状态的一份“存档”,其实就是所有任务状态在同一时间点的一个“快照”(snapshot),它的触发是周期性的。具体来说,当每隔一段时间检查点保存操作被触发时,就把每个任务当前的状态复制一份,按照一定的逻辑结构放在一起持久化保存起来,就构成了检查点。
  12. 保存的时间点这里有一个关键问题:当检查点的保存被触发时,任务有可能正在处理某个数据,这时该怎么办呢?最简单的想法是,可以在某个时刻“按下暂停键”,让所有任务停止处理数据。这样状态就不再更改,大家可以一起复制保存;保存完毕之后,再同时恢复数据处理就可以了。
  13. 然而仔细思考就会发现这有很多问题。这种想法其实是粗暴地“停止一切来拍照”,在保存检查点的过程中,任务完全中断了,这会造成很大的延迟;我们之前为了实时性做出的所有设计就毁在了做快照上。另一方面,我们做快照的目的是为了故障恢复;现在的快照中,有些任务正在处理数据,那它保存的到底是处理到什么程度的状态呢?举个例子,我们在程序中某一步操作中自定义了一个 ValueState,处理的逻辑是:当遇到一个数据时,状态先加 1;而后经过一些其他步骤后再加 1。现在停止处理数据,状态到底是被加了 1 还是加了 2 呢?这很重要,因为状态恢复之后,我们需要知道当前数据从哪里开始继续处理。要满足这个要求,就必须将暂停时的所有环境信息都保存下来——而这显然是很麻烦的。
  14. 为了解决这个问题,我们不应该“一刀切”把所有任务同时停掉,而是至少得先把手头正在处理的数据弄完。这样的话,我们在检查点中就不需要保存所有上下文信息,只要知道当前处理到哪个数据就可以了。
  15. 但这样依然会有问题:分布式系统的节点之间需要通过网络通信来传递数据,如果我们保存检查点的时候刚好有数据在网络传输的路上,那么下游任务是没法将数据保存起来的;故障重启之后,我们只能期待上游任务重新发送这个数据。然而上游任务是无法知道下游任务是否收到数据的,只能盲目地重发,这可能导致下游将数据处理两次,结果就会出现错误。
  16. 所以我们最终的选择是:当所有任务都恰好处理完一个相同的输入数据的时候,将它们的状态保存下来。首先,这样避免了除状态之外其他额外信息的存储,提高了检查点保存的效率。其次,一个数据要么就是被所有任务完整地处理完,状态得到了保存;要么就是没处理完,状态全部没保存:这就相当于构建了一个“事务”(transaction)。如果出现故障,我们恢复到之前保存的状态,故障时正在处理的所有数据都需要重新处理;所以我们只需要让源(source)任务向数据源重新提交偏移量、请求重放数据就可以了。这需要源任务可以把偏移量作为算子状态保存下来,而且外部数据源能够重置偏移量;Kafka 就是满足这些要求的一个最好的例子,我们会在后面详细讨论。
  17. 保存的具体流程检查点的保存,最关键的就是要等所有任务将“同一个数据”处理完毕。下面我们通过一个具体的例子,来详细描述一下检查点具体的保存过程。回忆一下我们最初实现的统计词频的程序——WordCount。这里为了方便,我们直接从数据源读入已经分开的一个个单词,例如这里输入的就是:“hello”“world”“hello”“flink”“hello”“world”“hello”“flink”……对应的代码就可以简化为:
SingleOutputStreamOperator<Tuple2<String, Long>> wordCountStream = env.addSource(...)
.map(word -> Tuple2.of(word, 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG)); .keyBy(t -> t.f0);
.sum(1);
  1. 源(Source)任务从外部数据源读取数据,并记录当前的偏移量,作为算子状态(Operator State)保存下来。然后将数据发给下游的 Map 任务,它会将一个单词转换成(word, count)二元组,初始 count 都是 1,也就是(“hello”, 1)、(“world”, 1)这样的形式;这是一个无状态的算子任务。进而以 word 作为键(key)进行分区,调用.sum()方法就可以对 count 值进行求和统计了;Sum 算子会把当前求和的结果作为按键分区状态(Keyed State)保存下来。最后得到的就是当前单词的频次统计(word, count)。

  2. 当我们需要保存检查点(checkpoint)时,就是在所有任务处理完同一条数据后,对状态做个快照保存下来。例如上图中,已经处理了 3 条数据:“hello”“world”“hello”,所以我们会看到 Source 算子的偏移量为 3;后面的 Sum 算子处理完第三条数据“hello”之后,此时已经有 2 个“hello”和 1 个“world”,所以对应的状态为“hello”-> 2,“world”-> 1(这里 KeyedState底层会以 key-value 形式存储)。此时所有任务都已经处理完了前三个数据,所以我们可以把当前的状态保存成一个检查点,写入外部存储中。至于具体保存到哪里,这是由状态后端的配置项 检查点存储来决定的。 可以有作业管理器的堆内存JobManagerCheckpointStorage和文件系统FileSystemCheckpointStorage两种选择。一般情况下,我们会将检查点写入持久化的分布式文件系统。

  3. 从检查点恢复状态。在运行流处理程序时,Flink 会周期性地保存检查点。当发生故障时,就需要找到最近一次成功保存的检查点来恢复状态。例如在上节的 word count 示例中,我们处理完三个数据后保存了一个检查点。之后继续运行,又正常处理了一个数据“flink”,在处理第五个数据“hello”时发生了故障。

  4. 这里 Source 任务已经处理完毕,所以偏移量为 5;Map 任务也处理完成了。而 Sum 任务在处理中发生了故障,此时状态并未保存。接下来就需要从检查点来恢复状态了。具体的步骤为:

    • 重启应用遇到故障之后,第一步当然就是重启。我们将应用重新启动后,所有任务的状态会清空,
    • 读取检查点,重置状态找到最近一次保存的检查点,从中读出每个算子任务状态的快照,分别填充到对应的状态中。这样,Flink 内部所有任务的状态,就恢复到了保存检查点的那一时刻,也就是刚好处理完第三个数据的时候。这里 key 为“flink”并没有数据到来,所以初始为 0。
    • 重放数据从检查点恢复状态后还有一个问题:如果直接继续处理数据,那么保存检查点之后、到发生故障这段时间内的数据,也就是第 4、5 个数据(“flink”“hello”)就相当于丢掉了;这会造成计算结果的错误。为了不丢数据,我们应该从保存检查点后开始重新读取数据,这可以通过 Source 任务向外部数据源重新提交偏移量(offset)来实现这样,整个系统的状态已经完全回退到了检查点保存完成的那一时刻。
    • 继续处理数据接下来,我们就可以正常处理数据了。首先是重放第 4、5 个数据,然后继续读取后面的数据
  5. 当处理到第 5 个数据时,就已经追上了发生故障时的系统状态。之后继续处理,就好像没有发生过故障一样;我们既没有丢掉数据也没有重复计算数据,这就保证了计算结果的正确性。在分布式系统中,这叫作实现了“精确一次”(exactly-once)的状态一致性保证。

  6. 这里我们也可以发现,想要正确地从检查点中读取并恢复状态,必须知道每个算子任务状态的类型和它们的先后顺序(拓扑结构);因此为了可以从之前的检查点中恢复状态,我们在改动程序、修复 bug 时要保证状态的拓扑顺序和类型不变。状态的拓扑结构在 JobManager 上可以由 JobGraph 分析得到,而检查点保存的定期触发也是由 JobManager 控制的;所以故障恢复的过程需要 JobManager 的参与。

5.2 检查点算法

  1. 我们已经知道,Flink 保存检查点的时间点,是所有任务都处理完同一个输入数据的时候。但是不同的任务处理数据的速度不同,当第一个 Source 任务处理到某个数据时,后面的 Sum任务可能还在处理之前的数据;而且数据经过任务处理之后类型和值都会发生变化,面对着“面目全非”的数据,不同的任务怎么知道处理的是“同一个”呢?

  2. 一个简单的想法是,当接到 JobManager 发出的保存检查点的指令后,Source 算子任务处理完当前数据就暂停等待,不再读取新的数据了。这样我们就可以保证在流中只有需要保存到检查点的数据,只要把它们全部处理完,就可以保证所有任务刚好处理完最后一个数据;这时把所有状态保存起来,合并之后就是一个检查点了。这就好比我们想要保存所有同学刚好毕业时的状态,那就在所有人答辩完成之后,集合起来拍一张毕业合照。这样做最大的问题,就是每个人的进度可能不同;先答辩完的人为了保证状态一致不能进行其他工作,只能等待。当先保存完状态的任务需要等待其他任务时,就导致了资源的闲置和性能的降低。

  3. 所以更好的做法是,在不暂停整体流处理的前提下,将状态备份保存到检查点。在 Flink中,采用了基于 Chandy-Lamport 算法的分布式快照,下面我们就来详细了解一下。

  4. **检查点分界线(Barrier)**我们现在的目标是,在不暂停流处理的前提下,让每个任务“认出”触发检查点保存的那个数据。

  5. 自然想到,如果给数据添加一个特殊标识,任务就可以准确识别并开始保存状态了。这需要在 Source 任务收到触发检查点保存的指令后,立即在当前处理的数据中插入一个标识字段,然后再向下游任务发出。但是假如 Source 任务此时并没有正在处理的数据,这个操作就无法实现了。

  6. 所以我们可以借鉴水位线(watermark)的设计,在数据流中插入一个特殊的数据结构,专门用来表示触发检查点保存的时间点。收到保存检查点的指令后,Source 任务可以在当前数据流中插入这个结构;之后的所有任务只要遇到它就开始对状态做持久化快照保存。由于数据流是保持顺序依次处理的,因此遇到这个标识就代表之前的数据都处理完了,可以保存一个检查点;而在它之后的数据,引起的状态改变就不会体现在这个检查点中,而需要保存到下一个检查点。

  7. 这种特殊的数据形式,把一条流上的数据按照不同的检查点分隔开,所以就叫作检查点的
    “分界线”(Checkpoint Barrier)。与水位线很类似,检查点分界线也是一条特殊的数据,由 Source 算子注入到常规的数据流中,它的位置是限定好的,不能超过其他数据,也不能被后面的数据超过。检查点分界线中带有一个检查点 ID,这是当前要保存的检查点的唯一标识。

  8. 这样,分界线就将一条流逻辑上分成了两部分:分界线之前到来的数据导致的状态更改,都会被包含在当前分界线所表示的检查点中;而基于分界线之后的数据导致的状态更改,则会被包含在之后的检查点中。

  9. 在 JobManager 中有一个“检查点协调器”(checkpoint coordinator),专门用来协调处理检查点的相关工作。检查点协调器会定期向 TaskManager 发出指令,要求保存检查点(带着检查点 ID);TaskManager 会让所有的 Source 任务把自己的偏移量(算子状态)保存起来,并将带有检查点 ID 的分界线(barrier)插入到当前的数据流中,然后像正常的数据一样像下游传递;之后 Source 任务就可以继续读入新的数据了。

  10. 每个算子任务只要处理到这个 barrier,就把当前的状态进行快照;在收到 barrier 之前,还是正常地处理之前的数据,完全不受影响。比如上图中,Source 任务收到 1 号检查点保存指令时,读取完了三个数据,所以将偏移量 3 保存到外部存储中;而后将 ID 为 1 的 barrier 注入数据流;与此同时,Map 任务刚刚收到上一条数据“hello”,而 Sum 任务则还在处理之前的第二条数据(world, 1)。下游任务不会在这时就立刻保存状态,而是等收到 barrier 时才去做快照,这时可以保证前三个数据都已经处理完了。同样地,下游任务做状态快照时,也不会影响上游任务的处理,每个任务的快照保存并行不悖,不会有暂停等待的时间。

  11. 如果还是拿拍毕业照来类比的话,现在就不需要大家答辩完之后聚在一起排队摆 pose 了——每个人完成答辩之后只要单独照张相,就可以继续做自己的事情去了;最后由班主任老师发挥 P 图技能合成合照,这样无疑就省去了大家集合等待的时间。

  12. 分布式快照算法。通过在流中插入分界线(barrier),我们可以明确地指示触发检查点保存的时间。在一条单一的流上,数据依次进行处理,顺序保持不变;不过对于分布式流处理来说,想要一直保持数据的顺序就不是那么容易了。

  13. 我们先回忆一下水位线(watermark)的处理:上游任务向多个并行下游任务传递时,需要广播出去;而多个上游任务向同一个下游任务传递时,则需要下游任务为每个上游并行任务维护一个“分区水位线”,取其中最小的那个作为当前任务的事件时钟。那 barier 在并行数据流中的传递,是不是也有类似的规则呢?

  14. watermark 指示的是“之前的数据全部到齐了”,而 barrier 指示的是“之前所有数据的状态更改保存入当前检查点”:它们都是一个“截止时间”的标志。所以在处理多个分区的传递时,也要以是否还会有数据到来作为一个判断标准。

  15. 具体实现上,Flink 使用了 Chandy-Lamport 算法的一种变体,被称为“异步分界线快照”
    (asynchronous barrier snapshotting)算法。算法的核心就是两个原则:当上游任务向多个并行下游任务发送 barrier 时,需要广播出去;而当多个上游任务向同一个下游任务传递 barrier 时,需要在下游任务执行“分界线对齐”(barrier alignment)操作,也就是需要等到所有并行分区的 barrier 都到齐,才可以开始状态的保存。

  16. 为了详细解释检查点算法的原理,我们对之前的 word count 程序进行扩展,考虑所有算子并行度为 2 的场景。

  17. 我们有两个并行的 Source 任务,会分别读取两个数据流(或者是一个源的不同分区)。这里每条流中的数据都是一个个的单词:“hello”“world”“hello”“flink”交替出现。此时第一条流的 Source 任务(为了方便,下文中我们直接叫它“Source 1”,其他任务类似)读取了 3个数据,偏移量为 3;而第二条流的 Source 任务(Source 2)只读取了一个“hello”数据,偏移量为 1。第一条流中的第一个数据“hello”已经完全处理完毕,所以 Sum 任务的状态中 key为 hello 对应着值 1,而且已经发出了结果(hello, 1);第二个数据“world”经过了 Map 任务的转换,还在被 Sum 任务处理;第三个数据“hello”还在被 Map 任务处理。而第二条流的第一个数据“hello”同样已经经过了 Map 转换,正在被 Sum 任务处理。

  18. 接下来就是检查点保存的算法。具体过程如下:

  19. 第一:JobManager 发送指令,触发检查点的保存;Source 任务保存状态,插入分界线JobManager 会周期性地向每个 TaskManager 发送一条带有新检查点 ID 的消息,通过这种方式来启动检查点。收到指令后,TaskManger 会在所有 Source 任务中插入一个分界线(barrier),并将偏移量保存到远程的持久化存储中。并行的 Source 任务保存的状态为 3 和 1,表示当前的 1 号检查点应该包含:第一条流中截至第三个数据、第二条流中截至第一个数据的所有状态更改。可以发现 Source 任务做这些的时候并不影响后面任务的处理,Sum 任务已经处理完了第一条流中传来的(world, 1),对应的状态也有了更改。

  20. 第二:状态快照保存完成,分界线向下游传递。状态存入持久化存储之后,会返回通知给 Source 任务;Source 任务就会向 JobManager确认检查点完成,然后像数据一样把 barrier 向下游任务传递。

  21. 由于 Source 和 Map 之间是一对一(forward)的传输关系(这里没有考虑算子链 operator chain),所以 barrier 可以直接传递给对应的 Map 任务。之后 Source 任务就可以继续读取新的数据了。与此同时,Sum 1 已经将第二条流传来的(hello,1)处理完毕,更新了状态。

  22. 第三:向下游多个并行子任务广播分界线,执行分界线对齐。Map 任务没有状态,所以直接将 barrier 继续向下游传递。这时由于进行了 keyBy 分区,所以需要将 barrier 广播到下游并行的两个 Sum 任务。同时,Sum 任务可能收到来自上游两个并行 Map 任务的 barrier,所以需要执行“分界线对齐”操作。

  23. 此时的 Sum 2 收到了来自上游两个 Map 任务的 barrier,说明第一条流第三个数据、第二条流第一个数据都已经处理完,可以进行状态的保存了;而 Sum 1 只收到了来自 Map 2 的barrier,所以这时需要等待分界线对齐。在等待的过程中,如果分界线尚未到达的分区任务Map 1 又传来了数据(hello, 1),说明这是需要保存到检查点的,Sum 任务应该正常继续处理数据,状态更新为 3;而如果分界线已经到达的分区任务 Map 2 又传来数据,这已经是下一个检查点要保存的内容了,就不应立即处理,而是要缓存起来、等到状态保存之后再做处理。

  24. 第四:分界线对齐后,保存状态到持久化存储各个分区的分界线都对齐后,就可以对当前状态做快照,保存到持久化存储了。存储完成之后,同样将 barrier 向下游继续传递,并通知 JobManager 保存完毕。

  25. 这个过程中,每个任务保存自己的状态都是相对独立的,互不影响。我们可以看到,当
    Sum 将当前状态保存完毕时,Source 1 任务已经读取到第一条流的第五个数据了。

  26. 第五:先处理缓存数据,然后正常继续处理。完成检查点保存之后,任务就可以继续正常处理数据了。这时如果有等待分界线对齐时缓存的数据,需要先做处理;然后再按照顺序依次处理新到的数据。当 JobManager 收到所有任务成功保存状态的信息,就可以确认当前检查点成功保存。之后遇到故障就可以从这里恢复了。

  27. 由于分界线对齐要求先到达的分区做缓存等待,一定程度上会影响处理的速度;当出现背压(backpressure)时,下游任务会堆积大量的缓冲数据,检查点可能需要很久才可以保存完毕。为了应对这种场景,Flink 1.11 之后提供了不对齐的检查点保存方式,可以将未处理的缓冲数据(in-flight data)也保存进检查点。这样,当我们遇到一个分区 barrier 时就不需等待对齐,而是可以直接启动状态的保存了。

5.3 检查点配置

  1. 检查点的作用是为了故障恢复,我们不能因为保存检查点占据了大量时间、导致数据处理性能明显降低。为了兼顾容错性和处理性能,我们可以在代码中对检查点进行各种配置。
  2. 启用检查点。默认情况下,Flink 程序是禁用检查点的。如果想要为 Flink 应用开启自动保存快照的功能,需要在代码中显式地调用执行环境的.enableCheckpointing()方法:这里需要传入一个长整型的毫秒数,表示周期性保存检查点的间隔时间。如果不传参数直接启用检查点,默认的间隔周期为 500 毫秒,这种方式已经被弃用。
  3. 检查点的间隔时间是对处理性能和故障恢复速度的一个权衡。如果我们希望对性能的影响更小,可以调大间隔时间;而如果希望故障重启后迅速赶上实时的数据处理,就需要将间隔时间设小一些
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 每隔 1 秒启动一次检查点保存
env.enableCheckpointing(1000);
  1. 检查点存储(Checkpoint Storage)检查点具体的持久化存储位置,取决于“检查点存储”(CheckpointStorage)的设置。默认情况下,检查点存储在 JobManager 的堆(heap)内存中。而对于大状态的持久化保存,Flink也提供了在其他存储位置进行保存的接口,这就是 CheckpointStorage。
  2. 具体可以通过调用检查点配置的 .setCheckpointStorage() 来 配置,需要传入一 个
    CheckpointStorage 的实现类。Flink 主要提供了两种 CheckpointStorage:作业管理器的堆内存
    (JobManagerCheckpointStorage)和文件系统(FileSystemCheckpointStorage)。
// 配置存储检查点到 JobManager 堆内存env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage());
// 配置存储检查点到文件系统env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://namenode:40010/flink/checkpoints"));
对于实际生产应用,我们一般会将 CheckpointStorage  配置为高可用的分布式文件系统(HDFS,S3 等)。
  1. 其他高级配置检查点还有很多可以配置的选项,可以通过获取检查点配置CheckpointConfig来进行设置。
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
  1. 我们这里做一个简单的列举说明:

    • 检查点模式(CheckpointingMode)设置检查点一致性的保证级别,有“精确一次”(exactly-once)和“至少一次”(at-least-once)两个选项。默认级别为 exactly-once,而对于大多数低延迟的流处理程序,at-least-once 就够用了,而且处理效率会更高。
    • 超时时间(checkpointTimeout)用于指定检查点保存的超时时间,超时没完成就会被丢弃掉。传入一个长整型毫秒数作为参数,表示超时时间。
    • 最小间隔时间(minPauseBetweenCheckpoints)用于指定在上一个检查点完成之后,检查点协调器(checkpoint coordinator)最快等多久可以出发保存下一个检查点的指令。这就意味着即使已经达到了周期触发的时间点,只要距离上一个检查点完成的间隔不够,就依然不能开启下一次检查点的保存。这就为正常处理数据留下了充足的间隙。当指定这个参数时,maxConcurrentCheckpoints 的值强制为 1。
    • 最大并发检查点数量(maxConcurrentCheckpoints)用于指定运行中的检查点最多可以有多少个。由于每个任务的处理进度不同,完全可能出现后面的任务还没完成前一个检查点的保存、前面任务已经开始保存下一个检查点了。这个参数就是限制同时进行的最大数量。如果前面设置了 minPauseBetweenCheckpoints,则 maxConcurrentCheckpoints 这个参数就不起作用了。
    • 开启外部持久化存储(enableExternalizedCheckpoints)用于开启检查点的外部持久化,而且默认在作业失败的时候不会自动清理,如果想释放空间需要自己手工清理。里面传入的参数 ExternalizedCheckpointCleanup 指定了当作业取消的时候外部的检查点该如何清理。
      • DELETE_ON_CANCELLATION:在作业取消的时候会自动删除外部检查点,但是如果是作业失败退出,则会保留检查点。
      • RETAIN_ON_CANCELLATION:作业取消的时候也会保留外部检查点。
    • 检查点异常时是否让整个任务失败(failOnCheckpointingErrors)用于指定在检查点发生异常的时候,是否应该让任务直接失败退出。默认为 true,如果设置为 false,则任务会丢弃掉检查点然后继续运行。
    • 不对齐检查点(enableUnalignedCheckpoints)不再执行检查点的分界线对齐操作,启用之后可以大大减少产生背压时的检查点保存时间。这个设置要求检查点模式CheckpointingMode必须为 exctly-once,并且并发的检查点个数为 1。代码中具体设置如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 启用检查点,间隔时间 1 秒env.enableCheckpointing(1000);
CheckpointConfig checkpointConfig = env.getCheckpointConfig(); // 设置精确一次模式
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 最小间隔时间 500 毫秒
checkpointConfig.setMinPauseBetweenCheckpoints(500);
// 超时时间 1 分钟
checkpointConfig.setCheckpointTimeout(60000); // 同时只能有一个检查点
checkpointConfig.setMaxConcurrentCheckpoints(1); // 开启检查点的外部持久化保存,作业取消后依然保留
checkpointConfig.enableExternalizedCheckpoints(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // 启用不对齐的检查点保存方式
checkpointConfig.enableUnalignedCheckpoints();// 设置检查点存储,可以直接传入一个 String,指定文件系统的路径
checkpointConfig.setCheckpointStorage("hdfs://my/checkpoint/dir")

5.4 保存点(Savepoint)

  1. 除了检查点(checkpoint)外,Flink 还提供了另一个非常独特的镜像保存功能——保存点(Savepoint)。从名称就可以看出,这也是一个存盘的备份,它的原理和算法与检查点完全相同,只是多了一些额外的元数据。事实上,保存点就是通过检查点的机制来创建流式作业状态的一致性镜像(consistent image)的。
  2. 保存点中的状态快照,是以算子 ID 和状态名称组织起来的,相当于一个键值对。从保存点启动应用程序时,Flink 会将保存点的状态数据重新分配给相应的算子任务。
  3. 保存点的用途。保存点与检查点最大的区别,就是触发的时机。检查点是由 Flink 自动管理的,定期创建,发生故障之后自动读取进行恢复,这是一个“自动存盘”的功能;而保存点不会自动创建,必须由用户明确地手动触发保存操作,所以就是“手动存盘”。因此两者尽管原理一致,但用途就有所差别了:检查点主要用来做故障恢复,是容错机制的核心;保存点则更加灵活,可以用来做有计划的手动备份和恢复。保存点可以当作一个强大的运维工具来使用。我们可以在需要的时候创建一个保存点,然后停止应用,做一些处理调整之后再从保存点重启。它适用的具体场景有:
    • 版本管理和归档存储。对重要的节点进行手动备份,设置为某一版本,归档(archive)存储应用程序的状态。
    • 更新 Flink 版本。目前 Flink 的底层架构已经非常稳定,所以当 Flink 版本升级时,程序本身一般是兼容的。这时不需要重新执行所有的计算,只要创建一个保存点,停掉应用、升级 Flink 后,从保存点重启就可以继续处理了。
    • 更新应用程序。我们不仅可以在应用程序不变的时候,更新 Flink 版本;还可以直接更新应用程序。前提是程序必须是兼容的,也就是说更改之后的程序,状态的拓扑结构和数据类型都是不变的,这样才能正常从之前的保存点去加载。这个功能非常有用。我们可以及时修复应用程序中的逻辑 bug,更新之后接着继续处理;也可以用于有不同业务逻辑的场景,比如 A/B 测试等等。
    • 调整并行度。如果应用运行的过程中,发现需要的资源不足或已经有了大量剩余,也可以通过从保存点重启的方式,将应用程序的并行度增大或减小。
    • 暂停应用程序。有时候我们不需要调整集群或者更新程序,只是单纯地希望把应用暂停、释放一些资源来处理更重要的应用程序。使用保存点就可以灵活实现应用的暂停和重启,可以对有限的集群资源做最好的优化配置。
  4. 需要注意的是,保存点能够在程序更改的时候依然兼容,前提是状态的拓扑结构和数据类型不变。我们知道保存点中状态都是以算子 ID-状态名称这样的 key-value 组织起来的,算子ID 可以在代码中直接调用 SingleOutputStreamOperator 的.uid()方法来进行指定:
DataStream<String> stream = env.addSource(new StatefulSource()).uid("source-id").map(new StatefulMapper()) .uid("mapper-id").print();
  1. 对于没有设置 ID 的算子,Flink 默认会自动进行设置,所以在重新启动应用后可能会导致ID 不同而无法兼容以前的状态。所以为了方便后续的维护,强烈建议在程序中为每一个算子手动指定 ID。
  2. 使用保存点保存点的使用非常简单,我们可以使用命令行工具来创建保存点,也可以从保存点恢复作业。
    • 创建保存点。要在命令行中为运行的作业创建一个保存点镜像。

      • 只需要执行:bin/flink savepoint :jobId [:targetDirectory] 这里 jobId 需要填充要做镜像保存的作业 ID,目标路径 targetDirectory 可选,表示保存点存储的路径。
      • 对于保存点的默认路径,可以通过配置文件 flink-conf.yaml 中的 state.savepoints.dir 项来设定:state.savepoints.dir: hdfs:///flink/savepoints当然对于单独的作业,
      • 我们也可以在程序代码中通过执行环境来设置:env.setDefaultSavepointDir("hdfs:///flink/savepoints");
      • 由于创建保存点一般都是希望更改环境之后重启,所以创建之后往往紧接着就是停掉作业的操作。除了对运行的作业创建保存点,我们也可以在停掉一个作业时直接创建保存点:bin/flink stop --savepointPath [:targetDirectory] :jobId
    • 从保存点重启应用
      • 我们已经知道,提交启动一个 Flink 作业,使用的命令是 flink run;现在要从保存点重启一个应用,其实本质是一样的:bin/flink run -s :savepointPath [:runArgs]
      • 这里只要增加一个-s 参数,指定保存点的路径就可以了,其他启动时的参数还是完全一样的。细心的读者可能还记得我们在第三章使用 web UI 进行作业提交时,可以填入的参数除了入口类、并行度和运行参数,还有一个“Savepoint Path”,这就是从保存点启动应用的配置。

第六节 状态一致性

6.1 一致性的概念和级别

  1. 在分布式系统中,一致性(consistency)是一个非常重要的概念;在事务(transaction)中,一致性也是重要的一个特性。Flink 中一致性的概念,主要用在故障恢复的描述中,所以更加类似于事务中的表述。那到底什么是一致性呢?
  2. 简单来讲,一致性其实就是结果的正确性。对于分布式系统而言,强调的是不同节点中相同数据的副本应该总是“一致的”,也就是从不同节点读取时总能得到相同的值;而对于事务而言,是要求提交更新操作后,能够读取到新的数据。对于 Flink 来说,多个节点并行处理不同的任务,我们要保证计算结果是正确的,就必须不漏掉任何一个数据,而且也不会重复处理同一个数据。流式计算本身就是一个一个来的,所以正常处理的过程中结果肯定是正确的;但在发生故障、需要恢复状态进行回滚时就需要更多的保障机制了。我们通过检查点的保存来保证状态恢复后结果的正确,所以主要讨论的就是“状态的一致性”。
  3. 一般说来,状态一致性有三种级别:
    • 最多一次。(AT-MOST-ONCE)当任务发生故障时,最简单的做法就是直接重启,别的什么都不干;既不恢复丢失的状态,也不重放丢失的数据。每个数据在正常情况下会被处理一次,遇到故障时就会丢掉,所以就是“最多处理一次”。我们发现,如果数据可以直接被丢掉,那其实就是没有任何操作来保证结果的准确性;所以这种类型的保证也叫“没有保证”。尽管看起来比较糟糕,不过如果我们的主要诉求是“快”,而对近似正确的结果也能接受,那这也不失为一种很好的解决方案。
    • 至少一次。(AT-LEAST-ONCE)在实际应用中,我们一般会希望至少不要丢掉数据。这种一致性级别就叫作“至少一次”(at-least-once),就是说是所有数据都不会丢,肯定被处理了;不过不能保证只处理一次,有些数据会被重复处理。
    • 精确一次(EXACTLY-ONCE)最严格的一致性保证,就是所谓的“精确一次”(exactly-once,有时也译作“恰好一次”)。这也是最难实现的状态一致性语义。exactly-once 意味着所有数据不仅不会丢失,而且只被处理一次,不会重复处理。也就是说对于每一个数据,最终体现在状态和输出结果上,只能有一次统计。exactly-once 可以真正意义上保证结果的绝对正确,在发生故障恢复后,就好像从未发生过故障一样。
  4. 在有些场景下,重复处理数据是不影响结果的正确性的,这种操作具有“幂等性”。比如,如果我们统计电商网站的 UV,需要对每个用户的访问数据进行去重处理,所以即使同一个数据被处理多次,也不会影响最终的结果,这时使用 at-least-once 语义是完全没问题的。当然,如果重复数据对结果有影响,比如统计的是 PV,或者之前的统计词频 word count,使用at-least-once 语义就可能会导致结果的不一致了。
  5. 为了保证达到 at-least-once 的状态一致性,我们需要在发生故障时能够重放数据。最常见的做法是,可以用持久化的事件日志系统,把所有的事件写入到持久化存储中。这时只要记录一个偏移量,当任务发生故障重启后,重置偏移量就可以重放检查点之后的数据了。Kafka 就是这种架构的一个典型实现。
  6. 要做的 exactly-once,首先必须能达到 at-least-once 的要求,就是数据不丢。所以同样需要有数据重放机制来保证这一点。另外,还需要有专门的设计保证每个数据只被处理一次。Flink 中使用的是一种轻量级快照机制——检查点(checkpoint)来保证 exactly-once 语义。

6.2 端到端的状态一致性

  1. 我们已经知道检查点可以保证 Flink 内部状态的一致性,而且可以做到精确一次(exactly-once)。那是不是说,只要开启了检查点,发生故障进行恢复,结果就不会有任何问题呢?
  2. 没那么简单。在实际应用中,一般要保证从用户的角度看来,最终消费的数据是正确的。而用户或者外部应用不会直接从 Flink 内部的状态读取数据,往往需要我们将处理结果写入外部存储中。这就要求我们不仅要考虑 Flink 内部数据的处理转换,还涉及从外部数据源读取,以及写入外部持久化系统,整个应用处理流程从头到尾都应该是正确的。
  3. 所以完整的流处理应用,应该包括了数据源、流处理器和外部存储系统三个部分。这个完整应用的一致性,就叫作“端到端(end-to-end)的状态一致性”,它取决于三个组件中最弱的那一环。一般来说,能否达到 at-least-once 一致性级别,主要看数据源能够重放数据;而能否达到 exactly-once 级别,流处理器内部、数据源、外部存储都要有相应的保证机制。在下一节,我们就将详细讨论端到端的 exactly-once 一致性语义如何保证。
  4. 端到端精确一次(end-to-end exactly-once)实际应用中,最难做到、也最希望做到的一致性语义,无疑就是端到端(end-to-end)的“精确一次”(exactly-once)。我们知道,对于 Flink 内部来说,检查点机制可以保证故障恢复后数据不丢(在能够重放的前提下),并且只处理一次,所以已经可以做到 exactly-once 的一致性语义了。
  5. 需要注意的是,我们说检查点能够保证故障恢复后数据只处理一次,并不是说之前统计过某个数据,现在就不能再次统计了;而是要看状态的改变和输出的结果,是否只包含了一次这个数据的处理。由于检查点保存的是之前所有任务处理完某个数据后的状态快照,所以重放的数据引起的状态改变一定不会包含在里面,最终结果中只处理了一次。
  6. 所以,端到端一致性的关键点,就在于输入的数据源端和输出的外部存储端。
  7. 输入端保证输入端主要指的就是 Flink 读取的外部数据源。对于一些数据源来说,并不提供数据的缓冲或是持久化保存,数据被消费之后就彻底不存在了。例如 socket 文本流就是这样, socket服务器是不负责存储数据的,发送一条数据之后,我们只能消费一次,是“一锤子买卖”。对于这样的数据源,故障后我们即使通过检查点恢复之前的状态,可保存检查点之后到发生故障期间的数据已经不能重发了,这就会导致数据丢失。所以就只能保证 at-most-once 的一致性语义,相当于没有保证。
  8. 想要在故障恢复后不丢数据,外部数据源就必须拥有重放数据的能力。常见的做法就是对数据进行持久化保存,并且可以重设数据的读取位置。一个最经典的应用就是 Kafka。在 Flink的 Source 任务中将数据读取的偏移量保存为状态,这样就可以在故障恢复时从检查点中读取出来,对数据源重置偏移量,重新获取数据。
  9. 数据源可重放数据,或者说可重置读取数据偏移量,加上 Flink 的 Source 算子将偏移量作为状态保存进检查点,就可以保证数据不丢。这是达到 at-least-once 一致性语义的基本要求,当然也是实现端到端 exactly-once 的基本要求。
  10. 输出端保证有了 Flink 的检查点机制,以及可重放数据的外部数据源,我们已经能做到 at-least-once了。但是想要实现 exactly-once 却有更大的困难:数据有可能重复写入外部系统。
  11. 因为检查点保存之后,继续到来的数据也会一一处理,任务的状态也会更新,最终通过Sink 任务将计算结果输出到外部系统;只是状态改变还没有存到下一个检查点中。这时如果出现故障,这些数据都会重新来一遍,就计算了两次。我们知道对 Flink 内部状态来说,重复计算的动作是没有影响的,因为状态已经回滚,最终改变只会发生一次;但对于外部系统来说,已经写入的结果就是泼出去的水,已经无法收回了,再次执行写入就会把同一个数据写入两次。所以这时,我们只保证了端到端的 at-least-once 语义。

6.3 幂等写入和事务写入

  1. 为了实现端到端 exactly-once,我们还需要对外部存储系统、以及 Sink 连接器有额外的要求。能够保证 exactly-once 一致性的写入方式有两种:

    • 幂等写入
    • 事务写入
  2. 我们需要外部存储系统对这两种写入方式的支持,而 Flink 也为提供了一些 Sink 连接器接口。接下来我们进行展开讲解。
  3. 幂等(idempotent)写入。所谓“幂等”操作,就是说一个操作可以重复执行很多次,但只导致一次结果更改。也就是说,后面再重复执行就不会对结果起作用了。
  4. 数学中一个典型的例子是,ex 的求导下操作,无论做多少次,得到的都是自身。而在数据处理领域,最典型的就是对 HashMap 的插入操作:如果是相同的键值对,后面的重复插入就都没什么作用了。
  5. 这相当于说,我们并没有真正解决数据重复计算、写入的问题;而是说,重复写入也没关系,结果不会改变。所以这种方式主要的限制在于外部存储系统必须支持这样的幂等写入:比如 Redis 中键值存储,或者关系型数据库(如 MySQL)中满足查询条件的更新操作。
    6、 需要注意,对于幂等写入,遇到故障进行恢复时,有可能会出现短暂的不一致。因为保存点完成之后到发生故障之间的数据,其实已经写入了一遍,回滚的时候并不能消除它们。如果有一个外部应用读取写入的数据,可能会看到奇怪的现象:短时间内,结果会突然“跳回”到之前的某个值,然后“重播”一段之前的数据。不过当数据的重放逐渐超过发生故障的点的时候,最终的结果还是一致的。
  6. 事务(transactional)写入。如果说幂等写入对应用场景限制太多,那么事务写入可以说是更一般化的保证一致性的方式。之前我们提到,输出端最大的问题就是“覆水难收”,写入到外部系统的数据难以撤回。自然想到,那怎样可以收回一条已写入的数据呢?利用事务就可以做到。
  7. 我们都知道,事务(transaction)是应用程序中一系列严密的操作,所有操作必须成功完成,否则在每个操作中所做的所有更改都会被撤消。事务有四个基本特性:原子性(Atomicity)、一致性(Correspondence)、隔离性(Isolation)和持久性(Durability),这就是著名的 ACID。
  8. 在 Flink 流处理的结果写入外部系统时,如果能够构建一个事务,让写入操作可以随着检查点来提交和回滚,那么自然就可以解决重复写入的问题了。所以事务写入的基本思想就是:用一个事务来进行数据向外部系统的写入,这个事务是与检查点绑定在一起的。当 Sink 任务遇到 barrier 时,开始保存状态的同时就开启一个事务,接下来所有数据的写入都在这个事务中;待到当前检查点保存完毕时,将事务提交,所有写入的数据就真正可用了。如果中间过程出现故障,状态会回退到上一个检查点,而当前事务没有正常关闭(因为当前检查点没有保存完),所以也会回滚,写入到外部的数据就被撤销了。
  9. 具体来说,又有两种实现方式:预写日志(WAL)和两阶段提交(2PC)
  10. 预写日志(write-ahead-log,WAL)我们发现,事务提交是需要外部存储系统支持事务的,否则没有办法真正实现写入的回撤。那对于一般不支持事务的存储系统,能够实现事务写入呢?
  11. 预写日志(WAL)就是一种非常简单的方式。具体步骤是:
    • 先把结果数据作为日志(log)状态保存起来
    • 进行检查点保存时,也会将这些结果数据一并做持久化存储
    • 在收到检查点完成的通知时,将所有结果一次性写入外部系统。
  12. 我们会发现,这种方式类似于检查点完成时做一个批处理,一次性的写入会带来一些性能上的问题;而优点就是比较简单,由于数据提前在状态后端中做了缓存,所以无论什么外部存储系统,理论上都能用这种方式一批搞定。在 Flink 中 DataStream API 提供了一个模板类GenericWriteAheadSink,用来实现这种事务型的写入方式。
  13. 需要注意的是,预写日志这种一批写入的方式,有可能会写入失败;所以在执行写入动作之后,必须等待发送成功的返回确认消息。在成功写入所有数据后,在内部再次确认相应的检查点,这才代表着检查点的真正完成。这里需要将确认信息也进行持久化保存,在故障恢复时,只有存在对应的确认信息,才能保证这批数据已经写入,可以恢复到对应的检查点位置。
  14. 但这种“再次确认”的方式,也会有一些缺陷。如果我们的检查点已经成功保存、数据也成功地一批写入到了外部系统,但是最终保存确认信息时出现了故障,Flink 最终还是会认为没有成功写入。于是发生故障时,不会使用这个检查点,而是需要回退到上一个;这样就会导致这批数据的重复写入。
  15. 两阶段提交(two-phase-commit,2PC)前面提到的各种实现 exactly-once 的方式,多少都有点缺陷,有没有更好的方法呢?自然是有的,这就是传说中的两阶段提交(2PC)。顾名思义,它的想法是分成两个阶段:先做“预提交”,等检查点完成之后再正式提交。这种提交方式是真正基于事务的,它需要外部系统提供事务支持。具体的实现步骤为:
    • 当第一条数据到来时,或者收到检查点的分界线时,Sink 任务都会启动一个事务。
    • 接下来接收到的所有数据,都通过这个事务写入外部系统;这时由于事务没有提交,所以数据尽管写入了外部系统,但是不可用,是“预提交”的状态。
    • 当 Sink 任务收到 JobManager 发来检查点完成的通知时,正式提交事务,写入的结果就真正可用了。
  16. 当中间发生故障时,当前未提交的事务就会回滚,于是所有写入外部系统的数据也就实现了撤回。这种两阶段提交(2PC)的方式充分利用了 Flink 现有的检查点机制:分界线的到来,就标志着开始一个新事务;而收到来自 JobManager 的 checkpoint 成功的消息,就是提交事务的指令。每个结果数据的写入,依然是流式的,不再有预写日志时批处理的性能问题;最终提交时,也只需要额外发送一个确认信息。所以 2PC 协议不仅真正意义上实现了 exactly-once,而且通过搭载 Flink 的检查点机制来实现事务,只给系统增加了很少的开销。
  17. Flink 提供了 TwoPhaseCommitSinkFunction 接口,方便我们自定义实现两阶段提交的SinkFunction 的实现,提供了真正端到端的 exactly-once 保证。不过两阶段提交虽然精巧,却对外部系统有很高的要求。这里将 2PC 对外部系统的要求列举如下:
    • 外部系统必须提供事务支持,或者 Sink 任务必须能够模拟外部系统上的事务。
    • 在检查点的间隔期间里,必须能够开启一个事务并接受数据写入。
    • 在收到检查点完成的通知之前,事务必须是“等待提交”的状态。在故障恢复的情况下,这可能需要一些时间。如果这个时候外部系统关闭事务(例如超时了),那么未提交的数据就会丢失。
    • Sink 任务必须能够在进程失败后恢复事务。
    • 提交事务必须是幂等操作。也就是说,事务的重复提交应该是无效的。可见,2PC 在实际应用同样会受到比较大的限制。具体在项目中的选型,最终还应该是一致性级别和处理性能的权衡考量。

6.4 Flink和Kafka连接时的精确一次保证

  1. 在流处理的应用中,最佳的数据源当然就是可重置偏移量的消息队列了;它不仅可以提供数据重放的功能,而且天生就是以流的方式存储和处理数据的。所以作为大数据工具中消息队列的代表,Kafka 可以说与 Flink 是天作之合,实际项目中也经常会看到以 Kafka 作为数据源和写入的外部系统的应用。在本小节中,我们就来具体讨论一下 Flink 和 Kafka 连接时,怎样保证端到端的 exactly-once 状态一致性。
  2. 整体介绍。既然是端到端的 exactly-once,我们依然可以从三个组件的角度来进行分析:
    • Flink 内部。Flink 内部可以通过检查点机制保证状态和处理结果的 exactly-once 语义。
    • 输入端。输入数据源端的 Kafka 可以对数据进行持久化保存,并可以重置偏移量(offset)。所以我们可以在 Source 任务(FlinkKafkaConsumer)中将当前读取的偏移量保存为算子状态,写入到检查点中;当发生故障时,从检查点中读取恢复状态,并由连接器 FlinkKafkaConsumer 向 Kafka重新提交偏移量,就可以重新消费数据、保证结果的一致性了。
    • 输出端输出端保证 exactly-once 的最佳实现,当然就是两阶段提交(2PC)。作为与 Flink 天生一对的 Kafka,自然需要用最强有力的一致性保证来证明自己。
  3. Flink 官方实现的 Kafka 连接器中,提供了写入到 Kafka 的 FlinkKafkaProducer,它就实现
    了 TwoPhaseCommitSinkFunction 接口:
public class FlinkKafkaProducer<IN> extends TwoPhaseCommitSinkFunction<IN, FlinkKafkaProducer.KafkaTransactionState, FlinkKafkaProducer.KafkaTransactionContext> {
...
}
  1. 也就是说,我们写入 Kafka 的过程实际上是一个两段式的提交:处理完毕得到结果,写入Kafka 时是基于事务的“预提交”;等到检查点保存完毕,才会提交事务进行“正式提交”。如果中间出现故障,事务进行回滚,预提交就会被放弃;恢复状态之后,也只能恢复所有已经确认提交的操作。

  2. 具体步骤为了方便说明,我们来考虑一个具体的流处理系统,由 Flink 从 Kafka 读取数据、并将处理结果写入 Kafka。

  3. 这是一个 Flink 与 Kafka 构建的完整数据管道,Source 任务从 Kafka 读取数据,经过一系列处理(比如窗口计算),然后由 Sink 任务将结果再写入 Kafka。Flink 与 Kafka 连接的两阶段提交,离不开检查点的配合,这个过程需要 JobManager 协调各个 TaskManager 进行状态快照,而检查点具体存储位置则是由状态后端(State Backend)来配置管理的。一般情况,我们会将检查点存储到分布式文件系统上。实现端到端 exactly-once 的具体过程可以分解如下:

  4. 第一步:启动检查点保存。检查点保存的启动,标志着我们进入了两阶段提交协议的“预提交”阶段。当然,现在还没有具体提交的数据。JobManager 通知各个 TaskManager 启动检查点保存,Source 任务会将检查点分界线(barrier)注入数据流。这个 barrier 可以将数据流中的数据,分为进入当前检查点的集合和进入下一个检查点的集合。

  5. 第二步:算子任务对状态做快照。分界线(barrier)会在算子间传递下去。每个算子收到 barrier 时,会将当前的状态做个快照,保存到状态后端。Source 任务将 barrier 插入数据流后,也会将当前读取数据的偏移量作为状态写入检查点,存入状态后端;然后把 barrier 向下游传递,自己就可以继续读取数据了。接下来 barrier 传递到了内部的 Window 算子,它同样会对自己的状态进行快照保存,写入远程的持久化存储。

  6. 第三步:Sink 任务开启事务,进行预提交。分界线(barrier)终于传到了 Sink 任务,这时 Sink 任务会开启一个事务。接下来到来的所有数据,Sink 任务都会通过这个事务来写入 Kafka。这里 barrier 是检查点的分界线,也是事务的分界线。由于之前的检查点可能尚未完成,因此上一个事务也可能尚未提交;此时 barrier 的到来开启了新的事务,上一个事务尽管可能没有被提交,但也不再接收新的数据了。对于 Kafka 而言,提交的数据会被标记为“未确认”(uncommitted)。这个过程就是所谓的“预提交”(pre-commit)。

  7. 第四步: 检查点保存完成,提交事务。当所有算子的快照都完成,也就是这次的检查点保存最终完成时,JobManager 会向所有任务发确认通知,告诉大家当前检查点已成功保存

  8. 当 Sink 任务收到确认通知后,就会正式提交之前的事务,把之前“未确认”的数据标为“已确认”,接下来就可以正常消费了。在任务运行中的任何阶段失败,都会从上一次的状态恢复,所有没有正式提交的数据也会回滚。这样,Flink 和 Kafka 连接构成的流处理系统,就实现了端到端的 exactly-once 状态一致性。

  9. 需要的配置。在具体应用中,实现真正的端到端 exactly-once,还需要有一些额外的配置:

    • 必须启用检查点;
    • 在 FlinkKafkaProducer 的构造函数中传入参数 Semantic.EXACTLY_ONCE;
    • 配置 Kafka 读取数据的消费者的隔离级别
  10. 这里所说的 Kafka,是写入的外部系统。预提交阶段数据已经写入,只是被标记为“未提交”(uncommitted),而 Kafka 中默认的隔离级别 isolation.level 是 read_uncommitted,也就是可以读取未提交的数据。这样一来,外部应用就可以直接消费未提交的数据,对于事务性的保证就失效了。所以应该将隔离级别配置为 read_committed,表示消费者遇到未提交的消息时,会停止从分区中消费数据,直到消息被标记为已提交才会再次恢复消费。当然,这样做的话,外部应用消费数据就会有显著的延迟。

  11. 事务超时配置。Flink 的Kafka 连接器中配置的事务超时时间transaction.timeout.ms 默认是1 小时,而Kafka集群配置的事务最大超时时间 transaction.max.timeout.ms 默认是 15 分钟。所以在检查点保存时间很长时,有可能出现 Kafka 已经认为事务超时了,丢弃了预提交的数据;而 Sink 任务认为还可以继续等待。如果接下来检查点保存成功,发生故障后回滚到这个检查点的状态,这部分数据就被真正丢掉了。所以这两个超时时间,前者应该小于等于后者。

第五课 大数据技术之Fink1.13的实战学习-状态编程和容错机制相关推荐

  1. 第三课 大数据技术之Fink1.13的实战学习-时间和窗口

    第三课 大数据技术之Fink1.13的实战学习-时间和窗口 文章目录 第三课 大数据技术之Fink1.13的实战学习-时间和窗口 第一节 时间定义 1.1 Flink中的时间语义 1.2 两种时间语义 ...

  2. 第一课 大数据技术之Fink1.13的实战学习-部署使用和基础概念

    第一课 大数据技术之Fink1.13的实战学习 文章目录 第一课 大数据技术之Fink1.13的实战学习 第一节 Fink介绍 1.1 Flink介绍背景 1.2 Flink 的应用场景 1.3 流式 ...

  3. 第七课 大数据技术之Fink1.13的实战学习-Fink CEP

    第七课 大数据技术之Fink1.13的实战学习-Fink CEP 文章目录 第七课 大数据技术之Fink1.13的实战学习-Fink CEP 第一节 Fink CEP介绍 1.1 Flink CEP背 ...

  4. 第六课 大数据技术之Fink1.13的实战学习-Table Api和SQL

    第六课 大数据技术之Fink1.13的实战学习-Table Api和SQL 文章目录 第六课 大数据技术之Fink1.13的实战学习-Table Api和SQL 第一节 Fink SQL快速上手 1. ...

  5. 第二课 大数据技术之Hadoop3.x的HDFS

    第二课 大数据技术之Hadoop3.x的HDFS 文章目录 第二课 大数据技术之Hadoop3.x的HDFS 第一节 HDFS概述 1.1 HDFS产出背景及定义 1.2 HDFS优缺点 1.3 HD ...

  6. 第三课 大数据技术之Spark-RDD介绍和转换算子

    第三课 大数据技术之Spark-RDD介绍和转换算子 文章目录 第三课 大数据技术之Spark-RDD介绍和转换算子 第一节 RDD相关介绍 1.1 什么是 RDD 1.2 核心属性 1.3 执行原理 ...

  7. 大数据、云计算、物联网相关技术概述——《大数据技术原理与应用》课程学习总结

    在学习大数.云计算以及物联网相关概念之前,先了解一下大数据的背景吧 1.1 大数据时代 1.1.1 第三次信息化浪潮 三次信息化浪潮: 信息化浪潮 发生时间 标志 解决问题 代表企业 第一次信息化浪潮 ...

  8. 大数据技术之HBase原理与实战归纳分享-上

    文章目录 概述 定义 特点 数据模型 概述 逻辑结构 物理存储结构 数据模型 应用场景 基础架构 安装 前置条件 部署 启动服务 高可用 Shell操作 基础操作 命令空间 DDL DML 概述 定义 ...

  9. 三、大数据技术之Linux下篇(linux学习)

    centos镜像地址 网易镜像: http://mirrors.163.com/centos/7/isos/ 搜狐镜像: http://mirrors.sohu.com/centos/7/isos/ ...

  10. 大数据技术之HBase原理与实战归纳分享-下

    文章目录 整合Phoenix 定义 为何要使用 安装 SHELL操作 表的映射 简易JDBC示例 二级索引 二级索引配置文件 全局索引 包含索引 本地索引(local index) HBase与 Hi ...

最新文章

  1. StoryBoard 视图切换和传值
  2. javascript的词法作用域
  3. linux中查看lvm的名称,关于Linux中LVM的使用总结
  4. python骨灰教学_python+mongodb+flask的基本使用
  5. excel表格大学计算机知识,大学计算机基础excel电子表格
  6. 操作系统之常考面试题
  7. codeforces gym-101736 Dessert First Strategy 最小割
  8. 【程序员自救指南】中关村保洁大叔的一句话竟然帮我转正了
  9. Ubuntu14.04如何备份和恢复系统
  10. k8s核心技术-命令行工具kubectl---K8S_Google工作笔记0017
  11. Jquery垂直下拉二级菜单
  12. 第41天:匀速、缓动运动和图片无缝滚动
  13. rm -f .... 恢复
  14. AntiModerate – 渐进式图片加载的 JavaScript 库
  15. 三阶金字塔魔方还原 - 3步无公式
  16. Linux 错误E45,readonly optionisset(add ...)
  17. php 替换表情符号,php怎么实现正则替换特殊符号
  18. 求所有质因子(Java)
  19. 走进小程序【十一】微信小程序【使用Echarts 和 腾讯地图】
  20. R语言快速运行脚本程序

热门文章

  1. win10计算机休眠快捷键,键盘快捷键关闭或休眠Windows 10 | MOS86
  2. java 音频 网络传输_如何流式传输音频?
  3. html广告网页完整代码,HTML5设计网页动态条幅广告(Banner) 已经加上完整源代码 - 伊甸一点...
  4. android logo在线生成工具,在线生成logo
  5. 厦门大学2019年高等代数考研试题
  6. Python双人五子棋
  7. JS实现 b站直播弹幕自动补中括号、一键常用语脚本
  8. android 飞行模式 源代码,android 定时进入飞行模式 例子
  9. ArcGIS地图制图教程——超详细
  10. 【Python】将xls格式转换为xlsx格式