在学习Spark过程中,资料中介绍的提交Spark Job的方式主要有两种(我所知道的):

第一种:

通过命令行的方式提交Job,使用spark 自带的spark-submit工具提交,官网和大多数参考资料都是已这种方式提交的,提交命令示例如下:
./spark-submit --class com.learn.spark.SimpleApp --master yarn --deploy-mode client --driver-memory 2g --executor-memory 2g --executor-cores 3 ../spark-demo.jar
参数含义就不解释了,请参考官网资料。
 第二种:

提交方式是已JAVA API编程的方式提交,这种方式不需要使用命令行,直接可以在IDEA中点击Run 运行包含Job的Main类就行,Spark 提供了以SparkLanuncher 作为唯一入口的API来实现。这种方式很方便(试想如果某个任务需要重复执行,但是又不会写linux 脚本怎么搞?我想到的是以JAV API的方式提交Job, 还可以和Spring整合,让应用在tomcat中运行),官网的示例:http://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/launcher/package-summary.html

根据官网的示例,通过JAVA API编程的方式提交有两种方式:

第一种是调用SparkLanuncher实例的startApplication方法,但是这种方式在所有配置都正确的情况下使用运行都会失败的,原因是startApplication方法会调用LauncherServer启动一个进程与集群交互,这个操作貌似是异步的,所以可能结果是main主线程结束了这个进程都没有起起来,导致运行失败。解决办法是调用new SparkLanuncher().startApplication后需要让主线程休眠一定的时间后者是使用下面的例子:

 1 package com.learn.spark;
 2
 3 import org.apache.spark.launcher.SparkAppHandle;
 4 import org.apache.spark.launcher.SparkLauncher;
 5
 6 import java.io.IOException;
 7 import java.util.HashMap;
 8 import java.util.concurrent.CountDownLatch;
 9
10 public class LanuncherAppV {
11     public static void main(String[] args) throws IOException, InterruptedException {
12
13
14         HashMap env = new HashMap();
15         //这两个属性必须设置
16         env.put("HADOOP_CONF_DIR", "/usr/local/hadoop/etc/overriterHaoopConf");
17         env.put("JAVA_HOME", "/usr/local/java/jdk1.8.0_151");
18         //可以不设置
19         //env.put("YARN_CONF_DIR","");
20         CountDownLatch countDownLatch = new CountDownLatch(1);
21         //这里调用setJavaHome()方法后,JAVA_HOME is not set 错误依然存在
22         SparkAppHandle handle = new SparkLauncher(env)
23         .setSparkHome("/usr/local/spark")
24         .setAppResource("/usr/local/spark/spark-demo.jar")
25         .setMainClass("com.learn.spark.SimpleApp")
26         .setMaster("yarn")
27         .setDeployMode("cluster")
28         .setConf("spark.app.id", "11222")
29         .setConf("spark.driver.memory", "2g")
30         .setConf("spark.akka.frameSize", "200")
31         .setConf("spark.executor.memory", "1g")
32         .setConf("spark.executor.instances", "32")
33         .setConf("spark.executor.cores", "3")
34         .setConf("spark.default.parallelism", "10")
35         .setConf("spark.driver.allowMultipleContexts", "true")
36         .setVerbose(true).startApplication(new SparkAppHandle.Listener() {
37         //这里监听任务状态,当任务结束时(不管是什么原因结束),isFinal()方法会返回true,否则返回false
38          @Override
39         public void stateChanged(SparkAppHandle sparkAppHandle) {
40             if (sparkAppHandle.getState().isFinal()) {
41                 countDownLatch.countDown();
42             }
43             System.out.println("state:" + sparkAppHandle.getState().toString());
44         }
45
46
47         @Override
48         public void infoChanged(SparkAppHandle sparkAppHandle) {
49             System.out.println("Info:" + sparkAppHandle.getState().toString());
50         }
51     });
52     System.out.println("The task is executing, please wait ....");
53     //线程等待任务结束
54     countDownLatch.await();
55     System.out.println("The task is finished!");
56
57
58     }
59 } 

注意:如果部署模式是cluster,但是代码中有标准输出的话将看不到,需要把结果写到HDFS中,如果是client模式则可以看到输出。

