Java Web提交任务到Spark Spark通过Java Web提交任务
http://blog.csdn.net/fansy1990/article/details/48001013
相关软件版本:
Spark1.4.1 ,Hadoop2.6,Scala2.10.5 , MyEclipse2014,intelliJ IDEA14,JDK1.8,Tomcat7
机器:
windows7 (包含JDK1.8,MyEclipse2014,IntelliJ IDEA14,TOmcat7);
centos6.6虚拟机(Hadoop伪分布式集群,Spark standAlone集群,JDK1.8);
centos7虚拟机(Tomcat,JDK1.8);
1. 场景:
1. windows简单Java程序调用Spark,执行Scala开发的Spark程序,这里包含两种模式:
1> 提交任务到Spark集群,使用standAlone模式执行;
2> 提交任务到Yarn集群,使用yarn-client的模式;
2. windows 开发Java web程序调用Spark,执行Scala开发的Spark程序,同样包含两种模式,参考1.
3. Linux运行java web程序调用Spark,执行Scala开发的Spark程序,包含两种模式,参考1.
2. 实现:
1. 简单Scala程序,该程序的功能是读取HDFS中的log日志文件,过滤log文件中的WARN和ERROR的记录,最后把过滤后的记录写入到HDFS中,代码如下:
- import org.apache.spark.{SparkConf, SparkContext}
- /**
- * Created by Administrator on 2015/8/23.
- */
- object Scala_Test {
- def main(args:Array[String]): Unit ={
- if(args.length!=2){
- System.err.println("Usage:Scala_Test <input> <output>")
- }
- // 初始化SparkConf
- val conf = new SparkConf().setAppName("Scala filter")
- val sc = new SparkContext(conf)
- // 读入数据
- val lines = sc.textFile(args(0))
- // 转换
- val errorsRDD = lines.filter(line => line.contains("ERROR"))
- val warningsRDD = lines.filter(line => line.contains("WARN"))
- val badLinesRDD = errorsRDD.union(warningsRDD)
- // 写入数据
- badLinesRDD.saveAsTextFile(args(1))
- // 关闭SparkConf
- sc.stop()
- }
- }
使用IntelliJ IDEA 并打成jar包备用(lz这里命名为spark_filter.jar);
2. java调用spark_filter.jar中的Scala_Test 文件,并采用Spark standAlone模式,java代码如下:
- package test;
- import java.text.SimpleDateFormat;
- import java.util.Date;
- import org.apache.spark.deploy.SparkSubmit;
- /**
- * @author fansy
- *
- */
- public class SubmitScalaJobToSpark {
- public static void main(String[] args) {
- SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss");
- String filename = dateFormat.format(new Date());
- String tmp=Thread.currentThread().getContextClassLoader().getResource("").getPath();
- tmp =tmp.substring(0, tmp.length()-8);
- String[] arg0=new String[]{
- "--master","spark://node101:7077",
- "--deploy-mode","client",
- "--name","test java submit job to spark",
- "--class","Scala_Test",
- "--executor-memory","1G",
- // "spark_filter.jar",
- tmp+"lib/spark_filter.jar",//
- "hdfs://node101:8020/user/root/log.txt",
- "hdfs://node101:8020/user/root/badLines_spark_"+filename
- };
- SparkSubmit.main(arg0);
- }
- }
具体操作,使用MyEclipse新建java web工程,把spark_filter.jar 以及spark-assembly-1.4.1-hadoop2.6.0.jar(该文件在Spark压缩文件的lib目录中,同时该文件较大,拷贝需要一定时间) 拷贝到WebRoot/WEB-INF/lib目录。(注意:这里可以直接建立java web项目,在测试java调用时,直接运行java代码即可,在测试web项目时,开启tomcat即可)
java调用spark_filter.jar中的Scala_Test 文件,并采用Yarn模式。采用Yarn模式,不能使用简单的修改master为“yarn-client”或“yarn-cluster”,在使用Spark-shell或者spark-submit的时候,使用这个,同时配置HADOOP_CONF_DIR路径是可以的,但是在这里,读取不到HADOOP的配置,所以这里采用其他方式,使用yarn.Clent提交的方式,java代码如下:
- package test;
- import java.text.SimpleDateFormat;
- import java.util.Date;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.spark.SparkConf;
- import org.apache.spark.deploy.yarn.Client;
- import org.apache.spark.deploy.yarn.ClientArguments;
- public class SubmitScalaJobToYarn {
- public static void main(String[] args) {
- SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss");
- String filename = dateFormat.format(new Date());
- String tmp=Thread.currentThread().getContextClassLoader().getResource("").getPath();
- tmp =tmp.substring(0, tmp.length()-8);
- String[] arg0=new String[]{
- "--name","test java submit job to yarn",
- "--class","Scala_Test",
- "--executor-memory","1G",
- // "WebRoot/WEB-INF/lib/spark_filter.jar",//
- "--jar",tmp+"lib/spark_filter.jar",//
- "--arg","hdfs://node101:8020/user/root/log.txt",
- "--arg","hdfs://node101:8020/user/root/badLines_yarn_"+filename,
- "--addJars","hdfs://node101:8020/user/root/servlet-api.jar",//
- "--archives","hdfs://node101:8020/user/root/servlet-api.jar"//
- };
- // SparkSubmit.main(arg0);
- Configuration conf = new Configuration();
- String os = System.getProperty("os.name");
- boolean cross_platform =false;
- if(os.contains("Windows")){
- cross_platform = true;
- }
- conf.setBoolean("mapreduce.app-submission.cross-platform", cross_platform);// 配置使用跨平台提交任务
- conf.set("fs.defaultFS", "hdfs://node101:8020");// 指定namenode
- conf.set("mapreduce.framework.name","yarn"); // 指定使用yarn框架
- conf.set("yarn.resourcemanager.address","node101:8032"); // 指定resourcemanager
- conf.set("yarn.resourcemanager.scheduler.address", "node101:8030");// 指定资源分配器
- conf.set("mapreduce.jobhistory.address","node101:10020");
- System.setProperty("SPARK_YARN_MODE", "true");
- SparkConf sparkConf = new SparkConf();
- ClientArguments cArgs = new ClientArguments(arg0, sparkConf);
- new Client(cArgs,conf,sparkConf).run();
- }
- }
3. java web测试 任务提交到Spark的两种模式,这里采用最简单的方式,直接配置servlet,其web.xml文件如下:
- <?xml version="1.0" encoding="UTF-8"?>
- <web-app version="3.0"
- xmlns="http://java.sun.com/xml/ns/javaee"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd">
- <servlet>
- <description>This is the description of my J2EE component</description>
- <display-name>This is the display name of my J2EE component</display-name>
- <servlet-name>SparkServlet</servlet-name>
- <servlet-class>servlet.SparkServlet</servlet-class>
- </servlet>
- <servlet>
- <description>This is the description of my J2EE component</description>
- <display-name>This is the display name of my J2EE component</display-name>
- <servlet-name>YarnServlet</servlet-name>
- <servlet-class>servlet.YarnServlet</servlet-class>
- </servlet>
- <servlet-mapping>
- <servlet-name>SparkServlet</servlet-name>
- <url-pattern>/servlet/SparkServlet</url-pattern>
- </servlet-mapping>
- <servlet-mapping>
- <servlet-name>YarnServlet</servlet-name>
- <url-pattern>/servlet/YarnServlet</url-pattern>
- </servlet-mapping>
- </web-app>
SparkServlet如下:
- package servlet;
- import java.io.IOException;
- import java.io.PrintWriter;
- import javax.servlet.ServletException;
- import javax.servlet.http.HttpServlet;
- import javax.servlet.http.HttpServletRequest;
- import javax.servlet.http.HttpServletResponse;
- import test.SubmitScalaJobToSpark;
- public class SparkServlet extends HttpServlet {
- /**
- * Constructor of the object.
- */
- public SparkServlet() {
- super();
- }
- /**
- * Destruction of the servlet. <br>
- */
- public void destroy() {
- super.destroy(); // Just puts "destroy" string in log
- // Put your code here
- }
- /**
- * The doGet method of the servlet. <br>
- *
- * This method is called when a form has its tag value method equals to get.
- *
- * @param request the request send by the client to the server
- * @param response the response send by the server to the client
- * @throws ServletException if an error occurred
- * @throws IOException if an error occurred
- */
- public void doGet(HttpServletRequest request, HttpServletResponse response)
- throws ServletException, IOException {
- this.doPost(request, response);
- }
- /**
- * The doPost method of the servlet. <br>
- *
- * This method is called when a form has its tag value method equals to post.
- *
- * @param request the request send by the client to the server
- * @param response the response send by the server to the client
- * @throws ServletException if an error occurred
- * @throws IOException if an error occurred
- */
- public void doPost(HttpServletRequest request, HttpServletResponse response)
- throws ServletException, IOException {
- System.out.println("开始SubmitScalaJobToSpark调用......");
- SubmitScalaJobToSpark.main(null);
- //YarnServlet也只是这里不同
- System.out.println("完成SubmitScalaJobToSpark调用!");
- response.setContentType("text/html");
- PrintWriter out = response.getWriter();
- out.println("<!DOCTYPE HTML PUBLIC \"-//W3C//DTD HTML 4.01 Transitional//EN\">");
- out.println("<HTML>");
- out.println(" <HEAD><TITLE>A Servlet</TITLE></HEAD>");
- out.println(" <BODY>");
- out.print(" This is ");
- out.print(this.getClass());
- out.println(", using the POST method");
- out.println(" </BODY>");
- out.println("</HTML>");
- out.flush();
- out.close();
- }
- /**
- * Initialization of the servlet. <br>
- *
- * @throws ServletException if an error occurs
- */
- public void init() throws ServletException {
- // Put your code here
- }
- }
这里只是调用了java编写的任务调用类而已。同时,SparServlet和YarnServlet也只是在调用的地方不同而已。
在web测试时,首先直接在MyEclipse上测试,然后拷贝工程WebRoot到centos7,再次运行tomcat,进行测试。
3. 总结及问题
1. 测试结果:
1> java代码直接提交任务到Spark和Yarn,进行日志文件的过滤,测试是成功运行的。可以在Yarn和Spark的监控中看到相关信息:
同时,在HDFS可以看到输出的文件:
2> java web 提交任务到Spark和Yarn,首先需要把spark-assembly-1.4.1-hadoop2.6.0.jar中的javax.servlet文件夹删掉,因为会和tomcat的servlet-api.jar冲突。
a. 在windows和linux上启动tomcat,提交任务到Spark standAlone,测试成功运行;
b. 在windows和linux上启动tomcat,提交任务到Yarn,测试失败;
2. 遇到的问题:
1> java web 提交任务到Yarn,会失败,失败的主要日志如下:
- 15/08/25 11:35:48 ERROR yarn.ApplicationMaster: User class threw exception: java.lang.NoClassDefFoundError: javax/servlet/http/HttpServletResponse
- java.lang.NoClassDefFoundError: javax/servlet/http/HttpServletResponse
这个是因为javax.servlet的包被删掉了,和tomcat的冲突。
同时,在日志中还可以看到:
- 15/08/26 12:39:27 INFO Client: Setting up container launch context for our AM
- 15/08/26 12:39:27 INFO Client: Preparing resources for our AM container
- 15/08/26 12:39:27 INFO Client: Uploading resource file:/D:/workspase_scala/SparkWebTest/WebRoot/WEB-INF/lib/spark-assembly-1.4.1-hadoop2.6.0.jar -> hdfs://node101:8020/user/Administrator/.sparkStaging/application_1440464833795_0012/spark-assembly-1.4.1-hadoop2.6.0.jar
- 15/08/26 12:39:32 INFO Client: Uploading resource file:/D:/workspase_scala/SparkWebTest/WebRoot/WEB-INF/lib/spark_filter.jar -> hdfs://node101:8020/user/Administrator/.sparkStaging/application_1440464833795_0012/spark_filter.jar
- 15/08/26 12:39:33 INFO Client: Uploading resource file:/C:/Users/Administrator/AppData/Local/Temp/spark-46820caf-06e0-4c51-a479-3bb35666573f/__hadoop_conf__5465819424276830228.zip -> hdfs://node101:8020/user/Administrator/.sparkStaging/application_1440464833795_0012/__hadoop_conf__5465819424276830228.zip
- 15/08/26 12:39:33 INFO Client: Source and destination file systems are the same. Not copying hdfs://node101:8020/user/root/servlet-api.jar
- 15/08/26 12:39:33 WARN Client: Resource hdfs://node101:8020/user/root/servlet-api.jar added multiple times to distributed cache.
这里在环境初始化的时候,上传了两个jar,一个就是spark-assembly-1.4.1-hadoop2.6.0.jar 还有一个就是我们自定义的jar。上传的spark-assembly-1.4.1-hadoop2.6.0.jar 里面没有javax.servlet的文件夹,所以会报错。在java中直接调用(没有删除javax.servlet的时候)同样会看到这样的日志,同样的上传,那时是可以的,也就是说这里确实是删除了包文件夹的关系。那么如何修复呢?
上传servlet-api到hdfs,同时在使用yarn.Client提交任务的时候,添加相关的参数,这里查看参数,发现两个比较相关的参数,--addJars以及--archive 参数,把这两个参数都添加后,看到日志中确实把这个jar包作为了job的共享文件,但是java web提交任务到yarn 还是报这个类找不到的错误。所以这个办法也是行不通!(可以参考http://blog.csdn.NET/fansy1990/article/details/52289826中的部署部分解决这个问题)
使用yarn.Client提交任务到Yarn参考http://blog.sequenceiq.com/blog/2014/08/22/spark-submit-in-java/ 。
http://blog.csdn.net/u010022051/article/details/48240173
相关软件版本:
Spark1.4.1 ,Hadoop2.6,Scala2.10.5 , MyEclipse2014,intelliJ IDEA14,JDK1.8,Tomcat7
机器:
windows7 (包含JDK1.8,MyEclipse2014,IntelliJ IDEA14,TOmcat7);
centos6.6虚拟机(Hadoop伪分布式集群,Spark standAlone集群,JDK1.8);
centos7虚拟机(Tomcat,JDK1.8);
1. 场景:
1. windows简单java程序调用Spark,执行Scala开发的Spark程序,这里包含两种模式:
1> 提交任务到Spark集群,使用standAlone模式执行;
2> 提交任务到Yarn集群,使用yarn-client的模式;
2. windows 开发java web程序调用Spark,执行Scala开发的Spark程序,同样包含两种模式,参考1.
3. linux运行java web程序调用Spark,执行Scala开发的Spark程序,包含两种模式,参考1.
2. 实现:
1. 简单Scala程序,该程序的功能是读取HDFS中的log日志文件,过滤log文件中的WARN和ERROR的记录,最后把过滤后的记录写入到HDFS中,代码如下:
01
|
import org.apache.spark.{SparkConf, SparkContext}
|
02
|
03
|
04
|
/**
|
05
|
* Created by Administrator on 2015/8/23.
|
06
|
*/
|
07
|
object Scala_Test {
|
08
|
def main(args:Array[String]): Unit ={
|
09
|
if (args.length!=2){
|
10
|
System.err.println( "Usage:Scala_Test <input> <output>" )
|
11
|
}
|
12
|
// 初始化SparkConf
|
13
|
val conf = new SparkConf().setAppName( "Scala filter" )
|
14
|
val sc = new SparkContext(conf)
|
15
|
16
|
// 读入数据
|
17
|
val lines = sc.textFile(args(0))
|
18
|
19
|
// 转换
|
20
|
val errorsRDD = lines.filter(line => line.contains( "ERROR" ))
|
21
|
val warningsRDD = lines.filter(line => line.contains( "WARN" ))
|
22
|
val badLinesRDD = errorsRDD.union(warningsRDD)
|
23
|
24
|
// 写入数据
|
25
|
badLinesRDD.saveAsTextFile(args(1))
|
26
|
27
|
// 关闭SparkConf
|
28
|
sc.stop()
|
29
|
}
|
30
|
}
|
使用IntelliJ IDEA 并打成jar包备用(lz这里命名为spark_filter.jar);
2. java调用spark_filter.jar中的Scala_Test 文件,并采用Spark standAlone模式
java代码如下:
01
|
package test;
|
02
|
03
|
import java.text.SimpleDateFormat;
|
04
|
import java.util.Date;
|
05
|
06
|
import org.apache.spark.deploy.SparkSubmit;
|
07
|
/**
|
08
|
* @author fansy
|
09
|
*
|
10
|
*/
|
11
|
public class SubmitScalaJobToSpark {
|
12
|
13
|
public static void main(String[] args) {
|
14
|
SimpleDateFormat dateFormat = new SimpleDateFormat( "yyyy-MM-dd-hh-mm-ss" );
|
15
|
String filename = dateFormat.format( new Date());
|
16
|
String tmp=Thread.currentThread().getContextClassLoader().getResource( "" ).getPath();
|
17
|
tmp =tmp.substring( 0 , tmp.length()- 8 );
|
18
|
String[] arg0= new String[]{
|
19
|
"--master" , "spark://node101:7077" ,
|
20
|
"--deploy-mode" , "client" ,
|
21
|
"--name" , "test java submit job to spark" ,
|
22
|
"--class" , "Scala_Test" ,
|
23
|
"--executor-memory" , "1G" ,
|
24
|
// "spark_filter.jar",
|
25
|
tmp+ "lib/spark_filter.jar" , //
|
26
|
"hdfs://node101:8020/user/root/log.txt" ,
|
27
|
"hdfs://node101:8020/user/root/badLines_spark_" +filename
|
28
|
};
|
29
|
|
30
|
SparkSubmit.main(arg0);
|
31
|
}
|
32
|
}
|
具体操作,使用MyEclipse新建java web工程,把spark_filter.jar 以及spark-assembly-1.4.1-hadoop2.6.0.jar(该文件在Spark压缩文件的lib目录中,同时该文件较大,拷贝需要一定时间) 拷贝到WebRoot/WEB-INF/lib目录。(注意:这里可以直接建立java web项目,在测试java调用时,直接运行java代码即可,在测试web项目时,开启tomcat即可)
java调用spark_filter.jar中的Scala_Test 文件,并采用Yarn模式。采用Yarn模式,不能使用简单的修改master为“yarn-client”或“yarn-cluster”,在使用Spark-shell或者spark-submit的时候,使用这个,同时配置HADOOP_CONF_DIR路径是可以的,但是在这里,读取不到HADOOP的配置,所以这里采用其他方式,使用yarn.Clent提交的方式,java代码如下:
01
|
package test;
|
02
|
03
|
import java.text.SimpleDateFormat;
|
04
|
import java.util.Date;
|
05
|
06
|
import org.apache.hadoop.conf.Configuration;
|
07
|
import org.apache.spark.SparkConf;
|
08
|
import org.apache.spark.deploy.yarn.Client;
|
09
|
import org.apache.spark.deploy.yarn.ClientArguments;
|
10
|
11
|
public class SubmitScalaJobToYarn {
|
12
|
13
|
public static void main(String[] args) {
|
14
|
SimpleDateFormat dateFormat = new SimpleDateFormat( "yyyy-MM-dd-hh-mm-ss" );
|
15
|
String filename = dateFormat.format( new Date());
|
16
|
String tmp=Thread.currentThread().getContextClassLoader().getResource( "" ).getPath();
|
17
|
tmp =tmp.substring( 0 , tmp.length()- 8 );
|
18
|
String[] arg0= new String[]{
|
19
|
"--name" , "test java submit job to yarn" ,
|
20
|
"--class" , "Scala_Test" ,
|
21
|
"--executor-memory" , "1G" ,
|
22
|
// "WebRoot/WEB-INF/lib/spark_filter.jar",//
|
23
|
"--jar" ,tmp+ "lib/spark_filter.jar" , //
|
24
|
|
25
|
"--arg" , "hdfs://node101:8020/user/root/log.txt" ,
|
26
|
"--arg" , "hdfs://node101:8020/user/root/badLines_yarn_" +filename,
|
27
|
"--addJars" , "hdfs://node101:8020/user/root/servlet-api.jar" ,//
|
28
|
"--archives" , "hdfs://node101:8020/user/root/servlet-api.jar" //
|
29
|
};
|
30
|
|
31
|
// SparkSubmit.main(arg0);
|
32
|
Configuration conf = new Configuration();
|
33
|
String os = System.getProperty( "os.name" );
|
34
|
boolean cross_platform = false ;
|
35
|
if (os.contains( "Windows" )){
|
36
|
cross_platform = true ;
|
37
|
}
|
38
|
conf.setBoolean( "mapreduce.app-submission.cross-platform" , cross_platform); // 配置使用跨平台提交任务
|
39
|
conf.set( "fs.defaultFS" , "hdfs://node101:8020" );// 指定namenode
|
40
|
conf.set( "mapreduce.framework.name" , "yarn" ); // 指定使用yarn框架
|
41
|
conf.set( "yarn.resourcemanager.address" , "node101:8032" ); // 指定resourcemanager
|
42
|
conf.set( "yarn.resourcemanager.scheduler.address" , "node101:8030" ); // 指定资源分配器
|
43
|
conf.set( "mapreduce.jobhistory.address" , "node101:10020" );
|
44
|
|
45
|
System.setProperty( "SPARK_YARN_MODE" , "true" );
|
46
|
47
|
SparkConf sparkConf = new SparkConf();
|
48
|
ClientArguments cArgs = new ClientArguments(arg0, sparkConf);
|
49
|
|
50
|
new Client(cArgs,conf,sparkConf).run();
|
51
|
}
|
52
|
}
|
SparkServlet如下:
01
|
package servlet;
|
02
|
03
|
import java.io.IOException;
|
04
|
import java.io.PrintWriter;
|
05
|
06
|
import javax.servlet.ServletException;
|
07
|
import javax.servlet.http.HttpServlet;
|
08
|
import javax.servlet.http.HttpServletRequest;
|
09
|
import javax.servlet.http.HttpServletResponse;
|
10
|
11
|
import test.SubmitScalaJobToSpark;
|
12
|
13
|
public class SparkServlet extends HttpServlet {
|
14
|
15
|
/**
|
16
|
* Constructor of the object.
|
17
|
*/
|
18
|
public SparkServlet() {
|
19
|
super ();
|
20
|
}
|
21
|
22
|
/**
|
23
|
* Destruction of the servlet. <br>
|
24
|
*/
|
25
|
public void destroy() {
|
26
|
super .destroy(); // Just puts "destroy" string in log
|
27
|
// Put your code here
|
28
|
}
|
29
|
30
|
/**
|
31
|
* The doGet method of the servlet. <br>
|
32
|
*
|
33
|
* This method is called when a form has its tag value method equals to get.
|
34
|
*
|
35
|
* @param request the request send by the client to the server
|
36
|
* @param response the response send by the server to the client
|
37
|
* @throws ServletException if an error occurred
|
38
|
* @throws IOException if an error occurred
|
39
|
*/
|
40
|
public void doGet(HttpServletRequest request, HttpServletResponse response)
|
41
|
throws ServletException, IOException {
|
42
|
43
|
this .doPost(request, response);
|
44
|
}
|
45
|
46
|
/**
|
47
|
* The doPost method of the servlet. <br>
|
48
|
*
|
49
|
* This method is called when a form has its tag value method equals to post.
|
50
|
*
|
51
|
* @param request the request send by the client to the server
|
52
|
* @param response the response send by the server to the client
|
53
|
* @throws ServletException if an error occurred
|
54
|
* @throws IOException if an error occurred
|
55
|
*/
|
56
|
public void doPost(HttpServletRequest request, HttpServletResponse response)
|
57
|
throws ServletException, IOException {
|
58
|
System.out.println( "开始SubmitScalaJobToSpark调用......" );
|
59
|
SubmitScalaJobToSpark.main( null );
|
60
|
//YarnServlet也只是这里不同
|
61
|
System.out.println( "完成SubmitScalaJobToSpark调用!" );
|
62
|
response.setContentType( "text/html" );
|
63
|
PrintWriter out = response.getWriter();
|
64
|
out.println( "<!DOCTYPE HTML PUBLIC \"-//W3C//DTD HTML 4.01 Transitional//EN\">" );
|
65
|
out.println( "<HTML>" );
|
66
|
out.println( " <HEAD><TITLE>A Servlet</TITLE></HEAD>" );
|
67
|
out.println( " <BODY>" );
|
68
|
out.print( " This is " );
|
69
|
out.print( this .getClass());
|
70
|
out.println( ", using the POST method" );
|
71
|
out.println( " </BODY>" );
|
72
|
out.println( "</HTML>" );
|
73
|
out.flush();
|
74
|
out.close();
|
75
|
}
|
76
|
77
|
/**
|
78
|
* Initialization of the servlet. <br>
|
79
|
*
|
80
|
* @throws ServletException if an error occurs
|
81
|
*/
|
82
|
public void init() throws ServletException {
|
83
|
// Put your code here
|
84
|
}
|
85
|
86
|
}
|
这里只是调用了java编写的任务调用类而已。同时,SparServlet和YarnServlet也只是在调用的地方不同而已。
在web测试时,首先直接在MyEclipse上测试,然后拷贝工程WebRoot到centos7,再次运行tomcat,进行测试。
3. 总结及问题 1. 测试结果:
1> java代码直接提交任务到Spark和Yarn,进行日志文件的过滤,测试是成功运行的。可以在Yarn和Spark的监控中看到相关信息:
同时,在HDFS可以看到输出的文件:
2> java web 提交任务到Spark和Yarn,首先需要把spark-assembly-1.4.1-hadoop2.6.0.jar中的javax.servlet文件夹删掉,因为会和tomcat的servlet-api.jar冲突。
a. 在windows和linux上启动tomcat,提交任务到Spark standAlone,测试成功运行;
b. 在windows和linux上启动tomcat,提交任务到Yarn,测试失败;
2. 遇到的问题:
1> java web 提交任务到Yarn,会失败,失败的主要日志如下:
01
|
15/08/25 11:35:48 ERROR yarn.ApplicationMaster: User class threw exception: java.lang.NoClassDefFoundError: javax/servlet/http/HttpServletResponse
|
02
|
java.lang.NoClassDefFoundError: javax/servlet/http/HttpServletResponse
|
这个是因为javax.servlet的包被删掉了,和tomcat的冲突。
同时,在日志中还可以看到:
01
|
15/08/26 12:39:27 INFO Client: Setting up container launch context for our AM
|
02
|
15/08/26 12:39:27 INFO Client: Preparing resources for our AM container
|
03
|
15/08/26 12:39:27 INFO Client: Uploading resource file :/D:/workspase_scala/SparkWebTest/WebRoot/WEB-INF/lib/spark-assembly-1.4.1-hadoop2.6.0.jar ->hdfs://node101:8020/user/Administrator/.sparkStaging/application_1440464833795_0012/spark-assembly-1.4.1-hadoop2.6.0.jar
|
04
|
15/08/26 12:39:32 INFO Client: Uploading resource file :/D:/workspase_scala/SparkWebTest/WebRoot/WEB-INF/lib/spark_filter.jar ->hdfs://node101:8020/user/Administrator/.sparkStaging/application_1440464833795_0012/spark_filter.jar
|
05
|
15/08/26 12:39:33 INFO Client: Uploading resource file :/C:/Users/Administrator/AppData/Local/Temp/spark-46820caf-06e0-4c51-a479-3bb35666573f/__hadoop_conf__5465819424276830228.zip ->hdfs://node101:8020/user/Administrator/.sparkStaging/application_1440464833795_0012/__hadoop_conf__5465819424276830228.zip
|
06
|
15/08/26 12:39:33 INFO Client: Source and destination file systems are the same. Not copyinghdfs://node101:8020/user/root/servlet-api.jar
|
07
|
15/08/26 12:39:33 WARN Client: Resource hdfs://node101:8020/user/root/servlet-api.jar added multiple times to distributed cache.
|
这里在环境初始化的时候,上传了两个jar,一个就是spark-assembly-1.4.1-hadoop2.6.0.jar 还有一个就是我们自定义的jar。上传的spark-assembly-1.4.1-hadoop2.6.0.jar 里面没有javax.servlet的文件夹,所以会报错。在java中直接调用(没有删除javax.servlet的时候)同样会看到这样的日志,同样的上传,那时是可以的,也就是说这里确实是删除了包文件夹的关系。那么如何修复呢?
上传servlet-api到hdfs,同时在使用yarn.Client提交任务的时候,添加相关的参数,这里查看参数,发现两个比较相关的参数,--addJars以及--archive 参数,把这两个参数都添加后,看到日志中确实把这个jar包作为了job的共享文件,但是java web提交任务到yarn 还是报这个类找不到的错误。所以这个办法也是行不通!
使用yarn.Client提交任务到Yarn参考 http://blog.sequenceiq.com/blog/2014/08/22/spark-submit-in-java/ 。
Java Web提交任务到Spark Spark通过Java Web提交任务相关推荐
- Spark部署模式、任务提交
Spark部署模式与任务提交 一.作业提交 1.1 spark-submit Spark 所有模式均使用 spark-submit 命令提交作业,其格式如下: ./bin/spark-submit \ ...
- Spark _05Standalone模式两种提交任务方式
Standalone模式两种提交任务方式 Standalone-client提交任务方式 提交命令 ./spark-submit --master spark://node1:7077 --class ...
- spark学习:java版JavaRDD与JavaPairRDD的互相转换
1.引发:做一个java读取hbase的注册成表的程序.但是读出来的是javaPairRDD,而网上都是javaRDD转成dataFrame,我只能自己摸索怎么转成javaRDD 2.方法 Jav ...
- Apache Spark RDD和Java流
几个月前,我很幸运地参加了一些使用Apache Spark的PoC(概念验证). 在这里,我有机会使用弹性分布式数据集(简称RDD ),转换和操作. 几天后,我意识到虽然Apache Spark和JD ...
- java reducebykey_Spark入门(五)--Spark的reduce和reduceByKey
reduce和reduceByKey的区别 reduce和reduceByKey是spark中使用地非常频繁的,在字数统计中,可以看到reduceByKey的经典使用.那么reduce和reduceB ...
- Spark源码阅读——任务提交过程
2019独角兽企业重金招聘Python工程师标准>>> Spark 源码阅读--任务提交过程 当我们在使用spark编写mr作业是,最后都要涉及到调用reduce,foreach或者 ...
- Spark scala和java的api使用
1.利用scala语言开发spark的worcount程序(本地运行) package com.zy.sparkimport org.apache.spark.rdd.RDD import org.a ...
- Spark算子实战Java版,学到了
(一)概述 算子从功能上可以分为Transformations转换算子和Action行动算子.转换算子用来做数据的转换操作,比如map.flatMap.reduceByKey等都是转换算子,这类算子通 ...
- Hive连接Spark报错java.sql.SQLException: null, message from server: Host 'datanode03' is blocked becaus
背景: 线上一些任务大部分使用Spark Sql来处理Hive的数据:今天任务由于数据量增大,任务耗时也增加,因此导致多个任务同时运行.但是后来任务迟迟运行不完,因此去查看任务日志: 16-08-20 ...
- shell调用spark不执行JAVA,当代码在Spark shell中工作时,spark-submit不能引用“--jars”指定的jar?...
我使用intelliJ创建了一个sbt项目 . 我在项目的 lib 文件夹中复制了所需的jdbc jar sqljdbc42.jar . sbt package 圆满结束 . 我在 Windows 的 ...
最新文章
- 冒泡链表排序java_链表排序(冒泡、选择、插入、快排、归并、希尔、堆排序)...
- 数据库中表id自增重置为1
- django模板过滤器
- 设计模式学习之代理模式学习(一)
- java idea 模块_使用IntelliJ IDEA搭建多maven模块JAVA项目
- 常见计算机英语,常见计算机英语词汇
- PHP反序列化—构造POP链
- 有序的Map集合--LinkedHashMap
- 【转载保存】基于Lucene的近实时搜索引擎优化总结
- win7系统怎么打开屏幕键盘
- 转载--ASP解决AJAX带来的码问题
- 开源信息安全管理平台OSSIM入门-李晨光-专题视频课程
- 996下的程序员,该如何保证自己的身体健康?
- java调用高德地图api_JAVA调用高德地图API实践
- 一文带你了解什么是CDN
- 利用EXCEL批量重命名文件
- 【转】加油站压力/真空阀(PV阀)的工作原理及安全注意事项
- 随机邻域嵌入_诠释数据降维算法:一文讲尽t-分布邻域嵌入算法(t-SNE)如何有效利用-阿里云开发者社区...
- TSINGSEE青犀视频监控平台的多种联网方式详解
- 卡塔兰(Catalan)数
热门文章
- Android中Activity出现与退出的自定义动画
- ubuntu 20.04 搭建 rsyslog 服务器
- 【Flutter】微信项目实战【07】 通讯录界面搭建(下)
- 教你正确设置CrossOver的Wine配置(一)
- MIS系统权限控制的一个简便方法
- Linux wpa_cli 调试方法
- 磁盘IO单线程顺序写时最快的,如果多线程写,磁盘的磁头要不断重新寻址,所以写入速度反而会慢...
- box2dweb基础
- c#开发Mongo笔记第九篇
- 用TMG搭建×××服务器(二)---L2TP/IPsec ×××