Sink

Flink没有类似spark中foreach方法 让用户进行迭代操作

虽有对外的输出操作 都要利用Sink完成 

最后通过类似如下方式完成整个任务最终输出操作
stream.addSink(new MySink(xxxx))
官方提供了一部分框架的Sink 除此之外 需要用户自定义实现sink


Kafka


  • 既然从kafka sensor主题中消费消息 所以需要有一个往该队列中发送消息的生产者
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic sensor
  • 往sinkTest主题中生产消息 所以需要有一个监听该主题的消费者
./bin/kafka-console-consumer.sh --bootstrap-sever localhost:9092 --topic sinkTest

具体演示过程在这里面有详细说明Flink原理简介和使用

Redis Sink

  • 源码
https://gitee.com/pingfanrenbiji/Flink-UserBehaviorAnalysis/blob/master/FlinkTutorial/src/main/scala/com/xdl/apitest/sinktest/RedisSinkTest.scala
  • 源码分析

  • 查看redis

  • set和hset比较
set :

1、普通的key-value方式存储数据 2、可以设置过期时间3、时间复杂度为O(1)4、每执行一个set 在redis中就会多一个key

hset:

1、以hash散列表的形式存储2、超时时间只能设置在大key上3、单个filed则不能设置超时时间4、时间复杂度是O(N) N是单个hash上filed的个数5、hash上不适合存储大量的filed 多了比较消耗cpu6、但以散列表存储比较节省内存

使用场景总结:

1、在实际的使用过程中 使用set应该保存单个大文本非结构化数据2、hset则存储结构化数据 一个hash存储一条数据 一个filed存储一条数据中的一个属性 value则是属性对应的值

举例说明:

用户表

id,name,age,sex

1、1,张三,16,1

2、2,李四,22,1

3、3,王五,28,0

4、4,赵六,32,1

如果要整表缓存到 redis 中则使用 hash ,一条数据一个hash 一个hash 里则包含4个filed。

hset user_1 id 1 name 张三 age 16 sex 1

hset user_2 id 2 name 李四 age 16 sex 1

如果用户的某个属性值改变,还可以单个修改

把张三的年龄改为30 则可以使用命令 hset user_1 age 30 

set存储举例:

1、缓存应用整个首页 html 

2、某个商品的详情介绍

  a、一般来说商品的详情介绍是makdown语法的富文本信息  b、html 格式的富文本信息

3、应用中的 某个热点数据

ElasticSearch Sink

安装elasticSearch

  • 下载安装包
https://www.elastic.co/cn/downloads/elasticsearch

elasticsearch-7.10.1-darwin-x86_64.tar.gz

如果想选择其他版本


  • 启动

  • 确认是否启动成功

引入依赖

<dependency>      <groupId>org.apache.flink</groupId>      <artifactId>flink-connector-elasticsearch7_2.11</artifactId>      <version>1.11.0</version></dependency>

代码分析

  • 源码
https://gitee.com/pingfanrenbiji/Flink-UserBehaviorAnalysis/blob/master/FlinkTutorial/src/main/scala/com/xdl/apitest/sinktest/EsSinkTest.scala
  • 分析

  • 程序执行结果

确认是否保存进入es

  • 查询所有索引
curl -XGET 'http://127.0.0.1:9200/_cat/indices?v'

  • 查询指定索引内容
curl -XGET 'http://127.0.0.1:9200/sensor/_search?pretty=true'

Flink中的Windows

Windows概述

一般真实的流都是无解的 怎么处理无解的数据?

可以把无限的流进行切分 得到有限的数据流进行处理 也就是得到了有界流

窗口是将无限流切割为有限流的一种方式

它会将流数据分发到有限大小的通(bucket)中进行分析

Windows类型

  • 时间窗口(Time window)

    • 滚动事件窗口(Tumbling Windows)

      • 将数据依据固定的窗口长度对数据进行切分

      • 时间堆积 窗口长度固定 没有重叠

      滚动窗口分配器将每个元素分配到一个指定窗口的窗口中 滚动窗口有一个固定的大小 并且不会出现重叠
      
      例如指定了5分钟大小的滚动窗口 窗口的创建如何所示

