2019独角兽企业重金招聘Python工程师标准>>>

项目中遇到Spark Yarn方式提交到Hadoop集群,访问集群HDFS时发现使用的当前用户,没有访问权限,经过排查后发现Hadoop集群是带Kerberos认证的集群,需要像hadoop一样使用Kerberos的认证用户登陆,然后查相关资料傻脸了,没有相关的内容,查了半天也只查到了在服务器上使用Spark-Submit命令提交时 加入参数 --keytab  /Kerberos/user.keytab \  --principal user ,但是我使用的java api的 SparkLauncer(),发现API里面没有setKeyTab与setPrincipal方法,然后就卡住了。后面想着去看看SparkLauncer的源码

/** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements.  See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License.  You may obtain a copy of the License at**    http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.spark.launcher;import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;import static org.apache.spark.launcher.CommandBuilderUtils.*;/*** Launcher for Spark applications.* <p>* Use this class to start Spark applications programmatically. The class uses a builder pattern* to allow clients to configure the Spark application and launch it as a child process.* </p>*/
public class SparkLauncher extends AbstractLauncher<SparkLauncher> {/** The Spark master. */public static final String SPARK_MASTER = "spark.master";/** The Spark deploy mode. */public static final String DEPLOY_MODE = "spark.submit.deployMode";/** Configuration key for the driver memory. */public static final String DRIVER_MEMORY = "spark.driver.memory";/** Configuration key for the driver class path. */public static final String DRIVER_EXTRA_CLASSPATH = "spark.driver.extraClassPath";/** Configuration key for the driver VM options. */public static final String DRIVER_EXTRA_JAVA_OPTIONS = "spark.driver.extraJavaOptions";/** Configuration key for the driver native library path. */public static final String DRIVER_EXTRA_LIBRARY_PATH = "spark.driver.extraLibraryPath";/** Configuration key for the executor memory. */public static final String EXECUTOR_MEMORY = "spark.executor.memory";/** Configuration key for the executor class path. */public static final String EXECUTOR_EXTRA_CLASSPATH = "spark.executor.extraClassPath";/** Configuration key for the executor VM options. */public static final String EXECUTOR_EXTRA_JAVA_OPTIONS = "spark.executor.extraJavaOptions";/** Configuration key for the executor native library path. */public static final String EXECUTOR_EXTRA_LIBRARY_PATH = "spark.executor.extraLibraryPath";/** Configuration key for the number of executor CPU cores. */public static final String EXECUTOR_CORES = "spark.executor.cores";static final String PYSPARK_DRIVER_PYTHON = "spark.pyspark.driver.python";static final String PYSPARK_PYTHON = "spark.pyspark.python";static final String SPARKR_R_SHELL = "spark.r.shell.command";/** Logger name to use when launching a child process. */public static final String CHILD_PROCESS_LOGGER_NAME = "spark.launcher.childProcLoggerName";/*** A special value for the resource that tells Spark to not try to process the app resource as a* file. This is useful when the class being executed is added to the application using other* means - for example, by adding jars using the package download feature.*/public static final String NO_RESOURCE = "spark-internal";/*** Maximum time (in ms) to wait for a child process to connect back to the launcher server* when using @link{#start()}.*/public static final String CHILD_CONNECTION_TIMEOUT = "spark.launcher.childConectionTimeout";/** Used internally to create unique logger names. */private static final AtomicInteger COUNTER = new AtomicInteger();/** Factory for creating OutputRedirector threads. **/static final ThreadFactory REDIRECTOR_FACTORY = new NamedThreadFactory("launcher-proc-%d");static final Map<String, String> launcherConfig = new HashMap<>();/*** Set a configuration value for the launcher library. These config values do not affect the* launched application, but rather the behavior of the launcher library itself when managing* applications.** @since 1.6.0* @param name Config name.* @param value Config value.*/public static void setConfig(String name, String value) {launcherConfig.put(name, value);}// Visible for testing.File workingDir;boolean redirectErrorStream;ProcessBuilder.Redirect errorStream;ProcessBuilder.Redirect outputStream;public SparkLauncher() {this(null);}/*** Creates a launcher that will set the given environment variables in the child.** @param env Environment variables to set.*/public SparkLauncher(Map<String, String> env) {if (env != null) {this.builder.childEnv.putAll(env);}}/*** Set a custom JAVA_HOME for launching the Spark application.** @param javaHome Path to the JAVA_HOME to use.* @return This launcher.*/public SparkLauncher setJavaHome(String javaHome) {checkNotNull(javaHome, "javaHome");builder.javaHome = javaHome;return this;}/*** Set a custom Spark installation location for the application.** @param sparkHome Path to the Spark installation to use.* @return This launcher.*/public SparkLauncher setSparkHome(String sparkHome) {checkNotNull(sparkHome, "sparkHome");builder.childEnv.put(ENV_SPARK_HOME, sparkHome);return this;}/*** Sets the working directory of spark-submit.** @param dir The directory to set as spark-submit's working directory.* @return This launcher.*/public SparkLauncher directory(File dir) {workingDir = dir;return this;}/*** Specifies that stderr in spark-submit should be redirected to stdout.** @return This launcher.*/public SparkLauncher redirectError() {redirectErrorStream = true;return this;}/*** Redirects error output to the specified Redirect.** @param to The method of redirection.* @return This launcher.*/public SparkLauncher redirectError(ProcessBuilder.Redirect to) {errorStream = to;return this;}/*** Redirects standard output to the specified Redirect.** @param to The method of redirection.* @return This launcher.*/public SparkLauncher redirectOutput(ProcessBuilder.Redirect to) {outputStream = to;return this;}/*** Redirects error output to the specified File.** @param errFile The file to which stderr is written.* @return This launcher.*/public SparkLauncher redirectError(File errFile) {errorStream = ProcessBuilder.Redirect.to(errFile);return this;}/*** Redirects error output to the specified File.** @param outFile The file to which stdout is written.* @return This launcher.*/public SparkLauncher redirectOutput(File outFile) {outputStream = ProcessBuilder.Redirect.to(outFile);return this;}/*** Sets all output to be logged and redirected to a logger with the specified name.** @param loggerName The name of the logger to log stdout and stderr.* @return This launcher.*/public SparkLauncher redirectToLog(String loggerName) {setConf(CHILD_PROCESS_LOGGER_NAME, loggerName);return this;}// The following methods just delegate to the parent class, but they are needed to keep// binary compatibility with previous versions of this class.@Overridepublic SparkLauncher setPropertiesFile(String path) {return super.setPropertiesFile(path);}@Overridepublic SparkLauncher setConf(String key, String value) {return super.setConf(key, value);}@Overridepublic SparkLauncher setAppName(String appName) {return super.setAppName(appName);}@Overridepublic SparkLauncher setMaster(String master) {return super.setMaster(master);}@Overridepublic SparkLauncher setDeployMode(String mode) {return super.setDeployMode(mode);}@Overridepublic SparkLauncher setAppResource(String resource) {return super.setAppResource(resource);}@Overridepublic SparkLauncher setMainClass(String mainClass) {return super.setMainClass(mainClass);}@Overridepublic SparkLauncher addSparkArg(String arg) {return super.addSparkArg(arg);}@Overridepublic SparkLauncher addSparkArg(String name, String value) {return super.addSparkArg(name, value);}@Overridepublic SparkLauncher addAppArgs(String... args) {return super.addAppArgs(args);}@Overridepublic SparkLauncher addJar(String jar) {return super.addJar(jar);}@Overridepublic SparkLauncher addFile(String file) {return super.addFile(file);}@Overridepublic SparkLauncher addPyFile(String file) {return super.addPyFile(file);}@Overridepublic SparkLauncher setVerbose(boolean verbose) {return super.setVerbose(verbose);}/*** Launches a sub-process that will start the configured Spark application.* <p>* The {@link #startApplication(SparkAppHandle.Listener...)} method is preferred when launching* Spark, since it provides better control of the child application.** @return A process handle for the Spark app.*/public Process launch() throws IOException {ProcessBuilder pb = createBuilder();boolean outputToLog = outputStream == null;boolean errorToLog = !redirectErrorStream && errorStream == null;String loggerName = getLoggerName();if (loggerName != null && outputToLog && errorToLog) {pb.redirectErrorStream(true);}Process childProc = pb.start();if (loggerName != null) {InputStream logStream = outputToLog ? childProc.getInputStream() : childProc.getErrorStream();new OutputRedirector(logStream, loggerName, REDIRECTOR_FACTORY);}return childProc;}/*** Starts a Spark application.** <p>* Applications launched by this launcher run as child processes. The child's stdout and stderr* are merged and written to a logger (see <code>java.util.logging</code>) only if redirection* has not otherwise been configured on this <code>SparkLauncher</code>. The logger's name can be* defined by setting {@link #CHILD_PROCESS_LOGGER_NAME} in the app's configuration. If that* option is not set, the code will try to derive a name from the application's name or main* class / script file. If those cannot be determined, an internal, unique name will be used.* In all cases, the logger name will start with "org.apache.spark.launcher.app", to fit more* easily into the configuration of commonly-used logging systems.** @since 1.6.0* @see AbstractLauncher#startApplication(SparkAppHandle.Listener...)* @param listeners Listeners to add to the handle before the app is launched.* @return A handle for the launched application.*/@Overridepublic SparkAppHandle startApplication(SparkAppHandle.Listener... listeners) throws IOException {LauncherServer server = LauncherServer.getOrCreateServer();ChildProcAppHandle handle = new ChildProcAppHandle(server);for (SparkAppHandle.Listener l : listeners) {handle.addListener(l);}String secret = server.registerHandle(handle);String loggerName = getLoggerName();ProcessBuilder pb = createBuilder();boolean outputToLog = outputStream == null;boolean errorToLog = !redirectErrorStream && errorStream == null;// Only setup stderr + stdout to logger redirection if user has not otherwise configured output// redirection.if (loggerName == null && (outputToLog || errorToLog)) {String appName;if (builder.appName != null) {appName = builder.appName;} else if (builder.mainClass != null) {int dot = builder.mainClass.lastIndexOf(".");if (dot >= 0 && dot < builder.mainClass.length() - 1) {appName = builder.mainClass.substring(dot + 1, builder.mainClass.length());} else {appName = builder.mainClass;}} else if (builder.appResource != null) {appName = new File(builder.appResource).getName();} else {appName = String.valueOf(COUNTER.incrementAndGet());}String loggerPrefix = getClass().getPackage().getName();loggerName = String.format("%s.app.%s", loggerPrefix, appName);}if (outputToLog && errorToLog) {pb.redirectErrorStream(true);}pb.environment().put(LauncherProtocol.ENV_LAUNCHER_PORT, String.valueOf(server.getPort()));pb.environment().put(LauncherProtocol.ENV_LAUNCHER_SECRET, secret);try {Process child = pb.start();InputStream logStream = null;if (loggerName != null) {logStream = outputToLog ? child.getInputStream() : child.getErrorStream();}handle.setChildProc(child, loggerName, logStream);} catch (IOException ioe) {handle.kill();throw ioe;}return handle;}private ProcessBuilder createBuilder() throws IOException {List<String> cmd = new ArrayList<>();cmd.add(findSparkSubmit());cmd.addAll(builder.buildSparkSubmitArgs());// Since the child process is a batch script, let's quote things so that special characters are// preserved, otherwise the batch interpreter will mess up the arguments. Batch scripts are// weird.if (isWindows()) {List<String> winCmd = new ArrayList<>();for (String arg : cmd) {winCmd.add(quoteForBatchScript(arg));}cmd = winCmd;}ProcessBuilder pb = new ProcessBuilder(cmd.toArray(new String[cmd.size()]));for (Map.Entry<String, String> e : builder.childEnv.entrySet()) {pb.environment().put(e.getKey(), e.getValue());}if (workingDir != null) {pb.directory(workingDir);}// Only one of redirectError and redirectError(...) can be specified.// Similarly, if redirectToLog is specified, no other redirections should be specified.checkState(!redirectErrorStream || errorStream == null,"Cannot specify both redirectError() and redirectError(...) ");checkState(getLoggerName() == null ||((!redirectErrorStream && errorStream == null) || outputStream == null),"Cannot used redirectToLog() in conjunction with other redirection methods.");if (redirectErrorStream) {pb.redirectErrorStream(true);}if (errorStream != null) {pb.redirectError(errorStream);}if (outputStream != null) {pb.redirectOutput(outputStream);}return pb;}@OverrideSparkLauncher self() {return this;}// Visible for testing.String findSparkSubmit() {String script = isWindows() ? "spark-submit.cmd" : "spark-submit";return join(File.separator, builder.getSparkHome(), "bin", script);}private String getLoggerName() throws IOException {return builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME);}}

突然发现有个addSparkArg(String name, String value)的方法,眼前一亮感觉有希望,继续跟踪代码发现父类AbstractLauncher实现

/*** Adds an argument with a value to the Spark invocation. If the argument name corresponds to* a known argument, the code validates that the argument actually expects a value, and throws* an exception otherwise.* <p>* It is safe to add arguments modified by other methods in this class (such as* {@link #setMaster(String)} - the last invocation will be the one to take effect.* <p>* Use this method with caution. It is possible to create an invalid Spark command by passing* unknown arguments to this method, since those are allowed for forward compatibility.** @since 1.5.0* @param name Name of argument to add.* @param value Value of the argument.* @return This launcher.*/public T addSparkArg(String name, String value) {SparkSubmitOptionParser validator = new ArgumentValidator(true);if (validator.MASTER.equals(name)) {setMaster(value);} else if (validator.PROPERTIES_FILE.equals(name)) {setPropertiesFile(value);} else if (validator.CONF.equals(name)) {String[] vals = value.split("=", 2);setConf(vals[0], vals[1]);} else if (validator.CLASS.equals(name)) {setMainClass(value);} else if (validator.JARS.equals(name)) {builder.jars.clear();for (String jar : value.split(",")) {addJar(jar);}} else if (validator.FILES.equals(name)) {builder.files.clear();for (String file : value.split(",")) {addFile(file);}} else if (validator.PY_FILES.equals(name)) {builder.pyFiles.clear();for (String file : value.split(",")) {addPyFile(file);}} else {validator.parse(Arrays.asList(name, value));builder.sparkArgs.add(name);builder.sparkArgs.add(value);}return self();}

参数放入了 final SparkSubmitCommandBuilder builder, 到这一步基本可以确定这个方法是可以使用的,

然后代码中加入

SparkAppHandle handle = new SparkLauncher().setSparkHome("/**/spark-2.2.0").setAppResource("/**/spark-2.2.0/lib/spark.jar").setMainClass("***.SimpleApp").setMaster("yarn").setDeployMode("cluster").addSparkArg("keytab", "/**/user.keytab") //此value为kerberos根据用户生成的公钥.addSparkArg("principal ", "user")//此value为生成公钥时 使用的用户名.....................

修改后,发现有问题 而且全乱套了,后面经过一系列跟源码,才发现又是一个比较二的问题,漏了前面的--符号,想当然的认为可以省略,后面发现API完全没给补上,修改后

SparkAppHandle handle = new SparkLauncher().setSparkHome("/**/spark-2.2.0").setAppResource("/**/spark-2.2.0/lib/spark.jar").setMainClass("***.SimpleApp").setMaster("yarn").setDeployMode("cluster").addSparkArg("--keytab", "/**/user.keytab") //此value为kerberos根据用户生成的公钥.addSparkArg("--principal ", "user")//此value为生成公钥时 使用的用户名.....................

完美通过kerberos认证,执行计划的用户名一样变成了使用的kerberos认证的用户名

转载于:https://my.oschina.net/u/3398895/blog/2247092

Java代码使用Spark on Yarn 方式提交任务到带Kerberos认证的Hadoop集群相关推荐

