一个Flink程序,其实就是对DataStream的各种转换。具体来说,代码基本上都由以下几部分构成

  • 获取执行环境(execution environment)
  • 读取数据源(source)
  • 定义基于数据的转换操作(transformation)
  • 定义计算结果的输出位置(sink)
  • 触发程序的执行(execute)

    文章目录

    • 执行环境(Execution Environment)
      • 1、创建执行环境
      • 2、执行模式
      • 3、触发程序执行
    • 源算子(Source)
      • 准备工作
      • 从集合、文件、SourceSocket、Kafka
      • 自定义Source
      • Flink支持的数据类型

执行环境(Execution Environment)

Flink程序可以在各种上下文环境中运行:本地JVM中执行、远程集群上运行。不同的环境,代码的提交执行过程会有所不同,这就要求我们在提交执行计算时,首先必须获取当前Flink的运行环境,从而建立与Flink框架之间的联系。只有获取了环境上下文信息,才能将具体的任务调度到不同的TaskManager执行。

1、创建执行环境

StreamExecutionEnvironment类对象

1、getExecutionEnvionment
会根据当前运行的上下文直接得到正确的结果:如果程序时独立运行,就返回一个本地执行环境;如果是创建了jar包,从命令行调用它并提交到集群执行,返回集群的执行环境。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();2、createLocalEnvironment
返回一个本地执行环境,可以在调用时传入一个参数,指定默认并行度;如果不传入,则默认并行度就是本地的CPU核心数
StreamExecutionEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironment();3、createRemoteEnvironment
返回集群执行环境,在调用时指定JobManager的主机号和端口号,并指定要在集群中运行的jar包
StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment.createRemoteEnvironment("host", // JobManager 主机名1234, // JobManager 进程端口号"path/to/jarFile.jar" // 提交给 JobManager 的 JAR 包);

2、执行模式

StreamExecutionEnvironment是流处理,批处理调用类ExecutionEnvironment 的静态方法。基于 ExecutionEnvironment 读入数据创建的数据集合,就是 DataSet;对应的调用的一整套转换方法,就是 DataSet API。 而从 1.12.0 版本起,Flink 实现了 API 上的流批统一。DataStream API 新增了一个重要特 性:可以支持不同的“执行模式”(execution mode),通过简单的设置就可以让一段 Flink 程序 在流处理和批处理之间切换。这样一来,DataSet API 也就没有存在的必要了

1、流执行模式(STREAMING)
DataStream API最经典的模式,一般用于需要持续实时处理的无界数据流。默认情况下,程序使用STREAMING执行模式
2、批执行模式(BATCH)
专门用于批处理的执行模式,在这种模式下,Flink处理作业的方式类似于MapReduce框架
3、自动模式(AUTOMATIC)
由程序根据输入数据源是否有界,来自动选择执行模式BATCH模式的配置方法
1、命令行配置:bin/flink run -Dexecution.runtime-mode=BATCH ...
2、通过代码配置:StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);什么时候选择BATCH模式?
在数据有界时,用BATCH模式处理批量数据,直接输出结果会更加有效
在数据无界时,用STREAMING模式处理持续的数据流,没得选择。

3、触发程序执行

有了执行环境,就可以构建程序的处理流程了:基于环境读取数据源,进而进行各种转换操作,最后输出结果到外部系统。
写完输出(sink)操作并不代表程序已经结束,Flink是由事件驱动,只有等到数据到来,才会触发真正的计算,这也被称为“延迟执行”或懒执行(lazy execution)。我们需要显示地调用执行环境的execute()方法,来触发程序执行,返回一个执行结果(JobExecutionResult):env.execute();

源算子(Source)

Flink中通用的添加source的方式,是调用执行环境的addSource()方法:

DataStream<String> stream = env.addSource(...);方法传入一个对象参数,需要实现SourceFunction接口,返回DataStreamSource(继承于SingleOutputStreamOperatorl类,进一步继承自DataStream)。读取数据的source操作时一个算子,得到的数一个数据源DataStream