适用场景:

适合做BI统计等(每个时间段的聚合计算)

  • 滑动事件窗口

    • 滑动窗口是固定窗口的更广义的一种形式 滑动窗口由固定的窗口长度和滑动间隔组成

    • 窗口的长度固定 可以由重叠

    滑动窗口分配器将元素分配到固定元素的窗口中 与滚动窗口类似 
    
    窗口的大小由窗口大小参数配置
    
    另一个滑动窗口参数控制滑动窗口开始的频率
    
    因此滑动窗口如果滑动参数小于窗口大小的话 窗口是可以重叠的
    
    在这种情况下会被分配到多个窗口中
    
    例如:
    
    有10分钟的窗口和5分钟的滑动 那么窗口中每5分钟的窗口里包含着上个10分钟产生的数据

适用场景:

对最近一段时间内的统计(求某接口最近 5min的失败率来决定是否要报警)

  • 会话窗口

    • 由一系列事件组合一个指定时间长度的timeout间隙组成 类似于web应用的session 也就是一段时间内没有接收到新数据就会生成新窗口

    • 特点:时间无对齐

    session窗口分配器通过session活动来对元素进行分组 
    
    session窗口根滚动窗口和滑动窗口相比
    
    不会有重叠和固定的开始时间和结束时间的情况 
    
    相反 当它在一个固定的时间周期内不再收到元素 即非活动间隔产生
    
    那么这个窗口就会关闭 
    
    一个session窗口通过一个session间隔来配置 
    
    这个session间隔定义了非活跃周期的长度 当这个非活跃周期产生 那么当前session将关闭并且后续的元素将会被分配到新的session窗口中去 

  • 计数窗口

    • 滚动计数窗口

    • 滑动计数窗口

Windows API

  • 窗口分配器 window()
可以用 .window()定义一个窗口 基于这个window去做一些聚合或者其他处理操作

注意:这个方法必须在keyBy之后才能用

Flink提供了更简单的方法 .timeWindow(时间窗口)和.countWindow(计数窗口)

代码

val minTempPerWindow = dataStream    .map(r => (r.id, r.temperature))    .keyBy(_._1)    .timeWindow(Time.seconds(15))    .reduce((r1, r2) => (r1._1, r1._2.min(r2._2)))
  • 窗口分配器(window assigner)

    • window方法接受的输入参数是一个 WindowAssigner

    • WindowAssigner负责将每条输入的数据分发到正确的window中

    • Flink提供了通用的 WindowAssigner

      • 滚动窗口(tumbling window)
      • 滑动窗口(sliding window)
      • 会话窗口(session window)
      • 全局窗口(global window)
  • 创建不同类型的窗口

- 滚动时间窗口(tumbling time window)

  .timeWindow(Time.seconds(15))

- 滑动事件窗口(sliding time window)

  .timeWindow(Time.seconds(15), Time.seconds(5))

- 会话窗口(session window)

  .window(EventTimeSessionWindows.withGap(Time.minutes(10)))

- 滚动计数窗口(tumbling count Window)

  .countWindow(5)

- 滑动计数窗口(sliding count window)

  .countWindow(10,2)

窗口函数

window function 定义了要对窗口中收集的数据做的计算操作

  • 增量聚合函数(incremental aggregation functions)
每条数据到来就进行计算 保持一个简单的状态
  • 全窗口函数(full window functions)
先把窗口所有数据收集起来 等到计算的时候会遍历所有数据

其他可选API

  • .trigger()一触即发
定义了window什么时候关闭 触发计算并输出结果
  • .evitor 移除器
定义了移除某些数据的逻辑
  • .allowLateness()
允许处理迟到的数据
  • .sideOutputLateData()
将迟到的数据放入侧输出流
  • .sideOutputLateData()
将迟到的数据放入侧输出流

