Flink -- DataStream API

  • 执行环境 Execution Environment
    • 创建执行环境
    • 设置执行模式
    • 触发程序执行
  • 源算子 Source
    • 从集合中读取数据
    • 从文件读取数据
    • 从 Socket 读取数据
    • 自定义数据源
  • 转换算子 Transformation
    • 基本转换算子
      • 映射 map
      • 过滤 filter
      • 扁平映射 flatMap
    • 聚合算子 Aggregation
      • 按键分区 keyBy
      • 简单聚合
      • 规约聚合 reduce
      • 物理分区 Physical Partitioning
        • 随机分区 Random
        • 轮询分区 Round-Robin
        • 重缩放分区 Rescale
        • 广播 Broadcast
        • 全局分区 Global
        • 自定义分区 Custom
  • 输出算子 Sink
    • 输出到文件
    • 输出到 Redis
    • 输出到数据库 MySQL
    • 输出到 ElasticSearch
    • 自定义输出算子 Sink

DataStream 本质上就是 Flink 中用于表示集合的类,其用法类似于 Java 集合,通过 API 定义出一系列的操作进行数据处理。

一个 Flink 程序,实际上就是对 DataStream 的各种转换,代码一般由以下几部分构成:

  • 获取执行环境(execution environment)

  • 读取数据源(source)

  • 定义转换操作(transformations)

  • 定义计算结果的输出位置(sink)

  • 触发程序执行(execute)

执行环境 Execution Environment

创建执行环境

因为 Flink 可以在各种环境中运行,因此在提交作业执行计算时,首先要获取 Flink 的执行环境,从而建立起程序与 Flink 框架的关系。

执行环境的创建,需要调用 StreamExecutionEnvironment 类的静态方法:

方法 概述
getExecutionEnvironment 自动判断当前程序的运行方式,并返回对应的运行环境,开发中最常用
createLocalEnvironment 返回一个本地执行环境。在调用时可以传入一个参数指定并行度;若不传入则默认并行度为本地 CPU 核心数
createRemoteEnvironment 返回集群执行环境。在调用时需要依次传入 JobManager 的主机名、端口号以及要执行 jar 包的路径

设置执行模式

基于执行环境,我们可以设置不同的执行模式让 Flink 程序在流处理与批处理之间进行切换。调用 StreamExecutionEnvironment 类的setRuntimeMode()方法,传入对应的参数即可完成设置。

Flink 存在以下 3 种执行模式:

执行模式 概述
RuntimeExecutionMode.STREAMING 流处理模式,用于需要持续实时处理的无界流数据,程序默认使用该模式
RuntimeExecutionMode.BATCH 批处理模式,用于不会持续计算的有界数据
RuntimeExecutionMode.AUTOMATIC 自动模式,该模式下程序将根据输入数据是否有界来自动选择执行模式

触发程序执行

通过 StreamExecutionEnvironment 类的execute()方法,来触发程序执行。该方法将一直等待作业完成,并返回一个执行结果。

源算子 Source

Flink 可以从各种来源获取数据,然后构建 DataStream 进行转换处理。数据的输入来源称为数据源,读取数据的算子则称为源算子。Flink 代码中添加源算子的方法是调用执行环境的addSource()方法。

Flink 的源可以有多种方式获取,下面介绍几种获取元数据的方式。

本文中所用到的实例对象 Score 如下:

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Score {String className;String name;int score;}

从集合中读取数据

该方法是最简单的读取数据的方式,直接在 Java 中创建一个集合,调用执行环境的fromCollection()方法即可。

public class FromCollectionDemo {public static void main(String[] args) throws Exception {// 1. 环境准备StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();// 模式选择environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 2. 加载数据源ArrayList<Score> scores = new ArrayList<>();scores.add(new Score("一班", "zzz", 89));scores.add(new Score("二班", "qqq", 92));scores.add(new Score("三班", "fff", 97));DataStreamSource<Score> source = environment.fromCollection(scores);// 3. 数据展示source.print();// 4. 执行程序environment.execute();}
}

此外,也可以不构建集合,直接列举元素,并调用fromElements()方法即可。