准备工作

创建一个Bean类

public class Event {public String user;public String url;public Long timestamp;public Event(){}public Event(String user, String url, Long timestamp) {this.user = user;this.url = url;this.timestamp = timestamp;}@Overridepublic String toString() {return "Event{" +"user='" + user + '\'' +", url='" + url + '\'' +", timestamp=" + new Timestamp(timestamp) +'}';}
}

Event的特点:类是公有的、有一个无参的构造方法、所有属性都是公有的、所有属性的类型都是可以序列化的

从集合、文件、SourceSocket、Kafka

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.ArrayList;
import java.util.Properties;public class SourceTest {public static void main(String[] args) throws Exception {//创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//1、从文件中读取数据DataStreamSource<String> stream1 = env.readTextFile("input/clicks.txt");//2、从集合中读取数据ArrayList<Integer> nums = new ArrayList<>();nums.add(2);nums.add(5);DataStreamSource<Integer> numStream = env.fromCollection(nums);ArrayList<Event> events = new ArrayList<>();events.add(new Event("Mary", "./home", 1000L));events.add(new Event("Bob", "./cart", 2000L));DataStreamSource<Event> stream2 = env.fromCollection(events);//3.从元素读取数据DataStreamSource<Event> stream3 = env.fromElements(new Event("Mary", "./home", 1000L),new Event("Bob", "./cart", 2000L));//4.从socket文本流读中读取DataStreamSource<String> stream4 = env.socketTextStream("hadoop102", 7777);//5.从kafka中读取数据Properties properties = new Properties();properties.setProperty("bootstrap.servers","hadoop102:9092");properties.setProperty("group.id", "consumer-group");properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("auto.offset.reset", "latest");DataStreamSource<String> kafkaStream = env.addSource(new FlinkKafkaConsumer<String>("clicks", new SimpleStringSchema(), properties));//        stream1.print("1");
//        numStream.print("nums");
//        stream2.print("2");
//        stream3.print("3");kafkaStream.print();env.execute();}
}

从kafka读取数据

自定义Source

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;import java.util.Random;public class SourceCustomTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//DataStreamSource<Event> customStream = env.addSource(new ClickSource());DataStreamSource<Integer> customStream = env.addSource(new ParallelCustomSource()).setParallelism(2);customStream.print();env.execute();}//实现自定义的并行SourceFunctionpublic static class ParallelCustomSource implements ParallelSourceFunction<Integer> {private Boolean running = true;private Random random = new Random();@Overridepublic void run(SourceContext<Integer> ctx) throws Exception {while (running){ctx.collect(random.nextInt());}}@Overridepublic void cancel() {running = false;}}
}

Flink支持的数据类型

1、基本类型
所有Jvav基本类型及其包装类,外加Void、String、Date、BigDecimal、BigInteger2、数组类型
基本数组类型(PRIMITIVE_ARRAY)、对象数组(OBJECT_ARRAY)3、复合数据类型
Java元组类型(TUPLE):Flink内置元组类型,是Java API一部分,最多25个字段,不支持空字段
Scala样例类以及Scala元组:不支持空字段
行类型:具有任意个字段的元素,支持空字段
POJO:Flink自定义的类似于Java bean模式的类4、辅助类型
Option、Either、List、Map等5、泛型类型(GENERIC)
Flink支持所有的Java类和Scala类。如果没有按照要求定义POJO类型,就会被当做泛型,Flink会把泛型类当做黑盒,无法获取它们内部的属性;它们由Kryo序列化
POJO类型要求如下:类是public、standaloe;类有一个public的无参构造方法;类中所有字段是public且非final的;

类型提示(TypeHints)

由于Java中泛型擦除的存在,在某些特殊情况下,自动提取的信息是不够精细的,这时就需要显示地提供类型信息。
Flink专门提供了TypeHint类,它可以捕获泛型的类型信息,并且一直记录下来;我们也可以通过.returns()方法明确转化后的元素类型:returns(new TypeHint<Tuple2<Integer, SomeType>>(){})

