Java项目模板

前提条件:Maven3.0.4或更高,Java8

使用如下命令创建Maven项目:

    $ mvn archetype:generate                               \-DarchetypeGroupId=org.apache.flink              \-DarchetypeArtifactId=flink-quickstart-java      \-DarchetypeVersion=1.6.1

查看项目结构如下:

$ tree quickstart/
quickstart/
├── pom.xml
└── src└── main├── java│   └── org│       └── myorg│           └── quickstart│               ├── BatchJob.java│               └── StreamingJob.java└── resources└── log4j.properties

项目包含两个类,一个用于批处理一个用于流处理。 是程序运行的入口,推荐使用IDE导入工程进行开发和测试,默认的JVM配置对于运行Flink程序来说太小了,在IDE中可以在Help | Edit Custom VM Options 菜单中设置。

如果要构建和打包项目,使用maven命令mvn clean package,会在项目下生成一个包含依赖包的jar包。

Scala项目模板

这里分SBT和Maven两种方式构建,详见官网:https://ci.apache.org/projects/flink/flink-docs-release-1.6/quickstart/scala_api_quickstart.html

配置依赖,连接器,库

Flink核心库和应用依赖

和许多应用一样,Flink依赖库分为两大类:

1.Flink核心依赖:包含flink程序运行时所需要的一些类和依赖,如协调,网络,检查点,容错,APIS,操作,资源管理等等。这些都是Flink运行时所需要的核心类和依赖。这些核心类和依赖打包在flink-dist包中,是flink lib目录下的一部分,类似于Java的核心类库一样。核心包不包含连接器包和其他类库(如CEP,SQL,ML等),这也是必须classpath下有过多的依赖和类。实际上,我们试图尽可能地减少核心依赖项,以使默认类路径尽可能小,并避免依赖项冲突。

2.用户应用程序依赖:应用所需要的依赖,如连接器,格式化等等,这些需要打包到应用程序中去。注入DataSet / DataStream等等flink核心依赖是不需要打的。

项目设置:基础依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.6.1</version><scope>provided</scope>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>1.6.1</version><scope>provided</scope>
</dependency>

注意,jar包的生命周期是provided,意味着编译时需要,而不应该打到jar包中区。这些是flink核心依赖,已经可用。极力推荐这种方式,因为如果不这样的话,把jar打到依赖中,那么应用的jar包就会很大,而且可能会引起jar包冲突。

要在IDE中运行程序,flink依赖需要声明为compile而不是provided,否则IDE不会把其加入到classpath中执行时会报错NoClassDefFountError,为了避免把依赖的生命周期申明为compile,他们添加了一个配置文件,当应用程序在IntelliJ中运行时,这个配置文件会有选择地激活,然后才会将依赖提升到compile,而不会影响JAR文件的打包。

增加连接器和库依赖

很多应用程序需要添加连接器,如添加kafka,ES等连接器,因为其不是Flink Core依赖的一部分,因此需要打包到项目中去。

如下是添加kafka0.10依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.10_2.11</artifactId><version>1.6.1</version>
</dependency>

我们推荐使用Maven Shade Plugin(见下)打包插件将所有的依赖打进项目中,除了flink core依赖的生命周期是provided之外,其他依赖必须是compile。

Scala版本

Scala各个版本(2.10,2.11,2.12)之间互不兼容。因此使用scala 2.11编写的程序不能使用Scala 2.12.

flink依赖都会在尾部带上版本,如link-streaming-scala_2.11,使用java开发可以选择任何scala版本,使用scala开发必须选择合适的版本。

因为scala2.12的重大调整,flink1.5暂时只支持2.11,未来会增加对scala2.12的支持。

Hadoop依赖

一般性规则:不应该直接向应用程序添加Hadoop依赖。(唯一的例外是当使用现有的Hadoop输入/输出格式和Flink的Hadoop兼容性包装器时)

如果要在hadoop中使用flink,应该在flink配置中包含hadoop依赖,而不是在flink中增加hadoop依赖(配置hadoop_classpath)。可以查 Hadoop Setup Guide看细节。

这样做的原因有两个:

1、一些Hadoop交互发生在Flink的核心中,可能是在用户应用程序启动之前,例如为检查点设置HDFS、通过Hadoop的Kerberos令牌进行身份验证或在YARN上部署

2、Flink的倒置类加载方法从核心依赖项中隐藏了许多传递依赖项。这不仅适用于Flink自己的核心依赖项,也适用于设置中出现的Hadoop依赖项。这样,应用程序就可以使用相同依赖项的不同版本,而不会遇到依赖项冲突(相信我们,这很重要,因为hadoops依赖项很庞大)。

如果再在IDE中开发及测试程序需要hadoop依赖,生命周期为test或provided。

附录:构建包含依赖jar的模板

构建一个包含所有需要的连接器和库依赖的JAR,如下:

<build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.0.0</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>log4j:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>my.programs.main.clazz</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins>
</build>