        DataStreamSource<Score> source = environment.fromElements(new Score("一班", "zzz", 76),new Score("二班", "qqq", 94),new Score("三班", "fff", 99));

从文件读取数据

实际开发应用中,一般不会通过代码将数据写在代码中。通常需要从文件中读取数据进行解析和处理,如读取日志文件。调用执行环境的readTextFile()方法即可读取文件,方法中需要传入文件的相对路径或绝对路径。

public class ReadTextFileDemo {public static void main(String[] args) throws Exception {// 1. 环境准备StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 2. 加载数据源DataStreamSource<String> source = environment.readTextFile("D:/work/my_project/FlinkDemo/src/main/resources/test.txt");// 3. 数据展示source.print();// 4. 执行程序environment.execute();}
}

从 Socket 读取数据

从集合和文件中获取的数据都是有界数据,而在流处理的场景中,数据一般是无界的。我们可以简单的通过 Socket 的方式进行无界数据的获取测试。

测试代码的远程 Socket 采用阿里云服务器,开放端口 8080 作为 Socket 文本流端口。

nc -l 8080

运行程序,从 Socket 中读取无界数据。

public class SocketTextStreamDemo {public static void main(String[] args) throws Exception {// 1. 环境准备StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 2. socketTextStream 配置数据源String address = "47.92.146.85";int port = 8080;DataStreamSource<String> source = environment.socketTextStream(address, port);// 3. 数据输出source.print();// 4. 执行数据environment.execute();}
}

在 Socket 端输入数据,可以看到数据被读取进程序。

自定义数据源

在日常开发中,我们可以自定义数据源以获取来自各种数据库以及中间件的数据。自定义数据源需要编写自定义数据源类并继承SourceFunction接口,实现接口中的run()以及cancel()方法。

public class MySource implements SourceFunction<String> {/*** 实现数据的获取逻辑并通过 sourceContext 进行转发* @param sourceContext source 函数用于发出数据的接口*/@Overridepublic void run(SourceContext<String> sourceContext) throws Exception {while (true) {sourceContext.collect(String.valueOf(new Random().nextInt(100)));Thread.sleep(1000);}}/*** 取消数据源,用于终止循环获取数据的逻辑*/@Overridepublic void cancel() {}
}

在使用自定义的数据源时,只需要调用执行环境的addSource()方法,将自定义的数据源对象传入即可。

public class MySourceDemo {public static void main(String[] args) throws Exception {// 1. 环境准备StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);environment.enableCheckpointing(5000);// 2. addSource 配置自定义数据源DataStreamSource<String> source = environment.addSource(new MySource());// 3. 数据输出source.print();// 4. 执行程序environment.execute();}
}

转换算子 Transformation

在使用源算子将数据读取到程序之后,我们便可以使用各种转换算子,将一个或多个 DataStream 转换为新的 DataStream。Flink 程序的核心就是各种转换操作,它规定了数据处理转换的逻辑。

基本转换算子

映射 map

map 是一一映射的转换算子,即消费一个元素便产出一个元素。

map 算子的使用只需要调用 DataStream 对象的map()方法即可,方法需要传入的参数是 MapFunction 接口的实现类。map()方法的返回值仍然为 DataStream,不过泛型可能改变。

下列代码从 Socket 中读取数据,并根据输入数据将 1 转换为 ”男“,将 2 转换为 ”女“.

public class MapDemo {public static void main(String[] args) throws Exception {// 1. 环境准备StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 2. socketTextStream 配置数据源String address = "47.92.146.85";int port = 8080;DataStreamSource<String> source = environment.socketTextStream(address, port);// 3. 定义数据转换规则SingleOutputStreamOperator<String> outputStreamOperator = source.map(new MapFunction<String, String>() {@Overridepublic String map(String s) throws Exception {if ("1".equals(s)) {return "男";} else if ("2".equals(s)) {return "女";} else {return "输入有误!";}}});// 4. 数据输出outputStreamOperator.print();// 5. 执行数据environment.execute();}
}

代码测试:

过滤 filter

filter 操作实际上是对一个数据流按规定的方式进行过滤,通过一个布尔表达式设置一个过滤条件,对流内的每一个因素进行判断。若返回 true 则元素正常通过;若返回 false 则元素被过滤掉。

filter 算子的使用只需要调用 DataStream 对象的filter()方法即可,方法需要传入 FilterFunction 接口的实现类。

下列代码从 Socket 中读取数据,并过滤掉所有值小于等于 100 的数据。

public class FilterDemo {public static void main(String[] args) throws Exception {// 1. 环境准备StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 2. socketTextStream 配置数据源String address = "47.92.146.85";int port = 8080;DataStreamSource<String> source = environment.socketTextStream(address, port);// 3. 定义数据转换规则SingleOutputStreamOperator<String> outputStreamOperator = source.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String s) throws Exception {if (Integer.parseInt(s) > 100) {return true;}return false;}});// 4. 数据输出outputStreamOperator.print();// 5. 执行数据environment.execute();}
}

代码测试:

扁平映射 flatMap

flatMap 扁平映射可以将数据流中的数据拆分成多个个体处理,即消费一个元素,可以获得 0 个、1 个或者多个数据。

flatMap 算子的使用只需要调用 DataStream 对象的flapMap()方法即可,方法需要传入 FlatMapFunction 接口的实现类。

下列方法实现了将输入的数据按照空格进行划分,获得多个数据。

public class FlatMapDemo {public static void main(String[] args) throws Exception {// 1. 环境准备StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 2. socketTextStream 配置数据源String address = "47.92.146.85";int port = 8080;DataStreamSource<String> source = environment.socketTextStream(address, port);// 3. 定义数据转换规则SingleOutputStreamOperator<String> outputStreamOperator = source.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String s, Collector<String> collector) throws Exception {String[] strings = s.split(" ");for (String string:strings) {collector.collect(string);}}});// 4. 数据输出outputStreamOperator.print();// 5. 执行数据environment.execute();}
}

测试代码:

聚合算子 Aggregation

聚合算子,顾名思义,就是将一系列的数据按照某种规则进行统计和整合,从而提炼出更有用的信息的算子。

按键分区 keyBy

在 Flink 中,DataStream 对象没有直接进行聚合的 API,因为我们需要对海量的数据进行分区,然后并行处理数据以提高效率。因此,若要对数据进行聚合,首先需要对数据进行分区,keyBy 就是用来做分区处理的。

keyBy 可以通过指定一个 key 作为分区的依据,将一条数据流从逻辑上划分为不同的分区 partitions。在内部,是通过计算 key 的哈希值(hash code),对分区数进行取模运算来实现的。所以这里 key 如果是 POJO 的话,必须要重写hashCode()方法。

keyBy 的使用需要调用 DataSource 的keyBy()方法,传入的参数为 KeySelector 接口的实现类。

需要注意的是,keyBy 得到的结果将不再是 DataStream,而是会将 DataStream 转换为
KeyedStream,即”分区流“或”键控流“。

下列代码通过 Score 类的 className 属性对数据流进行分区。

public class KeyByDemo {public static void main(String[] args) throws Exception {// 1. 环境准备StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 2. 加载数据源DataStreamSource<Score> source = environment.fromElements(new Score("一班", "zzz", 76),new Score("一班", "zzz1", 77),new Score("二班", "qqq", 94),new Score("二班", "qqq1", 76),new Score("三班", "fff", 99),new Score("三班", "fff1", 81));// 3. 数据分区KeyedStream<Score, String> keyedStream = source.keyBy(new KeySelector<Score, String>() {@Overridepublic String getKey(Score score) throws Exception {return score.getClassName();}});// 4. 打印数据keyedStream.print();// 5. 执行程序environment.execute();}
}

测试代码:可以看到,班级相同的对象在最前面所对应的分区号也相同,即进入一个分区进行处理。

简单聚合

有了分区流 KeyedStream 之后,我们就可以根据它进行数据的聚合操作了。Flink 内置实现了一些简单的聚合 API:

