一、Flink基础API-Flink编程的基本概念

1.1、Flink程序

  • Flink 程序是实现了分布式集合转换(例如过滤、映射、更新状态、join、分组、定义窗口、聚合)的规范化程序。

  • 集合初始创建自 source(例如读取文件、kafka 主题,或本地内存中的集合)。

  • 结果通过 sink 返回,例如,它可以将数据写入(分布式)文件,或标准输出(例如命令行终端)。

  • Flink 程序可以在多种环境中运行,独立运行或嵌入到其他程序中。可以在本地 JVM 中执行,也可以在多台机器的集群上执行

  • 针对有界和无界两种数据 source 类型,可以使用 DataSet API 来编写批处理程序或使用 DataStream API 来编写流处理程序。

  • 对于流处理,使用 StreamingExecutionEnvironment 和 DataStream API。

  • 对于批处理,将他们替换为 ExecutionEnvironment 和 DataSet API 即可,概念是完全相同的。


1.2、DataSet和DataStream

  • Flink 用特有的 DataSet 和 DataStream 类来表示程序中的数据。

  • 可以将他们视为可能包含重复项的不可变数据集合。对于 DataSet,数据是有限的,而对于 DataStream,元素的数量可以是无限的。

  • 这些集合与标准的 Java 集合有一些关键的区别。首先它们是不可变的,也就是说它们一旦被创你就不能添加或删除元素了,同时也不能简单地检查它们内部的元素。

  • 在 Flink 程序中,集合最初通过添加数据 source 来创建,通过使用诸如 map、filter 等 API 方法对数据 source 进行转换从而派生新的集合。


1.3、Flink程序构成

  1. 获取执行环境
  2. 加载/创建初始数据;
  3. 编写对数据的转换操作;
  4. 指定计算结果存放的位置;
  5. 触发程序执行;

1.3.1、获取执行环境

  • Scala DataSet API : org.apache.flink.api.scala ;
  • Scala DataStream API : org.apache.flink.streaming.api.scala ;
  • StreamExecutionEnvironment 是所有 Flink 程序的基础,可以使用它的这些静态方法获取:
  • 一般只需要使用getExecutionEnvironment(),它会根据上下文环境完成正确的工作。
  • 例如,在IDE中执行程序或者作为标准的 Java 程序来执行,它会创建本机执行环境。
  • 如果将程序封装成 JAR 包,然后通过命令行调用,Flink 集群管理器会执行你的 main 方法并且 getExecutionEnvironment() 会返回在集群上执行程序的执行环境。

1.3.2、加载数据集

  • 针对不同的数据 source,执行环境有若干不同的读取文件的方法:你可以逐行读取 CSV 文件,或者使用完全自定义的输入格式。要将文本文件作为一系列行读取,你可以使用:

1.3.3、编写转换操作

  • 通过2.的操作会得到一个DataStream数据流,然后对其应用转换操作就可以创建新的派生 DataStream。

  • 通过调用 DataStream 的转换函数来进行转换。如下是一个映射转换的实例:

  • 通过把原始数据集合的每个字符串转换为一个整数,从而创建出一个新的 DataStream。


1.3.4、指定计算结果存放位置& 触发程序执行

  • 一旦得到了包含最终结果的 DataStream,就可以通过创建 sink 将其写入外部系统。
  • 例如,下面是一些创建 sink 的示例:
  • 当设定好整个程序以后只需要调用 StreamExecutionEnvironment 的 execute() 方法触发程序执行。execute() 方法返回 JobExecutionResult,它包括执行耗时和一个累加器的结果。
  • 如果不需要等待作业的结束,只是想要触发程序执行,你可以调用 StreamExecutionEnvironment 的 executeAsync() 方法。这个方法将返回一个 JobClient 对象,通过 JobClient 能够与程序对应的作业进行交互。

1.4、延迟计算

  • 无论在本地还是集群执行,所有的 Flink 程序都是延迟执行的:

  • 当程序的 main 方法被执行时,并不立即执行数据的加载和转换,而是创建每个操作并将其加入到程序的执行计划中。

  • 当执行环境调用 execute() 方法显式地触发执行的时候才真正执行各个操作。

  • 延迟计算允许你构建复杂的程序,Flink 将其作为整体计划单元来执行。


1.5、指定键、值

  • 一些转换操作(join, coGroup, keyBy, groupBy)要求在元素集合上定义键。
  • 另外一些转换操作 (Reduce, GroupReduce, Aggregate, Windows)允许在应用这些转换之前将数据按键分组。
  • Flink 的数据模型不是基于键值对的。因此不需要将数据集类型物理地打包到键和值中。
  • 键都是“虚拟的”:它们的功能是指导分组算子用哪些数据来分组。

