spark java api通过run as java application运行的方法
先上代码:
![](https://code.csdn.net/assets/CODE_ico.png)
- /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- import java.util.Arrays;
- import java.util.regex.Pattern;
- import org.apache.spark.api.java.JavaPairRDD;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.api.java.function.FlatMapFunction;
- import org.apache.spark.api.java.function.Function2;
- import org.apache.spark.api.java.function.PairFunction;
- import scala.Tuple2;
- public final class JavaWordCount {
- private static final Pattern SPACE = Pattern.compile(" ");
- public static void main(String[] args) throws Exception {
- if (args.length < 2) {
- System.err.println("Usage: JavaWordCount <master> <file>");
- System.exit(1);
- }
- JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaWordCount",
- System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaWordCount.class));
- ctx.addJar("/home/hadoop/Desktop/JavaSparkT.jar");
- JavaRDD<String> lines = ctx.textFile(args[1], 1);
- JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
- @Override
- public Iterable<String> call(String s) {
- return Arrays.asList(SPACE.split(s));
- }
- });
- JavaPairRDD<String, Integer> ones = words.map(new PairFunction<String, String, Integer>() {
- @Override
- public Tuple2<String, Integer> call(String s) {
- return new Tuple2<String, Integer>(s, 1);
- }
- });
- JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer i1, Integer i2) {
- return i1 + i2;
- }
- });
- counts.saveAsTextFile(args[2]);
- // counts.s
- /*List<Tuple2<String, Integer>> output = counts.collect();
- for (Tuple2<?,?> tuple : output) {
- System.out.println(tuple._1() + ": " + tuple._2());
- }*/
- System.exit(0);
- }
- }
这是spark 自带的一个example 之前只能将代码达成jar包然后在spark的bin目录下面通过spark-class来运行,这样我们就没办法将spark的程序你很好的融合到现有的系统中,所以我希望通过java函数调用的方式运行这段程序,在一段时间的摸索和老师的指导下发现根据报错的意思应该是没有将jar包提交到spark的worker上面 导致运行的worker找不到被调用的类,会报如下错误:
![](https://code.csdn.net/assets/CODE_ico.png)
- 4/07/07 10:26:10 INFO TaskSetManager: Serialized task 1.0:0 as 2194 bytes in 104 ms
- 14/07/07 10:26:11 WARN TaskSetManager: Lost TID 0 (task 1.0:0)
- 14/07/07 10:26:11 WARN TaskSetManager: Loss was due to java.lang.ClassNotFoundException
- java.lang.ClassNotFoundException: JavaWordCount$1
- at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
- at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
- at java.security.AccessController.doPrivileged(Native Method)
- at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
- at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
- at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
- at java.lang.Class.forName0(Native Method)
- at java.lang.Class.forName(Class.java:270)
- at org.apache.spark.serializer.JavaDeserializationStream$anon$1.resolveClass(JavaSerializer.scala:37)
- at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
- at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
- at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
- at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
- at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
- at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
- at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
- at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
- at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
- at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
- at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
- at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
- at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
- at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
- at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
- at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
- at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
- at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
- at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
- at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
解决方案:将要运行的程序达成jar包,然后调用JavaSparkContext的addJar方法将该jar包提交到spark集群中,然后spark的master会将该jar包分发到各个worker上面,
代码如下:
原文中没有给出java的代码,但是在这里根据原文的描述给出scala的代码,亲测可用
比如,我们的IDEA的目录”/root/IdeaProjects/HelloScala“,则其打成jar包后的地址为"HelloScala/out/artifacts/SparkTest_jar/SparkTest.jar",则添加jar包的地址为如下所示:
sc.addJar("/root/IdeaProjects/HelloScala/out/artifacts/SparkTest_jar/SparkTest.jar")
这样再运行程序即可。(程序修改过后要重新打jar包)
这样运行时就不会出现 java.lang.ClassNotFoundException: JavaWordCount$1这样的错误了
运行如下:
spark://localhost:7077 hdfs://localhost:9000/input/test.txt hdfs://localhost:9000/input/result.txt
然后会eclipse控制台中会有如下log
![](https://code.csdn.net/assets/CODE_ico.png)
- 14/07/08 16:03:06 INFO Utils: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
- 14/07/08 16:03:06 WARN Utils: Your hostname, localhost resolves to a loopback address: 127.0.0.1; using 192.168.200.233 instead (on interface eth0)
- 14/07/08 16:03:06 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
- 14/07/08 16:03:07 INFO Slf4jLogger: Slf4jLogger started
- 14/07/08 16:03:07 INFO Remoting: Starting remoting
- 14/07/08 16:03:07 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@192.168.200.233:52469]
- 14/07/08 16:03:07 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@192.168.200.233:52469]
- 14/07/08 16:03:07 INFO SparkEnv: Registering BlockManagerMaster
- 14/07/08 16:03:07 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20140708160307-0a89
- 14/07/08 16:03:07 INFO MemoryStore: MemoryStore started with capacity 484.2 MB.
- 14/07/08 16:03:08 INFO ConnectionManager: Bound socket to port 47731 with id = ConnectionManagerId(192.168.200.233,47731)
- 14/07/08 16:03:08 INFO BlockManagerMaster: Trying to register BlockManager
- 14/07/08 16:03:08 INFO BlockManagerMasterActor$BlockManagerInfo: Registering block manager 192.168.200.233:47731 with 484.2 MB RAM
- 14/07/08 16:03:08 INFO BlockManagerMaster: Registered BlockManager
- 14/07/08 16:03:08 INFO HttpServer: Starting HTTP Server
- 14/07/08 16:03:08 INFO HttpBroadcast: Broadcast server started at http://192.168.200.233:58077
- 14/07/08 16:03:08 INFO SparkEnv: Registering MapOutputTracker
- 14/07/08 16:03:08 INFO HttpFileServer: HTTP File server directory is /tmp/spark-86439c44-9a36-4bda-b8c7-063c5c2e15b2
- 14/07/08 16:03:08 INFO HttpServer: Starting HTTP Server
- 14/07/08 16:03:08 INFO SparkUI: Started Spark Web UI at http://192.168.200.233:4040
- 14/07/08 16:03:08 INFO AppClient$ClientActor: Connecting to master spark://localhost:7077...
- 14/07/08 16:03:09 INFO SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20140708160309-0000
- 14/07/08 16:03:09 INFO AppClient$ClientActor: Executor added: app-20140708160309-0000/0 on worker-20140708160246-localhost-34775 (localhost:34775) with 4 cores
- 14/07/08 16:03:09 INFO SparkDeploySchedulerBackend: Granted executor ID app-20140708160309-0000/0 on hostPort localhost:34775 with 4 cores, 512.0 MB RAM
- 14/07/08 16:03:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
- 14/07/08 16:03:09 INFO AppClient$ClientActor: Executor updated: app-20140708160309-0000/0 is now RUNNING
- 14/07/08 16:03:10 INFO SparkContext: Added JAR /home/hadoop/Desktop/JavaSparkT.jar at http://192.168.200.233:52827/jars/JavaSparkT.jar with timestamp 1404806590353
- 14/07/08 16:03:10 INFO MemoryStore: ensureFreeSpace(138763) called with curMem=0, maxMem=507720499
- 14/07/08 16:03:10 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 135.5 KB, free 484.1 MB)
- 14/07/08 16:03:12 INFO SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@localhost:42090/user/Executor#-1434031133] with ID 0
- 14/07/08 16:03:13 INFO BlockManagerMasterActor$BlockManagerInfo: Registering block manager localhost:56831 with 294.9 MB RAM
- 14/07/08 16:03:13 INFO FileInputFormat: Total input paths to process : 1
- 14/07/08 16:03:13 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
- 14/07/08 16:03:13 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
- 14/07/08 16:03:13 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
- 14/07/08 16:03:13 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
- 14/07/08 16:03:13 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
- 14/07/08 16:03:13 INFO SparkContext: Starting job: saveAsTextFile at JavaWordCount.java:66
- 14/07/08 16:03:13 INFO DAGScheduler: Registering RDD 4 (reduceByKey at JavaWordCount.java:60)
- 14/07/08 16:03:13 INFO DAGScheduler: Got job 0 (saveAsTextFile at JavaWordCount.java:66) with 1 output partitions (allowLocal=false)
- 14/07/08 16:03:13 INFO DAGScheduler: Final stage: Stage 0 (saveAsTextFile at JavaWordCount.java:66)
- 14/07/08 16:03:13 INFO DAGScheduler: Parents of final stage: List(Stage 1)
- 14/07/08 16:03:13 INFO DAGScheduler: Missing parents: List(Stage 1)
- 14/07/08 16:03:13 INFO DAGScheduler: Submitting Stage 1 (MapPartitionsRDD[4] at reduceByKey at JavaWordCount.java:60), which has no missing parents
- 14/07/08 16:03:13 INFO DAGScheduler: Submitting 1 missing tasks from Stage 1 (MapPartitionsRDD[4] at reduceByKey at JavaWordCount.java:60)
- 14/07/08 16:03:13 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
- 14/07/08 16:03:13 INFO TaskSetManager: Starting task 1.0:0 as TID 0 on executor 0: localhost (PROCESS_LOCAL)
- 14/07/08 16:03:13 INFO TaskSetManager: Serialized task 1.0:0 as 2252 bytes in 39 ms
- 14/07/08 16:03:17 INFO TaskSetManager: Finished TID 0 in 3310 ms on localhost (progress: 1/1)
- 14/07/08 16:03:17 INFO DAGScheduler: Completed ShuffleMapTask(1, 0)
- 14/07/08 16:03:17 INFO DAGScheduler: Stage 1 (reduceByKey at JavaWordCount.java:60) finished in 3.319 s
- 14/07/08 16:03:17 INFO DAGScheduler: looking for newly runnable stages
- 14/07/08 16:03:17 INFO DAGScheduler: running: Set()
- 14/07/08 16:03:17 INFO DAGScheduler: waiting: Set(Stage 0)
- 14/07/08 16:03:17 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
- 14/07/08 16:03:17 INFO DAGScheduler: failed: Set()
- 14/07/08 16:03:17 INFO DAGScheduler: Missing parents for Stage 0: List()
- 14/07/08 16:03:17 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[7] at saveAsTextFile at JavaWordCount.java:66), which is now runnable
- 14/07/08 16:03:17 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (MappedRDD[7] at saveAsTextFile at JavaWordCount.java:66)
- 14/07/08 16:03:17 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
- 14/07/08 16:03:17 INFO TaskSetManager: Starting task 0.0:0 as TID 1 on executor 0: localhost (PROCESS_LOCAL)
- 14/07/08 16:03:17 INFO TaskSetManager: Serialized task 0.0:0 as 11717 bytes in 0 ms
- 14/07/08 16:03:17 INFO MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 0 to spark@localhost:37990
- 14/07/08 16:03:17 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 127 bytes
- 14/07/08 16:03:18 INFO DAGScheduler: Completed ResultTask(0, 0)
- 14/07/08 16:03:18 INFO TaskSetManager: Finished TID 1 in 1074 ms on localhost (progress: 1/1)
- 14/07/08 16:03:18 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
- 14/07/08 16:03:18 INFO DAGScheduler: Stage 0 (saveAsTextFile at JavaWordCount.java:66) finished in 1.076 s
- 14/07/08 16:03:18 INFO SparkContext: Job finished: saveAsTextFile at JavaWordCount.java:66, took 4.719158065 s
程序执行结果如下:
![](https://code.csdn.net/assets/CODE_ico.png)
- [hadoop@localhost sbin]$ hadoop fs -ls hdfs://localhost:9000/input/result.txt
- 14/07/08 16:04:22 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
- Found 2 items
- -rw-r--r-- 3 hadoop supergroup 0 2014-07-08 16:03 hdfs://localhost:9000/input/result.txt/_SUCCESS
- -rw-r--r-- 3 hadoop supergroup 56 2014-07-08 16:03 hdfs://localhost:9000/input/result.txt/part-00000
- [hadoop@localhost sbin]$ hadoop fs -cat hdfs://localhost:9000/input/result.txt/part-00000
- 14/07/08 16:04:44 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
- (caozw,1)
- (hello,3)
- (hadoop,1)
- (2.2.0,1)
- (world,1)
- [hadoop@localhost sbin]$
spark java api通过run as java application运行的方法相关推荐
- Elasticsearch Java API 6.2(java client)
前言 本节描述了Elasticsearch提供的Java API,所有的Elasticsearch操作都使用客户端对象执行,所有操作本质上都是完全异步的(要么接收监听器,要么未来返回). 此外,客户端 ...
- kafka java api 删除_使用Java API创建(create),查看(describe),列举(list),删除(delete)Kafka主题(Topic)...
使用Kafka的同学都知道,我们每次创建Kafka主题(Topic)的时候可以指定分区数和副本数等信息,如果将这些属性配置到server.properties文件中,以后调用Java API生成的主题 ...
- ews java api maven_通过 EWS JAVA API读取exchange邮件
为什么80%的码农都做不了架构师?>>> 第一步,下载EWS JAVA API包 从如下路径下载EWS API包:http://code.msdn.microsoft.com/Exc ...
- ArangoDB 学习笔记(二)AQL Java API | AQL语法 | 使用Java连接ArangoDB
文章目录 参考资料 一.ArangoDB Java Driver 支持的不同类型 1.1 BaseDocument 1.2 XML 1.3 Graph 二.AQL 2.1 AQL 语法 2.1.1 查 ...
- Java api 入门教程 之 JAVA的IO处理
IO是输入和输出的简称,在实际的使用时,输入和输出是有方向的.就像现实中两个人之间借钱一样,例如A借钱给B,相对于A来说是借出,而相对于B来说则是借入.所以在程序中提到输入和输出时,也需要区分清楚是相 ...
- COMSOL java API——编译comsol模型java文件
在Windows系统下编译comsol模型的java文件(确保jdk安装成功,并且设置好环境变量). 本文以comsol案例库文件馈线夹的变形(feeder_clamp)为例. 1.打开feeder_ ...
- 达内java api文档,达内JAVA核心API(下)
中按照该编码将字节数据转换为字符并读取.需要使用字节输入流作为参数构造InputStreamReader对象. 6.下列代码中,能实现按行读取文件的数据的流是:(). A.InputStreamRea ...
- java对象名不可以是_java运行一个方法时如何得到该个对象的名字(不是类的名字)....
展开全部 可以通过StackTrace从栈顶往下62616964757a686964616fe4b893e5b19e31333332623239倒 String clsname = "Cla ...
- dita文档_使用DITADoclet和DITA API专业化生成DITA Java™API参考文档
dita文档 2009年12月11日修订说明:在" 目标"和" 安装org.dita.dost插件 "标题下添加了两个指向可下载资源的链接. 2014年3月7日 ...
最新文章
- 不占用多余空间实现值的交换——异或运算
- 漫话:如何给女朋友解释为什么计算机中 0.2 + 0.1 不等于 0.3 ?
- Java 基础 之 常量
- 电热耦合_教育部关于发布电热原子吸收光谱分析方法通则等30个教育行业标准的通知...
- valgrind 详解
- 3.将maven项目jar纳入maven仓库,Mave项目依赖另外一个Maven项目的案例
- html金额自动换算成大写,[求助]word文档中金额数字自动转换为大写
- squid 日志清理
- 如何利用回调模式去解决问题
- 【无广告】一位算法工程师从30+场秋招面试中总结出的超强面经——目标检测篇...
- MySQL查询语句格式总结
- Win10 LSTC与Ubuntu18.04LTS双系统安装详细流程
- 163邮箱如何开启pop服务器端口,pop3端口号详情介绍
- 输入起止坐标,返回途径网格。
- 罗克韦尔AB PLC安装Studio5000提示未安装Microsoft .NET Framework 3.5的解决方法
- PLSQL Developer13.0.4安装破解教程
- 前端工程师高手说说CSS学习中的瓶颈
- Codeforces118D Caesar's Legions(DP)
- (转)关于无良培训机构的恶意抹黑疯狂Java的反驳(v2)
- PDF文件打开密码忘记了可以取消吗
热门文章
- tomcat安全认证
- Fresco使用及问题
- 偶然搜索看到的杂谈——什麼東西是.NET程序員可以掌握並且可倚仗十年而不管微軟存在與否的技術呢?...
- scss提取 vue_vue 中使用sass实现主体换肤
- 计算机二级-JAVA基础知识1
- mysql私房菜_老男孩MySQL私房菜深入浅出精品视频第7章备份与恢复基础实践视频课程...
- 明晚直播丨基于IB网络的Oracle Extend RAC最佳实践
- 2020 从新开始:你应该知道的Oracle认证新变化
- 一条SQL语句的千回百转
- 解密GaussDB(for Influx)时序洞察