IDE设置

本部分描述,如何把一个flink项目导入IDE以进行开发及测试。若IDE不能工作,请尝试mvn clean package -DskipTests,可能IDE有bug或没有正确设置。

首先从Flink仓库中检出源:

git clone https://github.com/apache/flink.git

IDE设置,该部分略,请查看https://www.jetbrains.com/idea/download/

安装scala插件:

1.IntelliJ IDEA -> Preferences -> Plugins,点击Install Jetbrains plugin

2.选择安装Scala

3.重启IDE

导入Flink:

1.启动IDE,选择"Import Project"

2.选择flink仓库更目录

3.选择Import project from external model 并选择Maven

4.保持默认选项,点击Next知道SDK部分

5.如果没有SDK,使用左上方的+创建一个,然后点击JDK,选择JDK目录并点击OK。否则只选择SDK。

6.一直点击Next直到导入完成。

7.右击导入的Flink工程,project -> Maven -> Generate Sources and Update Folders。注意会在本地仓库安装flink库,即“/home/-your-user-/.m2/repository/org/apache/flink/”或者mvn clean package -DskipTests,也会在IDE工作时创建需要的文件而不安装库。

8.构建项目(Build -> Make Project)

Java代码检查即Scala代码检查

IDE支持使用Checkstyle-IDEA插件检查代码规范,具体使用见:https://ci.apache.org/projects/flink/flink-docs-release-1.6/internals/ide_setup.html

Scala REPL

REPL=Read-Evaluate-Print-Loop

Flink有个集成的交互scala shell,可以用在本地设置中,也可以用在集群中。要使用该REPL,执行如下命令:

bin/start-scala-shell.sh local

本shell支持batch和streaming,启动后,两种类型的环境变量已经装载了,分别为benv和senv。

DataSet API

Scala-Flink> val text = benv.fromElements("To be, or not to be,--that is the question:--","Whether 'tis nobler in the mind to suffer","The slings and arrows of outrageous fortune","Or to take arms against a sea of troubles,")
Scala-Flink> val counts = text.flatMap { _.toLowerCase.split("\\W+") }.map { (_, 1) }.groupBy(0).sum(1)
Scala-Flink> counts.print()

print命令会自动向jobManager提交任务并在终端打印结果,当然结果还可以写入到文件中,这种情况下,需要执行:

Scala-Flink> benv.execute("MyProgram")

DataStream API

Scala-Flink> val textStreaming = senv.fromElements("To be, or not to be,--that is the question:--","Whether 'tis nobler in the mind to suffer","The slings and arrows of outrageous fortune","Or to take arms against a sea of troubles,")
Scala-Flink> val countsStreaming = textStreaming.flatMap { _.toLowerCase.split("\\W+") }.map { (_, 1) }.keyBy(0).sum(1)
Scala-Flink> countsStreaming.print()
Scala-Flink> senv.execute("Streaming Wordcount")

注意,在这种流式处理情况下,print不会触发执行。

Flink shell有历史命令记录和自动完成功能。

添加外部依赖

scala shell有可能需要添加外部classpath,可以这样:

bin/start-scala-shell.sh [local | remote <host> <port> | yarn] --addclasspath <path/to/jar.jar>

查看scala shell 提供有哪些选择:

bin/start-scala-shell.sh --help

使用flink集成的shell:

bin/start-scala-shell.sh local

远程使用shell:

bin/start-scala-shell.sh remote <hostname> <portnumber>

yarn scala shell集群:

shell可以把flink应用部署到yarn上,可以配置使用的container数量,也可以指定使用的内存,下面是用scala shell启动了一个包含2个taskmanager的yarn集群。

 bin/start-scala-shell.sh yarn -n 2

Yarn Session

可以使用flink yarn session提前部署一个flink集群:

 bin/start-scala-shell.sh yarn

命令使用指南:

Flink Scala Shell
Usage: start-scala-shell.sh [local|remote|yarn] [options] <args>...Command: local [options]
Starts Flink scala shell with a local Flink cluster-a <path/to/jar> | --addclasspath <path/to/jar>Specifies additional jars to be used in Flink
Command: remote [options] <host> <port>
Starts Flink scala shell connecting to a remote cluster<host>Remote host name as string<port>Remote port as integer-a <path/to/jar> | --addclasspath <path/to/jar>Specifies additional jars to be used in Flink
Command: yarn [options]
Starts Flink scala shell connecting to a yarn cluster-n arg | --container argNumber of YARN container to allocate (= Number of TaskManagers)-jm arg | --jobManagerMemory argMemory for JobManager container with optional unit (default: MB)-nm <value> | --name <value>Set a custom name for the application on YARN-qu <arg> | --queue <arg>Specifies YARN queue-s <arg> | --slots <arg>Number of slots per TaskManager-tm <arg> | --taskManagerMemory <arg>Memory per TaskManager container with optional unit (default: MB)-a <path/to/jar> | --addclasspath <path/to/jar>Specifies additional jars to be used in Flink--configDir <value>The configuration directory.-h | --helpPrints this usage text

