一: Spark任务提交至YARN运行的3种方式

Spark作为新一代计算平台的闪亮明星,在我们的大数据平台中具有非常重要的作用,SQL查询、流计算和机器学习等场景都能见到它的身影,可以说平台应用的数据处理、计算和挖掘等场景都可以使用Spark进行开发。在默认的情况下,如果想向Spark提交计算任务,通常会使用Spark提供的Spark-Submit脚本来提交含有业务逻辑的jar文件程序。这种方式虽然简单,但有悖于服务化的设计理念,所以需要为Spark提供一套任务管理的RESTful服务
在大数据平台中,Spark是以Spark on YARN的方式运行的,在这种模式下,整个集群的资源调度是由YARN统一控制的Spark只是作为运行在YARN上的一个应用客户端而存在。本文将介绍提交Spark任务至YARN运行的3种方式。如下图所示:

1. 第一种方式:使用Spark-Submit脚本提交
Spark本身提供了Spark-Submit脚本用于提交任务,可以借助Java的Process-Builder调用脚本,将其包装成RESTful服务。
 ./spark-submit --class com.learn.spark.SimpleApp --master yarn --deploy-mode client --driver-memory 2g --executor-memory 2g --executor-cores 3 ../spark-demo.jar
2. 第二种方式:使用Spark Client提交
 除了Spark-Submit脚本之外,Spark还提供了一套Java客户端接口用于提交任务。在使用这套接口之后,程序就可以去掉对Spark-Submit脚本的依赖,这样一来提交任务的服务程序就可以运行在应用服务器之上,使得以远程的方式向集群提交任务成为可能。提交方式是已JAVA API编程的方式提交,这种方式不需要使用命令行,直接可以在IDEA中点击Run 运行包含Job的Main类就行。Spark 提供了以SparkLanuncher 作为唯一入口的API来实现。这种方式很方便(试想如果某个任务需要重复执行,但是又不会写linux 脚本怎么搞?我想到的是以JAVA API的方式提交Job, 还可以和Spring整合,让应用在tomcat中运行),官网的示例:[官网示例链接](http://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/launcher/package-summary.html)

根据官网的示例,通过JAVA API编程的方式提交有两种方式:
第一种是调用SparkLanuncher实例的startApplication方法,但是这种方式在所有配置都正确的情况下使用运行都会失败的,原因是startApplication方法会调用LauncherServer启动一个进程与集群交互,这个操作貌似是异步的,所以可能结果是main主线程结束了这个进程都没有起起来,导致运行失败。解决办法是调用new SparkLanuncher().startApplication后需要让主线程休眠一定的时间后者是使用下面的例子:

package com.learn.spark; import org.apache.spark.launcher.SparkAppHandle;
import org.apache.spark.launcher.SparkLauncher; import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch; public class LanuncherAppV { public static void main(String[] args) throws IOException, InterruptedException { HashMap env = new HashMap(); //这两个属性必须设置 env.put("HADOOP_CONF_DIR", "/usr/local/hadoop/etc/overriterHaoopConf"); env.put("JAVA_HOME", "/usr/local/java/jdk1.8.0_151"); //可以不设置 //env.put("YARN_CONF_DIR",""); CountDownLatch countDownLatch = new CountDownLatch(1); //这里调用setJavaHome()方法后,JAVA_HOME is not set 错误依然存在 SparkAppHandle handle = new SparkLauncher(env) .setSparkHome("/usr/local/spark") .setAppResource("/usr/local/spark/spark-demo.jar") .setMainClass("com.learn.spark.SimpleApp") .setMaster("yarn") .setDeployMode("cluster") .setConf("spark.app.id", "11222") .setConf("spark.driver.memory", "2g") .setConf("spark.akka.frameSize", "200") .setConf("spark.executor.memory", "1g") .setConf("spark.executor.instances", "32") .setConf("spark.executor.cores", "3") .setConf("spark.default.parallelism", "10") .setConf("spark.driver.allowMultipleContexts", "true") .setVerbose(true).startApplication(new SparkAppHandle.Listener() { //这里监听任务状态,当任务结束时(不管是什么原因结束),isFinal()方法会返回true,否则返回false @Override public void stateChanged(SparkAppHandle sparkAppHandle) { if (sparkAppHandle.getState().isFinal()) { countDownLatch.countDown(); } System.out.println("state:" + sparkAppHandle.getState().toString()); } @Override public void infoChanged(SparkAppHandle sparkAppHandle) { System.out.println("Info:" + sparkAppHandle.getState().toString()); } }); System.out.println("The task is executing, please wait ...."); //线程等待任务结束 countDownLatch.await(); System.out.println("The task is finished!"); }
}

注意:如果部署模式是cluster,但是代码中有标准输出的话将看不到,需要把结果写到HDFS中,如果是client模式则可以看到输出。

第二种方式是:通过SparkLanuncher.lanunch()方法获取一个进程,然后调用进程的process.waitFor()方法等待线程返回结果,但是使用这种方式需要自己管理运行过程中的输出信息,比较麻烦,好处是一切都在掌握之中,即获取的输出信息和通过命令提交的方式一样,很详细,实现如下:

package com.learn.spark; import org.apache.spark.launcher.SparkAppHandle;
import org.apache.spark.launcher.SparkLauncher; import java.io.IOException;
import java.util.HashMap; public class LauncherApp { public static void main(String[] args) throws IOException, InterruptedException { HashMap env = new HashMap(); //这两个属性必须设置 env.put("HADOOP_CONF_DIR","/usr/local/hadoop/etc/overriterHaoopConf"); env.put("JAVA_HOME","/usr/local/java/jdk1.8.0_151"); //env.put("YARN_CONF_DIR",""); SparkLauncher handle = new SparkLauncher(env) .setSparkHome("/usr/local/spark") .setAppResource("/usr/local/spark/spark-demo.jar") .setMainClass("com.learn.spark.SimpleApp") .setMaster("yarn") .setDeployMode("cluster") .setConf("spark.app.id", "11222") .setConf("spark.driver.memory", "2g") .setConf("spark.akka.frameSize", "200") .setConf("spark.executor.memory", "1g") .setConf("spark.executor.instances", "32") .setConf("spark.executor.cores", "3") .setConf("spark.default.parallelism", "10") .setConf("spark.driver.allowMultipleContexts","true") .setVerbose(true); Process process =handle.launch(); InputStreamReaderRunnable inputStreamReaderRunnable = new InputStreamReaderRunnable(process.getInputStream(), "input"); Thread inputThread = new Thread(inputStreamReaderRunnable, "LogStreamReader input"); inputThread.start(); InputStreamReaderRunnable errorStreamReaderRunnable = new InputStreamReaderRunnable(process.getErrorStream(), "error"); Thread errorThread = new Thread(errorStreamReaderRunnable, "LogStreamReader error"); errorThread.start(); System.out.println("Waiting for finish..."); int exitCode = process.waitFor(); System.out.println("Finished! Exit code:" + exitCode); }
}
package com.learn.spark; import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader; public class InputStreamReaderRunnable implements Runnable { private BufferedReader reader; private String name; public InputStreamReaderRunnable(InputStream is, String name) { this.reader = new BufferedReader(new InputStreamReader(is)); this.name = name; } public void run() {System.out.println("InputStream " + name + ":"); try { String line = reader.readLine(); while (line != null) { System.out.println(line); line = reader.readLine(); } reader.close(); } catch (IOException e) { e.printStackTrace(); } }
}
3. 第三种方式:使用YARN RESTful API提交
除了Spark-Submit和SparkClient两种方法之外,还可以通过YARN提供的RESTful API向其提交Spark任务。但是这种方法十分复杂,并不推荐使用。

Post请求示例: * http:///ws/v1/cluster/apps

官网介绍地址
既然是api,推荐使用postman或者idea,本文使用postman

0.准备MR的jar包并上传到hdfs上

hdfs dfs -put xifan227-1.0-SNAPSHOT.jar /cpos/test/
3.1.生成新的applicationId

请求类型:POST
URL:http://rm-http-address:port/ws/v1/cluster/apps/new-application

3.2.查看MR的jar包信息(查看modificationTime)

请求类型:GET
URL:http://nm-http-address:port/webhdfs/v1/cpos/test/xifan227-1.0-SNAPSHOT.jar?op=GETFILESTATUS

3.3.提交应用 (注意timestamp)

Cluster Applications API(Submit Application)
请求类型:POST
URL:http://rm-http-address:port/ws/v1/cluster/apps
Body:

{"application-id":"application_1586340793234_0125","application-name":"AaronTest","queue":"default","priority":-2,"am-container-spec":{"local-resources":{"entry":[{"key":"xifan227-1.0-SNAPSHOT","value":{"resource":"hdfs://test96.com:8020/cpos/test/xifan227-1.0-SNAPSHOT.jar","type":"FILE","visibility":"APPLICATION","size": 30301,"timestamp": 1586400852288}}]},"commands":{"command":"/opt/modules/hadoop-3.2.1/bin/yarn jar xifan227-1.0-SNAPSHOT mr.WordcountMapreduce 1><LOG_DIR>/WC.stdout 2><LOG_DIR>/WC.stderr"}},"max-app-attempts":1,"resource":{"memory":1024, "vCores":2},"application-type":"MAPREDUCE"
}

4.结果展示

二: Mapreduce任务提交的几种方式

1.问题来源

为什么我们在windows下用编程集成工具或者linux下用编程集成工具来run我们编写的mr程序的时候是在本地jvm中运行,而我们在linux上用命令行:hadoop jar 提交我们的jar文件的时候就提交到集群去运行了呢?
具体原因是:如果是RunJar中包含了和RM通信的rpc客户端就是把jar包提交到集群运行,如果RunJar中包含了和本地机器通讯的客户端的话,就是把jar包提交到本地的jvm中运行。这些都是根据配置文件来配置的。
Configurationconf = new Configuration().是由这个来决定的。如果你conf里面设定了mapreduce.framework.name的value为yarn的话,它就会初始化一个持有和yarn通信的rpc客户端,如果没有设置,则会初始化一个和本地通信的客户端。

2.提交模式分类

本地提交模式
① 在windows或者linux上访问的本地文件系统上的文件,生成的结果也是在本地文件系统上面
这种模式的启动模式是本地启动,并没有将程序上传到集群里面去。
② 在windows或者linux上访问的文件是hdfs上的文件,生成的结果也是输出到hdfs上面。
这种模式虽然读取的是hdfs上的文件,但是也没有将程序上传到集群去执行,最终还是一种本地启动模式。

这个staging的资源文件是在本地文件系统上面,而非是在hdfs上面,所以还是一种本地启动模式。
集群提交模式
③ 把程序打成jar包,上传到服务器用hadoop命令提交 Hadoop jar jar的名字 jar的main方法路径,这样是一种集群提交模式
④ 在linux的eclipse中运行程序,当把配置文件拷进到程序的classpath下面之后(主要是mapred-site.xml文件和yarn-site.xml) ,这也是一种集群提交模式。
先读到mapred-site.xml中的mapreduce.framework.name值,初始化一个和RM通信的rpc客户端,然后将任务提交到RM去。

此时执行程序的时候会报错

说是找不到mapper类,因为在eclipse中没有打jar包,而且没有指定jar在哪,所以找不到。
可以在配置文件中设置这个属性,让程序执行的时候可以找到jar文件。

conf.set("mapreduce.framework.name","wordcount.jar")

然后将jar包打在工程工程目录下就行了。
⑤ 在windows的eclipse中直接运行main方法,配置项和linux中利用eclipse运行main方法差不多,也是一种集群提交模式,但是由于平台不兼容,需要做很多的设置修改,网上也有许多资料介绍怎么修改,可以自行查阅。但是不推荐这种模式。

Spark任务提交至YARN运行的3种方式与Mapreduce提交任务的几种形式相关推荐

  1. 两种方式设置SVN提交代码时必须填写日志

    两种方式设置SVN提交代码时必须填写日志 咱们在使用SVN的时候,团队中难免有同事提交代码时忘记填写日志而直接提交,这样会导致后期维护极不方便,这并不是我们想看到的.于是下面给出两种方式来解决这个问题 ...

  2. form表单提交数据的两种方式——submit直接提交、AJAX提交

    submit提交 form表单本身提供action属性,在action属性中填写数据提交地址后,点击submit类型的按钮即可将数据提交至指定地址,代码如下: <form action=&quo ...

  3. 运行python程序的两种方式交互式和文件式_执行Python程序的两种方式

    交互式(了解) 交互式环境下,敲完一条命令按下enter键马上能看到结果,调试程序方便.程序无法永久保存,关掉cmd窗口数据就消失了. 命令行式(了解) 打开文本编辑器,在文本编辑器中写入一串字符. ...

  4. git 怎么提交忽略文件夹_git 设置忽略文件提交的几种方式

    在使用git进行项目管理的时候,有时候一些安装包之类,或者自己本地项目使用的一些编译文件,在不需要提交到远程仓库时,可以通过以下几种方式设置忽略提交,包括文件夹和单个文件.之前自己项目里面采用了第二种 ...

  5. 安卓下使用GET方式向服务器端提交数据

    从客户端提交数据给服务器端: 使用get方式向服务器端提交数据,把参数组拼到了url地址的后面: http://192.168.0.10:8080/web/servlet/LoginServlet ? ...

  6. CGI提交表单的两种方式POST与GET

    Http定义了与服务器交互的不同方法,最基本的方法有4种,分别是GET,POST,PUT,DELETE.URL全称是资源描述符,我们可以这样认为:一个URL地址,它用于描述一个网络上的资源,而HTTP ...

  7. Spark Standalone -- 独立集群模式、Spark 提交任务的两种模式、spark在yarn上运行的环境搭建、自己写的spark代码如何提交到yarn上并运行...

    目录 Spark Standalone -- 独立集群模式 Standalone 架构图 Standalone 的搭建 1.上传.解压.重命名 2.配置环境变量 3.修改配置文件 conf 4.同步到 ...

  8. spark提交到yarn_详细总结spark基于standalone、yarn集群提交作业流程

    最近总结了一些关于spark core的内容,今天先来和大家分享一下spark的运行模式. spark运行模式 (1)local:在本地eclipse.IDEA中写spark代码运行程序,一般用于测试 ...

  9. Spark On Yarn 运行项目

    在spark中,支持4中运行模式: Local:往往使用本地开发的时候使用. StandAlone:是spark自带的,如果一个集群是StandAlone模式的话,那么就需要在多台机器上同时部署Spa ...

最新文章

  1. RocketMQ--生产者与消费者的简单示例
  2. 线程安全的signals
  3. linux中利用脚本编写数组,shell脚本编程之数组
  4. ElasticSearch插件demo
  5. python中的 if __name__ == '__main_'的作用和原理
  6. 微信android 流畅,【黑科技】微信只需这样操作,立刻提升流畅度和使用效率
  7. pv原语模拟实现_并发编程信号量的使用方法和其实现原理
  8. P1464 Function
  9. ZooKeeper Web UI -- Shovel
  10. 离散数学耿素云计算机,离散数学,屈婉玲,耿素云,张立昂编著_考研教材_考试点...
  11. Win10双网卡上网冲突(内网、外网)
  12. js中的splice方法使用,删除数组中的最大最小值
  13. Three.js图像波动特效
  14. ue5-预计算可视性体积(PVS)
  15. android获取其他app布局,关于android:如何从App的(布局)XML变量中获取Manifest版本号?...
  16. Python基于Django航空飞机票预定网站设计
  17. 笔记本电脑没有鼠标怎么右键_鼠标右键失灵怎么办,你知道原因吗?
  18. 【EasyUI篇】一整套EasyUI示例集锦
  19. MySQL的锁机制 - 记录锁、间隙锁、临键锁
  20. 亚马逊运营最常见家50问(上)

热门文章

  1. 云栖大会首设“科技脱贫”专场 ,20张会场门票等你来拿!
  2. 金蝶EAS应用之登录方式介绍
  3. 管理计算机的应用建立账套,用友软件如何新建账套?
  4. 女孩用自己的×××身来换取男友的健康
  5. 优化核心思想(1):分迭
  6. 冯诺依曼机核心由运算器转变为存储器的原因
  7. 图像特征提取中的一些不变形,平移不变性,旋转不变性 光照不变性
  8. 中小型机器人开发平台apollo的场景应用
  9. 代理arp功能的测试方法
  10. 优营占比双非 > 985,电子科大软工学院不容错过