flink流处理示例开发

  • 一、版本和开发工具
  • 二、Flink 程序开发步骤
  • 三、开发示例
  • 四、运行

flink处理分流处理(streaming)和批处理(batch)。本示例来展示flink流处理开发的一般步骤和方法。

一、版本和开发工具

flink版本:1.13.3
开发工具:Intellij IDEA
Java版本:1.8.0_261

二、Flink 程序开发步骤

1、获得一个执行环境
2、加载或者创建初始化数据
3、指定操作数据的Transaction算子
4、指定计算好的数据的存放位置
5、调用execute()触发程序执行

三、开发示例

1、创建项目,填写项目名称,存放路径、包名、版本号等

2、添加依赖,在本机注释掉scope,当发布到flink集群时,需要添加scope,因为集群已经存在这些依赖

<properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.13.3</flink.version><scala.version>2.12</scala.version>
</properties>
<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><!--<scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.version}</artifactId><version>${flink.version}</version><!--<scope>provided</scope>--></dependency>
</dependencies>

3、开发需求:通过Socket模拟产生单词,实现每隔1s对最近2s内的数据进行汇总计算
4、java程序结构如下,代码下载地址:Flink开发示例源代码

5、核心代码如下

public static void main(String[] args) throws Exception {// 获取socket端口号int port;try {ParameterTool parameterTool = ParameterTool.fromArgs(args);port = parameterTool.getInt("port");} catch (Exception e) {System.out.println("没有设置端口号。使用默认端口号9109");port = 9109;}// 获取Flink 运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 获取数据String hostname = "node01";String delimiter = "\n";DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);// timeWindow 已过时,建议使用windowDataStream<WordCount> wordCounts = text.flatMap(new FlatMapFunction<String, WordCount>() {@Overridepublic void flatMap(String s, Collector<WordCount> collector) throws Exception {String[] splits = s.split("\\s");for (String word : splits) {collector.collect(new WordCount(word, 1L));}}}).keyBy(new StreamWordCoutKeySelector()).window(SlidingProcessingTimeWindows.of(Time.seconds(2),Time.seconds(1))).sum("count");// 把数据打印到控制台并且设置并行度wordCounts.print().setParallelism(1);// 触发执行程序env.execute("Socket window count");}

四、运行

1、打开终端,输入命令:nc -l 9109

2、运行SocketWindowWorldCount,会出现异常,直接退出。如下图所示,查看日志有两处错误:
1)是由于依赖中缺少slf4j的依赖,添加依赖slf4j-simple
2)缺少flink执行环境的依赖,添加flink客户端依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.version}</artifactId><version>${flink.version}</version><!--<scope>provided</scope>-->
</dependency>
<dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.32</version>
</dependency>

3、再次运行,在终端输入单词,可以看到统计的单词个数

flink流处理示例开发相关推荐

  1. Flink流计算WordCount代码示例

    代码 package com.zxl.flinkimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironment/*** f ...

  2. Flink 流式计算在节省资源方面的简单分析

    本文由小米的王加胜同学分享,文章介绍了 Apache Flink 在小米的发展,从 Spark Streaming 迁移到 Flink ,在调度计算与调度数据.Mini batch 与 streami ...

  3. Flink流式计算从入门到实战 三

    文章目录 四.Flink DataStream API 1.Flink程序的基础运行模型 2.Environment 运行环境 3.Source 3.1 基于File的数据源 3.2 基于Socket ...

  4. Flink流式计算从入门到实战 二

    文章目录 三.Flink运行架构 1.JobManager和TaskManager 2.并发度与Slots 3.开发环境搭建 4.提交到集群执行 5.并行度分析 6.Flink整体运行流程 Flink ...

  5. Flink 流批一体一站式平台 StreamX 来袭

    背景 实时即未来,在实时处理流域 Apache Spark 和 Apache Flink 是一个伟大的进步,尤其是Apache Flink被普遍认为是下一代大数据流计算引擎, 我们在使用 Flink  ...

  6. Flink流处理核心编程

    Flink流处理核心编程 5.1 Environment //批执行环境 ExecutionEnvironment benv = ExecutionEnvironment.getExecutionEn ...

  7. Flink流处理Demo(含源码)

    Flink流处理的Source 基于集合 基于文件 基于Socket 自定义数据源 使用Kafka作为数据源 使用MySql作为数据源 Flink流处理的Transformation keyby co ...

  8. flink 流式处理中如何集成mybatis框架

    flink 中自身虽然实现了大量的connectors,如下图所示,也实现了jdbc的connector,可以通过jdbc 去操作数据库,但是flink-jdbc包中对数据库的操作是以ROW来操作并且 ...

  9. Flink流式计算从入门到实战 一

    文章目录 一.理解Flink与流计算 1.初识Flink 2.Flink的适用场景 3.流式计算梳理 二.Flink安装部署 1.Flink的部署方式 2.获取Flink 3.实验环境与前置软件 4. ...

  10. Flink流式计算从入门到实战 四

    文章目录 六.Flink Table API 和Flink SQL 1.Table API和SQL是什么? 2.如何使用Table API 3.基础编程框架 3.1 创建TableEnvironmen ...

最新文章

  1. 图解Oracle 12c 手动建库
  2. 1491. [NOI2007]社交网络【最短路计数】
  3. 空间统计分析_(案例)空间分析6.4江西省地级市社会经济统计分析
  4. java addfirst_java – ArrayDeque类的addFirst方法
  5. 挂“洋头”卖奶粉,澳优还要欺骗好久
  6. python内存管理机制错误_Python内存管理机制和垃圾回收机制的简单理解
  7. Python数模笔记-StatsModels 统计回归(3)模型数据的准备
  8. mysql basic_MySQL Basic Learning (二)
  9. mysql安全实验测验答案_实验四∶数据库安全性实验报告.doc
  10. Bootstrap Table API 中文版 说明文档
  11. eclipse 导入maven项目_一文轻松学会:从GitHub下载项目到eclispe
  12. 再也不见,Itchat!
  13. vwf活性_血管性血友病因子(VWF)应该针对血型设置参考范围吗?
  14. C++ 已知两个时间(年月日)求日期差
  15. android 解决微信登录白屏样式问题
  16. 如何避免装修风格跑偏
  17. 【练习赛】2022年高教杯数学建模C题(第一题的第二小问)
  18. JAVA飞机大战网络联机对战
  19. python中rect函数_使用类和函数的面向对象Python-rectangle
  20. Linux下Oracle 11g安装(2)—— 系统准备篇

热门文章

  1. 仿蓝色理想的“运行代码”功能
  2. 中文编程发展与兴起的重要意义
  3. 2022/11/6周报
  4. C++(数据结构与算法):56---无权图与有权图的编码实现
  5. linu重置root密码(CentOS7)
  6. 仿绚丽彩虹播放器程序源码
  7. 上古卷轴5传奇版LE与重制版SE的区别(LE Vs SE)
  8. oracle用户授权只读,只读权限oracle用户
  9. JAVA游戏土行孙_《封神榜》土行孙,被誉为国内最知名矮星,现惨淡靠低保度日...
  10. 基于SpringBoot的毕业设计题目