  • sum():对指定字段做叠加求和;

  • min():对指定字段求最小值;

  • max():对指定字段求最大值;

  • minBy():对指定字段求最小值并保留含最小字段的整条数据;

  • maxBy():对指定字段求最大值并保留含最大子段的整条数据;

测试代码:(注意打印数据时每条聚合算子单独使用,否则结果不容易观察)

public class SimpleAggregationDemo {public static void main(String[] args) throws Exception {// 1. 环境准备StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 2. 加载数据源DataStreamSource<Score> source = environment.fromElements(new Score("一班", "zzz01", 76),new Score("一班", "zzz02", 89),new Score("一班", "zzz03", 84),new Score("二班", "qqq01", 94),new Score("二班", "qqq02", 74),new Score("二班", "qqq03", 64),new Score("三班", "fff01", 84),new Score("三班", "fff02", 94),new Score("三班", "fff03", 96));// 3. 数据分区KeyedStream<Score, String> keyedStream = source.keyBy(new KeySelector<Score, String>() {@Overridepublic String getKey(Score score) throws Exception {return score.getClassName();}});// 4. 打印数据keyedStream.max("score").print();keyedStream.min("score").print();keyedStream.sum("score").print();keyedStream.maxBy("score").print();keyedStream.minBy("score").print();// 5. 执行程序environment.execute();}
}

规约聚合 reduce

reduce 可以对已有的数据进行归约处理,把每一个新输入的数据和当前已经归约出来的值,再做一个聚合计算。该操作也会将 KeyedStream 转换为 DataStream。它不会改变流的元
素数据类型,所以输出类型和输入类型是一样的。

调用 KeyedStream 的reduce()方法时,需要传入一个参数,实现 ReduceFunction 接口。

下述代码利用 reduce 算子以及 Tuple 数据结构,同时计算出了 max 与 sum 的值

public class ReduceDemo {public static void main(String[] args) throws Exception {// 1. 环境准备StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 2. 加载数据源DataStreamSource<Score> source = environment.fromElements(new Score("一班", "zzz01", 76),new Score("一班", "zzz02", 89),new Score("一班", "zzz03", 84),new Score("二班", "qqq01", 94),new Score("二班", "qqq02", 74),new Score("二班", "qqq03", 64),new Score("三班", "fff01", 84),new Score("三班", "fff02", 94),new Score("三班", "fff03", 96));// 3. 数据分区source.map(new MapFunction<Score, Tuple3<String, Integer, Integer>>() {@Overridepublic Tuple3<String, Integer, Integer> map(Score score) throws Exception {return new Tuple3<String, Integer, Integer>(score.getClassName(), score.getScore(), score.getScore());}}).keyBy(new KeySelector<Tuple3<String, Integer, Integer>, String>() {@Overridepublic String getKey(Tuple3<String, Integer, Integer> tuple3) throws Exception {return tuple3.f0;}}).reduce(new ReduceFunction<Tuple3<String, Integer, Integer>>() {@Overridepublic Tuple3<String, Integer, Integer> reduce(Tuple3<String, Integer, Integer> t1,Tuple3<String, Integer, Integer> t2) throws Exception {return Tuple3.of(t1.f0, t1.f1 > t2.f1 ? t1.f1 : t2.f1, t1.f2 + t2.f2);}})// 4. 打印数据.print();// 5. 执行程序environment.execute();}
}

物理分区 Physical Partitioning

分区操作即将数据进行重新分布,传递到不同的分区中进行下一步的操作。比如之前介绍过的 keyBy 按键分区,就是通过指定 key 的哈希值对数据进行分区的。对于 keyBy 而言,无法确定数据到底分到哪个区,也不会考虑数据的分区是否均匀,它是一种逻辑分区(logical partitioning)。

若我们想精确的对数据进行分区,即真正的控制分区策略,那我们就需要物理分区策略(physical partitioning)。物理分区策略就类似消息队列和 Nginx 做的那样,常见的物理分区策略包括随机分配(Random)、轮询分配(Round-Robin)、重缩放(Rescale)、以及广播(Broadcast)。

随机分区 Random

通过调用 DataStream 对象的shuffle()方法,即可对数据进行“洗牌”,将数据随机分配到下游算子的并行任务当中。经过随机分区后,我们得到的仍然是一个 DataStream 对象。

下列代码将任务流从一个分区拓展为 4 个分区,并进行随机分区。

public class ShuffleDemo {public static void main(String[] args) throws Exception {// 1. 环境准备StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);environment.setParallelism(1);// 2. 加载数据源DataStreamSource<String> source = environment.addSource(new MySource());// 3. 打印输出source.shuffle().print("shuffle").setParallelism(4);// 4. 执行程序environment.execute();}
}

测试结果如下,可以看到,数据被随机分配到不同的分区执行打印的操作。

轮询分区 Round-Robin

轮询分区按照顺序依次将数据分发到不同的分区,通过调用 DataStream 对象的rebalance()方法即可实现数据的轮询分区。

下列代码将任务流从一个分区拓展为 4 个分区,并进行轮询分区。

public class RebalanceDemo {public static void main(String[] args) throws Exception {// 1. 环境准备StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);environment.setParallelism(1);// 2. 加载数据源DataStreamSource<String> source = environment.addSource(new MySource());// 3. 打印输出source.rebalance().print("rebalance").setParallelism(4);// 4. 执行程序environment.execute();}
}

代码输出结果如下,可以看到,数据按照分区编号 2-3-4-1 的顺序被依次分发到对应的分区。

重缩放分区 Rescale

重缩放分区与轮询分区类似,只不过两者的作用范围不同。轮询分区针对所有的上游任务和下游任务进行重新分区;而重缩放仅对部分上游任务和下游任务之间进行重新分区,节省更多资源。

当下游任务的数量是上游任务数量的整数倍时,rescale 的效率明显会更高。比如当上游任务数量是 2,下游任务数量是 6 时,上游任务其中一个分区的数据就将会平均分配到下游任务的 3 个分区中。

通过调用 DataStream 对象的rescale()方法可以实现数据的重缩放分区。

下列代码采用并行数据源的富函数,将奇数发送到索引为 1 的并行子任务;将偶数发送到索引为 0 的并行子任务。随后将任务流从 2 个分区拓展为 4 个分区,并进行重缩放分区。

public class RescaleDemo {public static void main(String[] args) throws Exception {// 1. 环境准备StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);environment.setParallelism(1);// 2. 加载数据源DataStreamSource<Integer> source = environment.addSource(new RichParallelSourceFunction<Integer>() {@Overridepublic void run(SourceContext<Integer> sourceContext) throws Exception {for (int i = 0; i < 8; i++) {if ((i + 1) % 2 == getRuntimeContext().getIndexOfThisSubtask()) {sourceContext.collect(i + 1);}}}@Overridepublic void cancel() {}}).setParallelism(2);// 3. 打印输出source.rescale().print("rescale").setParallelism(4);// 4. 执行程序environment.execute();}
}

执行结果如下,可以观察到,奇数数据在重分区时,被轮流分发到 3 和 4 两个子分区(对应索引为 1 的原子任务);而偶数数据在重分区时,被轮流分发到 1 和 2 两个子分区(对应索引为 0 的原子任务)。因此,重缩放分区实际上就是局部上的轮询分区。

广播 Broadcast

广播即将数据重新分发到所有的下游子任务当中,数据将存在在每一个子分区。通过调用 DataStream 对象的broadcast()方法,可以实现数据的广播。注意,该方法可能会导致数据的重复处理。

下列代码将任务流从一个分区拓展为 4 个分区,并进行数据广播分区。

public class BroadcastDemo {public static void main(String[] args) throws Exception {// 1. 环境准备StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);environment.setParallelism(1);// 2. 加载数据源DataStreamSource<Score> source = environment.fromElements(new Score("一班", "zqf", 100));// 3. 打印输出source.broadcast().print("broadcast").setParallelism(4);// 4. 执行程序environment.execute();}
}

代码结果如下,可以看到,数据被广播到 4 个子分区当中。

全局分区 Global

全局分区是一种特殊的分区方式。通过调用 DataStream 对象的global()方法,可以强行将下游任务的并行度变为 1,因此使用该操作需要特别谨慎,可能会对程序造成较大压力。

下列代码将任务流从 2 个分区通过全局分区的方式修改为 1 个分区。

public class GlobalDemo {public static void main(String[] args) throws Exception {// 1. 环境准备StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);environment.setParallelism(2);// 2. 加载数据源DataStreamSource<String> source = environment.addSource(new MySource());// 3. 打印输出source.global().print("global");// 4. 执行程序environment.execute();}
}

代码结果如下。

自定义分区 Custom

若 Flink 提供的分区策略均不能满足我们的需求,此时可以通过partitionCustom()方法来自定义分区策略,该方法在调用时需要传入两个参数:

