业务需求

实现一个根据spark任务的appName来监控任务是否存在,及任务是否卡死的监控。

1)给定一个appName,根据appName从yarn application -list中验证任务是否存在,不存在则调用spark-submit.sh脚本来启动任务;

2)如果任务存在yarn application -list中,则读取‘监控文件(监控文件内容包括:appId,最新活动时间)’,从监控文件中读取出最后活动的日期,计算当前日期与app的最后活动日期相差时间为X,如果X大于30minutes(认为任务处于假死状态[再发布环境发现有的任务DAG抛出OOM,导致app的executor和driver依然存在,当时不执行任务调度,程序卡死。具体错误详情请参考《https://issues.apache.org/jira/browse/SPARK-26452》]),则执行yarn application -kill appId(杀掉任务),等待下次监控脚本执行时重启任务。

监控实现

脚本

#/bin/sh
#LANG=zh_CN.utf8
#export LANG
export SPARK_KAFKA_VERSION=0.10
export LANG=zh_CN.UTF-8
# export env variable
if [ -f ~/.bash_profile ];
thensource ~/.bash_profile
fi
source /etc/profilemyAppName='myBatchTopic'               #这里指定需要监控的spark任务的appName,注意:这名字重复了会导致监控失败。
apps=''for app in `yarn application -list`
doapps=${app},$apps
done
apps=${apps%?}if [[ $apps =~ $myAppName ]];
thenecho "appName($myAppName) exists in yarn application list"#1)运行 hadop fs -cat /目录/appName,读取其中最后更新日期;(如果文件不存在,则跳过等待文件生成。)monitorInfo=$(hadoop fs -cat /user/dx/streaming/monitor/${myAppName})LD_IFS="$IFS"IFS=","array=($monitorInfo)IFS="$OLD_IFS"  appId=${array[0]}monitorLastDate=${array[1]}echo "loading mintor information 'appId:$appId,monitorLastUpdateDate:$monitorLastDate'"current_date=$(date "+%Y-%m-%d %H:%M:%S")echo "loading current date '$current_date'"#2)与当前日期对比:# 如果距离当前日期相差小于30min,则不做处理;# 如果大于30min则kill job,根据上边yarn application -list中能获取对应的appId,运行yarn application -kill appIdt1=`date -d "$current_date" +%s`t2=`date -d "$monitorLastDate" +%s`diff_minute=$(($(($t1-$t2))/60))echo "current date($current_date) over than monitorLastDate($monitorLastDate) $diff_minute minutes"if [ $diff_minute -gt 30 ];thenecho 'over then 30 minutes'$(yarn application -kill ${appId})echo "kill application ${appId}"elseecho 'less than 30 minutes'fi
elseecho "appName($myAppName) not exists in yarn application list"#./submit_x1_x2.sh abc TestRestartDriver #这里指定需要启动的脚本来启动相关任务$(nohup ./submit_checkpoint2.sh >> ./output.log 2>&1 &)
fi

监控脚本业务流程图:

监控文件生成

我这里程序是spark structured streaming,因此可以注册sparkSesssion的streams()的query的监听事件

sparkSession.streams().addListener(new GlobalStreamingQueryListener(sparkSession。。。))

在监听事件中实现如下:

public class GlobalStreamingQueryListener extends StreamingQueryListener {private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(GlobalStreamingQueryListener.class);private static final String monitorBaseDir = "/user/dx/streaming/monitor/";private SparkSession sparkSession = null;private LongAccumulator triggerAccumulator = null;public GlobalStreamingQueryListener(SparkSession sparkSession, LongAccumulator triggerAccumulator) {this.sparkSession = sparkSession;this.triggerAccumulator = triggerAccumulator;}@Overridepublic void onQueryStarted(QueryStartedEvent queryStarted) {System.out.println("Query started: " + queryStarted.id());}@Overridepublic void onQueryTerminated(QueryTerminatedEvent queryTerminated) {System.out.println("Query terminated: " + queryTerminated.id());}@Overridepublic void onQueryProgress(QueryProgressEvent queryProgress) {System.out.println("Query made progress: " + queryProgress.progress());// sparkSession.sql("select * from " +// queryProgress.progress().name()).show();
triggerAccumulator.add(1);System.out.println("Trigger accumulator value: " + triggerAccumulator.value());logger.info("minitor start .... ");try {if (HDFSUtility.createDir(monitorBaseDir)) {logger.info("Create monitor base dir(" + monitorBaseDir + ") success");} else {logger.info("Create monitor base dir(" + monitorBaseDir + ") fail");}} catch (IOException e) {logger.error("An error was thrown while create monitor base dir(" + monitorBaseDir + ")");e.printStackTrace();}// spark.app.id application_1543820999543_0193String appId = this.sparkSession.conf().get("spark.app.id");// spark.app.name myBatchTopicString appName = this.sparkSession.conf().get("spark.app.name");String mintorFilePath = (monitorBaseDir.endsWith(File.separator) ? monitorBaseDir : monitorBaseDir + File.separator) + appName;logger.info("The application's id is " + appId);logger.info("The application's name is " + appName);logger.warn("If the appName is not unique,it will result in a monitor error");try {HDFSUtility.overwriter(mintorFilePath, appId + "," + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));} catch (IOException e) {logger.error("An error was thrown while write info to monitor file(" + mintorFilePath + ")");e.printStackTrace();}logger.info("minitor stop .... ");}}

HDFSUtility.java中方法如下:
public class HDFSUtility {private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HDFSUtility.class);/*** 当目录不存在时,创建目录。* * @param dirPath*            目标目录* @return true-創建成功;false-失敗。* @throws IOException* */public static boolean createDir(String dirPath) throws IOException {FileSystem fs = null;Path dir = new Path(dirPath);boolean success = false;try {fs = FileSystem.get(new Configuration());if (!fs.exists(dir)) {success = fs.mkdirs(dir);} else {success = true;}} catch (IOException e) {logger.error("create dir (" + dirPath + ") fail:", e);throw e;} finally {try {fs.close();} catch (IOException e) {e.printStackTrace();}}return success;}/*** 覆盖文件写入信息* * @param filePath*            目标文件路径* @param content*            被写入内容* @throws IOException* */public static void overwriter(String filePath, String content) throws IOException {FileSystem fs = null;// 在指定路径创建FSDataOutputStream。默认情况下会覆盖文件。FSDataOutputStream outputStream = null;Path file = new Path(filePath);try {fs = FileSystem.get(new Configuration());if (fs.exists(file)) {System.out.println("File exists(" + filePath + ")");}outputStream = fs.create(file);outputStream.write(content.getBytes());} catch (IOException e) {logger.error("write into file(" + filePath + ") fail:", e);throw e;} finally {if (outputStream != null) {try {outputStream.close();} catch (IOException e) {e.printStackTrace();}}try {fs.close();} catch (IOException e) {e.printStackTrace();}}}
}

 

转载于:https://www.cnblogs.com/yy3b2007com/p/10241914.html

Spark2.2(三十九):如何根据appName监控spark任务,当任务不存在则启动(任务存在当超过多久没有活动状态则kill,等待下次启动)...相关推荐

  1. NeHe OpenGL第三十九课:物理模拟

    NeHe OpenGL第三十九课:物理模拟 物理模拟简介: 还记得高中的物理吧,直线运动,自由落体运动,弹簧.在这一课里,我们将创造这一切.   物理模拟介绍 如果你很熟悉物理规律,并且想实现它,这篇 ...

  2. Python编程基础:第三十九节 面向对象编程Object Oriented Programming

    第三十九节 面向对象编程Object Oriented Programming 前言 实践 前言 到目前为止我们都是函数式编程,也即将每一个功能块写为一个函数.其实还有一种更常用的编程方式被称为面向对 ...

  3. javaweb学习总结(三十九)——数据库连接池

    javaweb学习总结(三十九)--数据库连接池 一.应用程序直接获取数据库连接的缺点 用户每次请求都需要向数据库获得链接,而数据库创建连接通常需要消耗相对较大的资源,创建时间也较长.假设网站一天10 ...

  4. 三十九、Java集合中的HashSet和TreeSet

    @Author:Runsen @Date:2020/6/6 作者介绍:Runsen目前大三下学期,专业化学工程与工艺,大学沉迷日语,Python, Java和一系列数据分析软件.导致翘课严重,专业排名 ...

  5. WPF,Silverlight与XAML读书笔记第三十九 - 可视化效果之3D图形

    原文:WPF,Silverlight与XAML读书笔记第三十九 - 可视化效果之3D图形 说明:本系列基本上是<WPF揭秘>的读书笔记.在结构安排与文章内容上参照<WPF揭秘> ...

  6. 【零基础学Java】—List集合(三十九)

    [零基础学Java]-List集合(三十九) java.util.list接口 extends Collection接口 list接口的特点: 1.有序的集合,存储元素和取出元素的顺序是一致的(存储1 ...

  7. JavaScript学习(三十九)—对象中内容的操作

    JavaScript学习(三十九)-对象中内容的操作 一.对象中内容的操作:增.删.改.查 (一).增:给对象添加属性或者方法 1)方式1:对象名称.属性名=属性值: 2)方式2:对象名称['属性名' ...

  8. 【正点原子FPGA连载】第三十九章OV7725摄像头RGB-LCD显示实验 -摘自【正点原子】新起点之FPGA开发指南_V2.1

    1)实验平台:正点原子新起点V2开发板 2)平台购买地址:https://detail.tmall.com/item.htm?id=609758951113 2)全套实验源码+手册+视频下载地址:ht ...

  9. 陈艾盐:《春燕》百集访谈节目第三十九集

    <春燕>访谈节目共120集,每月分10集播出,记录了上百位企业家对"慈善"的各种不同见解,通过讲述社会真善美的故事,让更多的人了解慈善.发扬慈善精神,构建更加美好,和谐 ...

最新文章

  1. 重构是提高可测试性的主要手段 《设计模式》《代码重构》《从重构到模式》 《反模式》 重构时机 编写测试时候 修改BUG时候
  2. LeetCode Text Justification(贪心)
  3. 2014全年目标及执行情况跟踪
  4. 复习笔记(六)——C++运算符重载(难点)
  5. Linux CentOS 6.x设置静态IP(亲测有效)
  6. PyOpenCV 坐标系统
  7. t-sql查询where in_产品操作MySQL第7篇 – 运算符 - IN
  8. 什么样的人不适合当程序员呢?
  9. 腾讯业务架构:六大事业群
  10. 顺序工作流 状态机工作流 数据岛工作流 选择 .
  11. unity 2021.3.6f1 报错 dependencies manifest(Microsoft.NetCore.App.deps.json)was not found
  12. Spring Cloud Alibaba Sentinel(七)受权规则 黑白名单
  13. 在WINDOWS下的Services.mscl里有好几个ORACLE的SERVICES的一些作用
  14. 电车难题和他的n个**变种分享
  15. qq邮件 外发服务器设置,大商创使用教程-大商创邮件服务器设置
  16. 小程序editor富文本编辑使用及rich-text解析富文本
  17. 利用python深度学习神经网络预测五年内糖尿病的发生(全代码)
  18. 考研复试怎么穿搭?看这一篇就够了!
  19. linux java桌面环境_Linux桌面环境玩转BT(转)
  20. Seay代码审计系统审计实战

热门文章

  1. java连接redis不稳定_java相关:jedispool连redis高并发卡死的问题
  2. python视频压缩算法_Python入门到精通视频,阿里巴巴大力推荐,20行Python代码,无损压缩千百张图片!...
  3. JAVA获取安卓系统下usb_Android 获取 usb 权限的两种方法
  4. macbook交叉编译linux,mac交叉编译到Linux报错
  5. spring boot面试_Spring Boot面试问题
  6. java 优先队列_优先队列Java
  7. java8接口写静态方法_Java 8接口更改–静态方法,默认方法
  8. Java ClassNotFoundException – java.lang.ClassNotFoundException
  9. struts2 拦截器_Struts 2拦截器示例
  10. Linux驱动开发经典书籍