第二种方式是:通过SparkLanuncher.lanunch()方法获取一个进程,然后调用进程的process.waitFor()方法等待线程返回结果,但是使用这种方式需要自己管理运行过程中的输出信息,比较麻烦,好处是一切都在掌握之中,即获取的输出信息和通过命令提交的方式一样,很详细,实现如下:

 1 package com.learn.spark;
 2
 3 import org.apache.spark.launcher.SparkAppHandle;
 4 import org.apache.spark.launcher.SparkLauncher;
 5
 6 import java.io.IOException;
 7 import java.util.HashMap;
 8
 9 public class LauncherApp {
10
11 public static void main(String[] args) throws IOException, InterruptedException {
12
13     HashMap env = new HashMap();
14     //这两个属性必须设置
15     env.put("HADOOP_CONF_DIR","/usr/local/hadoop/etc/overriterHaoopConf");
16     env.put("JAVA_HOME","/usr/local/java/jdk1.8.0_151");
17     //env.put("YARN_CONF_DIR","");
18
19     SparkLauncher handle = new SparkLauncher(env)
20         .setSparkHome("/usr/local/spark")
21         .setAppResource("/usr/local/spark/spark-demo.jar")
22         .setMainClass("com.learn.spark.SimpleApp")
23         .setMaster("yarn")
24         .setDeployMode("cluster")
25         .setConf("spark.app.id", "11222")
26         .setConf("spark.driver.memory", "2g")
27         .setConf("spark.akka.frameSize", "200")
28         .setConf("spark.executor.memory", "1g")
29         .setConf("spark.executor.instances", "32")
30         .setConf("spark.executor.cores", "3")
31         .setConf("spark.default.parallelism", "10")
32         .setConf("spark.driver.allowMultipleContexts","true")
33         .setVerbose(true);
34
35
36     Process process =handle.launch();
37     InputStreamReaderRunnable inputStreamReaderRunnable = new InputStreamReaderRunnable(process.getInputStream(), "input");
38     Thread inputThread = new Thread(inputStreamReaderRunnable, "LogStreamReader input");
39     inputThread.start();
40
41     InputStreamReaderRunnable errorStreamReaderRunnable = new InputStreamReaderRunnable(process.getErrorStream(), "error");
42     Thread errorThread = new Thread(errorStreamReaderRunnable, "LogStreamReader error");
43     errorThread.start();
44
45     System.out.println("Waiting for finish...");
46     int exitCode = process.waitFor();
47     System.out.println("Finished! Exit code:" + exitCode);
48
49     }
50 }

使用的自定义InputStreamReaderRunnable类实现如下:
 1 package com.learn.spark;
 2
 3 import java.io.BufferedReader;
 4 import java.io.IOException;
 5 import java.io.InputStream;
 6 import java.io.InputStreamReader;
 7
 8 public class InputStreamReaderRunnable implements Runnable {
 9
10   private BufferedReader reader;
11
12   private String name;
13
14   public InputStreamReaderRunnable(InputStream is, String name) {
15     this.reader = new BufferedReader(new InputStreamReader(is));
16     this.name = name;
17   }
18
19   public void run() {
20
21     System.out.println("InputStream " + name + ":");
22     try {
23         String line = reader.readLine();
24         while (line != null) {
25            System.out.println(line);
26            line = reader.readLine();
27         }
28         reader.close();
29       } catch (IOException e) {
30         e.printStackTrace();
31       }
32    }
33 } 

转载于:https://www.cnblogs.com/lyy-blog/p/8522616.html