DataStream API及源算子相关推荐

  1. [Flink]Flink DataStream API 概览

    目录 什么是 DataStream 什么能被转化为流 流式Flink程序的开发流程 DataStream的数据源 迭代数据流 配置运行时参数 什么是 DataStream Datastream API ...

  2. 【基础】Flink -- DataStream API

    Flink -- DataStream API 执行环境 Execution Environment 创建执行环境 设置执行模式 触发程序执行 源算子 Source 从集合中读取数据 从文件读取数据 ...

  3. DataStream API【1】

    一个Flink程序,就是对DataStream的各种转换.代码基本由以下几部分构成: 获取执行环境 读取数据源 定义基于数据的各种转换操作 定义计算结果的输出位置 触发程序执行 执行环境 创建执行环境 ...

  4. Flink DataStream API(基础版)

    概述   DataStream(数据流)本身是 Flink 中一个用来表示数据集合的类(Class),我们编写的 Flink 代码其实就是基于这种数据类型的处理,所以这套核心API 就以DataStr ...

  5. 第八章 DataStream API

    第八章 DataStream API 一.执行模式 DataStream API 支持不同的运行时执行模式,你可以根据你的用例需要和作业特点进行选择. DataStream API有一种"经 ...

  6. Flink自带的Source源算子以及自定义数据源Source

    文章目录 Flink的DataStream API(基础篇) Source源算子 从集合中读取数据 从文件中读取数据 从Scoket中读取数据 从Kafka中读取数据 自定义Source Flink的 ...

  7. flink DataStream API使用及原理

    传统的大数据处理方式一般是批处理式的,也就是说,今天所收集的数据,我们明天再把今天收集到的数据算出来,以供大家使用,但是在很多情况下,数据的时效性对于业务的成败是非常关键的. Spark 和 Flin ...

  8. Flink 1.13,面向流批一体的运行时与 DataStream API 优化

    简介:在 1.13 中,针对流批一体的目标,Flink 优化了大规模作业调度以及批执行模式下网络 Shuffle 的性能,以及在 DataStream API 方面完善有限流作业的退出语义. 本文由社 ...

  9. flink fi java_Flink DataStream API编程指南

    Flink中的DataStream程序是实现数据流转换的常规程序(例如:filtering, updating state, defining windows, aggregating).数据流最初是 ...

最新文章

  1. EBS 11i数据库升级(9i-10g)几点事项
  2. C++——this指针
  3. eclipse报jvm terminated.exitcode=2错误的解决方法
  4. mysql开启慢查询
  5. Random Maze HDU - 4067 费用流/可行流
  6. 跨域(三)——JSONP
  7. Android应用程序之间共享文字和图片(一)
  8. python 日志内容提取
  9. idea报Can’t Open Local Terminal
  10. autohold有什么弊端吗_自动驻车AUTO HOLD有啥作用?怎样使用?
  11. C语言课后习题(26)
  12. 架构师需要了解的知识
  13. hadoop集群服务重启后出错
  14. Python的学习必备基础知识总结
  15. OCR文字识别,PDF格式转换
  16. [导入]代理猎手找大学代理
  17. ”被裁员6次的运营总监分享”总被裁员的运营人到底是为什么?
  18. 基于JavaScript+css的购物网站项目
  19. 微信Python自动回复代码
  20. 直播带货app源码,实现直播连麦和PK

热门文章

  1. 创建ServiceArea
  2. andorid 三种方式的练习
  3. 第一部分 OpenStack及其构成简介
  4. Rayeager PX2 不能进入烧写模式解决方案
  5. javascript获取元素样式值
  6. 在ubuntu10.04安装java5和java6
  7. Leetcode 561.数组拆分I
  8. 磁盘的成组与分解技术
  9. Linux下Qt5: QMediaRecorder的问题,以及使用QCamera相关类进行摄像头视频采集
  10. Warning: 'https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/' already解决