  • 第一个参数为自定义分区器对象;

  • 第二个参数为应用自定义分区器的字段;

下列代码演示了如何通过数值的奇偶性进行分区。

public class CustomDemo {public static void main(String[] args) throws Exception {// 1. 环境准备StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);environment.setParallelism(1);// 2. 加载数据源DataStreamSource<String> source = environment.addSource(new MySource());// 3. 打印输出source.partitionCustom(new Partitioner<Integer>() {@Overridepublic int partition(Integer key, int numPartitions) {return key % 2;}}, new KeySelector<String, Integer>() {@Overridepublic Integer getKey(String s) throws Exception {return Integer.valueOf(s);}}).print().setParallelism(2);// 4. 执行程序environment.execute();}
}

代码结果如下,可以看到,所有奇数被分配到分区 2;所有偶数被分配到分区 1。

输出算子 Sink

Flink 作为数据处理框架,最终还需要把处理的结果写入外部系统。这个过程主要通过 Sink 算子实现,我们可以使用 Flink 提供的 Sink 算子,也可以自定义 Sink 算子。

之前的代码中我们一直使用的print()方法就是一种 Sink 算子,它表示将数据流写入标准控制台打印输出。

输出到文件

Flink 专门提供了一个流式文件系统的连接器 StreamingFileSink,其为流处理和批处理提供了统一的 Sink,可以将分区文件写入 Flink 支持的文件系统。

StreamingFileSink 支持行编码和批量编码两种方式,可以直接调用静态方法构建:

