Spark2.2(三十九):如何根据appName监控spark任务,当任务不存在则启动(任务存在当超过多久没有活动状态则kill,等待下次启动)...
业务需求
实现一个根据spark任务的appName来监控任务是否存在,及任务是否卡死的监控。
1)给定一个appName,根据appName从yarn application -list中验证任务是否存在,不存在则调用spark-submit.sh脚本来启动任务;
2)如果任务存在yarn application -list中,则读取‘监控文件(监控文件内容包括:appId,最新活动时间)’,从监控文件中读取出最后活动的日期,计算当前日期与app的最后活动日期相差时间为X,如果X大于30minutes(认为任务处于假死状态[再发布环境发现有的任务DAG抛出OOM,导致app的executor和driver依然存在,当时不执行任务调度,程序卡死。具体错误详情请参考《https://issues.apache.org/jira/browse/SPARK-26452》]),则执行yarn application -kill appId(杀掉任务),等待下次监控脚本执行时重启任务。
监控实现
脚本
#/bin/sh #LANG=zh_CN.utf8 #export LANG export SPARK_KAFKA_VERSION=0.10 export LANG=zh_CN.UTF-8 # export env variable if [ -f ~/.bash_profile ]; thensource ~/.bash_profile fi source /etc/profilemyAppName='myBatchTopic' #这里指定需要监控的spark任务的appName,注意:这名字重复了会导致监控失败。 apps=''for app in `yarn application -list` doapps=${app},$apps done apps=${apps%?}if [[ $apps =~ $myAppName ]]; thenecho "appName($myAppName) exists in yarn application list"#1)运行 hadop fs -cat /目录/appName,读取其中最后更新日期;(如果文件不存在,则跳过等待文件生成。)monitorInfo=$(hadoop fs -cat /user/dx/streaming/monitor/${myAppName})LD_IFS="$IFS"IFS=","array=($monitorInfo)IFS="$OLD_IFS" appId=${array[0]}monitorLastDate=${array[1]}echo "loading mintor information 'appId:$appId,monitorLastUpdateDate:$monitorLastDate'"current_date=$(date "+%Y-%m-%d %H:%M:%S")echo "loading current date '$current_date'"#2)与当前日期对比:# 如果距离当前日期相差小于30min,则不做处理;# 如果大于30min则kill job,根据上边yarn application -list中能获取对应的appId,运行yarn application -kill appIdt1=`date -d "$current_date" +%s`t2=`date -d "$monitorLastDate" +%s`diff_minute=$(($(($t1-$t2))/60))echo "current date($current_date) over than monitorLastDate($monitorLastDate) $diff_minute minutes"if [ $diff_minute -gt 30 ];thenecho 'over then 30 minutes'$(yarn application -kill ${appId})echo "kill application ${appId}"elseecho 'less than 30 minutes'fi elseecho "appName($myAppName) not exists in yarn application list"#./submit_x1_x2.sh abc TestRestartDriver #这里指定需要启动的脚本来启动相关任务$(nohup ./submit_checkpoint2.sh >> ./output.log 2>&1 &) fi
监控脚本业务流程图:
监控文件生成
我这里程序是spark structured streaming,因此可以注册sparkSesssion的streams()的query的监听事件
sparkSession.streams().addListener(new GlobalStreamingQueryListener(sparkSession。。。))
在监听事件中实现如下:
public class GlobalStreamingQueryListener extends StreamingQueryListener {private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(GlobalStreamingQueryListener.class);private static final String monitorBaseDir = "/user/dx/streaming/monitor/";private SparkSession sparkSession = null;private LongAccumulator triggerAccumulator = null;public GlobalStreamingQueryListener(SparkSession sparkSession, LongAccumulator triggerAccumulator) {this.sparkSession = sparkSession;this.triggerAccumulator = triggerAccumulator;}@Overridepublic void onQueryStarted(QueryStartedEvent queryStarted) {System.out.println("Query started: " + queryStarted.id());}@Overridepublic void onQueryTerminated(QueryTerminatedEvent queryTerminated) {System.out.println("Query terminated: " + queryTerminated.id());}@Overridepublic void onQueryProgress(QueryProgressEvent queryProgress) {System.out.println("Query made progress: " + queryProgress.progress());// sparkSession.sql("select * from " +// queryProgress.progress().name()).show(); triggerAccumulator.add(1);System.out.println("Trigger accumulator value: " + triggerAccumulator.value());logger.info("minitor start .... ");try {if (HDFSUtility.createDir(monitorBaseDir)) {logger.info("Create monitor base dir(" + monitorBaseDir + ") success");} else {logger.info("Create monitor base dir(" + monitorBaseDir + ") fail");}} catch (IOException e) {logger.error("An error was thrown while create monitor base dir(" + monitorBaseDir + ")");e.printStackTrace();}// spark.app.id application_1543820999543_0193String appId = this.sparkSession.conf().get("spark.app.id");// spark.app.name myBatchTopicString appName = this.sparkSession.conf().get("spark.app.name");String mintorFilePath = (monitorBaseDir.endsWith(File.separator) ? monitorBaseDir : monitorBaseDir + File.separator) + appName;logger.info("The application's id is " + appId);logger.info("The application's name is " + appName);logger.warn("If the appName is not unique,it will result in a monitor error");try {HDFSUtility.overwriter(mintorFilePath, appId + "," + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));} catch (IOException e) {logger.error("An error was thrown while write info to monitor file(" + mintorFilePath + ")");e.printStackTrace();}logger.info("minitor stop .... ");}}
HDFSUtility.java中方法如下:
public class HDFSUtility {private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HDFSUtility.class);/*** 当目录不存在时,创建目录。* * @param dirPath* 目标目录* @return true-創建成功;false-失敗。* @throws IOException* */public static boolean createDir(String dirPath) throws IOException {FileSystem fs = null;Path dir = new Path(dirPath);boolean success = false;try {fs = FileSystem.get(new Configuration());if (!fs.exists(dir)) {success = fs.mkdirs(dir);} else {success = true;}} catch (IOException e) {logger.error("create dir (" + dirPath + ") fail:", e);throw e;} finally {try {fs.close();} catch (IOException e) {e.printStackTrace();}}return success;}/*** 覆盖文件写入信息* * @param filePath* 目标文件路径* @param content* 被写入内容* @throws IOException* */public static void overwriter(String filePath, String content) throws IOException {FileSystem fs = null;// 在指定路径创建FSDataOutputStream。默认情况下会覆盖文件。FSDataOutputStream outputStream = null;Path file = new Path(filePath);try {fs = FileSystem.get(new Configuration());if (fs.exists(file)) {System.out.println("File exists(" + filePath + ")");}outputStream = fs.create(file);outputStream.write(content.getBytes());} catch (IOException e) {logger.error("write into file(" + filePath + ") fail:", e);throw e;} finally {if (outputStream != null) {try {outputStream.close();} catch (IOException e) {e.printStackTrace();}}try {fs.close();} catch (IOException e) {e.printStackTrace();}}} }
转载于:https://www.cnblogs.com/yy3b2007com/p/10241914.html
Spark2.2(三十九):如何根据appName监控spark任务,当任务不存在则启动(任务存在当超过多久没有活动状态则kill,等待下次启动)...相关推荐
- NeHe OpenGL第三十九课:物理模拟
NeHe OpenGL第三十九课:物理模拟 物理模拟简介: 还记得高中的物理吧,直线运动,自由落体运动,弹簧.在这一课里,我们将创造这一切. 物理模拟介绍 如果你很熟悉物理规律,并且想实现它,这篇 ...
- Python编程基础:第三十九节 面向对象编程Object Oriented Programming
第三十九节 面向对象编程Object Oriented Programming 前言 实践 前言 到目前为止我们都是函数式编程,也即将每一个功能块写为一个函数.其实还有一种更常用的编程方式被称为面向对 ...
- javaweb学习总结(三十九)——数据库连接池
javaweb学习总结(三十九)--数据库连接池 一.应用程序直接获取数据库连接的缺点 用户每次请求都需要向数据库获得链接,而数据库创建连接通常需要消耗相对较大的资源,创建时间也较长.假设网站一天10 ...
- 三十九、Java集合中的HashSet和TreeSet
@Author:Runsen @Date:2020/6/6 作者介绍:Runsen目前大三下学期,专业化学工程与工艺,大学沉迷日语,Python, Java和一系列数据分析软件.导致翘课严重,专业排名 ...
- WPF,Silverlight与XAML读书笔记第三十九 - 可视化效果之3D图形
原文:WPF,Silverlight与XAML读书笔记第三十九 - 可视化效果之3D图形 说明:本系列基本上是<WPF揭秘>的读书笔记.在结构安排与文章内容上参照<WPF揭秘> ...
- 【零基础学Java】—List集合(三十九)
[零基础学Java]-List集合(三十九) java.util.list接口 extends Collection接口 list接口的特点: 1.有序的集合,存储元素和取出元素的顺序是一致的(存储1 ...
- JavaScript学习(三十九)—对象中内容的操作
JavaScript学习(三十九)-对象中内容的操作 一.对象中内容的操作:增.删.改.查 (一).增:给对象添加属性或者方法 1)方式1:对象名称.属性名=属性值: 2)方式2:对象名称['属性名' ...
- 【正点原子FPGA连载】第三十九章OV7725摄像头RGB-LCD显示实验 -摘自【正点原子】新起点之FPGA开发指南_V2.1
1)实验平台:正点原子新起点V2开发板 2)平台购买地址:https://detail.tmall.com/item.htm?id=609758951113 2)全套实验源码+手册+视频下载地址:ht ...
- 陈艾盐:《春燕》百集访谈节目第三十九集
<春燕>访谈节目共120集,每月分10集播出,记录了上百位企业家对"慈善"的各种不同见解,通过讲述社会真善美的故事,让更多的人了解慈善.发扬慈善精神,构建更加美好,和谐 ...
最新文章
- 重构是提高可测试性的主要手段 《设计模式》《代码重构》《从重构到模式》 《反模式》 重构时机 编写测试时候 修改BUG时候
- LeetCode Text Justification(贪心)
- 2014全年目标及执行情况跟踪
- 复习笔记(六)——C++运算符重载(难点)
- Linux CentOS 6.x设置静态IP(亲测有效)
- PyOpenCV 坐标系统
- t-sql查询where in_产品操作MySQL第7篇 – 运算符 - IN
- 什么样的人不适合当程序员呢?
- 腾讯业务架构:六大事业群
- 顺序工作流 状态机工作流 数据岛工作流 选择 .
- unity 2021.3.6f1 报错 dependencies manifest(Microsoft.NetCore.App.deps.json)was not found
- Spring Cloud Alibaba Sentinel(七)受权规则 黑白名单
- 在WINDOWS下的Services.mscl里有好几个ORACLE的SERVICES的一些作用
- 电车难题和他的n个**变种分享
- qq邮件 外发服务器设置,大商创使用教程-大商创邮件服务器设置
- 小程序editor富文本编辑使用及rich-text解析富文本
- 利用python深度学习神经网络预测五年内糖尿病的发生(全代码)
- 考研复试怎么穿搭?看这一篇就够了!
- linux java桌面环境_Linux桌面环境玩转BT(转)
- Seay代码审计系统审计实战
热门文章
- java连接redis不稳定_java相关:jedispool连redis高并发卡死的问题
- python视频压缩算法_Python入门到精通视频,阿里巴巴大力推荐,20行Python代码,无损压缩千百张图片!...
- JAVA获取安卓系统下usb_Android 获取 usb 权限的两种方法
- macbook交叉编译linux,mac交叉编译到Linux报错
- spring boot面试_Spring Boot面试问题
- java 优先队列_优先队列Java
- java8接口写静态方法_Java 8接口更改–静态方法,默认方法
- Java ClassNotFoundException – java.lang.ClassNotFoundException
- struts2 拦截器_Struts 2拦截器示例
- Linux驱动开发经典书籍