使用Flink + java实现需求

环境

JDK:1.8

Maven:3.6.1(最低Maven 3.0.4

使用上一节中的springboot-flink-train项目

开发步骤

第一步:创建流处理上下文环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

第二步:读取数据,使用socket流方式读取数据

DataStreamSource<String> text = env.socketTextStream("192.168.152.45", 9999);

第三步:transform

        text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] tokens = value.toLowerCase().split(",");for(String token: tokens) {if(token.length() > 0) {out.collect(new Tuple2<String, Integer>(token, 1));}}}}).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print();

这里我们使用逗号分隔,然后跟批处理不同的是,这里使用keyBy(0),而不是groupBy(0)。timewindow表示每隔多久执行一次。

第四步:执行

env.execute("StreamingWCJavaApp");

整体代码如下:

/*** 使用Java API来开发Flink的实时处理应用程序* wc统计的数据源自socket*/
public class StreamingWCJava02App {public static void main(String[] args) throws Exception {// 获取参数int port;try{ParameterTool tool = ParameterTool.fromArgs(args);port = tool.getInt("port");} catch (Exception e) {System.out.println("端口未设置, 使用默认端口9999");port = 9999;}// step1: 获取流处理上下文环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// step2: 读取数据DataStreamSource<String> text = env.socketTextStream("192.168.152.45", port);// step3: transformtext.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] tokens = value.toLowerCase().split(",");for(String token: tokens) {if(token.length() > 0) {out.collect(new Tuple2<String, Integer>(token, 1));}}}}).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print();env.execute("StreamingWCJavaApp");}}

运行

首先在192.168.152.45上运行命令

nc -l 9999

然后在运行main方法。在192.168.152.45的nc上输入

abc,def,abc,ddd

在idea控制台输出如下:

4> (abc,2)
1> (def,1)
4> (ddd,1)

这个前面的"4>"表示并行度。我们可以设置setParallelism(1)来忽略这个问题。如下所示:

        text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] tokens = value.toLowerCase().split(",");for(String token: tokens) {if(token.length() > 0) {out.collect(new Tuple2<String, Integer>(token, 1));}}}}).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print().setParallelism(1);

这样控制台的打印结果如下:

(abc,2)
(ddd,1)
(def,1)

这样一个简单的demo就成功了!

重构代码

上面的代码中localhost与port需要用参数传递进来。

代码如下:

        // 获取参数int port;try{ParameterTool tool = ParameterTool.fromArgs(args);port = tool.getInt("port");} catch (Exception e) {System.out.println("端口未设置, 使用默认端口9999");port = 9999;}

使用Flink提供的ParameterTool来接收参数。

我们在运行时就可以指定参数列表了,其中的key必须以“-”或者“--”开头。

在运行时,配置参数:

这样运行就可以从外界传递参数了

使用Flink + Scala实现需求

接下来使用Scala方式实现,在项目springboot-flink-train-scala中新建StreamingWCScalaApp,内容如下:

/*** 使用Scala开发Flink的实时处理应用程序*/
object StreamingWCScalaApp {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment// 引入隐式转换import org.apache.flink.api.scala._val text = env.socketTextStream("192.168.152.45", 9999)text.flatMap(_.split(",")).map((_,1)).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print().setParallelism(1)env.execute("StreamingWCScalaApp");}
}

这种方式比java实现更加简洁。