  • 行编码:StreamingFileSink.forRowFormat(basePath,rowEncoder)

  • 批量编码:StreamingFileSink.forBulkFormat(basePath,bulkWriterFactory)

在创建行或批量编码 Sink 时,我们需要传入两个参数,用来指定存储桶的基本路径
(basePath)和数据的编码逻辑(rowEncoder 或 bulkWriterFactory)。

下列代码创建了一个简单的文件 Sink,并制定了文件写入的滚动策略,滚动策略即我们开启新文件记录数据的标准,下列代码设置的滚动策略为:

  • 至少包含 15 分钟的数据;

  • 最近 5 分钟内没有收到新的数据;

  • 文件大小已经达到 1 GB;

public class SinkToFileDemo {public static void main(String[] args) throws Exception {// 1. 环境准备StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 2. socketTextStream 配置数据源DataStreamSource<Score> source = environment.fromElements(new Score("一班", "zzz01", 76),new Score("二班", "qqq03", 64),new Score("三班", "fff01", 84));// 3. 定义数据转换规则SingleOutputStreamOperator<String> outputStreamOperator = source.map(new MapFunction<Score, String>() {@Overridepublic String map(Score score) throws Exception {return score.toString();}});// 4. 数据输出StreamingFileSink<String> fileSink = StreamingFileSink.<String>forRowFormat(new Path("D:/work/my_project/FlinkDemo/src/main/resources"),new SimpleStringEncoder<>("utf-8")).withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(TimeUnit.MINUTES.toMillis(15)).withInactivityInterval(TimeUnit.MINUTES.toMillis(5)).withMaxPartSize(1024 * 1024 * 1024).build()).build();outputStreamOperator.addSink(fileSink);// 5. 执行数据environment.execute();}
}

输出到 Redis

Bahir 项目为我们提供了 Flink 和 Redis 的连接器,首先我们导入连接器的依赖