  1. Java的HttpClient类以POST方式提交数据,目标端收到后中文乱码

     h ttpClient HttpMethod NameValuePair setRequestBody 今天开发时,遇到利用Java中HttpClient类以POST方式提交数据,目标收到后中文 ...

  2. Hadoop集群中HDFS的API测试案例以及MapReduce的多种提交Job方式案例

    这两个案例默认是hadoop集群环境已经搭建好以及IDEA环境也已经配置好 1.HDFS客户端测试案例 1.1.pom依赖 <?xml version="1.0" encod ...

  3. windows平台使用Docker搭建分布式Spark 与 hadoop集群

    若不关心具体搭建过程,只想运行分布式集群,请直接前往3.2开始 (本人已上传镜像至服务器) 续前节 windows平台使用Docker搭建分布式hadoop集群 安装分布式Spark篇 1. 运行wi ...

  4. 不看就亏系列!这里有完整的 Hadoop 集群搭建教程,和最易懂的 Hadoop 概念!| 附代码...

    作者 | chen_01_c 责编 | Carol 来源 | CSDN 博客 封图 | CSDN付费下载于视觉中国 hadoop介绍 Hadoop 是 Lucene 创始人 Doug Cutting, ...

  5. Win7下eclipse提交Job到hadoop集群

    Win7下eclipse提交Job到hadoop集群 参考:http://zy19982004.iteye.com/blog/2031172 之前跑通eclipse连接hadoop2.2.0原来是lo ...

