DataStream API及源算子
一个Flink程序,其实就是对DataStream的各种转换。具体来说,代码基本上都由以下几部分构成
- 获取执行环境(execution environment)
- 读取数据源(source)
- 定义基于数据的转换操作(transformation)
- 定义计算结果的输出位置(sink)
- 触发程序的执行(execute)
文章目录
- 执行环境(Execution Environment)
- 1、创建执行环境
- 2、执行模式
- 3、触发程序执行
- 源算子(Source)
- 准备工作
- 从集合、文件、SourceSocket、Kafka
- 自定义Source
- Flink支持的数据类型
执行环境(Execution Environment)
Flink程序可以在各种上下文环境中运行:本地JVM中执行、远程集群上运行。不同的环境,代码的提交执行过程会有所不同,这就要求我们在提交执行计算时,首先必须获取当前Flink的运行环境,从而建立与Flink框架之间的联系。只有获取了环境上下文信息,才能将具体的任务调度到不同的TaskManager执行。
1、创建执行环境
StreamExecutionEnvironment类对象
1、getExecutionEnvionment
会根据当前运行的上下文直接得到正确的结果:如果程序时独立运行,就返回一个本地执行环境;如果是创建了jar包,从命令行调用它并提交到集群执行,返回集群的执行环境。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();2、createLocalEnvironment
返回一个本地执行环境,可以在调用时传入一个参数,指定默认并行度;如果不传入,则默认并行度就是本地的CPU核心数
StreamExecutionEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironment();3、createRemoteEnvironment
返回集群执行环境,在调用时指定JobManager的主机号和端口号,并指定要在集群中运行的jar包
StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment.createRemoteEnvironment("host", // JobManager 主机名1234, // JobManager 进程端口号"path/to/jarFile.jar" // 提交给 JobManager 的 JAR 包);
2、执行模式
StreamExecutionEnvironment是流处理,批处理调用类ExecutionEnvironment 的静态方法。基于 ExecutionEnvironment 读入数据创建的数据集合,就是 DataSet;对应的调用的一整套转换方法,就是 DataSet API。 而从 1.12.0 版本起,Flink 实现了 API 上的流批统一。DataStream API 新增了一个重要特 性:可以支持不同的“执行模式”(execution mode),通过简单的设置就可以让一段 Flink 程序 在流处理和批处理之间切换。这样一来,DataSet API 也就没有存在的必要了
1、流执行模式(STREAMING)
DataStream API最经典的模式,一般用于需要持续实时处理的无界数据流。默认情况下,程序使用STREAMING执行模式
2、批执行模式(BATCH)
专门用于批处理的执行模式,在这种模式下,Flink处理作业的方式类似于MapReduce框架
3、自动模式(AUTOMATIC)
由程序根据输入数据源是否有界,来自动选择执行模式BATCH模式的配置方法
1、命令行配置:bin/flink run -Dexecution.runtime-mode=BATCH ...
2、通过代码配置:StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);什么时候选择BATCH模式?
在数据有界时,用BATCH模式处理批量数据,直接输出结果会更加有效
在数据无界时,用STREAMING模式处理持续的数据流,没得选择。
3、触发程序执行
有了执行环境,就可以构建程序的处理流程了:基于环境读取数据源,进而进行各种转换操作,最后输出结果到外部系统。
写完输出(sink)操作并不代表程序已经结束,Flink是由事件驱动,只有等到数据到来,才会触发真正的计算,这也被称为“延迟执行”或懒执行(lazy execution)。我们需要显示地调用执行环境的execute()方法,来触发程序执行,返回一个执行结果(JobExecutionResult):env.execute();
源算子(Source)
Flink中通用的添加source的方式,是调用执行环境的addSource()方法:
DataStream<String> stream = env.addSource(...);方法传入一个对象参数,需要实现SourceFunction接口,返回DataStreamSource(继承于SingleOutputStreamOperatorl类,进一步继承自DataStream)。读取数据的source操作时一个算子,得到的数一个数据源DataStream
准备工作
创建一个Bean类
public class Event {public String user;public String url;public Long timestamp;public Event(){}public Event(String user, String url, Long timestamp) {this.user = user;this.url = url;this.timestamp = timestamp;}@Overridepublic String toString() {return "Event{" +"user='" + user + '\'' +", url='" + url + '\'' +", timestamp=" + new Timestamp(timestamp) +'}';}
}
Event的特点:类是公有的、有一个无参的构造方法、所有属性都是公有的、所有属性的类型都是可以序列化的
从集合、文件、SourceSocket、Kafka
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.ArrayList;
import java.util.Properties;public class SourceTest {public static void main(String[] args) throws Exception {//创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//1、从文件中读取数据DataStreamSource<String> stream1 = env.readTextFile("input/clicks.txt");//2、从集合中读取数据ArrayList<Integer> nums = new ArrayList<>();nums.add(2);nums.add(5);DataStreamSource<Integer> numStream = env.fromCollection(nums);ArrayList<Event> events = new ArrayList<>();events.add(new Event("Mary", "./home", 1000L));events.add(new Event("Bob", "./cart", 2000L));DataStreamSource<Event> stream2 = env.fromCollection(events);//3.从元素读取数据DataStreamSource<Event> stream3 = env.fromElements(new Event("Mary", "./home", 1000L),new Event("Bob", "./cart", 2000L));//4.从socket文本流读中读取DataStreamSource<String> stream4 = env.socketTextStream("hadoop102", 7777);//5.从kafka中读取数据Properties properties = new Properties();properties.setProperty("bootstrap.servers","hadoop102:9092");properties.setProperty("group.id", "consumer-group");properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("auto.offset.reset", "latest");DataStreamSource<String> kafkaStream = env.addSource(new FlinkKafkaConsumer<String>("clicks", new SimpleStringSchema(), properties));// stream1.print("1");
// numStream.print("nums");
// stream2.print("2");
// stream3.print("3");kafkaStream.print();env.execute();}
}
从kafka读取数据
自定义Source
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;import java.util.Random;public class SourceCustomTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//DataStreamSource<Event> customStream = env.addSource(new ClickSource());DataStreamSource<Integer> customStream = env.addSource(new ParallelCustomSource()).setParallelism(2);customStream.print();env.execute();}//实现自定义的并行SourceFunctionpublic static class ParallelCustomSource implements ParallelSourceFunction<Integer> {private Boolean running = true;private Random random = new Random();@Overridepublic void run(SourceContext<Integer> ctx) throws Exception {while (running){ctx.collect(random.nextInt());}}@Overridepublic void cancel() {running = false;}}
}
Flink支持的数据类型
1、基本类型
所有Jvav基本类型及其包装类,外加Void、String、Date、BigDecimal、BigInteger2、数组类型
基本数组类型(PRIMITIVE_ARRAY)、对象数组(OBJECT_ARRAY)3、复合数据类型
Java元组类型(TUPLE):Flink内置元组类型,是Java API一部分,最多25个字段,不支持空字段
Scala样例类以及Scala元组:不支持空字段
行类型:具有任意个字段的元素,支持空字段
POJO:Flink自定义的类似于Java bean模式的类4、辅助类型
Option、Either、List、Map等5、泛型类型(GENERIC)
Flink支持所有的Java类和Scala类。如果没有按照要求定义POJO类型,就会被当做泛型,Flink会把泛型类当做黑盒,无法获取它们内部的属性;它们由Kryo序列化
POJO类型要求如下:类是public、standaloe;类有一个public的无参构造方法;类中所有字段是public且非final的;
类型提示(TypeHints)
由于Java中泛型擦除的存在,在某些特殊情况下,自动提取的信息是不够精细的,这时就需要显示地提供类型信息。
Flink专门提供了TypeHint类,它可以捕获泛型的类型信息,并且一直记录下来;我们也可以通过.returns()方法明确转化后的元素类型:returns(new TypeHint<Tuple2<Integer, SomeType>>(){})
DataStream API及源算子相关推荐
- [Flink]Flink DataStream API 概览
目录 什么是 DataStream 什么能被转化为流 流式Flink程序的开发流程 DataStream的数据源 迭代数据流 配置运行时参数 什么是 DataStream Datastream API ...
- 【基础】Flink -- DataStream API
Flink -- DataStream API 执行环境 Execution Environment 创建执行环境 设置执行模式 触发程序执行 源算子 Source 从集合中读取数据 从文件读取数据 ...
- DataStream API【1】
一个Flink程序,就是对DataStream的各种转换.代码基本由以下几部分构成: 获取执行环境 读取数据源 定义基于数据的各种转换操作 定义计算结果的输出位置 触发程序执行 执行环境 创建执行环境 ...
- Flink DataStream API(基础版)
概述 DataStream(数据流)本身是 Flink 中一个用来表示数据集合的类(Class),我们编写的 Flink 代码其实就是基于这种数据类型的处理,所以这套核心API 就以DataStr ...
- 第八章 DataStream API
第八章 DataStream API 一.执行模式 DataStream API 支持不同的运行时执行模式,你可以根据你的用例需要和作业特点进行选择. DataStream API有一种"经 ...
- Flink自带的Source源算子以及自定义数据源Source
文章目录 Flink的DataStream API(基础篇) Source源算子 从集合中读取数据 从文件中读取数据 从Scoket中读取数据 从Kafka中读取数据 自定义Source Flink的 ...
- flink DataStream API使用及原理
传统的大数据处理方式一般是批处理式的,也就是说,今天所收集的数据,我们明天再把今天收集到的数据算出来,以供大家使用,但是在很多情况下,数据的时效性对于业务的成败是非常关键的. Spark 和 Flin ...
- Flink 1.13,面向流批一体的运行时与 DataStream API 优化
简介:在 1.13 中,针对流批一体的目标,Flink 优化了大规模作业调度以及批执行模式下网络 Shuffle 的性能,以及在 DataStream API 方面完善有限流作业的退出语义. 本文由社 ...
- flink fi java_Flink DataStream API编程指南
Flink中的DataStream程序是实现数据流转换的常规程序(例如:filtering, updating state, defining windows, aggregating).数据流最初是 ...
最新文章
- EBS 11i数据库升级(9i-10g)几点事项
- C++——this指针
- eclipse报jvm terminated.exitcode=2错误的解决方法
- mysql开启慢查询
- Random Maze HDU - 4067 费用流/可行流
- 跨域(三)——JSONP
- Android应用程序之间共享文字和图片(一)
- python 日志内容提取
- idea报Can’t Open Local Terminal
- autohold有什么弊端吗_自动驻车AUTO HOLD有啥作用?怎样使用?
- C语言课后习题(26)
- 架构师需要了解的知识
- hadoop集群服务重启后出错
- Python的学习必备基础知识总结
- OCR文字识别,PDF格式转换
- [导入]代理猎手找大学代理
- ”被裁员6次的运营总监分享”总被裁员的运营人到底是为什么?
- 基于JavaScript+css的购物网站项目
- 微信Python自动回复代码
- 直播带货app源码,实现直播连麦和PK
热门文章
- 创建ServiceArea
- andorid 三种方式的练习
- 第一部分 OpenStack及其构成简介
- Rayeager PX2 不能进入烧写模式解决方案
- javascript获取元素样式值
- 在ubuntu10.04安装java5和java6
- Leetcode 561.数组拆分I
- 磁盘的成组与分解技术
- Linux下Qt5: QMediaRecorder的问题,以及使用QCamera相关类进行摄像头视频采集
- Warning: 'https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/' already解决