Apache Flink 零基础入门(五)Flink开发实时处理应用程序相关推荐

  1. Apache Flink 零基础入门【转】

    Apache Flink 零基础入门(一):基础概念解析 Apache Flink 零基础入门(二):DataStream API 编程 转载于:https://www.cnblogs.com/dav ...

  2. 自学也能学得会的《零基础入门学习Web开发》(HTML5 CSS3)

    1 Web开发是什么 很多读者可能还不明白,Web开发到底是什么. 其实,我们所说的Web开发通常相当于前端开发与后端开发的组合. 前端开发主要通过HTML.CSS.JavaScript. AJAX. ...

  3. 罗马音平假名中文可复制_日语零基础入门五十音,日语零基础五十音图表

    日语零基础入门五十音,日语入门的最基本要求就是记住五十音图,但是这个记住不仅是你能背下来或是默写下来.而是你需对号入座! 下面是一张五十音图表. 即每个假名单独拿出来你要立马反应出来怎么读.其重要性甚 ...

  4. 【Web前端开发】《零基础入门学习Web开发》(HTML5CSS3)(小甲鱼)

    1 P1:凉凉好像挺厉害的奥?      听完了! 2 P2:HTML是用来描述网页的一种语言 官方:超文本标记语言   Hyper Text Markup Language 使用标签来描述网页    ...

  5. 【环境篇】ESP-IDF零基础入门 2 —— 搭建开发环境2

    系列文章目录 [文章导航]基于 ESP-IDF 框架的 ESP32 零基础入门系列教程 文章目录 系列文章目录 前言 1. 运行 ESP-IDF 工具安装器 2. 验证 ESP-IDF 3. 安装VS ...

  6. Apache Flink 零基础入门(四)Flink开发批处理应用程序

    需求 词频统计,即给一个文件,统计文件中每个单词出现的次数,分隔符是\t.这个文件内容如下: hello world welcome hello welcome 统计结果直接打印在控制台.生产环境下一 ...

  7. Apache Flink 零基础入门(十八)Flink Table APISQL

    什么是Flink关系型API? 虽然Flink已经支持了DataSet和DataStream API,但是有没有一种更好的方式去编程,而不用关心具体的API实现?不需要去了解Java和Scala的具体 ...

  8. Apache Flink 零基础入门(三)编写最简单的helloWorld

    实验环境 JDK 1.8 IDE Intellij idea Flink 1.8.1 实验内容 创建一个Flink简单Demo,可以从流数据中统计单词个数. 实验步骤 首先创建一个maven项目,其中 ...

  9. 浅谈三个星期零基础入门学习Thinkphp5开发restful-api接口的心得和总结

    一丢丢心得体会: 首先不得不说一下,学习一门知识,真的就像建一栋高楼一样,地基必须的稳固,否则你辛辛苦苦建的楼可能随时会垮掉,这一点在我学习thinkphp5的路上深有体会,同时了自此我也爱上了写博客 ...

最新文章

  1. dataframe 转json
  2. 微软私有云分享(R2)8-PowerShell下载文件
  3. cas sso单点登录 登录过程和登出过程原理说明
  4. android与mysql的交互,与Android中的外部SQLite数据库进行交互.
  5. centos 查找nginx_centos7肿么查看已经安装nginx
  6. mysql 之jdbc idea版
  7. JavaScript 图片上传预览效果
  8. SQLSERVER字符串截取------STUFF
  9. python对数组的操作_Python对数组的基本操作
  10. 华为交换机配置syslog发送_华为/H3C Syslog配置
  11. C# 使用 WebBrowser 实现 HTML 转图片功能
  12. Leetcode-最长回文子串(包含动态规划以及Manacher算法)
  13. sqlmap指定cookie_sqlmap处理cookie数据
  14. 用人工智能方法计算水果难题------遗传算法篇
  15. mysql field in set_MySQL中的find_in_set()函数使用技巧心得与应用场景总结
  16. Self-supervised Heterogeneous Graph Neural Network with Co-contrastive Learning
  17. 【ACWing】1123. 铲雪车
  18. [bzoj4134]ljw和lzr的hack比赛
  19. 数组中a与a[0]的区别
  20. 箭杆织布机计算机控制系统,高速喷水织布机单片机控制系统设计 毕业设计论文.doc...

热门文章

  1. sql server 2008学习12 事务和锁
  2. oracle 取当天日期减一天 应该如何写
  3. ASP.NET页面级别的事务
  4. 为什么说神经网络可以逼近任意函数?
  5. PHP用单例模式实现一个数据库类
  6. 京东商品详情页碎碎念
  7. PHP的composer dump-autoload
  8. 操作系统的功能和特征
  9. 阿里linux安装mysql_阿里云Linux Ubuntu系统安装mysql完整过程
  10. php 单元测试 静态类,可选的PHP类型提示/检查单元测试或静态分析?