        <dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version></dependency>

连接器为我们提供了一个 RedisSink,创建对象时需要输入两个参数:

  • 第一个参数是 Jedis 连接池的配置信息;

  • 第二个参数需要传入 Redis 映射类的接口,需要我们写一个类继承 RedisMapper 接口,并实现相应的方法,定义数据转换成 Redis 数据格式的逻辑;

Redis 的映射类代码如下,此处我们保存的数据类型是 hash,表名设置为 score;每条数据的 name 字段作为 key,score 字段作为 value。

public class MyRedisMapper implements RedisMapper<Score> {@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.HSET, "score");}@Overridepublic String getKeyFromData(Score score) {return score.getName();}@Overridepublic String getValueFromData(Score score) {return Integer.toString(score.getScore());}
}

完整的实现数据 sink 到 redis 的代码如下。

public class SinkToRedisDemo {public static void main(String[] args) throws Exception {// 1. 环境准备StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 2. socketTextStream 配置数据源DataStreamSource<Score> source = environment.fromElements(new Score("一班", "zzz01", 76),new Score("二班", "qqq03", 64),new Score("三班", "fff01", 84));// 3. 创建一个 redis 的连接FlinkJedisPoolConfig jedisPoolConfig = new FlinkJedisPoolConfig.Builder().setHost("47.92.146.85").setPort(6379).build();// 4. 数据输出到 redissource.addSink(new RedisSink<Score>(jedisPoolConfig, new MyRedisMapper()));// 5. 执行数据environment.execute();}
}

运行代码,然后使用 redis-cli 连接 redis,查看结果。

root@2d1c2701081c:/data# redis-cli
127.0.0.1:6379> hgetall score
1) "fff01"
2) "84"
3) "zzz01"
4) "76"
5) "qqq03"
6) "64"
127.0.0.1:6379>

输出到数据库 MySQL

本小节将介绍如何将数据 sink 到 MySQL 数据库进行持久化存储。

首先我们需要引入相应的依赖,如下所示。

