flink入门案例之WordCount,以下测试代码都是在本地执行的

添加依赖

添加maven依赖

<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.11</artifactId><version>1.7.2</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.11</artifactId><version>1.7.2</version></dependency></dependencies>

build信息

<build><plugins><!-- 该插件用于将Scala代码编译成class文件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.4.6</version><executions><execution><!-- 声明绑定到maven的compile阶段 --><goals><goal>testCompile</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins>
</build>

批处理

import org.apache.flink.api.scala.{AggregateDataSet, DataSet, ExecutionEnvironment, createTypeInformation}object WordCount {def main(args: Array[String]): Unit = {//获取执行环境//创建一个执行环境,该环境代表当前在其中执行程序的上下文。// 如果程序是独立调用的,则此方法返回本地执行环境。// 如果从命令行客户端内部调用该程序以将其提交给集群,则此方法将返回该集群的执行环境。val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment//创建通过逐行读取给定文件而产生的字符串数据集。val inputDS: DataSet[String] = env.readTextFile("D:/test/a.txt")val wordCounts: AggregateDataSet[(String, Int)] = inputDS.flatMap(_.split(" ")).map((_, 1)).groupBy(0).sum(1)//将DataSet中的元素打印到调用print()方法的JVM的标准输出流System.out中。// 对于在群集中执行的程序,此方法需要将DataSet的内容收集回客户端,以在客户端打印。//为每个元素编写的字符串由AnyRef.toString方法定义。//与()和()方法类似,此方法立即触发程序执行。wordCounts.print()}
}

流处理

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}object StreamWordCount {def main(args: Array[String]): Unit = {//替换为你自己的IPval host = "192.168.xx.1x1"val port = 9000// 创建流处理环境val env = StreamExecutionEnvironment.getExecutionEnvironment// 接收socket文本流val textDstream: DataStream[String] = env.socketTextStream(host, port)// flatMap和Map需要引用的隐式转换import org.apache.flink.api.scala._val value: DataStream[(String, Int)] = textDstream.flatMap(_.split("\\s")).filter((_.nonEmpty)).map((_, 1)).keyBy(0).sum(1)value.print().setParallelism(1)env.execute("Socket WordCountTest")}
}

之后在linux上用如下命令进行测试

[root@mypc01 ~]# nc -lk 9000

总结

  • 导入包的时候一定要留意导入的是java的包还是scala的包,很多函数是两个包都有的,导入错了让你怀疑人生

flink入门案例之WordCount相关推荐

  1. 09_Flink入门案例、word-count程序(java和scala版本)、添加依赖、Flink Streaming和Batch的区别 、在集群上执行程序等

    1.9.Flink入门案例-wordCount 1.9.1.开发工具 1.9.2.编写java版本word-count程序 1.9.2.1.添加Flink Maven依赖 1.9.2.2.编写word ...

  2. 2021年大数据Flink(八):Flink入门案例

    目录 Flink入门案例 前置说明 API 编程模型 准备工程 pom文件 log4j.properties Flink初体验 需求 编码步骤 代码实现 Flink入门案例 前置说明 API API ...

  3. 【Spark分布式内存计算框架——Structured Streaming】3. Structured Streaming —— 入门案例:WordCount

    1.3 入门案例:WordCount 入门案例与SparkStreaming的入门案例基本一致:实时从TCP Socket读取数据(采用nc)实时进行词频统计WordCount,并将结果输出到控制台C ...

  4. Flink教程(04)- Flink入门案例

    文章目录 01 引言 02 开发前准备 2.1 API 2.2 编程模型 03 入门案例 3.1 项目搭建 3.2 代码实现 3.2.1 基于DataSet 3.2.2 基于DataStream 3. ...

  5. SparkStreaming 入门案例之wordcount

    案例概述 以nc作为源发送数据 案例演示 创建nc源,用于发送数据. [root@mypc01 ~]# nc -lk mypc01 10086 创建maven工程,导入依赖 <dependenc ...

  6. updateStateByKey算子入门案例之wordCount

    概念 有一个参数,是个函数,该函数有两个参数,第一个是序列类型,第二个是Option类型 def updateStateByKey[S : ClassTag](updateFunc: (Seq[V], ...

  7. Structured Streaming 入门案例之WordCount

    1.编写一个流式计算的应用, 不断的接收外部系统的消息 2.对消息中的单词进行词频统计 3.统计全局的结果 步骤 Socket Server 等待 Structured Streaming 程序连接 ...

  8. Hadoop入门案例WordCount

    wordcount可以说是hadoop的入门案例,也是基础案例 主要体现思想就是mapreduce核心思想 原始文件为hadoop.txt,内容如下: hello,java hello,java,li ...

  9. Flink——入门WordCount程序

    Flink是什么? Apache Flink is a framework and distributed processing engine for stateful computations ov ...

最新文章

  1. WINCE viewbin命令
  2. matlab批量对图片进行添加椒盐噪声并批量保存到文件夹
  3. [云炬python3玩转机器学习笔记] 3-4创建Numpy数组和矩阵
  4. 使用事务操作SQLite数据批量插入,提高数据批量写入速度,源码讲解
  5. Android之GridLayoutManager.setSpanSizeLookup问题
  6. Android之ButterKnife--View注入框架
  7. Toad for Oracle9.7中导入数据库以后,数据有中文乱码:
  8. 盐城计算机中专学校,盐城市有哪些中专学校?
  9. mysql的基本数据类型总结_Mysql数据类型的详细总结
  10. 用传说中的jquery写的随机
  11. McAfee Host Intrusion Prevention
  12. 关闭迅雷极速版自动更新功能
  13. 计算机控制鼠标,键盘控制鼠标,详细教您如何使用键盘来控制鼠标
  14. 数字电视DVB-T/T2/C/S/S2,ATSC,ISDB-T参数设置
  15. c语言指数公式_c语言指数函数详解
  16. 使用 Microsoft Teams Toolkit for Visual Studio 高效构建一个指示板
  17. 小程序开发API之mDNS
  18. NoSQL 与大数据
  19. Altium Designer 20 (6)——二极管以及光耦元件创建
  20. mac连接蓝牙耳机只有一个有声音

热门文章

  1. 各种接口的硬盘在linux中的文件名
  2. am335x linux修改ip,Linux 修改代码以支持LED 控制(board-am335xevm)
  3. python输出杨辉三角啊二维数组_用Python输出一个杨辉三角的例子
  4. jupyterlab nb_conda 增加 删除_Jupyter lab
  5. 每日三道前端面试题--vue 第四弹
  6. Qt5.12下载和安装教程(图文详解,简单易上手)
  7. 2020年最好用的手机是哪一款_2020年旗舰手机盘点,这七款优点明显,你最喜欢哪一款?...
  8. python客户端与服务器端_Python实现的FTP通信客户端与服务器端功能示例
  9. qtextedit非编辑时去边框_10分钟看懂Photoshop图像的基本编辑方法-数字化图像、图像基础理论知识(置入文件)...
  10. python散点图拟合曲线如何求拟合_python 拟合曲线并求参