  6. Hadoop集群+Spark集群搭建基于VMware虚拟机教程+安装运行Docker

    Hadoop集群+Spark集群搭建+安装运行Docker 目录 一.准备工作 二.在虚拟机上安装CentOS 7 三.hdfs的环境准备 四.hdfs配置文件的修改 五.克隆(复制虚拟机) 六.制作 ...

  7. 美团1万台 Hadoop 集群 YARN 的调优之路

    背景 YARN作为Hadoop的资源管理系统,负责Hadoop集群上计算资源的管理和作业调度. 美团的YARN以社区2.7.1版本为基础构建分支.目前在YARN上支撑离线业务.实时业务以及机器学习业务 ...

  8. 【Python学习系列四】Python程序通过hadoop-streaming提交到Hadoop集群执行MapReduce

    场景:将Python程序通过hadoop-streaming提交到Hadoop集群执行. 参考:http://www.michael-noll.com/tutorials/writing-an-had ...

  9. Hadoop入门(十二)Intellij IDEA远程向hadoop集群提交mapreduce作业

    Intellij IDEA远程向hadoop集群提交mapreduce作业,需要依赖到hadoop的库,hadoop集群的配置信息,还有本地项目的jar包. 一.软件环境 (1)window本地安装h ...

最新文章

  1. linux守护进程的创建
  2. sourcetree下回退
  3. linux系统增加swap分区
  4. 大厂与小厂工作的选择
  5. zend studio配置mysql_Zend studio for eclipse中使php可以调用mysql相关函数的设置方法
  6. python交互界面用图片当背景_wxPython实现窗口用图片做背景
  7. 仿京东账户设置APP模板
  8. java 多线程 选择题_Java多线程之三道多线程练习题
  9. python可以做哪些有趣的事作文_那些有趣的事作文500字
  10. [2018.07.17 T1] 字符串最大值
  11. 苹果AppId登录注册
  12. 微信公众号开发模式没有域名怎么办?申请免费域名
  13. 如何用ChemDraw实现3D建模
  14. 关于Window10系统无法打开Microsoft Store(应用商店)解决方案
  15. 表格查询(去除重复数据)
  16. C++游戏编程--模拟键盘打字程序
  17. CA(电子签名)与HIS等医疗信息化系统的集成简介
  18. Android大小单位转换工具类
  19. 海关数据有没有效果?
  20. 原装应广单片机 MCU芯片PMS152 SOP8封装 单片机开发

热门文章

  1. TensorFlow教程之进阶指南 3.4 TensorBoard: 图表可视化
  2. 做程序猿的老婆应该注意的一些事情
  3. mysql_secure_installation
  4. 全文检索lucene中文分词的一些总结
  5. 初学php时一些术语以及一些基础知识
  6. Mp3写入专辑图片(Kotlin)
  7. Linux初级运维(七)——bash脚本编程(常见测试)
  8. 漫谈php框架之中间件
  9. 启动LINUX系统后,进入图形化界面的命令
  10. 2.Linux磁盘,文件系统管理--创建文件系统