Flink window 用法介绍相关推荐

  1. 1.16.Flink Window和Time详解、TimeWindow的应用、Window聚合分类之全量聚合、全量聚合状态变化过程-求最大值、Time介绍、EventTime和Watermarks等

    1.16.Flink Window和Time详解 1.16.1.Window(窗口) 1.16.2.Window的类型 1.16.3.Window类型汇总 1.16.4.TimeWindow的应用 1 ...

  2. Flink Window基本概念与实现原理

    Window意为窗口.在流处理系统中数据源源不断流入到系统,我们可以逐条处理流入的数据,也可以按一定规则一次处理流中的多条数据.当处理数据时程序需要知道什么时候开始处理.处理哪些数据.窗口提供了这样一 ...

  3. MATLAB基本用法介绍

    MATLAB基本用法介绍 最近由于学习算法又开始使用MATLAB了,因此记录一些MATLAB常用的用法,便于日后的复习 %% I. 清空环境变量及命令 clear all % 清除Workspace中 ...

  4. Flink DataStream API 介绍

    Flink DataStream API 介绍 StreamExecutionEnvironment #mermaid-svg-JKeWa22W2vWA4zBS {font-family:" ...

  5. javascript中in用法介绍

    <!DOCTYPE html> <html> <head> <meta charset="UTF-8"> <title> ...

  6. Flink专题四:Flink DataStream 窗口介绍及使用

    由于工作需要最近学习flink 现记录下Flink介绍和实际使用过程 这是flink系列的第四篇文章 Flink DataStream 窗口介绍及使用 窗口介绍 时间窗口 翻滚窗口(数据以一个时间断为 ...

  7. Flink Window机制详解

    Flink 认为 Batch 是 Streaming 的一个特例,所以 Flink 底层引擎是一个流式引擎,在上面实现了流处理和批处理.而窗口(window)就是从 Streaming 到 Batch ...

  8. Flink框架的介绍和实现原理(一)

    一.Flink是什么 Apache Flik 是一个面向分布式数据流处理和批量数据处理的开源计算平台,提供支持流处理和批处理两种类型应用的功能. 二.Flink特点 现在的开源方案,会把流处理和批处理 ...

  9. awk 和sed的用法介绍

    awk 和sed的用法介绍 一.awk的介绍 1. 作用及用法要求 2. 使用方法 (1)使用的命令: (2)举例说明: 二.sed的介绍 1. 用法介绍及常用命令 2. 具体使用 一.awk的介绍 ...

最新文章

  1. linux实验试题 cp,51CTO博客-专业IT技术博客创作平台-技术成就梦想
  2. ARM64的启动过程之(二):创建启动阶段的页表
  3. php面向对象及tp框架初识,thinkphp面向对象的问题
  4. scrapy 安装流程和启动
  5. jwt同一会话_在会话中使用JWT
  6. sort()函数、C++
  7. matlab 等高线_MATLAB作图实例:39:更改等高线图的填充颜色
  8. java - 条件嵌套
  9. vim可以用来编译python吗_Linux下编译Vim以支持python2.x
  10. c#定时备份mysql数据库_C# 定时备份数据库工具源码下载
  11. tornado Python mysql_python tornado mysql 内容管理后台部署
  12. 【转】VC6下安装与配置OpenCV1.0
  13. GIS专业/GIS方向需要考那些证书
  14. 能领拼多多优惠券的app
  15. 2020-03-31
  16. 数据库——T-SQL方式创建数据库
  17. 计算机课对小学生的作用,小学信息技术课的最重要性
  18. android里面的USB功能-----Accessory模式
  19. 推荐几个下载英文原版电子书的网站-PDF
  20. 【答读者问43】再谈不复权、前复权、后复权、定点复权在回测与实盘中的应用

热门文章

  1. LabVIEW自带函数实现SQL Server操作(上)
  2. 微信小程序 全局共享数据
  3. 20181016-10 每周例行报告
  4. Linux下OneinStack一键安装JAVA+PHP+Tomcat+Nginx+MySQL网站环
  5. 第四次作业——测试作业
  6. struts2、hibernate工作原理和流程
  7. SQL Server 自动化运维系列
  8. MySQL数据库的基本操作命令
  9. 我的Linux折腾史
  10. mac下使用Charles抓chrome包