spark提交任务的两种的方法相关推荐

  1. Spark Standalone -- 独立集群模式、Spark 提交任务的两种模式、spark在yarn上运行的环境搭建、自己写的spark代码如何提交到yarn上并运行...

    目录 Spark Standalone -- 独立集群模式 Standalone 架构图 Standalone 的搭建 1.上传.解压.重命名 2.配置环境变量 3.修改配置文件 conf 4.同步到 ...

  2. Spark提交代码的两种方式

    基于spark1.6测试(虽然很多公司都已经在用2.X了,但是1.6我认为是最经典的版本,CDH最新版本至今默认的spark版本依然是1.6,不过2.X提交方式是基本没有变的) Standalone ...

  3. SPARK官方实例:两种方法实现随机森林模型(ML/MLlib)

    在spark2.0以上版本中,存在两种对机器学习算法的实现库MLlib与ML,比如随机森林: org.apache.spark.mllib.tree.RandomForest 和 org.apache ...

  4. 快速排序的两种实现方法(c语言版本)

    经过调研发现,对任意无序整数数组,快速排序有两种实现方法,这里简单阐述下思路: 思路一:随意选择一个基准元,一般选择数组的起始元或末尾元,Weiss这本书上特意搞了个算法来选择基准元,--,总之就是基 ...

  5. R语言生存分析COX回归分析实战:两种治疗方法发生肾功能损害的情况

    R语言生存分析COX回归分析实战:两种治疗方法发生肾功能损害的情况 目录

  6. mysql workbench kernelbase.dll_电脑出现kernelbase.dll错误的两种解决方法

    KernelBase.dll是Windows操作系统的重要文件,它为各种应用程序提供服务.如果电脑提示kernelbase.dll错误,这该怎么处理?大家可以用电脑自带的防火墙或者是第三方软件来进行故 ...

  7. 使用定制的NSDictionary的方法,对NSArray进行排序(附:数组排序两种常见方法)

    NSArray中存放的是NSDictionary,可以使用策略的方法对NSDictionary进行定制,增加比较的方法.然后调用NSArray的sortUsingSelector方法对数组进行排序,这 ...

  8. java代码二进制转为十六进制_Java 中二进制转换成十六进制的两种实现方法

    Java 中二进制转换成十六进制的两种实现方法 每个字节转成16进制,方法1 /** * 每个字节转成16进制,方法1 * * @param result */ private static Stri ...

  9. python ioc di_Spring介绍,IOC(控制反转),DI(依赖注入)介绍及两种注入方法

    Spring介绍,IOC(控制反转),DI(依赖注入)介绍及两种注入方法 第一中方法:在xml文件中注入: (1)开源的轻量级的应用开发框架 特点:a.简化开发:b.解耦:c.集成: 原理对象与对象之 ...

  10. Json返回时间中出现乱码问题的两种解决方法

    Json返回时间中出现乱码问题的两种解决方法 参考文章: (1)Json返回时间中出现乱码问题的两种解决方法 (2)https://www.cnblogs.com/hanyinglong/archiv ...

最新文章

  1. FPGA之道(24)VHDL数据类型
  2. 并发编程-03线程安全性之原子性(Atomic包)及原理分析
  3. JQUERY获取各种HTML控件的值
  4. 利用solr实现商品的搜索功能
  5. 在Java项目中整合Scala
  6. 《人民日报》专访姚期智院士:AI是历史性的起跑线机遇
  7. 消息队列面试 - 如何保证消息队列的高可用?
  8. 使用pinyin4j将中文转换为拼音
  9. Python自动发送邮件提示:smtplib.SMTPServerDisconnected: please run connect() first
  10. 4g网络什么时候淘汰_4G升级5G,4G网络不会被淘汰,与5G继续共存
  11. 复杂网络社区划分方法综述
  12. c51单片机汇编语言指令,51单片机汇编指令详解
  13. html语言的特殊符号,特殊符号
  14. 财帮子(caibangzi.com)网站架构
  15. 工业级交换机级联介绍
  16. 对计算机应用领域的CAE,我对CAE的了解和想要进行研究的方面
  17. nginx启动失败nginx: [emerg] bind() to 0.0.0.0:7001 failed (98: Address already in use)
  18. 【附下载】手摸手带你搭建广告需求平台DSP
  19. 儿时经典电影回顾,你看过几部?
  20. 红米k40游戏加速开启方法分享

热门文章

  1. opengl 如何加阴影_一步步学OpenGL(23) -《阴影贴图1》
  2. Mysql之各种各样的函数啦
  3. python @符号_注意!大佬提醒你python初学者这几个很难绕过的坑,附教程资料
  4. 【学习笔记】堆的定义及其建立、排序等基本操作的实现
  5. 十大排序算法——插入排序法(C语言)
  6. 算法:Jump Game
  7. 关于Tricomi方程的类型分析与标准型求解
  8. xgboost与LightGBM的区别
  9. Dijkstra最短路径算法
  10. pandas实现分类汇总,查找不重复的一 一对应数据