如何在Java应用中提交Spark任务?
最近看到有几个Github友关注了Streaming的监控工程——Teddy,所以思来想去还是优化下代码,不能让别人看笑话啊。于是就想改一下之前觉得最丑陋的一个地方——任务提交。
本博客内容基于Spark2.2版本~在阅读文章并想实际操作前,请确保你有:
- 一台配置好Spark和yarn的服务器
- 支持正常
spark-submit --master yarn xxxx
的任务提交
老版本
老版本任务提交是基于 ** 启动本地进程,执行脚本spark-submit xxx
** 的方式做的。其中一个关键的问题就是获得提交Spark任务的Application-id,因为这个id是跟任务状态的跟踪有关系的。如果你的资源管理框架用的是yarn,应该知道每个运行的任务都有一个applicaiton_id,这个id的生成规则是:
appplication_时间戳_数字
老版本的spark通过修改SparkConf参数spark.app.id
就可以手动指定id,新版本的代码是直接读取的taskBackend中的applicationId()方法,这个方法具体的实现是根据实现类来定的。在yarn中,是通过Yarn的YarnClusterSchedulerBackend实现的,具体的实现逻辑可以参考对应的链接。
感兴趣的同学可以看一下,生成applicaiton_id的逻辑在hadoop-yarn工程的ContainerId中定义。
总结一句话就是,想要自定义id,甭想了!!!!
于是当时脑袋瓜不灵光的我,就想到那就等应用创建好了之后,直接写到数据库里面呗。怎么写呢?
- 我事先生成一个自定义的id,当做参数传递到spark应用里面;
- 等spark初始化后,就可以通过sparkContext取得对应的application_id以及url
- 然后再driver连接数据库,插入一条关联关系
新版本
还是归结于互联网时代的信息大爆炸,我看到群友的聊天,知道了SparkLauncer这个东西,调查后发现他可以基于Java代码自动提交Spark任务。SparkLauncher支持两种模式:
- new SparkLauncher().launch() 直接启动一个Process,效果跟以前一样
- new SparkLauncher().startApplicaiton(监听器) 返回一个SparkAppHandler,并(可选)传入一个监听器
当然是更倾向于第二种啦,因为好处很多:
- 自带输出重定向(Output,Error都有,支持写到文件里面),超级爽的功能
- 可以自定义监听器,当信息或者状态变更时,都能进行操作(对我没啥用)
- 返回的SparkAppHandler支持 暂停、停止、断连、获得AppId、获得State等多种功能,我就想要这个!!!!
一步一步,代码展示
首先创建一个最基本的Spark程序:
import org.apache.spark.sql.SparkSession;
import java.util.ArrayList;
import java.util.List;public class HelloWorld {public static void main(String[] args) throws InterruptedException {SparkSession spark = SparkSession.builder()//.master("yarn")//.appName("hello-wrold")//.config("spark.some.config.option", "some-value").getOrCreate();List<Person> persons = new ArrayList<>();persons.add(new Person("zhangsan", 22, "male"));persons.add(new Person("lisi", 25, "male"));persons.add(new Person("wangwu", 23, "female"));spark.createDataFrame(persons, Person.class).show(false);spark.close();}
}
然后创建SparkLauncher类:
import org.apache.spark.launcher.SparkAppHandle;
import org.apache.spark.launcher.SparkLauncher;import java.io.IOException;public class Launcher {public static void main(String[] args) throws IOException {SparkAppHandle handler = new SparkLauncher().setAppName("hello-world").setSparkHome(args[0]).setMaster(args[1]).setConf("spark.driver.memory", "2g").setConf("spark.executor.memory", "1g").setConf("spark.executor.cores", "3").setAppResource("/home/xinghailong/launcher/launcher_test.jar").setMainClass("HelloWorld").addAppArgs("I come from Launcher").setDeployMode("cluster").startApplication(new SparkAppHandle.Listener(){@Overridepublic void stateChanged(SparkAppHandle handle) {System.out.println("********** state changed **********");}@Overridepublic void infoChanged(SparkAppHandle handle) {System.out.println("********** info changed **********");}});while(!"FINISHED".equalsIgnoreCase(handler.getState().toString()) && !"FAILED".equalsIgnoreCase(handler.getState().toString())){System.out.println("id "+handler.getAppId());System.out.println("state "+handler.getState());try {Thread.sleep(10000);} catch (InterruptedException e) {e.printStackTrace();}}}
}
然后打包工程,打包过程可以参考之前的博客:
http://www.cnblogs.com/xing901022/p/7891867.html
打包完成后上传到部署Spark的服务器上。由于SparkLauncher所在的类引用了SparkLauncher,所以还需要把这个jar也上传到服务器上。
[xinghailong@hnode10 launcher]$ ls
launcher_test.jar spark-launcher_2.11-2.2.0.jar
[xinghailong@hnode10 launcher]$ pwd
/home/xinghailong/launcher
由于SparkLauncher需要指定SPARK_HOME,因此如果你的机器可以执行spark-submit,那么就看一下spark-submit里面,SPARK_HOME是在哪
[xinghailong@hnode10 launcher]$ which spark2-submit
/var/lib/hadoop-hdfs/bin/spark2-submit
最后几行就能看到:
export SPARK2_HOME=/var/lib/hadoop-hdfs/app/spark# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0exec "${SPARK2_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
综上,我们需要的是:
- 一个自定义的Jar,里面包含spark应用和SparkLauncher类
- 一个SparkLauncher的jar,spark-launcher_2.11-2.2.0.jar 版本根据你自己的来就行
- 一个当前目录的路径
- 一个SARK_HOME环境变量指定的目录
然后执行命令启动测试:
java -Djava.ext.dirs=/home/xinghailong/launcher -cp launcher_test.jar Launcher /var/lib/hadoop-hdfs/app/spark yarn
说明:
-Djava.ext.dirs
设置当前目录为java类加载的目录- 传入两个参数,一个是SPARK_HOME;一个是启动模式
观察删除发现成功启动运行了:
id null
state UNKNOWN
Mar 10, 2018 12:00:52 PM org.apache.spark.launcher.OutputRedirector redirect
INFO: 18/03/10 12:00:52 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
********** state changed **********
...省略一大堆拷贝jar的日志
********** info changed **********
********** state changed **********
Mar 10, 2018 12:00:55 PM org.apache.spark.launcher.OutputRedirector redirect
INFO: 18/03/10 12:00:55 INFO yarn.Client: Application report for application_1518263195995_37615 (state: ACCEPTED)
... 省略一堆重定向的日志
application_1518263195995_37615 (state: ACCEPTED)
id application_1518263195995_37615
state SUBMITTED
Mar 10, 2018 12:01:00 PM org.apache.spark.launcher.OutputRedirector redirect
INFO: 18/03/10 12:01:00 INFO yarn.Client: Application report for application_1518263195995_37615 (state: RUNNING)
********** state changed **********
... 省略一堆重定向的日志
INFO: user: hdfs
********** state changed **********
Mar 10, 2018 12:01:08 PM org.apache.spark.launcher.OutputRedirector redirect
INFO: 18/03/10 12:01:08 INFO util.ShutdownHookManager: Shutdown hook called
Mar 10, 2018 12:01:08 PM org.apache.spark.launcher.OutputRedirector redirect
INFO: 18/03/10 12:01:08 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-f07e0213-61fa-4710-90f5-2fd2030e0701
总结
这样就实现了基于Java应用提交Spark任务,并获得其Appliation_id和状态进行定位跟踪的需求了。
转载于:https://www.cnblogs.com/xing901022/p/8538713.html
如何在Java应用中提交Spark任务?相关推荐
- Android如何在java代码中设置margin
Android如何在java代码中设置margin,也就是组件与组件之间的间距. 代码中设置: LinearLayout.LayoutParams params = new LinearLayout. ...
- java 线程中创建线程_如何在Java 8中创建线程安全的ConcurrentHashSet?
java 线程中创建线程 在JDK 8之前,还没有办法在Java中创建大型的线程安全的ConcurrentHashSet. java.util.concurrent包甚至没有一个名为Concurren ...
- 如何在Java 8中创建线程安全的ConcurrentHashSet?
在JDK 8之前,还没有办法在Java中创建大型的线程安全的ConcurrentHashSet. java.util.concurrent包甚至没有一个名为ConcurrentHashSet的类,但是 ...
- 如何在Java 8中使用LocalDateTime格式化/解析日期-示例教程
Java项目中的常见任务之一是将日期格式化或解析为String,反之亦然. 解析日期表示您有一个表示日期的字符串,例如" 2017-08-3",并且要将其转换为表示Java中日期的 ...
- Java poi 列移动_如何在java xssf中移動列poi
如何將現有列數據和格式化到Apache POI中的下一列並將下一列移到右側.如何在java xssf中移動列poi 我試過了. 讓說我的代碼是這樣... XSSFCell oldCell = work ...
- 如何在Java项目中查找未使用/无效的代码
本文翻译自:How to find unused/dead code in java projects What tools do you use to find unused/dead code i ...
- jfinal 普通java工程_JFinal getModel方法如何在java项目中使用
JFinal getModel方法如何在java项目中使用 发布时间:2020-11-17 15:11:27 来源:亿速云 阅读:94 作者:Leah 今天就跟大家聊聊有关JFinal getMode ...
- mye连接mysql数据库_MySQL_如何在Java程序中访问mysql数据库中的数据并进行简单的操作,在上篇文章给大家介绍了Myeclip - phpStudy...
如何在Java程序中访问mysql数据库中的数据并进行简单的操作 在上篇文章给大家介绍了Myeclipse连接mysql数据库的方法,通过本文给大家介绍如何在Java程序中访问mysql数据库中的数据 ...
- html5如何提交到邮箱,如何在html网页中提交表单直接发送到邮箱
如何在html网页中提交表单直接发送到邮箱?此问题一直困扰了我很久很久,后来了解到jmail组件可以解决此问题,而且现在的付费空间基本都会有此组件.分别建立表单页和asp页面,提交表单后通过asp页面 ...
- 基于Java代码自动提交Spark任务
1.SparkLauncher简介 SparkLauncher支持两种模式: (1).new SparkLauncher().launch(),直接启动一个Process,效果跟Spark submi ...
最新文章
- Cobbler-自动化部署神器
- python 根据判断产生新列_pandas DataFrame 根据多列的值做判断,生成新的列值实例...
- C++ STL (四)set使用
- Docker selenium自动化 - 修改/dev/shm路径大小实例演示,“session deleted because of page crash“问题解决
- JavaScript案例三:动态显示时间
- calibre中的hcell_关于calibre的Hcell你知道多少?
- vbs获取cpu使用率
- qq机器人自动回复带脚本可以实现吗_python一个神奇的第三方库:QQ自动聊天
- asarray java,Java
- Python中使用代码将后缀名doc文件改为docx
- imread函数_MATLAB图像处理:27:使用imtranslate函数平移图像
- 用最简单的操作,做最精准的AI模型!
- 测试-关于Unity获取子层级内容的几种接口(Transform FindChild, Component GetComponentInChildren,...)...
- python 面向对象oop
- 使用jQuery.form插件,实现完美的表单异步提交
- qq android 哪个版本好用吗,Android QQ轻聊版好用吗?
- 五款堪称神奇的手机APP 一定不要错过了
- 攻城掠地服务器维护到几点,攻城掠地玩法全解秘
- 从信息传播角度来看链路预测
- muse-ui.css_Muse UI:适用于Vuejs 2.0的Material Design UI库
热门文章
- 全网首发:为什么依赖库编译时加了-fPIC,还是提示依赖库要使用-fPIC?
- 《TensorFlow深度学习应用实践》学习笔记1
- 银河麒麟双击deb包无法安装
- Ubuntu16.04安装VirtualBox及无法启动的解决办法
- 指向Member Function的指针
- c++ static静态变量、静态函数
- php 公用方法,Laravel配置全局公共函数的方法步骤
- rethat安装MySQL多例_SSM 使用 mybatis 分页插件 pagehepler 实现分页
- java canvas数组_java数组
- 数据上传需要什么硬件_搭建云服务器需要什么硬件配置