        <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.0.0-1.16</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.47</version></dependency>

根据要存储的信息在数据库中创建相应的表结构。

编写代码,对数据库的连接、sql 语句进行配置,完整代码如下。

public class SinkToMySQLDemo {public static void main(String[] args) throws Exception {// 1. 环境准备StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 2. socketTextStream 配置数据源DataStreamSource<Score> source = environment.fromElements(new Score("一班", "zzz01", 76),new Score("二班", "qqq03", 64),new Score("三班", "fff01", 84));// 3. 数据输出source.addSink(JdbcSink.sink("INSERT INTO score (class_name, name, score) VALUES (?, ?, ?)",(statement, score) -> {statement.setString(1, score.getClassName());statement.setString(2, score.getName());statement.setInt(3, score.getScore());},JdbcExecutionOptions.builder().withBatchIntervalMs(1000).withBatchIntervalMs(200).withMaxRetries(5).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://47.92.146.85:3306/SysManage?useUnicode=true&useSSL=false&characterEncoding=utf8&serverTimezone=GMT%2B8&allowMultiQueries=true&rewriteBatchedStatements=true").withDriverName("com.mysql.jdbc.Driver").withUsername("root").withPassword("XXXXXXXXX").build()));// 5. 执行程序environment.execute();}
}

运行程序,连接数据库进行数据查看。

输出到 ElasticSearch

学习完 ES 相关知识后补充…

自定义输出算子 Sink

自定义输出算子需要我们编写自定义 Sink 类,并继承 RichSinkFunction 类,重写该类的invoke()方法和finish()方法,定义数据 sink 的处理逻辑。

下列自定义的 sink 算子将数据按照标准格式进行打印。

public class MySink extends RichSinkFunction<String> {/*** 每条读取到的记录都会调用该方法* @param value 获取到的值* @param context 可用于获取有关输入记录的附加数据的上下文*/@Overridepublic void invoke(String value, Context context) throws Exception {super.invoke(value, context);System.out.println("{data=" + value + "}");}/*** 任务完成后调用该方法*/@Overridepublic void finish() throws Exception {System.out.println("mission complete~");}
}

在使用自定义的 sink 算子时,只需要将实例化的对象传入addSink()方法即可。

public class MySinkDemo {public static void main(String[] args) throws Exception {// 1. 环境准备StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 2. socketTextStream 配置数据源String address = "47.92.146.85";int port = 8080;DataStreamSource<String> source = environment.socketTextStream(address, port);// 3. 定义数据转换规则SingleOutputStreamOperator<String> outputStreamOperator = source.map(new MapFunction<String, String>() {@Overridepublic String map(String s) throws Exception {return s;}});// 4. 数据输出outputStreamOperator.addSink(new MySink());// 5. 执行数据environment.execute();}
}

代码测试结果如下。

【基础】Flink -- DataStream API相关推荐

  1. flink DataStream API使用及原理

