之前我们都是基于Idea在本地进行开发,这种方式很适合开发以及测试,但是开发完之后,如何提交到服务器中运行?

Flink单机部署方式

本地开发和测试过程中非常有用,只要把代码放到服务器直接运行。

前置条件

jdk8
maven3
下载解压Flink,这里直接下载源码编译,直接从github上下载源码https://github.com/apache/flink/releases 选择1.8.1
然后解压到本地,解压后的文件夹如下:

编译

接下来我们需要编译这段源码。

mvn clean install -DskipTests -Dfast -Pvendor-repos -Dhadoop.version=2.6.0-cdh5.15.1

第一次编译需要花费很长时间,因为需要从网上下载一些依赖文件。

编译结果

编译成功后,会在当前目录下产生flink-dist/target/flink-1.8.1-bin文件夹,这个文件夹就是我们所需要的东西,把这个文件夹拷贝到服务器中就可以进行部署了。

单机模式部署

将flink-1.8.1-bin文件夹拷贝到服务器中,服务器中的目录如下:

启动服务


当前是stand-alone模式,输入jps可以查看:

可以在UI上面看到,默认ip是8081:

使用Flink跑一个WordCount程序

使用socket方式读入数据,然后统计数据wordcount
在服务器中开启socket:nc -lk 9999
然后运行命令:

./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9999


在WEB界面中可以查看任务:

测试数据

在nc终端输入数据(使用tab分割):

查看运行结果

在flink-1.8.1/log路径下,有一个flink--taskexecutor-.out文件,就是输出结果文件。

停止任务

可以在WEB界面上点击Running JOB,然后点击任务:

点击Cancel就可以取消这个任务了。这时,取消成功之后,可用slot就变为1了:

停止集群

使用命令./bin/stop-cluster.sh就可以了

Flink分布式standalone部署方式

我们在conf/flink-conf.yaml文件中配置主节点(jobmanager.rpc.address)的ip.
在conf/slaves文件中配置从节点(taskmanager)的ip

常用配置

jobmanager.rpc.address参数用来指向master节点的地址
jobmanager.heap.size 表示jobmanager节点可用的内存
taskmanager.heap.size表示taskmanager节点可用的内存
taskmanager.numberOfTaskSlots 每一个机器可用的CPU个数,决定了并行度
paraparallelism.default 表示任务的并行度 可以在代码层面覆盖
taskmanager.tmp.dirs taskmanager的临时数据存储目录

修改配置文件

修改conf/flink-conf.yaml文件:
jobmanager.rpc.address: swarm-manager(修改为本服务器的主机名)
修改conf/slaves文件:swarm-manager
使用命令./bin/start-cluster.sh启动集群,可以正常启动:

修改配置:
taskmanager.heap.size: 2048m
taskmanager.numberOfTaskSlots: 2
再次启动集群:

可以看到配置已经生效了。
这时再次使用命令nc -lk 9999 然后在另一个终端运行./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9999 开启一个任务。这时查看页面:

可以看到之前两个slots,现在已经用了一个,因为当前的并行度是1。

扩展和容错

现在我们输入jps命令时:

我们现在把TaskManagerRunner给Kill掉,然后在查看:

这时在查看Web页面:

可以看到都变成0了。
这时再启动taskmanager.sh

再次查看:

yarn方式部署

搭建Hadoop

下载Hadoop 2.6.0 解压,配置环境变量:

修改配置文件etc/hadoop/hadoop-env.sh ,配置java home export JAVA_HOME=/usr/jdk1.8.0_101
修改配置文件etc/hadoop/core-site.xml,配置:

<configuration><property><name>fs.defaultFS</name><value>hdfs://swarm-manager:9000</value></property>
</configuration>

修改配置文件etc/hadoop/hdfs-site.xml,配置:

<configuration><property><name>dfs.replication</name><value>1</value></property><property><name>hadoop.tmp.dir</name><value>/home/iie4bu/app/tmp</value></property>
</configuration>

修改配置文件slave 内容:swarm-manager
接下来配置yarn-site.xml

修改mapred-site.xml.template文件名为:mapred-site.xml ,内容如下:

<configuration><property><name>yarn.nodemanager.aux-services</name><value>mapreduce_shuffle</value></property>
</configuration>

格式化hdfs,使用命令~/app/hadoop-2.6.0/bin$ ./hdfs namenode -format
启动hadoop,使用命令~/app/hadoop-2.6.0/sbin$ ./start-all.sh
使用jps查看启动状态:

使用WEB页面进行查看
首先查看hdfs:http://192.168.170.170:50070


然后查看yarn:http://192.168.170.170:8088

测试一下

测试HDFS :

没有问题!

Flink On Yarn

方式一:Flink初始化的时候就申请一个资源,并且作为一个服务常驻在yarn中。Flink 的job共用一个yarn session。资源不够用时,出现排队情况。
从flink1.8之后hadoop相关jar包需要额外下载:

将下载后的flink-shaded-hadoop-2-uber-2.6.5-7.0.jar文件放到 flink-1.8.1/lib目录下,然后可以输入 ./bin/yarn-session.sh --help
./bin/yarn-session.sh参数:

