先上代码:

[python] view plain copy  
  1. /*
  2. * Licensed to the Apache Software Foundation (ASF) under one or more
  3. * contributor license agreements.  See the NOTICE file distributed with
  4. * this work for additional information regarding copyright ownership.
  5. * The ASF licenses this file to You under the Apache License, Version 2.0
  6. * (the "License"); you may not use this file except in compliance with
  7. * the License.  You may obtain a copy of the License at
  8. *
  9. *    http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. */
  17. import java.util.Arrays;
  18. import java.util.regex.Pattern;
  19. import org.apache.spark.api.java.JavaPairRDD;
  20. import org.apache.spark.api.java.JavaRDD;
  21. import org.apache.spark.api.java.JavaSparkContext;
  22. import org.apache.spark.api.java.function.FlatMapFunction;
  23. import org.apache.spark.api.java.function.Function2;
  24. import org.apache.spark.api.java.function.PairFunction;
  25. import scala.Tuple2;
  26. public final class JavaWordCount {
  27. private static final Pattern SPACE = Pattern.compile(" ");
  28. public static void main(String[] args) throws Exception {
  29. if (args.length < 2) {
  30. System.err.println("Usage: JavaWordCount <master> <file>");
  31. System.exit(1);
  32. }
  33. JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaWordCount",
  34. System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaWordCount.class));
  35. ctx.addJar("/home/hadoop/Desktop/JavaSparkT.jar");
  36. JavaRDD<String> lines = ctx.textFile(args[1], 1);
  37. JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
  38. @Override
  39. public Iterable<String> call(String s) {
  40. return Arrays.asList(SPACE.split(s));
  41. }
  42. });
  43. JavaPairRDD<String, Integer> ones = words.map(new PairFunction<String, String, Integer>() {
  44. @Override
  45. public Tuple2<String, Integer> call(String s) {
  46. return new Tuple2<String, Integer>(s, 1);
  47. }
  48. });
  49. JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
  50. @Override
  51. public Integer call(Integer i1, Integer i2) {
  52. return i1 + i2;
  53. }
  54. });
  55. counts.saveAsTextFile(args[2]);
  56. //    counts.s
  57. /*List<Tuple2<String, Integer>> output = counts.collect();
  58. for (Tuple2<?,?> tuple : output) {
  59. System.out.println(tuple._1() + ": " + tuple._2());
  60. }*/
  61. System.exit(0);
  62. }
  63. }

这是spark 自带的一个example  之前只能将代码达成jar包然后在spark的bin目录下面通过spark-class来运行,这样我们就没办法将spark的程序你很好的融合到现有的系统中,所以我希望通过java函数调用的方式运行这段程序,在一段时间的摸索和老师的指导下发现根据报错的意思应该是没有将jar包提交到spark的worker上面 导致运行的worker找不到被调用的类,会报如下错误:

[python] view plain copy  
  1. 4/07/07 10:26:10 INFO TaskSetManager: Serialized task 1.0:0 as 2194 bytes in 104 ms
  2. 14/07/07 10:26:11 WARN TaskSetManager: Lost TID 0 (task 1.0:0)
  3. 14/07/07 10:26:11 WARN TaskSetManager: Loss was due to java.lang.ClassNotFoundException
  4. java.lang.ClassNotFoundException: JavaWordCount$1
  5. at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
  6. at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
  7. at java.security.AccessController.doPrivileged(Native Method)
  8. at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
  9. at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
  10. at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
  11. at java.lang.Class.forName0(Native Method)
  12. at java.lang.Class.forName(Class.java:270)
  13. at org.apache.spark.serializer.JavaDeserializationStream$anon$1.resolveClass(JavaSerializer.scala:37)
  14. at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
  15. at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
  16. at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
  17. at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
  18. at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
  19. at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
  20. at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
  21. at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
  22. at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
  23. at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
  24. at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
  25. at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
  26. at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
  27. at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
  28. at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
  29. at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
  30. at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
  31. at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
  32. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  33. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

解决方案:将要运行的程序达成jar包,然后调用JavaSparkContext的addJar方法将该jar包提交到spark集群中,然后spark的master会将该jar包分发到各个worker上面,
代码如下:
原文中没有给出java的代码,但是在这里根据原文的描述给出scala的代码,亲测可用

比如,我们的IDEA的目录”/root/IdeaProjects/HelloScala“,则其打成jar包后的地址为"HelloScala/out/artifacts/SparkTest_jar/SparkTest.jar",则添加jar包的地址为如下所示:

sc.addJar("/root/IdeaProjects/HelloScala/out/artifacts/SparkTest_jar/SparkTest.jar")

