官方介绍 

Flink 中的 API

Flink 为流式 / 批式处理应用程序的开发提供了不同级别的抽象。

  • Flink API 最底层的抽象为有状态实时流处理。其抽象实现是 Process Function,并且 Process Function 被 Flink 框架集成到了 DataStream API 中来为我们使用。它允许用户在应用程序中自由地处理来自单流或多流的事件(数据),并提供具有全局一致性和容错保障的状态。此外,用户可以在此层抽象中注册事件时间(event time)和处理时间(processing time)回调方法,从而允许程序可以实现复杂计算。
  • Flink API 第二层抽象是 Core APIs。实际上,许多应用程序不需要使用到上述最底层抽象的 API,而是可以使用 Core APIs 进行编程:其中包含 DataStream API(应用于有界 / 无界数据流场景)和 DataSet API(应用于有界数据集场景)两部分。Core APIs 提供的流式 API(Fluent API)为数据处理提供了通用的模块组件,例如各种形式的用户自定义转换(transformations)、联接(joins)、聚合(aggregations)、窗口(windows)和状态(state)操作等。此层 API 中处理的数据类型在每种编程语言中都有其对应的类。
    Process Function 这类底层抽象和 DataStream API 的相互集成使得用户可以选择使用更底层的抽象 API 来实现自己的需求。DataSet API 还额外提供了一些原语,比如循环 / 迭代(loop/iteration)操作。
  • Flink API 第三层抽象是 Table API。Table API 是以表(Table)为中心的声明式编程(DSL)API,例如在流式数据场景下,它可以表示一张正在动态改变的表。Table API 遵循(扩展)关系模型:即表拥有 schema(类似于关系型数据库中的 schema),并且 Table API 也提供了类似于关系模型中的操作,比如 select、project、join、group-by 和 aggregate 等。Table API 程序是以声明的方式定义应执行的逻辑操作,而不是确切地指定程序应该执行的代码。尽管 Table API 使用起来很简洁并且可以由各种类型的用户自定义函数扩展功能,但还是比 Core API 的表达能力差。此外,Table API 程序在执行之前还会使用优化器中的优化规则对用户编写的表达式进行优化。
    表和 DataStream/DataSet 可以进行无缝切换,Flink 允许用户在编写应用程序时将 Table API 与 DataStream/DataSet API 混合使用。
  • Flink API 最顶层抽象是 SQL。这层抽象在语义和程序表达式上都类似于 Table API,但是其程序实现都是 SQL 查询表达式。SQL 抽象与 Table API 抽象之间的关联是非常紧密的,并且 SQL 查询语句可以在 Table API 中定义的表上执行。

 DataStream/DateSet API

Flink 中的 DataStream 和 DataSet 程序是常规程序,可对数据流实施转换(例如,过滤,更新状态,定义窗口,聚合)。最初从各种来源(例如,消息队列,套接字流,文件)创建数据流。结果通过接收器返回,接收器可以例如将数据写入文件或标准输出(例如命令行终端)。Flink 程序可在各种上下文中运行,独立运行或嵌入其他程序中。执行可以在本地 JVM 或许多计算机的群集中进行。

预定义的 Source 和 Sink

一些比较基本的 Source 和 Sink 已经内置在 Flink 里。 预定义 data sources 支持从文件、目录、socket,以及 collections 和 iterators 中读取数据。 预定义 data sinks 支持把数据写入文件、标准输出(stdout)、标准错误输出(stderr)和 socket。

官方文档

https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/connectors/

 DataStream/DateSet API 开发

从本篇开始,增加 DataStream/DateSet API 演示内容,在原有的工程基础上,扩展一个 connectors 模块;此模块会演示以下几个组件简单使用;

  • elasticsearch
  • file(text, csv)
  • kafka
  • jdbc (mysql)
  • rabbitmq
  • redis

新增 connectors 模块

在当前工程中,创建名称为 connectors 的 maven 工程模块

pom.xml

   <artifactId>connectors</artifactId><dependencies><!-- Flink jdbc依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-jdbc_2.11</artifactId><version>1.10.1</version></dependency><!-- mysql驱动包 --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.47</version></dependency><!-- kafka依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>${flink.version}</version></dependency><!-- redis依赖 --><dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version></dependency><!-- rabbitMq依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-rabbitmq_2.11</artifactId><version>${flink.version}</version></dependency><!-- elasticsearch6依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch6_2.11</artifactId><version>${flink.version}</version></dependency></dependencies>

刷新工程 maven,下载相关功能依赖组件包;

创建用户表(演示使用)