Windows上运行Flink:https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/flink_on_windows.html

从源构建Flink :https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html

Apache Flink-编程指南-项目设置相关推荐

  1. Apache Flink 为什么能够成为新一代大数据计算引擎?

    众所周知,Apache Flink(以下简称 Flink)最早诞生于欧洲,2014 年由其创始团队捐赠给 Apache 基金会.如同其他诞生之初的项目,它新鲜,它开源,它适应了快速转的世界中更重视的速 ...

  2. Apache Spark 3.0 DStreams-Streaming编程指南

    目录 总览 一个简单的例子 基本概念 连结中 初始化StreamingContext 离散流(DStreams) 输入DStreams和接收器 基本资料 进阶资源 自订来源 接收器可靠性 DStrea ...

  3. (十八)Flink Table API SQL 编程指南 Table API 和Datastream API 集成

    文章目录 DataStream 和 Table 之间的转换 依赖项和导入 配置 执行行为 datastream API table API 批处理运行时模式 Changelog统一 处理(仅插入)流 ...

  4. Apache Spark 3.0 GraphX编程指南

    学习地址:https://spark.apache.org/docs/latest/graphx-programming-guide.html 目录 总览 入门 属性图 属性图示例 图运算符 运营商摘 ...

  5. Apache Beam实战指南 | 玩转KafkaIO与Flink

    AI前线导读:本文是 Apache Beam实战指南系列文章 的第二篇内容,将重点介绍 Apache Beam与Flink的关系,对Beam框架中的KafkaIO和Flink源码进行剖析,并结合应用示 ...

  6. lua 给userdata设置元表_UE4热更新:基于UnLua的Lua编程指南

    本片文章搬运自我自己的博客:原文链接: UE4热更新:基于UnLua的Lua编程指南 作者: ZhaLiPeng UE使用的是C++这种编译型语言,在编译之后就成了二进制,只有通过玩家重新安装才能打到 ...

  7. Apache Spark 3.0 结构化Streaming流编程指南

    目录 总览 快速范例 Scala语言 Java语言 Python语言 R语言 程式设计模型 基本概念 处理事件时间和延迟数据 容错语义 使用数据集和数据帧的API 创建流数据框架和流数据集 流数据帧/ ...

  8. Apache Spark【从无到有从有到无】【编程指南】【AS5】结构化流编程指南

    目录 1.概观 2.快速示例 3.编程模型 3.1.基本概念 3.2.处理事件时间和延迟数据 3.3.容错语义 4.使用数据集和数据框架的API 4.1.创建streaming DataFrames ...

  9. Flink入门——DataSet Api编程指南

    简介: Flink入门--DataSet Api编程指南 Apache Flink 是一个兼顾高吞吐.低延迟.高性能的分布式处理框架.在实时计算崛起的今天,Flink正在飞速发展.由于性能的优势和兼顾 ...

最新文章

  1. 排查IDEA 全局搜索快捷键Ctrl +Shift+F不起作用的原因和解决方法
  2. python 编程入门-python编程入门(第3版)
  3. php图片上传方案,php图片上传
  4. Python执行 shell 命令并实时打印输出
  5. 1.2)深度学习笔记------神经网络的编程基础
  6. jqGrid 使用案例及笔记
  7. .NET 垃圾回收与内存泄漏
  8. addeventlistener监听ajax请求_基于h5的history改善ajax列表请求体验
  9. 2011/05/19
  10. python创建列表副本的方法_Python之列表方法
  11. 一个开源的网页画板,真的太方便了
  12. 使用 eBPF 技术跟踪 Netfilter 数据流
  13. 摩擦学类毕业论文文献都有哪些?
  14. flutter 设置全屏背景图(导航栏)
  15. 玉米田 炮兵阵地 状态压缩DP
  16. 使用EFI安装win7-64位,在不能使用U盘的情况下
  17. minecraft刷怪笼java_Minecraft怪物经验top9!刷怪箱位列第4,杀玩家第2出乎意料
  18. 全能型终端神器!好用、免费!
  19. 私人定制情人节告白网站并且部署上线,谁说程序员没有爱!超详细步骤!源码分享!
  20. R3LIVE开源代码全体验及测试

热门文章

  1. C语言:计算球体积和表面积(含注释,可复制)
  2. BZOJ2283: [Sdoi2011]火星移民
  3. 罚金与罚款的区别 ?
  4. 香港公司逾期报税处罚
  5. Socket实现服务器端与客户端之间通信(输入文字聊天)
  6. 决心为社会贡献一点绵薄之力,特此决定在网上发表博客,贡大家参考学习
  7. 嵌入式 RTP协议详解以及其他相关协议
  8. BQ24295电源管理芯片驱动
  9. 使用scoket发送HTTP请求
  10. PHP and MySQL Web Development 5th Edition