flink 入门(一)
flink 入门(一)
简介
阅读目标:
本文为入门级别文章,即阅读完下文你需要简单的知道 flink 是做什么用的,他的主要特点是什么。工欲善其事必先利其器更深入的了解,待熟练后再回头看看。
简而言之flink就是一个框架,你在框架里面编写代码(接收从某处来的数据->数据处理/转换->将处理好的数据输出到某地),将编写好的代码交给flink集群,由集群取调度任务去处理
阅读并实践本文可能会存在某些问题,你还需要阅读其他文章/博客加深对flink的理解(如下文中提到的某些概念:有界、无界等等
实际是因为我懒得写了。。。
Flink 起源于一个叫作 Stratosphere 的项目,它是由 3 所地处柏林的大学和欧洲其他一些大 学在 2010~2014 年共同进行的研究项目,由柏林理工大学的教授沃克尔·马尔科(Volker Markl) 领衔开发。2014 年 4 月,Stratosphere 的代码被复制并捐赠给了 Apache 软件基金会,Flink 就 是在此基础上被重新设计出来的。 在德语中,“flink”一词表示“快速、灵巧”。项目的 logo 是一只彩色的松鼠,当然了, 这不仅是因为 Apache 大数据项目对动物的喜好(是否联想到了 Hadoop、Hive?),更是因为 松鼠这种小动物完美地体现了“快速、灵巧”的特点。关于 logo 的颜色,还一个有趣的缘由: 柏林当地的松鼠非常漂亮,颜色是迷人的红棕色;而 Apache 软件基金会的 logo,刚好也是一 根以红棕色为主的渐变色羽毛。于是,Flink 的松鼠 Logo 就设计成了红棕色,而且拥有一个漂 亮的渐变色尾巴,尾巴的配色与 Apache 软件基金会的 logo 一致。这只松鼠色彩炫目,既呼应 了 Apache 的风格,似乎也预示着 Flink 未来将要大放异彩。
Flink 的官网主页地址:https://flink.apache.org/ 在 Flink 官网主页的顶部可以看到,项目的核心目标,是“数据流上的有状态计算”(Stateful Computations over Data Streams)。
很多专业词汇,我们从中至少可以提炼出一些容易理解的信息:Flink 是一个“框 架”,是一个数据处理的“引擎”;既然是“分布式”,当然是为了应付大规模数据的应用场景 了;另外,Flink 处理的是数据流。所以,Flink 是一个流式大数据处理引擎。 而“内存执行速度”和“任意规模”,突出了 Flink 的两个特点:速度快、可扩展性强— —这说的自然就是小松鼠的“快速”和“灵巧”了。
java 开发案例
以下案例为环境jdk1.8,且以下案例均为展示使用,目的是为了明白这两种方式的区别以及基本使用
- jdk 1.8
- maven
- win10
- flink 1.15.2
以下示例代码仅做入门级别使用,非生产可用。
pom文件
<properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><java.version>1.8</java.version><flink.version>1.15.2</flink.version><target.java.version>1.8</target.java.version><scala.binary.version>2.12</scala.binary.version><log4j.version>2.17.1</log4j.version> </properties><dependencies><!-- Apache Flink dependencies --><!-- These dependencies are provided, because they should not be packaged into the JAR file. --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- connector kafka--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency><!-- 在此处 我添加了分词 --><dependency><groupId>org.ansj</groupId><artifactId>ansj_seg</artifactId><version>5.1.6</version></dependency><!-- Add logging framework, to produce console output when running in the IDE. --><!-- These dependencies are excluded from the application JAR by default. --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency></dependencies>
Streaming(无界)
这里可以简单的理解为源源不断的数据,需要不断监听某个消息队列(kafka)或者其他来源。
public static final String HOST = "192.168.20.127";public static final Integer PORT = 8888;public static void main(String[] args) throws Exception {final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = environment.socketTextStream(HOST, PORT);SingleOutputStreamOperator<Tuple2<String, Long>> wordsCollector = source.flatMap((String line, Collector<Tuple2<String, Long>> collector) -> {String[] words = line.split(" ");for (String word : words) {collector.collect(new Tuple2<String, Long>(word, 1L));}}).returns(Types.TUPLE(Types.STRING, Types.LONG));SingleOutputStreamOperator<Tuple2<String, Long>> sum = wordsCollector.keyBy(0).sum(1);sum.print();environment.execute();}
Batch(有界)
这里可以简单的理解为批量数据处理。
kafka
运行类
public static void main(String[] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();//new 一个实例!Properties properties = new Properties();//告诉程序我们要接收那台机器上生产的数据properties.setProperty("bootstrap.servers", "master:9092");//告诉程序开启分区,已经分区名称properties.setProperty("group.id", "temp-1");//属性key.serializer和value.serializer就是key和value指定的序列化方式。properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//读取kafka数据的时候需要指定消费策略,如果不指定会使用auto.offset.reset设置//earliest当各分区下有已提交的offset时,从提交的offset开始消费;//无提交的offset时,从头开始消费;//latest,当各分区下有已提交的offset时,从提交的offset开始消费;//无提交的offset时,消费新产生的该分区下的数据;//none,topic各分区都存在已提交的offset时,从offset后开始消费;//只要有一个分区不存在已提交的offset,则抛出异常properties.setProperty("auto.offset.reset", "earliest");//enable.auto.commit 的默认值是 true;就是默认采用自动提交的机制。properties.setProperty("enable.auto.commit", "false");//如果FlinkKafkaConsumer没有开启checkpoint功能,为了不重复读取//这种方式无法实现Exactly-Once(只执行一次)FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer("test_topic", new SimpleStringSchema(), properties);DataStreamSource<String> lines = environment.addSource(flinkKafkaConsumer);SingleOutputStreamOperator<Tuple2<String, Long>> sum = lines.flatMap((String line, Collector<Tuple2<String, Long>> collector) -> {List<Term> terms = ToAnalysis.parse(line).getTerms();terms.forEach(item -> {collector.collect(new Tuple2<>(item.getName(), 1L));});}).returns(Types.TUPLE(Types.STRING, Types.LONG)).keyBy(0).sum(1);sum.print();environment.execute("word-coun-kafka");}
任务提交
提交有两种方式
web-ui界面
访问部署服务器 ip:8081
点击 Submit new Job
点击Add new
编辑Entry class与Parallelism等
- Entry class 为入口类 即为上文中的运行
main()
函数的类的全限定名
- Entry class 为入口类 即为上文中的运行
点击Submit
点击Jobs -> Running Jobs 查看
命令行
如果要把job提交到jobmanager,应该在jobmanager服务器上提交
flink 安装与部署
Flink的安装和部署主要分为本地模式和集群模式,其中本地模式只需直接解压就可以使用,不以修改任何参数,一般在做一些简单测试的时候使用。
集群模式包含Standalone、Flink on Yarn等模式,适合在生产环境下面使用,且需要修改对应的配置 参数。
flink 下载
## 官方版本(可能下载速度慢)
curl -O https://dlcdn.apache.org/flink/flink-1.15.2/flink-1.15.2-bin-scala_2.12.tgz
## 腾讯云镜像(推荐,国内速度快)
curl -O http://mirrors.cloud.tencent.com/apache/flink/flink-1.15.2/flink-1.15.2-bin-scala_2.12.tgz
下载完成解压,解压后目录如下
CentOS/Kernel环境
系统环境
以下均基于 Kernel
- CentOS Linux release 7.9.2009 (Core)
- Linux version 3.10.0-1160.el7.x86_64
- gcc version 4.8.5 20150623 (Red Hat 4.8.5-44) (GCC)
- open-jdk 11
- 大部分过程中使用root用户。请在生产环境或特殊环境注意用户切换。本文不在linux用户做过多赘述。
本地模式
自己是jobmanager也是taskmanager(会话模式)
配置文件详解
- 修改
conf/flink-conf.yaml
cd conf vim flink-conf.yaml
# 此处修改集群时需要修改 jobmanager.rpc.address: localhost # 默认1623 jobmanager.rpc.port: 6123 # 任务管理默认 jobmanager.memory.process.size: 1600m taskmanager.memory.process.size: 1728m # 任务槽 资源(并行执行 相当于 组) taskmanager.numberOfTaskSlots: 1 # 默认并行度 parallelism.default: 1 # web界面默认端口 需要修改时 解开注释 #rest.port: 8081
master
当前jobmanager(默认localhost)以及webui端口(默认8081)
works
单节点启动默认这里面没有东西
- 修改
启动脚本
# 进入flink bin目录 cd bin # 单节点集群启动 ./start-cluster.sh
访问服务器ip加8081(默认)
停止服务
# 进入flink bin目录 cd bin # 单节点集群启动 ./stop-cluster.sh
集群
至少需要三台服务器。一台jobmanager
,两台taskmanager
,三台服务器之间需要配置免密登录,这里为了方便,我修改了hosts文件,三台服务器分别为
master
、slave0
、slave1
。(会话模式)
修改hosts(ip地址 主机名/域名 (主机别名))
自己的服务器IP-1 master 自己的服务器IP-2 slave0 自己的服务器IP-3 slave1
使配置文件生效请参考 CentOS修改hosts
服务器之间免密登录
请自行百度/google(master 最好也将自身产生的秘钥导入自身,不导也可以会导致每次启动flink需要输入本机密码)
修改配置文件
master 服务器
flink-conf.yaml
# 用于节点间通信 jobmanager.rpc.address: 0.0.0.0
master
master:8081
works
# 另外两台机器 slave0 slave1
slave0 服务器
flink-conf.yaml
jobmanager.rpc.address: master # 不改此处 集群运行后 solt为0 jobmanager.bind-host: 0.0.0.0
master
master:8081
works
slave0 slave1
slave1 服务器
flink-conf.yaml
jobmanager.rpc.address: master # 不改此处 集群运行后 solt为0 jobmanager.bind-host: 0.0.0.0
master
master:8081
works
slave0 slave1
修改环境变量
master/slave0/slave1 分别执行以下操作(因文件都是由master分发,所以目录位置应都一致,当然可自行修改)
## 修改环境变量 vim /etc/profile## 新增以下内容export FLINK_HOME=/software/flink-cluster/flink/ export PATH=$PATH:$FLINK_HOME/bin## 使环境变量生效 source /etc/profile
运行集群
在
master
bin目录下执行,看到以下几截图后集群启动成功,即可访问webUI界面./start-cluster.sh
且执行
jps
命令后且
slave0
与slave1
执行jps
后web ui 界面
jdk安装(多版本切换)
## 下载openjdkjdk
curl -O https://download.java.net/openjdk/jdk11/ri/openjdk-11+28_linux-x64_bin.tar.gz## 解压
tar zxf openjdk-11+28_linux-x64_bin.tar.gz## 添加jdk11 /opt/openjdk11/jdk-11/ 应为压缩包实际解压路径
sudo update-alternatives --install /usr/bin/java java /home/flink/opt/jdk-11/bin/java 1## 添加jdk11 /opt/openjdk11/jdk-11/ 应为压缩包实际解压路径
sudo update-alternatives --install /usr/bin/javac javac /home/flink/opt/jdk-11/bin/javac 1## 切换
sudo update-alternatives --config java
sudo update-alternatives --config javac
docker for windows
windows 10 专业版 21H2 WSL2
下载docker-desktop docker 历史版本
运行Docker Desktop Installer.exe
参考链接
用户提权
su
chmod -v u+w /etc/sudoers
vim /etc/sudoersroot ALL=(ALL) ALLchmod -v u-w /etc/sudoersexit
centos镜像
阿里云centos镜像
北京外国语大学开源镜像
Vmware
VMWARE
VMWARE 秘钥以及安装
VMWARE TOOLS
其他参考
Storm入门 3
Flink从入门到入土(详细教程)
JDK11下载界面
flink下载界面
flink官方安装教程
flink-streaming-platform-web
flink国内镜像 腾讯云
flink-kafka
flink 入门(一)相关推荐
- 2021年大数据Flink(八):Flink入门案例
目录 Flink入门案例 前置说明 API 编程模型 准备工程 pom文件 log4j.properties Flink初体验 需求 编码步骤 代码实现 Flink入门案例 前置说明 API API ...
- flink入门案例之WordCount
flink入门案例之WordCount,以下测试代码都是在本地执行的 添加依赖 添加maven依赖 <dependencies><dependency><groupId& ...
- Flink入门——DataSet Api编程指南
简介: Flink入门--DataSet Api编程指南 Apache Flink 是一个兼顾高吞吐.低延迟.高性能的分布式处理框架.在实时计算崛起的今天,Flink正在飞速发展.由于性能的优势和兼顾 ...
- Flink入门(一)(Java和scala)
Flink批处理 从文件中读取单词,计算频次 Scala版本 import org.apache.flink.api.scala.ExecutionEnvironmentobject BatchWor ...
- Flink入门技术分享PPT之一
今天为小伙伴们做了Flink入门的技术分享,把做的PPT贴在下面当做今日份吧. 多图预警~ 紫薯布丁紫薯布丁紫薯布丁紫薯布丁紫薯布丁紫薯布丁紫薯布丁紫薯布丁紫薯布丁紫薯布丁紫薯布丁紫薯布丁紫薯布丁紫薯 ...
- Flink教程(04)- Flink入门案例
文章目录 01 引言 02 开发前准备 2.1 API 2.2 编程模型 03 入门案例 3.1 项目搭建 3.2 代码实现 3.2.1 基于DataSet 3.2.2 基于DataStream 3. ...
- flink入门_flink简单学习_flink初识
时刻记住自己要成为什么样的人.--你 flink入门基础 1.项目前提:设置maven[配置pom.xml文件] <dependencies><dependency><g ...
- flink入门_阿里巴巴为何选择Flink?20年大佬分11章讲解Flink从入门到实践!
前言 Apache Flink 是德国柏林工业大学的几个博士生和研究生从学校开始做起来的项目,之前叫做 Stratosphere.他们在2014 年开源了这个项目,起名为 Flink. Apache ...
- flink入门_Flink入门:读取Kafka实时数据流,实现WordCount
本文主要介绍Flink接收一个Kafka文本数据流,进行WordCount词频统计,然后输出到标准输出上.通过本文你可以了解如何编写和运行Flink程序. 代码拆解 首先要设置Flink的执行环境: ...
最新文章
- Application Installation Failed
- 【SQL】IS NULL and = NULL 在 sql server 中的区别
- 第十五届全国大学生智能车竞赛车模技术检查表格
- 计算网络经典书籍--计算机网络:自顶向下方法
- 一个C/C++程序从编译到最终生成可执行文件的全过程分析
- Android studio 混淆打包 proguard-rules.pro 与 bulid.gradle 配置总结
- 方法的重写-扩展父类方法,super对象调用父类方法
- 修改wordpress上传文件大小限制
- Cmake安装遇到问题
- SpringMVC中controller的跳转
- 半波对称振子方向图_画好服装款式图的五个要点
- 从Unix开源开发学习应对大型复杂项目开发
- mysql unsigend_创建表 查询数据
- ENVI国产卫星插件
- 晋南讲堂之持久层框架ORM简介
- 互补滤波算法及理论推导
- 《SteamVR2.2.0快速入门》(Yanlz+Unity+XR+OpenVR+OpenXR+SteamVR+Valve+Vive+Oculus+Quickstart+HMD+立钻哥哥++ok++)
- 2022年备考[嵌入式系统设计师]你准备好了吗?
- 他把科学,放进几代人的中二梦
- TODA SMT上料防错系统