1.9.Flink入门案例-wordCount
1.9.1.开发工具
1.9.2.编写java版本word-count程序
1.9.2.1.添加Flink Maven依赖
1.9.2.2.编写wordcount的java代码
1.9.2.3.数据准备
1.9.2.4.执行结果
1.9.3.编写scala版本word-count程序
1.9.3.1.添加Flink依赖
1.9.3.2.编写wordcount的scala程序
1.9.4.Flink StreamingWindowWordCount
1.9.5.Flink程序开发步骤
1.9.6.Flink Streaming和Batch的区别 (API使用层面)
1.9.7.在集群上执行程序
1.9.8.集群跑jar的时候pom文件中需要进行的build配置

1.9.Flink入门案例-wordCount

1.9.1.开发工具

官方建议使用Intellij IDEA,因为它默认集成scala和maven环境,使用更加方便
开发flink程序,可以使用java或者scala语言。
个人建议,使用scala,因为实现起来更加简洁。使用java代码实现函数式编程比较别扭。
建议使用maven国内镜像仓库地址
国外仓库下载较慢,可以使用国内阿里云的maven仓库
注意:如果发现国内源下载提示找不到依赖的时候,记得切换回国外源
国内镜像仓库配置见备注

1.9.2.编写java版本word-count程序

新建一个FLINK IDEA工程,如下:

1.9.2.1.添加Flink Maven依赖

你只要将以下依赖添加到pom.xml中,就能在项目中引入Apache Flink。这些依赖项包含了本地执行环境,因此支持本地测试。
Scala API:为了使用Scala API,将flink-java的artifact id替换为flink-scala_2.11,同时将flink-streaming-java_2.11替换为flink-streaming-scala_2.11。

总的pom.xml文件内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.toto.learn</groupId><artifactId>flink-learn</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.11.1</version></dependency><!--1.compile   : 默认的scope,运行期有效,需要打入包中。2.provided  : 编译器有效,运行期不需要提供,不会打入包中。3.runtime   : 编译不需要,在运行期有效,需要导入包中。(接口与实现分离)4.test      : 测试需要,不会打入包中5.system    : 非本地仓库引入、存在系统的某个路径下的jar。(一般不使用)--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.11.1</version><!--<scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.11.1</version></dependency><!-- 使用scala编程的时候使用下面的依赖 start--><!--<dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.12</artifactId><version>1.11.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>1.11.1</version></dependency>--><!-- 使用scala编程的时候使用下面的依赖 end--></dependencies><build><plugins><!-- 编译插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.6.0</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin><!-- scala编译插件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.1.6</version><configuration><scalaCompatVersion>2.11</scalaCompatVersion><scalaVersion>2.11.12</scalaVersion><encoding>UTF-8</encoding></configuration><executions><execution><id>compile-scala</id><phase>compile</phase><goals><goal>add-source</goal><goal>compile</goal></goals></execution><execution><id>test-compile-scala</id><phase>test-compile</phase><goals><goal>add-source</goal><goal>testCompile</goal></goals></execution></executions></plugin><!-- 打jar包插件(会包含所有依赖) --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>2.6</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest><!-- 可以设置jar包的入口类(可选) --><mainClass>com.toto.learn.batch.BatchWordCountJava</mainClass></manifest></archive></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build></project>