这样再运行程序即可。(程序修改过后要重新打jar包)

这样运行时就不会出现 java.lang.ClassNotFoundException: JavaWordCount$1这样的错误了
运行如下:

spark://localhost:7077  hdfs://localhost:9000/input/test.txt  hdfs://localhost:9000/input/result.txt

然后会eclipse控制台中会有如下log

[python] view plain copy  
  1. 14/07/08 16:03:06 INFO Utils: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
  2. 14/07/08 16:03:06 WARN Utils: Your hostname, localhost resolves to a loopback address: 127.0.0.1; using 192.168.200.233 instead (on interface eth0)
  3. 14/07/08 16:03:06 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
  4. 14/07/08 16:03:07 INFO Slf4jLogger: Slf4jLogger started
  5. 14/07/08 16:03:07 INFO Remoting: Starting remoting
  6. 14/07/08 16:03:07 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@192.168.200.233:52469]
  7. 14/07/08 16:03:07 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@192.168.200.233:52469]
  8. 14/07/08 16:03:07 INFO SparkEnv: Registering BlockManagerMaster
  9. 14/07/08 16:03:07 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20140708160307-0a89
  10. 14/07/08 16:03:07 INFO MemoryStore: MemoryStore started with capacity 484.2 MB.
  11. 14/07/08 16:03:08 INFO ConnectionManager: Bound socket to port 47731 with id = ConnectionManagerId(192.168.200.233,47731)
  12. 14/07/08 16:03:08 INFO BlockManagerMaster: Trying to register BlockManager
  13. 14/07/08 16:03:08 INFO BlockManagerMasterActor$BlockManagerInfo: Registering block manager 192.168.200.233:47731 with 484.2 MB RAM
  14. 14/07/08 16:03:08 INFO BlockManagerMaster: Registered BlockManager
  15. 14/07/08 16:03:08 INFO HttpServer: Starting HTTP Server
  16. 14/07/08 16:03:08 INFO HttpBroadcast: Broadcast server started at http://192.168.200.233:58077
  17. 14/07/08 16:03:08 INFO SparkEnv: Registering MapOutputTracker
  18. 14/07/08 16:03:08 INFO HttpFileServer: HTTP File server directory is /tmp/spark-86439c44-9a36-4bda-b8c7-063c5c2e15b2
  19. 14/07/08 16:03:08 INFO HttpServer: Starting HTTP Server
  20. 14/07/08 16:03:08 INFO SparkUI: Started Spark Web UI at http://192.168.200.233:4040
  21. 14/07/08 16:03:08 INFO AppClient$ClientActor: Connecting to master spark://localhost:7077...
  22. 14/07/08 16:03:09 INFO SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20140708160309-0000
  23. 14/07/08 16:03:09 INFO AppClient$ClientActor: Executor added: app-20140708160309-0000/0 on worker-20140708160246-localhost-34775 (localhost:34775) with 4 cores
  24. 14/07/08 16:03:09 INFO SparkDeploySchedulerBackend: Granted executor ID app-20140708160309-0000/0 on hostPort localhost:34775 with 4 cores, 512.0 MB RAM
  25. 14/07/08 16:03:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
  26. 14/07/08 16:03:09 INFO AppClient$ClientActor: Executor updated: app-20140708160309-0000/0 is now RUNNING
  27. 14/07/08 16:03:10 INFO SparkContext: Added JAR /home/hadoop/Desktop/JavaSparkT.jar at http://192.168.200.233:52827/jars/JavaSparkT.jar with timestamp 1404806590353
  28. 14/07/08 16:03:10 INFO MemoryStore: ensureFreeSpace(138763) called with curMem=0, maxMem=507720499
  29. 14/07/08 16:03:10 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 135.5 KB, free 484.1 MB)
  30. 14/07/08 16:03:12 INFO SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@localhost:42090/user/Executor#-1434031133] with ID 0
  31. 14/07/08 16:03:13 INFO BlockManagerMasterActor$BlockManagerInfo: Registering block manager localhost:56831 with 294.9 MB RAM
  32. 14/07/08 16:03:13 INFO FileInputFormat: Total input paths to process : 1
  33. 14/07/08 16:03:13 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
  34. 14/07/08 16:03:13 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
  35. 14/07/08 16:03:13 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
  36. 14/07/08 16:03:13 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
  37. 14/07/08 16:03:13 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
  38. 14/07/08 16:03:13 INFO SparkContext: Starting job: saveAsTextFile at JavaWordCount.java:66
  39. 14/07/08 16:03:13 INFO DAGScheduler: Registering RDD 4 (reduceByKey at JavaWordCount.java:60)
  40. 14/07/08 16:03:13 INFO DAGScheduler: Got job 0 (saveAsTextFile at JavaWordCount.java:66) with 1 output partitions (allowLocal=false)
  41. 14/07/08 16:03:13 INFO DAGScheduler: Final stage: Stage 0 (saveAsTextFile at JavaWordCount.java:66)
  42. 14/07/08 16:03:13 INFO DAGScheduler: Parents of final stage: List(Stage 1)
  43. 14/07/08 16:03:13 INFO DAGScheduler: Missing parents: List(Stage 1)
  44. 14/07/08 16:03:13 INFO DAGScheduler: Submitting Stage 1 (MapPartitionsRDD[4] at reduceByKey at JavaWordCount.java:60), which has no missing parents
  45. 14/07/08 16:03:13 INFO DAGScheduler: Submitting 1 missing tasks from Stage 1 (MapPartitionsRDD[4] at reduceByKey at JavaWordCount.java:60)
  46. 14/07/08 16:03:13 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
  47. 14/07/08 16:03:13 INFO TaskSetManager: Starting task 1.0:0 as TID 0 on executor 0: localhost (PROCESS_LOCAL)
  48. 14/07/08 16:03:13 INFO TaskSetManager: Serialized task 1.0:0 as 2252 bytes in 39 ms
  49. 14/07/08 16:03:17 INFO TaskSetManager: Finished TID 0 in 3310 ms on localhost (progress: 1/1)
  50. 14/07/08 16:03:17 INFO DAGScheduler: Completed ShuffleMapTask(1, 0)
  51. 14/07/08 16:03:17 INFO DAGScheduler: Stage 1 (reduceByKey at JavaWordCount.java:60) finished in 3.319 s
  52. 14/07/08 16:03:17 INFO DAGScheduler: looking for newly runnable stages
  53. 14/07/08 16:03:17 INFO DAGScheduler: running: Set()
  54. 14/07/08 16:03:17 INFO DAGScheduler: waiting: Set(Stage 0)
  55. 14/07/08 16:03:17 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
  56. 14/07/08 16:03:17 INFO DAGScheduler: failed: Set()
  57. 14/07/08 16:03:17 INFO DAGScheduler: Missing parents for Stage 0: List()
  58. 14/07/08 16:03:17 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[7] at saveAsTextFile at JavaWordCount.java:66), which is now runnable
  59. 14/07/08 16:03:17 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (MappedRDD[7] at saveAsTextFile at JavaWordCount.java:66)
  60. 14/07/08 16:03:17 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
  61. 14/07/08 16:03:17 INFO TaskSetManager: Starting task 0.0:0 as TID 1 on executor 0: localhost (PROCESS_LOCAL)
  62. 14/07/08 16:03:17 INFO TaskSetManager: Serialized task 0.0:0 as 11717 bytes in 0 ms
  63. 14/07/08 16:03:17 INFO MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 0 to spark@localhost:37990
  64. 14/07/08 16:03:17 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 127 bytes
  65. 14/07/08 16:03:18 INFO DAGScheduler: Completed ResultTask(0, 0)
  66. 14/07/08 16:03:18 INFO TaskSetManager: Finished TID 1 in 1074 ms on localhost (progress: 1/1)
  67. 14/07/08 16:03:18 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
  68. 14/07/08 16:03:18 INFO DAGScheduler: Stage 0 (saveAsTextFile at JavaWordCount.java:66) finished in 1.076 s
  69. 14/07/08 16:03:18 INFO SparkContext: Job finished: saveAsTextFile at JavaWordCount.java:66, took 4.719158065 s

