需求

词频统计,即给一个文件,统计文件中每个单词出现的次数,分隔符是\t。这个文件内容如下:

hello    world    welcome
hello    welcome

统计结果直接打印在控制台。生产环境下一般Sink到目的地。

使用Flink + java实现需求

环境

JDK:1.8

Maven:3.6.1(最低Maven 3.0.4

创建项目

mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.8.1 -DarchetypeCatalog=local

groupId: com.vincent artifactId: springboot-flink-train version:1.0 这样就创建了一个项目,使用Idea导入这个项目,项目结构如下:

里面有两个自动为我们准备好的java类。

开发步骤

第一步:创建批处理上下文环境

// set up the batch execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

第二步:读取数据

env.readTextFile(textPath);

第三步:transform operations,例如 filter()  flatMap()  join()  coGroup(),这是开发的核心所在,一般就是业务逻辑

第四步:execute program

具体操作

第一步:读取数据

hello    welcome

第二步:每一行的数据按照指定的分隔符拆分

hello
welcome

第三步:为每一个单词赋上次数为1

(hello,1)
(welcome,1)

第四步:合并操作

代码实现

/*** 使用Java API来开发Flink的批处理应用程序*/
public class BatchWCJavaApp {public static void main(String[] args) throws Exception {String input = "E:/test/input/test.txt";// step1: 获取运行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// step2: 读取数据DataSource<String> text = env.readTextFile(input);// step3: transform// FlatMapFunction<String, Tuple2<String, Integer>表示进来一个String, 转换成一个<String, Integer>类型text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {/**** @param value 就是一行一行的字符串* @param out 转换成(单词,次数)* @throws Exception*/@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] tokens = value.toLowerCase().split("\t");for(String token: tokens) {if(token.length() > 0) {out.collect(new Tuple2<String, Integer>(token, 1));}}}}).groupBy(0).sum(1).print();}
}

运行结果

(world,1)
(hello,2)
(welcome,2)

使用Flink + scala实现需求

环境

JDK:1.8

Maven:3.6.1(最低Maven 3.0.4

创建项目,跟使用java方式是一样的

mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-scala -DarchetypeVersion=1.8.1 -DarchetypeCatalog=local

groupId: com.vincent artifactId: springboot-flink-train-scala version:1.0 这样就创建了一个项目,使用Idea导入这个项目:

接下来的开发步骤与使用java实现的开发步骤是一样的:这里给出

代码实现

import org.apache.flink.api.scala.ExecutionEnvironment/*** 使用Scala开发Flink的批处理应用程序*/
object BatchWCScalaApp {def main(args: Array[String]): Unit = {val input = "E:/test/input/test.txt"val env = ExecutionEnvironment.getExecutionEnvironmentval text = env.readTextFile(input)// 引入隐式转换import org.apache.flink.api.scala._text.flatMap(_.toLowerCase.split("\t")).filter(_.nonEmpty).map((_, 1)).groupBy(0).sum(1).print()}
}

Java与Scala实现方式对比

算子与简洁性

也就是transform部分虽然原理是一样的,但是实现的方式不一样,scala更加简洁

Apache Flink 零基础入门(四)Flink开发批处理应用程序相关推荐

  1. Apache Flink 零基础入门【转】

    Apache Flink 零基础入门(一):基础概念解析 Apache Flink 零基础入门(二):DataStream API 编程 转载于:https://www.cnblogs.com/dav ...

  2. 【Web前端开发】《零基础入门学习Web开发》(HTML5CSS3)(小甲鱼)

    1 P1:凉凉好像挺厉害的奥?      听完了! 2 P2:HTML是用来描述网页的一种语言 官方:超文本标记语言   Hyper Text Markup Language 使用标签来描述网页    ...

  3. 自学也能学得会的《零基础入门学习Web开发》(HTML5 CSS3)

    1 Web开发是什么 很多读者可能还不明白,Web开发到底是什么. 其实,我们所说的Web开发通常相当于前端开发与后端开发的组合. 前端开发主要通过HTML.CSS.JavaScript. AJAX. ...

  4. 【环境篇】ESP-IDF零基础入门 2 —— 搭建开发环境2

    系列文章目录 [文章导航]基于 ESP-IDF 框架的 ESP32 零基础入门系列教程 文章目录 系列文章目录 前言 1. 运行 ESP-IDF 工具安装器 2. 验证 ESP-IDF 3. 安装VS ...

  5. Apache Flink 零基础入门(十八)Flink Table APISQL

    什么是Flink关系型API? 虽然Flink已经支持了DataSet和DataStream API,但是有没有一种更好的方式去编程,而不用关心具体的API实现?不需要去了解Java和Scala的具体 ...

  6. Apache Flink 零基础入门(三)编写最简单的helloWorld

    实验环境 JDK 1.8 IDE Intellij idea Flink 1.8.1 实验内容 创建一个Flink简单Demo,可以从流数据中统计单词个数. 实验步骤 首先创建一个maven项目,其中 ...

  7. 浅谈三个星期零基础入门学习Thinkphp5开发restful-api接口的心得和总结

    一丢丢心得体会: 首先不得不说一下,学习一门知识,真的就像建一栋高楼一样,地基必须的稳固,否则你辛辛苦苦建的楼可能随时会垮掉,这一点在我学习thinkphp5的路上深有体会,同时了自此我也爱上了写博客 ...

  8. C# 在线培训之零基础入门 01:开篇及C#程序、解决方案的结构

    一. 本课程所面向的对象 从今天起,我们开始<零基础c#入门>学习.本课程是一门收费课程,请参见TMJ .NET在线培训. <零基础c#入门>是给那些非常想成为程序员,但是基础 ...

  9. Apache Flink 零基础入门(五)Flink开发实时处理应用程序

    使用Flink + java实现需求 环境 JDK:1.8 Maven:3.6.1(最低Maven 3.0.4) 使用上一节中的springboot-flink-train项目 开发步骤 第一步:创建 ...

最新文章

  1. TCL withSNPS info existscreate_cellcreate_netconnect_net
  2. 2015年百度二面试题
  3. L1,L2正则化分析
  4. PyQt4编程之简短地做出多个选择框
  5. PPT 下载 | 神策数据张涛:企业服务客户全生命周期运营三步曲客情诊断 解决方案库...
  6. Career Service, what skills do you need for career domain?
  7. Session在类库中的使用
  8. mysql数据库优化课程---13、mysql基础操作
  9. found.000文件夹的问题
  10. 服务器节点信息管理,华为云管理节点服务器
  11. LeetCode 1005. K 次取反后最大化的数组和
  12. .第一天.net 学习理论
  13. 树莓派初始化安装与配置
  14. Atitit.dart语言的特性  编译时js语言大总结
  15. Android 4G 模块添加 TV平台Mstar HISI
  16. SEO实战密码:60天网站流量提高20倍(第2版)
  17. outlook 您的组织策略阻止我们为您完成此操作 解决办法
  18. JQuery解析二维码
  19. 红帽子linux9百度云,红帽 Red Hat Linux相关产品iso镜像下载【百度云】
  20. C#对.CSV格式的文件--逗号分隔值文件 的读写操作及上传ftp服务器操作方法总结

热门文章

  1. Java与C#事件处理详细对比
  2. 织梦本地调试运行PHP不显示图片,织梦dedecms不能下载远程图片实现图片本地化解决方法...
  3. Python实现顺序表
  4. php安装openssl 扩展
  5. mysql长连接与短连接
  6. 举例详解PHP归并排序的实现
  7. PHP的函数file_get_contents() 把整个文件读入一个字符串中
  8. Mysql的垂直分表-新建
  9. mysql 中 add2_计算器中的F,4,2,0,ADD2怎么调,MU键有什么用??急急急
  10. CRMEB Dockerfile文件