spark提交任务的两种的方法
在学习Spark过程中,资料中介绍的提交Spark Job的方式主要有两种(我所知道的):
第一种:
通过命令行的方式提交Job,使用spark 自带的spark-submit工具提交,官网和大多数参考资料都是已这种方式提交的,提交命令示例如下:
./spark-submit --class com.learn.spark.SimpleApp --master yarn --deploy-mode client --driver-memory 2g --executor-memory 2g --executor-cores 3 ../spark-demo.jar
参数含义就不解释了,请参考官网资料。
第二种:
提交方式是已JAVA API编程的方式提交,这种方式不需要使用命令行,直接可以在IDEA中点击Run 运行包含Job的Main类就行,Spark 提供了以SparkLanuncher 作为唯一入口的API来实现。这种方式很方便(试想如果某个任务需要重复执行,但是又不会写linux 脚本怎么搞?我想到的是以JAV API的方式提交Job, 还可以和Spring整合,让应用在tomcat中运行),官网的示例:http://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/launcher/package-summary.html
根据官网的示例,通过JAVA API编程的方式提交有两种方式:
第一种是调用SparkLanuncher实例的startApplication方法,但是这种方式在所有配置都正确的情况下使用运行都会失败的,原因是startApplication方法会调用LauncherServer启动一个进程与集群交互,这个操作貌似是异步的,所以可能结果是main主线程结束了这个进程都没有起起来,导致运行失败。解决办法是调用new SparkLanuncher().startApplication后需要让主线程休眠一定的时间后者是使用下面的例子:
1 package com.learn.spark; 2 3 import org.apache.spark.launcher.SparkAppHandle; 4 import org.apache.spark.launcher.SparkLauncher; 5 6 import java.io.IOException; 7 import java.util.HashMap; 8 import java.util.concurrent.CountDownLatch; 9 10 public class LanuncherAppV { 11 public static void main(String[] args) throws IOException, InterruptedException { 12 13 14 HashMap env = new HashMap(); 15 //这两个属性必须设置 16 env.put("HADOOP_CONF_DIR", "/usr/local/hadoop/etc/overriterHaoopConf"); 17 env.put("JAVA_HOME", "/usr/local/java/jdk1.8.0_151"); 18 //可以不设置 19 //env.put("YARN_CONF_DIR",""); 20 CountDownLatch countDownLatch = new CountDownLatch(1); 21 //这里调用setJavaHome()方法后,JAVA_HOME is not set 错误依然存在 22 SparkAppHandle handle = new SparkLauncher(env) 23 .setSparkHome("/usr/local/spark") 24 .setAppResource("/usr/local/spark/spark-demo.jar") 25 .setMainClass("com.learn.spark.SimpleApp") 26 .setMaster("yarn") 27 .setDeployMode("cluster") 28 .setConf("spark.app.id", "11222") 29 .setConf("spark.driver.memory", "2g") 30 .setConf("spark.akka.frameSize", "200") 31 .setConf("spark.executor.memory", "1g") 32 .setConf("spark.executor.instances", "32") 33 .setConf("spark.executor.cores", "3") 34 .setConf("spark.default.parallelism", "10") 35 .setConf("spark.driver.allowMultipleContexts", "true") 36 .setVerbose(true).startApplication(new SparkAppHandle.Listener() { 37 //这里监听任务状态,当任务结束时(不管是什么原因结束),isFinal()方法会返回true,否则返回false 38 @Override 39 public void stateChanged(SparkAppHandle sparkAppHandle) { 40 if (sparkAppHandle.getState().isFinal()) { 41 countDownLatch.countDown(); 42 } 43 System.out.println("state:" + sparkAppHandle.getState().toString()); 44 } 45 46 47 @Override 48 public void infoChanged(SparkAppHandle sparkAppHandle) { 49 System.out.println("Info:" + sparkAppHandle.getState().toString()); 50 } 51 }); 52 System.out.println("The task is executing, please wait ...."); 53 //线程等待任务结束 54 countDownLatch.await(); 55 System.out.println("The task is finished!"); 56 57 58 } 59 }
注意:如果部署模式是cluster,但是代码中有标准输出的话将看不到,需要把结果写到HDFS中,如果是client模式则可以看到输出。
第二种方式是:通过SparkLanuncher.lanunch()方法获取一个进程,然后调用进程的process.waitFor()方法等待线程返回结果,但是使用这种方式需要自己管理运行过程中的输出信息,比较麻烦,好处是一切都在掌握之中,即获取的输出信息和通过命令提交的方式一样,很详细,实现如下:
1 package com.learn.spark; 2 3 import org.apache.spark.launcher.SparkAppHandle; 4 import org.apache.spark.launcher.SparkLauncher; 5 6 import java.io.IOException; 7 import java.util.HashMap; 8 9 public class LauncherApp { 10 11 public static void main(String[] args) throws IOException, InterruptedException { 12 13 HashMap env = new HashMap(); 14 //这两个属性必须设置 15 env.put("HADOOP_CONF_DIR","/usr/local/hadoop/etc/overriterHaoopConf"); 16 env.put("JAVA_HOME","/usr/local/java/jdk1.8.0_151"); 17 //env.put("YARN_CONF_DIR",""); 18 19 SparkLauncher handle = new SparkLauncher(env) 20 .setSparkHome("/usr/local/spark") 21 .setAppResource("/usr/local/spark/spark-demo.jar") 22 .setMainClass("com.learn.spark.SimpleApp") 23 .setMaster("yarn") 24 .setDeployMode("cluster") 25 .setConf("spark.app.id", "11222") 26 .setConf("spark.driver.memory", "2g") 27 .setConf("spark.akka.frameSize", "200") 28 .setConf("spark.executor.memory", "1g") 29 .setConf("spark.executor.instances", "32") 30 .setConf("spark.executor.cores", "3") 31 .setConf("spark.default.parallelism", "10") 32 .setConf("spark.driver.allowMultipleContexts","true") 33 .setVerbose(true); 34 35 36 Process process =handle.launch(); 37 InputStreamReaderRunnable inputStreamReaderRunnable = new InputStreamReaderRunnable(process.getInputStream(), "input"); 38 Thread inputThread = new Thread(inputStreamReaderRunnable, "LogStreamReader input"); 39 inputThread.start(); 40 41 InputStreamReaderRunnable errorStreamReaderRunnable = new InputStreamReaderRunnable(process.getErrorStream(), "error"); 42 Thread errorThread = new Thread(errorStreamReaderRunnable, "LogStreamReader error"); 43 errorThread.start(); 44 45 System.out.println("Waiting for finish..."); 46 int exitCode = process.waitFor(); 47 System.out.println("Finished! Exit code:" + exitCode); 48 49 } 50 }
使用的自定义InputStreamReaderRunnable类实现如下:
1 package com.learn.spark; 2 3 import java.io.BufferedReader; 4 import java.io.IOException; 5 import java.io.InputStream; 6 import java.io.InputStreamReader; 7 8 public class InputStreamReaderRunnable implements Runnable { 9 10 private BufferedReader reader; 11 12 private String name; 13 14 public InputStreamReaderRunnable(InputStream is, String name) { 15 this.reader = new BufferedReader(new InputStreamReader(is)); 16 this.name = name; 17 } 18 19 public void run() { 20 21 System.out.println("InputStream " + name + ":"); 22 try { 23 String line = reader.readLine(); 24 while (line != null) { 25 System.out.println(line); 26 line = reader.readLine(); 27 } 28 reader.close(); 29 } catch (IOException e) { 30 e.printStackTrace(); 31 } 32 } 33 }
转载于:https://www.cnblogs.com/lyy-blog/p/8522616.html
spark提交任务的两种的方法相关推荐
- Spark Standalone -- 独立集群模式、Spark 提交任务的两种模式、spark在yarn上运行的环境搭建、自己写的spark代码如何提交到yarn上并运行...
目录 Spark Standalone -- 独立集群模式 Standalone 架构图 Standalone 的搭建 1.上传.解压.重命名 2.配置环境变量 3.修改配置文件 conf 4.同步到 ...
- Spark提交代码的两种方式
基于spark1.6测试(虽然很多公司都已经在用2.X了,但是1.6我认为是最经典的版本,CDH最新版本至今默认的spark版本依然是1.6,不过2.X提交方式是基本没有变的) Standalone ...
- SPARK官方实例:两种方法实现随机森林模型(ML/MLlib)
在spark2.0以上版本中,存在两种对机器学习算法的实现库MLlib与ML,比如随机森林: org.apache.spark.mllib.tree.RandomForest 和 org.apache ...
- 快速排序的两种实现方法(c语言版本)
经过调研发现,对任意无序整数数组,快速排序有两种实现方法,这里简单阐述下思路: 思路一:随意选择一个基准元,一般选择数组的起始元或末尾元,Weiss这本书上特意搞了个算法来选择基准元,--,总之就是基 ...
- R语言生存分析COX回归分析实战:两种治疗方法发生肾功能损害的情况
R语言生存分析COX回归分析实战:两种治疗方法发生肾功能损害的情况 目录
- mysql workbench kernelbase.dll_电脑出现kernelbase.dll错误的两种解决方法
KernelBase.dll是Windows操作系统的重要文件,它为各种应用程序提供服务.如果电脑提示kernelbase.dll错误,这该怎么处理?大家可以用电脑自带的防火墙或者是第三方软件来进行故 ...
- 使用定制的NSDictionary的方法,对NSArray进行排序(附:数组排序两种常见方法)
NSArray中存放的是NSDictionary,可以使用策略的方法对NSDictionary进行定制,增加比较的方法.然后调用NSArray的sortUsingSelector方法对数组进行排序,这 ...
- java代码二进制转为十六进制_Java 中二进制转换成十六进制的两种实现方法
Java 中二进制转换成十六进制的两种实现方法 每个字节转成16进制,方法1 /** * 每个字节转成16进制,方法1 * * @param result */ private static Stri ...
- python ioc di_Spring介绍,IOC(控制反转),DI(依赖注入)介绍及两种注入方法
Spring介绍,IOC(控制反转),DI(依赖注入)介绍及两种注入方法 第一中方法:在xml文件中注入: (1)开源的轻量级的应用开发框架 特点:a.简化开发:b.解耦:c.集成: 原理对象与对象之 ...
- Json返回时间中出现乱码问题的两种解决方法
Json返回时间中出现乱码问题的两种解决方法 参考文章: (1)Json返回时间中出现乱码问题的两种解决方法 (2)https://www.cnblogs.com/hanyinglong/archiv ...
最新文章
- FPGA之道(24)VHDL数据类型
- 并发编程-03线程安全性之原子性(Atomic包)及原理分析
- JQUERY获取各种HTML控件的值
- 利用solr实现商品的搜索功能
- 在Java项目中整合Scala
- 《人民日报》专访姚期智院士:AI是历史性的起跑线机遇
- 消息队列面试 - 如何保证消息队列的高可用?
- 使用pinyin4j将中文转换为拼音
- Python自动发送邮件提示:smtplib.SMTPServerDisconnected: please run connect() first
- 4g网络什么时候淘汰_4G升级5G,4G网络不会被淘汰,与5G继续共存
- 复杂网络社区划分方法综述
- c51单片机汇编语言指令,51单片机汇编指令详解
- html语言的特殊符号,特殊符号
- 财帮子(caibangzi.com)网站架构
- 工业级交换机级联介绍
- 对计算机应用领域的CAE,我对CAE的了解和想要进行研究的方面
- nginx启动失败nginx: [emerg] bind() to 0.0.0.0:7001 failed (98: Address already in use)
- 【附下载】手摸手带你搭建广告需求平台DSP
- 儿时经典电影回顾,你看过几部?
- 红米k40游戏加速开启方法分享