程序执行结果如下:

[python] view plain copy  
  1. [hadoop@localhost sbin]$ hadoop fs -ls hdfs://localhost:9000/input/result.txt
  2. 14/07/08 16:04:22 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
  3. Found 2 items
  4. -rw-r--r--   3 hadoop supergroup          0 2014-07-08 16:03 hdfs://localhost:9000/input/result.txt/_SUCCESS
  5. -rw-r--r--   3 hadoop supergroup         56 2014-07-08 16:03 hdfs://localhost:9000/input/result.txt/part-00000
  6. [hadoop@localhost sbin]$ hadoop fs -cat  hdfs://localhost:9000/input/result.txt/part-00000
  7. 14/07/08 16:04:44 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
  8. (caozw,1)
  9. (hello,3)
  10. (hadoop,1)
  11. (2.2.0,1)
  12. (world,1)
  13. [hadoop@localhost sbin]$

spark java api通过run as java application运行的方法相关推荐

  1. Elasticsearch Java API 6.2(java client)

    前言 本节描述了Elasticsearch提供的Java API,所有的Elasticsearch操作都使用客户端对象执行,所有操作本质上都是完全异步的(要么接收监听器,要么未来返回). 此外,客户端 ...

  2. kafka java api 删除_使用Java API创建(create),查看(describe),列举(list),删除(delete)Kafka主题(Topic)...

    使用Kafka的同学都知道,我们每次创建Kafka主题(Topic)的时候可以指定分区数和副本数等信息,如果将这些属性配置到server.properties文件中,以后调用Java API生成的主题 ...

  3. ews java api maven_通过 EWS JAVA API读取exchange邮件

    为什么80%的码农都做不了架构师?>>> 第一步,下载EWS JAVA API包 从如下路径下载EWS API包:http://code.msdn.microsoft.com/Exc ...

  4. ArangoDB 学习笔记(二)AQL Java API | AQL语法 | 使用Java连接ArangoDB

    文章目录 参考资料 一.ArangoDB Java Driver 支持的不同类型 1.1 BaseDocument 1.2 XML 1.3 Graph 二.AQL 2.1 AQL 语法 2.1.1 查 ...

  5. Java api 入门教程 之 JAVA的IO处理

    IO是输入和输出的简称,在实际的使用时,输入和输出是有方向的.就像现实中两个人之间借钱一样,例如A借钱给B,相对于A来说是借出,而相对于B来说则是借入.所以在程序中提到输入和输出时,也需要区分清楚是相 ...

  6. COMSOL java API——编译comsol模型java文件

    在Windows系统下编译comsol模型的java文件(确保jdk安装成功,并且设置好环境变量). 本文以comsol案例库文件馈线夹的变形(feeder_clamp)为例. 1.打开feeder_ ...

  7. 达内java api文档,达内JAVA核心API(下)

    中按照该编码将字节数据转换为字符并读取.需要使用字节输入流作为参数构造InputStreamReader对象. 6.下列代码中,能实现按行读取文件的数据的流是:(). A.InputStreamRea ...

  8. java对象名不可以是_java运行一个方法时如何得到该个对象的名字(不是类的名字)....

    展开全部 可以通过StackTrace从栈顶往下62616964757a686964616fe4b893e5b19e31333332623239倒 String clsname = "Cla ...

  9. dita文档_使用DITADoclet和DITA API专业化生成DITA Java™API参考文档

    dita文档 2009年12月11日修订说明:在" 目标"和" 安装org.dita.dost插件 "标题下添加了两个指向可下载资源的链接. 2014年3月7日 ...