1.6、为Tuple定义键

  • 最简单的方式是按照 Tuple 的一个或多个字段进行分组:
  • 按照第一个、第二个字段组合来进行分组
  • 使用字段表达式来定义键

1.7、指定转换函数

  • 匿名函数
  • 富函数

二、Flink基础API-支持的数据类型

  • Java Tuple 和 Scala Case Class

Flink 将满足如下条件的 Java 和 Scala 的类作为特殊的 POJO 数据类型处理

  • 类必须是公有的。

  • 它必须有一个公有的无参构造器(默认构造器)。

  • 所有的字段要么是公有的要么必须可以通过 getter 和 setter 函数访问。例如一个名为 foo 的字段,它的 getter 和 setter 方法必须命名为 getFoo() 和 setFoo()。

  • 字段的类型必须被已注册的序列化程序所支持。

  • 基本数据类型
    Flink 支持所有 Java 和 Scala 的基本数据类型如 Integer、 String、和 Double。

  • 常规的类
    Flink 支持大部分 Java 和 Scala 的类(API 和自定义)。 除了包含无法序列化的字段的类,如文件指针,I / O流或其他本地资源。

  • 值类型
    值类型手工描述其序列化和反序列化。它们不是通过通用序列化框架,而是通过实现 org.apache.flinktypes.Value 接口的 read 和 write 方法来为这些操作提供自定义编码。当通用序列化效率非常低时,使用值类型是合理的。

  • Hadoop Writable
    可以使用实现了 org.apache.hadoop.Writable 接口的类型。它们会使用 write() 和 readFields() 方法中定义的序列化逻辑。

  • 特殊类型
    可以使用特殊类型,包括 Scala 的 Either、Option 和 Try。 Java API 有对 Either 的自定义实现。 类似于 Scala 的 Either,它表示一个具有 Left 或 Right 两种可能类型的值。 Either 可用于错误处理或需要输出两种不同类型记录的算子。


三、DataStream API

3.1、数据源

一般情况下通过StreamExecutionEnvironment.addSource(sourceFunction) 就可以添加数据源。

文件类型数据源

  • readTextFile(path) : 使用TextInputFormat 按行读取文本,每行返回一个字符串。

  • readFile(fileInputFormat, path) : 通过自定义fileInputFormat 来读取数据;

  • readFile(fileInputFormat , path , watchType , interval , pathFilter) : watchType : 新文件数据;

Socket 类型数据源

  • socketTextStream : 通过socket读取数据,可以通过设置分隔符来区分每个数据。

Collection 类型数据源

  • fromCollection(Seq)
  • fromCollection(Iterator)
  • fromElements(elements:_*)
  • fromParallelCollection(SplittableIterator)
  • generateSequence(from, to)

3.2、转换操作

  • 值 -> 值
  • (键,值) -> (键,值)

3.3、输出源

  • writeAsText() / TextOutputFormat : 按行写入数据
  • writeAsCsv() : 把元组数据通过都好进行分割写入文件
  • print() / printToErr() :把数据通过标准输出或异常输出进行打印
  • writeUsingOutputFormat() / FileOutputFormat : 自定义 输出文件类型;
  • writeToSocket : 通过socket 进行数据输出;
  • addSink : 调用自定义输出源

四、DataSet API

4.1、数据源


4.2、输出源


五、Table API

Table API 分为JavaScalaPython三种,Scala API需要导入
org.apache.flink.api.scala._org.apache.flink.table.api.scala._,并且Scala中字段需要使用特殊字符(’)来进行表示。

下面的例子会对Order表进行扫描,过滤null值,把a列变小写,然后针对每个小时对a进行分组,求b的平均值。


六、SQL API

启动SQL client:

./bin/sql-client.sh embedded


测试1

可以看到

设置table模式

SET execution.result-mode=table;

示例:

看到

设置changelog模式

SET execution.result-mode=changelog;

示例:

看到