1.9.2.2.编写wordcount的java代码

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;/*** @author tuzuoquan* @version 1.0* @ClassName BatchWordCountJava* @description TODO* @date 2020/9/11 14:03**/
public class BatchWordCountJava {public static void main(String[] args) throws Exception {String inputPath = "E:/workspace/wordcount/input";String outputPath = "E:/workspace/wordcount/output";// 获取运行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//获取文件中的内容DataSource<String> text = env.readTextFile(inputPath);//groupBy(num) :按照第几列进行排序;sum(num):排序后将第二列的值进行求和DataSet<Tuple2<String,Integer>> counts = text.flatMap(new Tokenizer()).groupBy(0).sum(1);counts.writeAsCsv(outputPath,"\n"," ").setParallelism(1);env.execute("batch word count");}public static class Tokenizer implements FlatMapFunction<String, Tuple2<String,Integer>>{@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {//\w 匹配字母或数字或下划线或汉字  等价于[^A-Za-z0-9_]//\W 非数字字母下划线String[] tokens = value.toLowerCase().split("\\W+");for (String token: tokens) {if (token.length() > 0) {//转变成 word   1的格式。每个新的单词字数都是1out.collect(new Tuple2<String,Integer>(token,1));}}}}}

1.9.2.3.数据准备

words.txt中的内容如下:

1.9.2.4.执行结果


output的内容如下:

1.9.3.编写scala版本word-count程序

1.9.3.1.添加Flink依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.toto.learn</groupId><artifactId>flink-learn</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.11.1</version></dependency><!--1.compile   : 默认的scope,运行期有效,需要打入包中。2.provided  : 编译器有效,运行期不需要提供,不会打入包中。3.runtime   : 编译不需要,在运行期有效,需要导入包中。(接口与实现分离)4.test      : 测试需要,不会打入包中5.system    : 非本地仓库引入、存在系统的某个路径下的jar。(一般不使用)--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.11.1</version><!--<scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.11.1</version></dependency><!-- 使用scala编程的时候使用下面的依赖 start--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.12</artifactId><version>1.11.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>1.11.1</version></dependency><!-- 使用scala编程的时候使用下面的依赖 end--></dependencies><build><plugins><!-- 编译插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.6.0</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin><!-- scala编译插件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.1.6</version><configuration><scalaCompatVersion>2.11</scalaCompatVersion><scalaVersion>2.11.12</scalaVersion><encoding>UTF-8</encoding></configuration><executions><execution><id>compile-scala</id><phase>compile</phase><goals><goal>add-source</goal><goal>compile</goal></goals></execution><execution><id>test-compile-scala</id><phase>test-compile</phase><goals><goal>add-source</goal><goal>testCompile</goal></goals></execution></executions></plugin><!-- 打jar包插件(会包含所有依赖) --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>2.6</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest><!-- 可以设置jar包的入口类(可选) --><mainClass>com.toto.learn.batch.BatchWordCountJava</mainClass></manifest></archive></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build></project>

主要是添加了:

<!-- 使用scala编程的时候使用下面的依赖 start-->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.12</artifactId><version>1.11.1</version>
</dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>1.11.1</version>
</dependency>
<!-- 使用scala编程的时候使用下面的依赖 end-->

1.9.3.2.编写wordcount的scala程序

import org.apache.flink.api.scala.ExecutionEnvironmentobject BatchWordCountScala {def main(args: Array[String]): Unit = {val inputPath = "E:/workspace/wordcount/input"val outPut = "E:/workspace/wordcount/output"val env = ExecutionEnvironment.getExecutionEnvironmentval text = env.readTextFile(inputPath)//引入隐式转换import org.apache.flink.api.scala._val counts = text.flatMap(_.toLowerCase.split("\\W")).filter(_.nonEmpty).map((_,1)).groupBy(0).sum(1)counts.writeAsCsv(outPut,"\n"," ").setParallelism(1)env.execute("batch word count")}}

运行程序,结果如上java版本的运行结果一样

1.9.4.Flink StreamingWindowWordCount

需求分析
手工通过socket实时产生一些单词,使用flink实时接收数据,对指定时间窗口内(例如:2秒)的数据进行聚合统计,并且把时间窗口计算的结果打印出来。
代码开发
添加对应的java依赖或者scala依赖
执行
1:在hadoop上执行nc -l 9000
2:在本机启动idea中的代码

编写程序:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;/*** @author tuzuoquan* @version 1.0* @ClassName SocketWindowWordCountJava* @description TODO* @date 2020/9/11 17:08**/
public class SocketWindowWordCountJava {public static void main(String[] args) throws Exception {//获取需要的端口号int port;try {ParameterTool parameterTool = ParameterTool.fromArgs(args);port = parameterTool.getInt("port");}catch (Exception e){System.err.println("No port set. use default port 9000--java");port = 9000;}//1、获取flink的运行环境org.apache.flink.streaming.api.environment.StreamExecutionEnvironment env = org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment();String hostname = "127.0.0.1";String delimiter = "\n";//2、获取数据源,连接socket获取输入的数据DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);//3、指定算子进行运算,通过flatMap将数据打平DataStream<WordWithCount> windowCounts = text.flatMap(new FlatMapFunction<String, WordWithCount>() {@Overridepublic void flatMap(String value, Collector<WordWithCount> out) throws Exception {String[] splits = value.split("\\s");for (String word : splits) {//默认每个单词出现1次out.collect(new WordWithCount(word,1L));}}}).keyBy("word")    //根据key进行分组,它会找word中的值.timeWindow(Time.seconds(2),Time.seconds(1)) //指定时间窗口大小为2秒,指定时间间隔为1秒.sum("count");    //这里可以使用sum或reduce//4、指定数据存储位置,这里是把它打印到控制台。把数据打印到控制台并设置并行度windowCounts.print().setParallelism(1);//5、执行,并且给它起个名字。这一行代码一定要实现,否则程序不执行。env.execute("Socket window count");}public static class WordWithCount {public String word;public long count;public  WordWithCount(){}public WordWithCount(String word,long count){this.word = word;this.count = count;}@Overridepublic String toString() {return "WordWithCount{" +"word='" + word + '\'' +", count=" + count +'}';}}
}

Scala的代码如下:

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time/*** 滑动窗口计算** 每隔1秒统计最近2秒内的数据,打印到控制台** Created by xxx.xxx on 2018/10/8.*/
object SocketWindowWordCountScala {def main(args: Array[String]): Unit = {//获取socket端口号val port: Int = try {ParameterTool.fromArgs(args).getInt("port")}catch {case e: Exception => {System.err.println("No port set. use default port 9000--scala")}9000}//获取运行环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//链接socket获取输入数据val text = env.socketTextStream("127.0.0.1",port,'\n')//解析数据(把数据打平),分组,窗口计算,并且聚合求sum//注意:必须要添加这一行隐式转行,否则下面的flatmap方法执行会报错import org.apache.flink.api.scala._val windowCounts = text.flatMap(line => line.split("\\s"))//打平,把每一行单词都切开.map(w => WordWithCount(w,1))//把单词转成word , 1这种形式.keyBy("word")//分组.timeWindow(Time.seconds(2),Time.seconds(1))//指定窗口大小,指定间隔时间.sum("count");// sum或者reduce都可以//.reduce((a,b)=>WordWithCount(a.word,a.count+b.count))//打印到控制台windowCounts.print().setParallelism(1);//执行任务env.execute("Socket window count");}case class WordWithCount(word: String,count: Long)
}

1.9.5.Flink程序开发步骤

1:获得一个执行环境
2:加载/创建 初始化数据
3:指定操作数据的transaction算子
4:指定把计算好的数据放在哪
5:调用execute()触发执行程序
注意:Flink程序是延迟计算的,只有最后调用execute()方法的时候才会真正触发执行程序。
延迟计算好处:你可以开发复杂的程序,但是Flink可以将复杂的程序转成一个Plan,将Plan作为一个整体单元执行!

1.9.6.Flink Streaming和Batch的区别 (API使用层面)

流处理Streaming
StreamExecutionEnvironment
DataStreaming
批处理Batch
ExecutionEnvironment
DataSet

1.9.7.在集群上执行程序

编译
需要在pom文件中添加build配置,打包时指定入口类全类名【或者在运行时动态指定】
provided
mvn clean package
执行
1:在flink机器上启动local flink集群
2:在flink机器上执行nc -l 9000
3: 在flink机器上执行./bin/flink run -c xxx.xxx.xxx.MainClass FlinkExample-xxxxx.jar --port 9000
4: 在flink机器上执行tail -f log/flink--taskexecutor-.out查看日志输出
5: 停止任务
A:web ui界面停止
B:命令执行bin/flink cancel

1.9.8.集群跑jar的时候pom文件中需要进行的build配置

<build><plugins><!-- 编译插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.6.0</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin><!-- scala编译插件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.1.6</version><configuration><scalaCompatVersion>2.11</scalaCompatVersion><scalaVersion>2.11.12</scalaVersion><encoding>UTF-8</encoding></configuration><executions><execution><id>compile-scala</id><phase>compile</phase><goals><goal>add-source</goal><goal>compile</goal></goals></execution><execution><id>test-compile-scala</id><phase>test-compile</phase><goals><goal>add-source</goal><goal>testCompile</goal></goals></execution></executions></plugin><!-- 打jar包插件(会包含所有依赖) --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>2.6</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest><!-- 可以设置jar包的入口类(可选) --><mainClass>xxx.xxx.SocketWindowWordCountJava</mainClass></manifest></archive></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>

09_Flink入门案例、word-count程序(java和scala版本)、添加依赖、Flink Streaming和Batch的区别 、在集群上执行程序等相关推荐

  1. 使用Python+jieba和java+庖丁分词在Spark集群上进行中文分词统计

    写在前边的话: 本篇博客也是在做豆瓣电影数据的分析过程中,需要对影评信息和剧情摘要信息进行分析而写的一篇博客 以前学习hadoop时,感觉做中文分词也没那么麻烦,但是到了Spark,却碰到了诸多困难, ...

  2. 在local模式下的spark程序打包到集群上运行

    一.前期准备 前期的环境准备,在Linux系统下要有Hadoop系统,spark伪分布式或者分布式,具体的教程可以查阅我的这两篇博客: Hadoop2.0伪分布式平台环境搭建 Spark2.4.0伪分 ...

  3. Spark在集群上执行代码案例(中文切词)

    Spark在集群上执行代码案例 java的切词使用案例(Demo) Spark中文切词代码 需求:利用jieba进行中文分词,并打包上传到集群进行执行 java的切词使用案例(Demo) @Test ...

  4. apache ignite_Kubernetes集群上的Apache Ignite和Spring第1部分:Spring Boot应用程序

    apache ignite 在之前的一系列博客中,我们在Kubernetes集群上启动了一个Ignite集群. 在本教程中,我们将使用先前在Spring Boot Application上创建的Ign ...

  5. Kubernetes集群上的Apache Ignite和Spring第1部分:Spring Boot应用程序

    在之前的一系列博客中,我们在Kubernetes集群上启动了一个Ignite集群. 在本教程中,我们将使用先前在Spring Boot Application上创建的Ignite集群. 让我们使用Sp ...

  6. 从认证到调度,K8s 集群上运行的小程序到底经历了什么?

    作者 | 声东  阿里云售后技术专家 导读:不知道大家有没有意识到一个现实:大部分时候,我们已经不像以前一样,通过命令行,或者可视窗口来使用一个系统了. 前言 现在我们上微博.或者网购,操作的其实不是 ...

  7. SparkSQL 内置函数的使用(JAVA与Scala版本)

    SparkSQL 内置函数的使用(JAVA与Scala版本) agg的使用(根据时间,去重id相同,统计相同时间内的id个数) Scala版本! package com.bynear.Scalaimp ...

  8. Java的不同版本:J2SE、J2EE、J2ME的区别

    一. Java概述 1. Java语言概述 2. Java虚拟机以及跨平台原理 3. Java的主要就业方向 4. Java的不同版本 5. Java开发环境搭建 6. 第一个Java程序示例 7.  ...

  9. apache ignite_Kubernetes集群上的Apache Ignite和Spring第3部分:测试应用程序

    apache ignite 在上一个博客中,我们为Ignite应用程序创建了Kubernetes部署文件. 在此博客上,我们将在Kubernetes上部署Ignite应用程序. 我将在此使用minik ...

最新文章

  1. java 拉姆表达式_Java8 lambda表达式10个示例
  2. 专访阿里云域名与网站业务总经理宋瑛桥:域名未来将更加个性化、生态化和规范化...
  3. PostgreSQL入门篇学习笔记(七)
  4. 连接MySQL数据库时常见故障问题的分析与解决
  5. 如何有效解决C与C++的相互调用问题
  6. delphi中的函数传参如何传枚举参数_我是这样使用SpringBoot(API传参)
  7. 请查收,一份让你年薪突破20W的Python爬虫笔记!
  8. java 包错_以下关于Java包的描述中,错误的是()
  9. STL之set_union、set_intersection、set_difference、set_symmetric_difference
  10. 系统描述符类型,段描述符类型和段描述符表
  11. python构造callable_Python callable内置函数原理解析
  12. Filter过滤器输出HelloFilter
  13. 文件二维码、社交媒体二维码如何制作?
  14. qq机器人php源码_基于PHP开发的QQ多功能机器人,小明同学机器人V4.0开源
  15. android修行之路----经典书籍
  16. 阿里云服务器租赁注意点
  17. 使用腾讯云服务器搭建个人网盘
  18. 项目反应理论 matlab,IRTPRO | 项目反应理论软件
  19. 这些坑别踩!游戏随机地图生成开发经验分享
  20. 阿里研究院启动2018年度淘宝村辅助认证活动

热门文章

  1. delphi xe http 收不到反馈消息_20款途乐4.0 xE 办公室开票
  2. java图遍历求最长路径_如何在Java中使用递归实现矩阵中最长路径的返回
  3. Shell 编程进阶笔记
  4. VTK:选择可见点用法实战
  5. boost::system模块实现动态链接库的测试程序
  6. boost::remove相关的测试程序
  7. boost::mp11::mp_remove_if_q相关用法的测试程序
  8. boost::hana::value用法的测试程序
  9. GDCM:gdcm::EnumeratedValues的测试程序
  10. ITK:创建一个高斯分布