Apache Flink-编程指南-项目设置
Java项目模板
前提条件:Maven3.0.4或更高,Java8
使用如下命令创建Maven项目:
$ mvn archetype:generate \-DarchetypeGroupId=org.apache.flink \-DarchetypeArtifactId=flink-quickstart-java \-DarchetypeVersion=1.6.1
查看项目结构如下:
$ tree quickstart/
quickstart/
├── pom.xml
└── src└── main├── java│ └── org│ └── myorg│ └── quickstart│ ├── BatchJob.java│ └── StreamingJob.java└── resources└── log4j.properties
项目包含两个类,一个用于批处理一个用于流处理。 是程序运行的入口,推荐使用IDE导入工程进行开发和测试,默认的JVM配置对于运行Flink程序来说太小了,在IDE中可以在Help | Edit Custom VM Options
菜单中设置。
如果要构建和打包项目,使用maven命令mvn clean package,会在项目下生成一个包含依赖包的jar包。
Scala项目模板
这里分SBT和Maven两种方式构建,详见官网:https://ci.apache.org/projects/flink/flink-docs-release-1.6/quickstart/scala_api_quickstart.html
配置依赖,连接器,库
Flink核心库和应用依赖
和许多应用一样,Flink依赖库分为两大类:
1.Flink核心依赖:包含flink程序运行时所需要的一些类和依赖,如协调,网络,检查点,容错,APIS,操作,资源管理等等。这些都是Flink运行时所需要的核心类和依赖。这些核心类和依赖打包在flink-dist包中,是flink lib目录下的一部分,类似于Java的核心类库一样。核心包不包含连接器包和其他类库(如CEP,SQL,ML等),这也是必须classpath下有过多的依赖和类。实际上,我们试图尽可能地减少核心依赖项,以使默认类路径尽可能小,并避免依赖项冲突。
2.用户应用程序依赖:应用所需要的依赖,如连接器,格式化等等,这些需要打包到应用程序中去。注入DataSet / DataStream等等flink核心依赖是不需要打的。
项目设置:基础依赖
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.6.1</version><scope>provided</scope>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>1.6.1</version><scope>provided</scope>
</dependency>
注意,jar包的生命周期是provided,意味着编译时需要,而不应该打到jar包中区。这些是flink核心依赖,已经可用。极力推荐这种方式,因为如果不这样的话,把jar打到依赖中,那么应用的jar包就会很大,而且可能会引起jar包冲突。
要在IDE中运行程序,flink依赖需要声明为compile而不是provided,否则IDE不会把其加入到classpath中执行时会报错NoClassDefFountError,为了避免把依赖的生命周期申明为compile,他们添加了一个配置文件,当应用程序在IntelliJ中运行时,这个配置文件会有选择地激活,然后才会将依赖提升到compile,而不会影响JAR文件的打包。
增加连接器和库依赖
很多应用程序需要添加连接器,如添加kafka,ES等连接器,因为其不是Flink Core依赖的一部分,因此需要打包到项目中去。
如下是添加kafka0.10依赖:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.10_2.11</artifactId><version>1.6.1</version>
</dependency>
我们推荐使用Maven Shade Plugin(见下)打包插件将所有的依赖打进项目中,除了flink core依赖的生命周期是provided之外,其他依赖必须是compile。
Scala版本
Scala各个版本(2.10,2.11,2.12)之间互不兼容。因此使用scala 2.11编写的程序不能使用Scala 2.12.
flink依赖都会在尾部带上版本,如link-streaming-scala_2.11,使用java开发可以选择任何scala版本,使用scala开发必须选择合适的版本。
因为scala2.12的重大调整,flink1.5暂时只支持2.11,未来会增加对scala2.12的支持。
Hadoop依赖
一般性规则:不应该直接向应用程序添加Hadoop依赖。(唯一的例外是当使用现有的Hadoop输入/输出格式和Flink的Hadoop兼容性包装器时)
如果要在hadoop中使用flink,应该在flink配置中包含hadoop依赖,而不是在flink中增加hadoop依赖(配置hadoop_classpath)。可以查 Hadoop Setup Guide看细节。
这样做的原因有两个:
1、一些Hadoop交互发生在Flink的核心中,可能是在用户应用程序启动之前,例如为检查点设置HDFS、通过Hadoop的Kerberos令牌进行身份验证或在YARN上部署
2、Flink的倒置类加载方法从核心依赖项中隐藏了许多传递依赖项。这不仅适用于Flink自己的核心依赖项,也适用于设置中出现的Hadoop依赖项。这样,应用程序就可以使用相同依赖项的不同版本,而不会遇到依赖项冲突(相信我们,这很重要,因为hadoops依赖项很庞大)。
如果再在IDE中开发及测试程序需要hadoop依赖,生命周期为test或provided。
附录:构建包含依赖jar的模板
构建一个包含所有需要的连接器和库依赖的JAR,如下:
<build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.0.0</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>log4j:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>my.programs.main.clazz</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins>
</build>
IDE设置
本部分描述,如何把一个flink项目导入IDE以进行开发及测试。若IDE不能工作,请尝试mvn clean package -DskipTests,可能IDE有bug或没有正确设置。
首先从Flink仓库中检出源:
git clone https://github.com/apache/flink.git
IDE设置,该部分略,请查看https://www.jetbrains.com/idea/download/
安装scala插件:
1.IntelliJ IDEA -> Preferences -> Plugins,点击Install Jetbrains plugin
2.选择安装Scala
3.重启IDE
导入Flink:
1.启动IDE,选择"Import Project"
2.选择flink仓库更目录
3.选择Import project from external model 并选择Maven
4.保持默认选项,点击Next知道SDK部分
5.如果没有SDK,使用左上方的+创建一个,然后点击JDK,选择JDK目录并点击OK。否则只选择SDK。
6.一直点击Next直到导入完成。
7.右击导入的Flink工程,project -> Maven -> Generate Sources and Update Folders。注意会在本地仓库安装flink库,即“/home/-your-user-/.m2/repository/org/apache/flink/”或者mvn clean package -DskipTests,也会在IDE工作时创建需要的文件而不安装库。
8.构建项目(Build -> Make Project)
Java代码检查即Scala代码检查
IDE支持使用Checkstyle-IDEA插件检查代码规范,具体使用见:https://ci.apache.org/projects/flink/flink-docs-release-1.6/internals/ide_setup.html
Scala REPL
REPL=Read-Evaluate-Print-Loop
Flink有个集成的交互scala shell,可以用在本地设置中,也可以用在集群中。要使用该REPL,执行如下命令:
bin/start-scala-shell.sh local
本shell支持batch和streaming,启动后,两种类型的环境变量已经装载了,分别为benv和senv。
DataSet API
Scala-Flink> val text = benv.fromElements("To be, or not to be,--that is the question:--","Whether 'tis nobler in the mind to suffer","The slings and arrows of outrageous fortune","Or to take arms against a sea of troubles,")
Scala-Flink> val counts = text.flatMap { _.toLowerCase.split("\\W+") }.map { (_, 1) }.groupBy(0).sum(1)
Scala-Flink> counts.print()
print命令会自动向jobManager提交任务并在终端打印结果,当然结果还可以写入到文件中,这种情况下,需要执行:
Scala-Flink> benv.execute("MyProgram")
DataStream API
Scala-Flink> val textStreaming = senv.fromElements("To be, or not to be,--that is the question:--","Whether 'tis nobler in the mind to suffer","The slings and arrows of outrageous fortune","Or to take arms against a sea of troubles,")
Scala-Flink> val countsStreaming = textStreaming.flatMap { _.toLowerCase.split("\\W+") }.map { (_, 1) }.keyBy(0).sum(1)
Scala-Flink> countsStreaming.print()
Scala-Flink> senv.execute("Streaming Wordcount")
注意,在这种流式处理情况下,print不会触发执行。
Flink shell有历史命令记录和自动完成功能。
添加外部依赖
scala shell有可能需要添加外部classpath,可以这样:
bin/start-scala-shell.sh [local | remote <host> <port> | yarn] --addclasspath <path/to/jar.jar>
查看scala shell 提供有哪些选择:
bin/start-scala-shell.sh --help
使用flink集成的shell:
bin/start-scala-shell.sh local
远程使用shell:
bin/start-scala-shell.sh remote <hostname> <portnumber>
yarn scala shell集群:
shell可以把flink应用部署到yarn上,可以配置使用的container数量,也可以指定使用的内存,下面是用scala shell启动了一个包含2个taskmanager的yarn集群。
bin/start-scala-shell.sh yarn -n 2
Yarn Session
可以使用flink yarn session提前部署一个flink集群:
bin/start-scala-shell.sh yarn
命令使用指南:
Flink Scala Shell
Usage: start-scala-shell.sh [local|remote|yarn] [options] <args>...Command: local [options]
Starts Flink scala shell with a local Flink cluster-a <path/to/jar> | --addclasspath <path/to/jar>Specifies additional jars to be used in Flink
Command: remote [options] <host> <port>
Starts Flink scala shell connecting to a remote cluster<host>Remote host name as string<port>Remote port as integer-a <path/to/jar> | --addclasspath <path/to/jar>Specifies additional jars to be used in Flink
Command: yarn [options]
Starts Flink scala shell connecting to a yarn cluster-n arg | --container argNumber of YARN container to allocate (= Number of TaskManagers)-jm arg | --jobManagerMemory argMemory for JobManager container with optional unit (default: MB)-nm <value> | --name <value>Set a custom name for the application on YARN-qu <arg> | --queue <arg>Specifies YARN queue-s <arg> | --slots <arg>Number of slots per TaskManager-tm <arg> | --taskManagerMemory <arg>Memory per TaskManager container with optional unit (default: MB)-a <path/to/jar> | --addclasspath <path/to/jar>Specifies additional jars to be used in Flink--configDir <value>The configuration directory.-h | --helpPrints this usage text
Windows上运行Flink:https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/flink_on_windows.html
从源构建Flink :https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html
Apache Flink-编程指南-项目设置相关推荐
- Apache Flink 为什么能够成为新一代大数据计算引擎?
众所周知,Apache Flink(以下简称 Flink)最早诞生于欧洲,2014 年由其创始团队捐赠给 Apache 基金会.如同其他诞生之初的项目,它新鲜,它开源,它适应了快速转的世界中更重视的速 ...
- Apache Spark 3.0 DStreams-Streaming编程指南
目录 总览 一个简单的例子 基本概念 连结中 初始化StreamingContext 离散流(DStreams) 输入DStreams和接收器 基本资料 进阶资源 自订来源 接收器可靠性 DStrea ...
- (十八)Flink Table API SQL 编程指南 Table API 和Datastream API 集成
文章目录 DataStream 和 Table 之间的转换 依赖项和导入 配置 执行行为 datastream API table API 批处理运行时模式 Changelog统一 处理(仅插入)流 ...
- Apache Spark 3.0 GraphX编程指南
学习地址:https://spark.apache.org/docs/latest/graphx-programming-guide.html 目录 总览 入门 属性图 属性图示例 图运算符 运营商摘 ...
- Apache Beam实战指南 | 玩转KafkaIO与Flink
AI前线导读:本文是 Apache Beam实战指南系列文章 的第二篇内容,将重点介绍 Apache Beam与Flink的关系,对Beam框架中的KafkaIO和Flink源码进行剖析,并结合应用示 ...
- lua 给userdata设置元表_UE4热更新:基于UnLua的Lua编程指南
本片文章搬运自我自己的博客:原文链接: UE4热更新:基于UnLua的Lua编程指南 作者: ZhaLiPeng UE使用的是C++这种编译型语言,在编译之后就成了二进制,只有通过玩家重新安装才能打到 ...
- Apache Spark 3.0 结构化Streaming流编程指南
目录 总览 快速范例 Scala语言 Java语言 Python语言 R语言 程式设计模型 基本概念 处理事件时间和延迟数据 容错语义 使用数据集和数据帧的API 创建流数据框架和流数据集 流数据帧/ ...
- Apache Spark【从无到有从有到无】【编程指南】【AS5】结构化流编程指南
目录 1.概观 2.快速示例 3.编程模型 3.1.基本概念 3.2.处理事件时间和延迟数据 3.3.容错语义 4.使用数据集和数据框架的API 4.1.创建streaming DataFrames ...
- Flink入门——DataSet Api编程指南
简介: Flink入门--DataSet Api编程指南 Apache Flink 是一个兼顾高吞吐.低延迟.高性能的分布式处理框架.在实时计算崛起的今天,Flink正在飞速发展.由于性能的优势和兼顾 ...
最新文章
- 排查IDEA 全局搜索快捷键Ctrl +Shift+F不起作用的原因和解决方法
- python 编程入门-python编程入门(第3版)
- php图片上传方案,php图片上传
- Python执行 shell 命令并实时打印输出
- 1.2)深度学习笔记------神经网络的编程基础
- jqGrid 使用案例及笔记
- .NET 垃圾回收与内存泄漏
- addeventlistener监听ajax请求_基于h5的history改善ajax列表请求体验
- 2011/05/19
- python创建列表副本的方法_Python之列表方法
- 一个开源的网页画板,真的太方便了
- 使用 eBPF 技术跟踪 Netfilter 数据流
- 摩擦学类毕业论文文献都有哪些?
- flutter 设置全屏背景图(导航栏)
- 玉米田 炮兵阵地 状态压缩DP
- 使用EFI安装win7-64位,在不能使用U盘的情况下
- minecraft刷怪笼java_Minecraft怪物经验top9!刷怪箱位列第4,杀玩家第2出乎意料
- 全能型终端神器!好用、免费!
- 私人定制情人节告白网站并且部署上线,谁说程序员没有爱!超详细步骤!源码分享!
- R3LIVE开源代码全体验及测试