-n  taskmanager的数量
-jm jobmanager的内存
-tm taskmanager的内存

方式二: 每次提交Flink job都申请一个yarn session,用完之后释放。任务之间不相互影响,任务之间是相互独立的。

使用方式一

启动一个常服务./bin/yarn-session.sh -n 1 -jm 1024m -tm 4096m表示启动job manager是1GB的堆内存,task manager是4GB的堆内存。
使用这个命令之后,打开yarn页面,http://192.168.170.170:8088/cluster

使用方式二:./bin/flink run -m yarn-cluster -p 4 -yjm 1024m -ytm 4096m ./examples/batch/WordCount.jar

Apache Flink 零基础入门(二十)Flink部署与作业的提交相关推荐

  1. Apache Flink 零基础入门(十二)Flink sink

    将DataSet中的数据Sink到哪里去.使用的是对应的OutPutFormat,也可以使用自定义的sink,有可能写到hbase中,hdfs中. writeAsText() / TextOutput ...

  2. Apache Flink 零基础入门(十五)Flink DataStream编程(如何自定义DataSource)

    数据源可以通过StreamExecutionEnvironment.addSource(sourceFunction)方式来创建,Flink也提供了一些内置的数据源方便使用,例如readTextFil ...

  3. Apache Flink 零基础入门(十九)Flink windows和Time操作

    Time类型 在Flink中常用的Time类型: 处理时间 摄取时间 事件时间 处理时间 是上图中,最后一步的处理时间,表示服务器中执行相关操作的处理时间.例如一些算子操作时间,在服务器上面的时间. ...

  4. Apache Flink 零基础入门(十八)Flink Table APISQL

    什么是Flink关系型API? 虽然Flink已经支持了DataSet和DataStream API,但是有没有一种更好的方式去编程,而不用关心具体的API实现?不需要去了解Java和Scala的具体 ...

  5. Apache Flink 零基础入门(十四)Flink 分布式缓存

    Apache Flink 提供了一个分布式缓存,类似于Hadoop,用户可以并行获取数据. 通过注册一个文件或者文件夹到本地或者远程HDFS等,在getExecutionEnvironment中指定一 ...

  6. Apache Flink 零基础入门(十六)Flink DataStream transformation

    Operators transform one or more DataStreams into a new DataStream. Operators操作转换一个或多个DataStream到一个新的 ...

  7. Apache Flink 零基础入门【转】

    Apache Flink 零基础入门(一):基础概念解析 Apache Flink 零基础入门(二):DataStream API 编程 转载于:https://www.cnblogs.com/dav ...

  8. 罗马音平假名中文可复制_日语零基础入门五十音,日语零基础五十音图表

    日语零基础入门五十音,日语入门的最基本要求就是记住五十音图,但是这个记住不仅是你能背下来或是默写下来.而是你需对号入座! 下面是一张五十音图表. 即每个假名单独拿出来你要立马反应出来怎么读.其重要性甚 ...

  9. Apache Flink 零基础入门(一):基础概念解析

    Apache Flink 的定义.架构及原理 Apache Flink 是一个分布式大数据处理引擎,可对有限数据流和无限数据流进行有状态或无状态的计算,能够部署在各种集群环境,对各种规模大小的数据进行 ...

最新文章

  1. 【数学与算法】凸函数、凸集、凸函数的一二阶数学解释
  2. [机器学习收藏] TensorFlow初学者必须了解的55个经典案例
  3. SDUTOJ2828_字典树
  4. python - 定时清理ES 索引
  5. 企业需求的Java程序员是什么样子的
  6. python max((1、2、3)*2)_Python functional.max_pool2d方法代码示例
  7. 彻底解决Spring MVC 中文乱码 问题
  8. 软件架构自学笔记---架构分析
  9. CI -持续集成及相关概念
  10. CentOS 系统sudo命令配置
  11. I will have to learn more skills now , now
  12. ArcGIS 泛克里金插值
  13. 第一个应用成功上架了-武林世界
  14. Latex公式空格输入
  15. 【记录】数控程序的指令代码---标准G代码与标准M代码
  16. pure-ftpd服务器搭建
  17. 无座火车票为什么不能半价?
  18. php修改qq举报按钮,自定义修改QQ在线状态
  19. 自动关闭MessageBox
  20. [生存志] 第53节 晏子春秋录纯臣

热门文章

  1. Python机器学习算法 — 逻辑回归(Logistic Regression)
  2. 大厂测试开发常见面试题收集(python,java,性能等)
  3. PHP is_null,empty以及isset,unset的区别
  4. PHP操作redis
  5. 安装 MongoDB PHP 驱动 在CentOS 6.x和遇到的问题
  6. python 好用的库存尾货女装_女装店主:做尾货有人能赚大钱,新手千万别碰,文茵告诉你原因...
  7. python替换缺失值_python 缺失值处理(Imputation)
  8. php 表单跳转,html - 思路问题:php表单跳转
  9. macappstore登不上去_Mac 链接不上AppStore的解决方法
  10. angularjs 读取mysql_如何使用AngularJS PHP从MySQL获取数据