学习笔记Flink(四)—— Flink基础API及核心数据结构相关推荐

  1. Vue基础学习笔记Day02_vue-cli脚手架_基础API

    今日目标: 能够理解vue的概念和作用 能够理解vuecli脚手架工程化开发 能够使用vue指令 学习内容: 学习目录: Vue基本概念 @vue/cli脚手架 Vue指令 今日总结 面试题 铺垫 必 ...

  2. 小迪渗透测试学习笔记(四)基础入门-WEB源码拓展

    前言: WEB源码在安全测试中是非常重要的信息来源,可以用来代码审 计漏洞也可以用来做信息突破口,其中WEB源码有很多技术需要简明分析. 比如:获取某ASP源码后可以采用默认数据库下载为突破,获取某其 ...

  3. OpenCV学习笔记(四十一)——再看基础数据结构core OpenCV学习笔记(四十二)——Mat数据操作之普通青年、文艺青年、暴力青年 OpenCV学习笔记(四十三)——存取像素值操作汇总co

    OpenCV学习笔记(四十一)--再看基础数据结构core 记得我在OpenCV学习笔记(四)--新版本的数据结构core里面讲过新版本的数据结构了,可是我再看这部分的时候,我发现我当时实在是看得太马 ...

  4. OpenCV学习笔记(四十二)——Mat数据操作之普通青年、文艺青年、暴力青年

    首先还是要感谢箫鸣朋友在我<OpenCV学习笔记(四十)--再谈OpenCV数据结构Mat详解>的留言,告诉我M.at<float>(3, 3)在Debug模式下运行缓慢,推荐 ...

  5. PyTorch学习笔记(四):PyTorch基础实战

    PyTorch实战:以FashionMNIST时装分类为例: 往期学习资料推荐: 1.Pytorch实战笔记_GoAI的博客-CSDN博客 2.Pytorch入门教程_GoAI的博客-CSDN博客 本 ...

  6. 《Go语言圣经》学习笔记 第四章 复合数据类型

    <Go语言圣经>学习笔记 第四章 复合数据类型 目录 数组 Slice Map 结构体 JSON 文本和HTML模板 注:学习<Go语言圣经>笔记,PDF点击下载,建议看书. ...

  7. JDBC 学习笔记(一)—— 基础知识 + 分页技术

    2019独角兽企业重金招聘Python工程师标准>>> 本文查阅方法:     1.查阅目录 -- 查阅本文目录,确定想要查阅的目录标题     2.快捷"查找" ...

  8. Docker 学习笔记 - 进阶四 Docker网络和 Docker compose

    Docker 学习笔记 - 进阶四 Docker网络和 Docker compose 4. docker 网络 4.1 docker network是什么 4.1.1 docker不启动,默认网络情况 ...

  9. Spring Cloud 学习笔记(四)-Spring Cloud Hystrix

    Spring Cloud 学习笔记(四)-Spring Cloud Hystrix 由于前一阵子项目的原因,今天才继续弄上,今天想学习一下Hystrix组件 这个组件还挺抽象的,最开始我一直没太明白, ...

最新文章

  1. Android 图片(ImageView)设置透明的方法
  2. 一文看懂怎样用 Python 创建比特币交易
  3. 何恺明、吴育昕最新成果:用组归一化替代批归一化
  4. 数字电路时钟问题——Jitter与Skew区别
  5. chrome调试手机webview中页面
  6. Http怎么处理长连接
  7. 第六届中国开源年会(COSCon'21)开心开源精彩收官
  8. 前后落差大用什么词语_语文考题一共有五类:汉字类、词语类、句子类、阅读类、作文类,如果基础扎实,答题技巧弄懂了,哪一类都能拿高分!...
  9. 两种最短路径(测地距离)的算法——Dijkstra和Floyd
  10. 基于 SpringMvc + OpenCV 实现的答题卡识别系统(附源码)
  11. 上海“随申码”研发幕后::倒排期的3天!
  12. 《线性代数:行列式》: 克拉默法则
  13. SICP练习题1.14
  14. 卡在硬盘启动计算机,插硬盘启动卡死了,怎么办?电脑维修方法
  15. F-Pairwise Modulo
  16. SAP资产负债表实现方案探索 - 基于 Excel-DNA 自定义函数方法
  17. 暗色html模板,暗色个人主页网站模板
  18. 一个好用的项目工时管理系统
  19. 提笔忘字低龄化 电脑难辞其咎
  20. Linux中如何启动redis服务

热门文章

  1. python全局变量定义_Python 3 实现定义跨模块的全局变量和使用
  2. 67. Leetcode 450. 删除二叉搜索树中的节点 (二叉搜索树-基本操作类)
  3. 判断两个字符串是否为旋转词
  4. Django startproject的问题
  5. emacs python plugin_使用 python 扩展 emacs
  6. 怎么样设置关闭网页再次登录网页是正常登录状态_学籍系统出现“该账号已登录,不能重复登录”怎么办?...
  7. arp欺骗攻击——获取内网中用户浏览的图片信息
  8. itemcf的hadoop实现优化(Python)
  9. wiki维基百科上logo中字母的意思是什么?
  10. Python编程基础:第十三节 循环控制语句Loop Control Statements