    传统的大数据处理方式一般是批处理式的,也就是说,今天所收集的数据,我们明天再把今天收集到的数据算出来,以供大家使用,但是在很多情况下,数据的时效性对于业务的成败是非常关键的. Spark 和 Flin ...

  2. Flink DataStream API 介绍

    Flink DataStream API 介绍 StreamExecutionEnvironment #mermaid-svg-JKeWa22W2vWA4zBS {font-family:" ...

  3. Flink DataStream API(基础版)

    概述   DataStream(数据流)本身是 Flink 中一个用来表示数据集合的类(Class),我们编写的 Flink 代码其实就是基于这种数据类型的处理,所以这套核心API 就以DataStr ...

  4. [Flink]Flink DataStream API 概览

    目录 什么是 DataStream 什么能被转化为流 流式Flink程序的开发流程 DataStream的数据源 迭代数据流 配置运行时参数 什么是 DataStream Datastream API ...

  5. Flink DataStream API 中的多面手——Process Function详解

    https://mp.weixin.qq.com/s/SOCAE-t25DPVlQMxuOT0jw 引言 在Flink的时间与watermarks详解这篇文章中,阐述了Flink的时间与水位线的相关内 ...

  6. flink DataStream API(三)事件时间-生成水印

    文章目录 生成水印 水印策略介绍 使用水印策略 处理空闲源 编写 `WatermarkGenerators` 编写周期 WatermarkGenerator 编写标点WatermarkGenerato ...

  7. 学习笔记Flink(六)—— Flink DataStream API编程

    一.Flink程序构成 获取执行环境: 加载/创建初始数据: 编写对数据的转换操作: 指定计算结果存放的位置: 触发程序执行: 二.数据源 Collection 类型数据源 fromCollectio ...

  8. flink DataStream API(三)事件时间-内置水印生成器

    文章目录 内置水印生成器 单调递增的时间戳 固定的延迟时间 内置水印生成器 如生成水印中所述,Flink提供了抽象,允许程序员分配他们自己的时间戳并发出他们自己的水印.更具体地说,可以通过实现Wate ...

  9. DataStream API:Overview

    Flink DataStream API Programming Guide Flink DataStream API编程指南 DataStream programs in Flink are reg ...

最新文章

  1. C++ 中隐藏DOS调用的命令行窗口
  2. 报名丨24小时创新挑战:数字时代的人类健康与福祉
  3. 检索数据_13_从表中查询空值
  4. 关于linux LVM的好图 (Logic Volume Management,逻辑卷管理)
  5. twisted 网络通信的简单例子
  6. 【opencv】图像处理之开闭运算
  7. Oracle统计信息锁,oracle 由于impdp 引起的表统计信息被锁 ORA-20005: object statistics are locked...
  8. 怎样使绝对定位的子元素铺满父元素并且子元素可以设置padding
  9. HDOJ---1232 畅通工程[并查集]
  10. 改变系统TCP默认 MSS
  11. 6天掌握记忆宫殿,你就是记忆大师
  12. 安装软件提示计算机管理员权限,Win7安装软件需要管理员权限的解决方法
  13. Java 菜鸟入门 | Java中的静态变量、实例变量、局部变量和成员变量
  14. 网页格式html转换成pdf的方法,将网页内容转化为PDF的三种方法
  15. 《现代汉语》北大公开课
  16. 少儿编程家长一定要看看这些值得参加的青少年专业权威赛考
  17. sofa与springboot的入门案例
  18. Java实例类中的切面_Spring进行面向切面编程的一个简单例子
  19. 【学习笔记】git配置到本地
  20. CleanMyMac最新版本下载体验心得及使用感受评价

热门文章

  1. ubuntu18.04系统中 matlab2019 和 sougou拼音冲突
  2. 常见android手机分辨率(xxhdpi,xhdpi)
  3. CF 1646C Factorials and Powers of Two
  4. 模拟电子技术/国外电子与通信教材系列
  5. Java的堆内存和栈内存
  6. Windows 10 ISO 官方镜像下载
  7. 梅特勒电子天平维修触摸屏维修XS205
  8. Arcgis小技巧【1】——地理配准
  9. 智能算法---蚁群算法介绍
  10. mac安装java开发环境-包含JDK、Maven、Svn、Idea