最新文章

  1. 不占用多余空间实现值的交换——异或运算
  2. 漫话:如何给女朋友解释为什么计算机中 0.2 + 0.1 不等于 0.3 ?
  3. Java 基础 之 常量
  4. 电热耦合_教育部关于发布电热原子吸收光谱分析方法通则等30个教育行业标准的通知...
  5. valgrind 详解
  6. 3.将maven项目jar纳入maven仓库,Mave项目依赖另外一个Maven项目的案例
  7. html金额自动换算成大写,[求助]word文档中金额数字自动转换为大写
  8. squid 日志清理
  9. 如何利用回调模式去解决问题
  10. 【无广告】一位算法工程师从30+场秋招面试中总结出的超强面经——目标检测篇...
  11. MySQL查询语句格式总结
  12. Win10 LSTC与Ubuntu18.04LTS双系统安装详细流程
  13. 163邮箱如何开启pop服务器端口,pop3端口号详情介绍
  14. 输入起止坐标,返回途径网格。
  15. 罗克韦尔AB PLC安装Studio5000提示未安装Microsoft .NET Framework 3.5的解决方法
  16. PLSQL Developer13.0.4安装破解教程
  17. 前端工程师高手说说CSS学习中的瓶颈
  18. Codeforces118D Caesar's Legions(DP)
  19. (转)关于无良培训机构的恶意抹黑疯狂Java的反驳(v2)
  20. PDF文件打开密码忘记了可以取消吗

热门文章

  1. tomcat安全认证
  2. Fresco使用及问题
  3. 偶然搜索看到的杂谈——什麼東西是.NET程序員可以掌握並且可倚仗十年而不管微軟存在與否的技術呢?...
  4. scss提取 vue_vue 中使用sass实现主体换肤
  5. 计算机二级-JAVA基础知识1
  6. mysql私房菜_老男孩MySQL私房菜深入浅出精品视频第7章备份与恢复基础实践视频课程...
  7. 明晚直播丨基于IB网络的Oracle Extend RAC最佳实践
  8. 2020 从新开始:你应该知道的Oracle认证新变化
  9. 一条SQL语句的千回百转
  10. 解密GaussDB(for Influx)时序洞察