-- 数所据库 flink 下创建用户表
CREATE TABLE `t_user` (`id` int(8) NOT NULL AUTO_INCREMENT,`name` varchar(40) DEFAULT NULL,`age` int(3) DEFAULT NULL,`sex` int(2) DEFAULT NULL,`address` varchar(40) DEFAULT NULL,`createTime` timestamp NULL DEFAULT NULL,`createTimeSeries` bigint(20) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

创建实体 Bean(演示使用)

TUser.java

package com.flink.examples;/*** @Description t_user表数据封装类*/
public class TUser {private Integer id;private String name;private Integer age;private Integer sex;private String address;private Long createTimeSeries;public TUser(){}public Integer getId() {return id;}public void setId(Integer id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public Integer getAge() {return age;}public void setAge(Integer age) {this.age = age;}public Integer getSex() {return sex;}public void setSex(Integer sex) {this.sex = sex;}public String getAddress() {return address;}public void setAddress(String address) {this.address = address;}public Long getCreateTimeSeries() {return createTimeSeries;}public void setCreateTimeSeries(Long createTimeSeries) {this.createTimeSeries = createTimeSeries;}@Overridepublic String toString() {return "TUser{" +"id=" + id +", name='" + name + '\'' +", age=" + age +", sex=" + sex +", address='" + address + '\'' +", createTimeSeries=" + createTimeSeries +'}';}
}

TCount.java

package com.flink.examples;/*** @Description 统计表*/
public class TCount {/*** 性别*/private Integer sex;/*** 数量*/private Integer num;public TCount(){}public TCount(Integer sex, Integer num){this.sex = sex;this.num = num;}public Integer getSex() {return sex;}public void setSex(Integer sex) {this.sex = sex;}public Integer getNum() {return num;}public void setNum(Integer num) {this.num = num;}
}

工程模块

后续关于 DataStream/DateSet API 演示示例均在此 connectors 模块下进行基础上开发;

源码下载

Gitee:flink-examples: 基于flink.1.11.1版本的工程示例,此示例包含大部份算子、窗口、中间件连接器、tables&sql的用法,适合新人学习使用;

Flink 系例 之 DataStream Connectors 与 示例模块相关推荐

  1. Flink 系例 之 Connectors 连接 Redis

    通过使用 Flink DataStream Connectors 数据流连接器连接到 Redis 缓存数据库,并提供数据流输入与输出操作: 示例环境 java.version: 1.8.x flink ...

  2. Flink 系例 之 Connectors 连接 Kafka

    通过使用 Flink DataStream Connectors 数据流连接器连接到 ElasticSearch 搜索引擎的文档数据库 Index,并提供数据流输入与输出操作: 示例环境 java.v ...

  3. Flink 系例 之 Connectors 连接 ElasticSearch

    通过使用 Flink DataStream Connectors 数据流连接器连接到 ElasticSearch 搜索引擎的文档数据库 Index,并提供数据流输入与输出操作: 示例环境 java.v ...

  4. Flink 系例 之 Connectors 连接 MySql

    通过使用 Flink DataStream Connectors 数据流连接器连接到 Mysql 数据源,并基于 JDBC 提供数据流输入与输出操作: 示例环境 java.version: 1.8.x ...

  5. Flink 系例 之 CountWindowAll

    countWindowAll 数量窗口 (不分区数量滚动窗口[滑动窗口与滚动窗口的区别,在于滑动窗口会有数据元素重叠可能,而滚动窗口不存在元素重叠]) 示例环境 java.version: 1.8.x ...

  6. Flink 系例 之 Project

    Project算子:从数据流的元数组中,重新排例参数并指定不同的下标位,返回新的数据流 示例环境 java.version: 1.8.x flink.version: 1.11.1 示例数据源 (项目 ...

  7. Flink DataStream Connectors 之 Apache Kafka 连接器

    文章目录 依赖 Kafka Source 使用方法 Topic / Partition 订阅 消息解析 起始消费位点 有界 / 无界模式 其他属性 动态分区检查 事件时间和水印 空闲 消费位点提交 监 ...

  8. Flink教程(07)- Flink批流一体API(Transformation示例)

    文章目录 01 引言 02 Transformation 2.1 基本操作 2.1.1 API 解析 2.1.2 示例代码 2.2 合并 2.2.1 union 2.2.2 connect 2.2.3 ...

  9. flink使用DataStreamUtils将DataStream的数据转成Iterator迭代器的数据(如数组、列表、集合等)

    1.scala代码如下 import org.apache.flink.streaming.experimental.DataStreamUtils import scala.collection.J ...

最新文章

  1. python codecs.open()及文件操作-文本处理 with open
  2. 安卓StepView事件进度条的简单实现
  3. java.io.IOException cannot be resolved
  4. lua去掉字符串中的UTF-8的BOM三个字节
  5. Android 动态设置 layout_centerInParent
  6. 第十周 11.1-11.7
  7. java 嵌套事务_Java事务以及嵌套事务
  8. mysql longblob_Mysql LONGBLOB 类型存储二进制数据 (修改+调试+整理)
  9. matlab重要性采样,Importance Sampling (重要性采样)介绍 | 文艺数学君
  10. ubuntu下安装PCL并测试(含视频安装过程记录)
  11. 点云边界提取方法总结
  12. 第1章 初始JAVA
  13. Nebula Graph - 基于Docker 安装 及 Studio
  14. 此文对你人生会有莫大好处的,建议永久保存
  15. 不同收入水平职工家庭 申请公积金贷款情况
  16. Dynamics CRM2013 在Visual Studio中开启脚本的Xrm.Page智能提示
  17. 如何有效开展小组教学_如何有效开展小组合作学习
  18. 无线网络安全————2、无线路由器配置和选择测试环境
  19. TI DSP芯片SCI模块的波特率自适应
  20. 一个帖子引爆流行?wappblog助阵

热门文章

  1. TensorFlow入门(五)多层 LSTM 通俗易懂版
  2. jmeter学习笔记(三):jmeter参数化
  3. 童装行业调研报告 - 市场现状分析与发展前景预测
  4. [转帖]FreeBSD、OpenBSD、NetBSD的区别
  5. vue渲染时数据对象里面的子对象的属性报错undefined,但页面正常渲染
  6. iis搭建网站教程(iis搭建网站详细步骤)
  7. HARVEST基音检测算法
  8. 门掩黄昏,无计留春住【城南旧事】
  9. 神相的‘敏捷项目管理’图
  10. 智能客服机器人系统的优势及提供哪些服务?