问题描述:

当按照Hadoop实战上讲述的用eclipse提交作业,其实作业是运行在eclipse虚拟的一个云环境中,而不是真正提交到Hadoop云端运行。在50030上也看不到job的运行记录,此时的代码如下:

package com.spork.hadoop.jobutil.test;import java.io.File;
import java.io.IOException;
import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;import com.spork.hadoop.jobutil.EJob;public class WordCountTest {public static class TokenizerMapper extendsMapper<Object, Text, Text, IntWritable> {private final static IntWritable one = new IntWritable(1);private Text word = new Text();public void map(Object key, Text value, Context context)throws IOException, InterruptedException {StringTokenizer itr = new StringTokenizer(value.toString());while (itr.hasMoreTokens()) {word.set(itr.nextToken());context.write(word, one);}}}public static class IntSumReducer extendsReducer<Text, IntWritable, Text, IntWritable> {private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}result.set(sum);context.write(key, result);}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();String[] inAndOut={"hdfs://localhost:9000/bin/in/1","hdfs://localhost:9000/out"};Job job = new Job(conf, "word count");job.setJarByClass(WordCountTest.class);job.setMapperClass(TokenizerMapper.class);job.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path(inAndOut[0]));FileOutputFormat.setOutputPath(job, new Path(inAndOut[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

如果要将作业真正提交到云端,一种是用命令行打jar包,然后运行、提交作业,这方式比较笨,而程序员都是些懒人啊,能让机器做干嘛自己动手。具体实现可以参考这个博客:http://weixiaolu.iteye.com/blog/1402919

这里介绍一个更加智能的方式,其主要思路是将jar打包的工作放在了java代码中来完成。其中使用了一个工具类ejob.java。

示例代码如下:

package com.spork.hadoop.jobutil.test;import java.io.File;
import java.io.IOException;
import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;import com.spork.hadoop.jobutil.EJob;public class WordCountTest {public static class TokenizerMapper extendsMapper<Object, Text, Text, IntWritable> {private final static IntWritable one = new IntWritable(1);private Text word = new Text();public void map(Object key, Text value, Context context)throws IOException, InterruptedException {StringTokenizer itr = new StringTokenizer(value.toString());while (itr.hasMoreTokens()) {word.set(itr.nextToken());context.write(word, one);}}}public static class IntSumReducer extendsReducer<Text, IntWritable, Text, IntWritable> {private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}result.set(sum);context.write(key, result);}}public static void main(String[] args) throws Exception {// Add these statements. XXXFile jarFile = EJob.createTempJar("bin");EJob.addClasspath("/usr/hadoop-1.2.1/conf");ClassLoader classLoader = EJob.getClassLoader();Thread.currentThread().setContextClassLoader(classLoader);Configuration conf = new Configuration();String[] inAndOut={"hdfs://localhost:9000/bin/in/1","hdfs://localhost:9000/out"};Job job = new Job(conf, "word count");// And add this statement. XXX((JobConf) job.getConfiguration()).setJar(jarFile.toString());job.setJarByClass(WordCountTest.class);job.setMapperClass(TokenizerMapper.class);job.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path(inAndOut[0]));FileOutputFormat.setOutputPath(job, new Path(inAndOut[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

ejob类的代码如下:

/*** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package com.spork.hadoop.jobutil;import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.Array;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Enumeration;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;
import java.util.jar.JarOutputStream;
import java.util.jar.Manifest;public class EJob {private static ArrayList<URL> classPath = new ArrayList<URL>();/** Unpack a jar file into a directory. */public static void unJar(File jarFile, File toDir) throws IOException {JarFile jar = new JarFile(jarFile);try {Enumeration entries = jar.entries();while (entries.hasMoreElements()) {JarEntry entry = (JarEntry) entries.nextElement();if (!entry.isDirectory()) {InputStream in = jar.getInputStream(entry);try {File file = new File(toDir, entry.getName());if (!file.getParentFile().mkdirs()) {if (!file.getParentFile().isDirectory()) {throw new IOException("Mkdirs failed to create "+ file.getParentFile().toString());}}OutputStream out = new FileOutputStream(file);try {byte[] buffer = new byte[8192];int i;while ((i = in.read(buffer)) != -1) {out.write(buffer, 0, i);}} finally {out.close();}} finally {in.close();}}}} finally {jar.close();}}/*** Run a Hadoop job jar. If the main class is not in the jar's manifest, then* it must be provided on the command line.*/public static void runJar(String[] args) throws Throwable {String usage = "jarFile [mainClass] args...";if (args.length < 1) {System.err.println(usage);System.exit(-1);}int firstArg = 0;String fileName = args[firstArg++];File file = new File(fileName);String mainClassName = null;JarFile jarFile;try {jarFile = new JarFile(fileName);} catch (IOException io) {throw new IOException("Error opening job jar: " + fileName).initCause(io);}Manifest manifest = jarFile.getManifest();if (manifest != null) {mainClassName = manifest.getMainAttributes().getValue("Main-Class");}jarFile.close();if (mainClassName == null) {if (args.length < 2) {System.err.println(usage);System.exit(-1);}mainClassName = args[firstArg++];}mainClassName = mainClassName.replaceAll("/", ".");File tmpDir = new File(System.getProperty("java.io.tmpdir"));tmpDir.mkdirs();if (!tmpDir.isDirectory()) {System.err.println("Mkdirs failed to create " + tmpDir);System.exit(-1);}final File workDir = File.createTempFile("hadoop-unjar", "", tmpDir);workDir.delete();workDir.mkdirs();if (!workDir.isDirectory()) {System.err.println("Mkdirs failed to create " + workDir);System.exit(-1);}Runtime.getRuntime().addShutdownHook(new Thread() {public void run() {try {fullyDelete(workDir);} catch (IOException e) {}}});unJar(file, workDir);classPath.add(new File(workDir + "/").toURL());classPath.add(file.toURL());classPath.add(new File(workDir, "classes/").toURL());File[] libs = new File(workDir, "lib").listFiles();if (libs != null) {for (int i = 0; i < libs.length; i++) {classPath.add(libs[i].toURL());}}ClassLoader loader = new URLClassLoader(classPath.toArray(new URL[0]));Thread.currentThread().setContextClassLoader(loader);Class<?> mainClass = Class.forName(mainClassName, true, loader);Method main = mainClass.getMethod("main", new Class[] { Array.newInstance(String.class, 0).getClass() });String[] newArgs = Arrays.asList(args).subList(firstArg, args.length).toArray(new String[0]);try {main.invoke(null, new Object[] { newArgs });} catch (InvocationTargetException e) {throw e.getTargetException();}}/*** Delete a directory and all its contents. If we return false, the directory* may be partially-deleted.*/public static boolean fullyDelete(File dir) throws IOException {File contents[] = dir.listFiles();if (contents != null) {for (int i = 0; i < contents.length; i++) {if (contents[i].isFile()) {if (!contents[i].delete()) {return false;}} else {// try deleting the directory// this might be a symlinkboolean b = false;b = contents[i].delete();if (b) {// this was indeed a symlink or an empty directorycontinue;}// if not an empty directory or symlink let// fullydelete handle it.if (!fullyDelete(contents[i])) {return false;}}}}return dir.delete();}/*** Add a directory or file to classpath.* * @param component*/public static void addClasspath(String component) {if ((component != null) && (component.length() > 0)) {try {File f = new File(component);if (f.exists()) {URL key = f.getCanonicalFile().toURL();if (!classPath.contains(key)) {classPath.add(key);}}} catch (IOException e) {}}}/*** Add default classpath listed in bin/hadoop bash.* * @param hadoopHome*/public static void addDefaultClasspath(String hadoopHome) {// Classpath initially contains conf dir.addClasspath(hadoopHome + "/conf");// For developers, add Hadoop classes to classpath.addClasspath(hadoopHome + "/build/classes");if (new File(hadoopHome + "/build/webapps").exists()) {addClasspath(hadoopHome + "/build");}addClasspath(hadoopHome + "/build/test/classes");addClasspath(hadoopHome + "/build/tools");// For releases, add core hadoop jar & webapps to classpath.if (new File(hadoopHome + "/webapps").exists()) {addClasspath(hadoopHome);}addJarsInDir(hadoopHome);addJarsInDir(hadoopHome + "/build");// Add libs to classpath.addJarsInDir(hadoopHome + "/lib");addJarsInDir(hadoopHome + "/lib/jsp-2.1");addJarsInDir(hadoopHome + "/build/ivy/lib/Hadoop/common");}/*** Add all jars in directory to classpath, sub-directory is excluded.* * @param dirPath*/public static void addJarsInDir(String dirPath) {File dir = new File(dirPath);if (!dir.exists()) {return;}File[] files = dir.listFiles();if (files == null) {return;}for (int i = 0; i < files.length; i++) {if (files[i].isDirectory()) {continue;} else {addClasspath(files[i].getAbsolutePath());}}}/*** Create a temp jar file in "java.io.tmpdir".* * @param root* @return* @throws IOException*/public static File createTempJar(String root) throws IOException {if (!new File(root).exists()) {return null;}Manifest manifest = new Manifest();manifest.getMainAttributes().putValue("Manifest-Version", "1.0");final File jarFile = File.createTempFile("EJob-", ".jar", new File(System.getProperty("java.io.tmpdir")));Runtime.getRuntime().addShutdownHook(new Thread() {public void run() {jarFile.delete();}});JarOutputStream out = new JarOutputStream(new FileOutputStream(jarFile),manifest);createTempJarInner(out, new File(root), "");out.flush();out.close();return jarFile;}private static void createTempJarInner(JarOutputStream out, File f,String base) throws IOException {if (f.isDirectory()) {File[] fl = f.listFiles();if (base.length() > 0) {base = base + "/";}for (int i = 0; i < fl.length; i++) {createTempJarInner(out, fl[i], base + fl[i].getName());}} else {out.putNextEntry(new JarEntry(base));FileInputStream in = new FileInputStream(f);byte[] buffer = new byte[1024];int n = in.read(buffer);while (n != -1) {out.write(buffer, 0, n);n = in.read(buffer);}in.close();}}/*** Return a classloader based on user-specified classpath and parent* classloader.* * @return*/public static ClassLoader getClassLoader() {ClassLoader parent = Thread.currentThread().getContextClassLoader();if (parent == null) {parent = EJob.class.getClassLoader();}if (parent == null) {parent = ClassLoader.getSystemClassLoader();}return new URLClassLoader(classPath.toArray(new URL[0]), parent);}}

修改之后,50030就可以监测到job的运行状况了,而且看cpu的使用状态,各个cpu的占用率也上去了。

若想更加详细了解ejob的实现机理,可以参考下面的博客。

http://www.cnblogs.com/spork/archive/2010/04/21/1717592.html

转载于:https://www.cnblogs.com/lovecreatemylife/p/4370333.html

hadoop提交作业到云端问题解决相关推荐

  1. 在win7上的eclipse向hadoop提交作业异常-权限/设置调度器

    第一个问题,在win7上的eclipse向hadoop提交作业时,没有权限,异常信息如下: Java代码   Caused by: org.apache.hadoop.ipc.RemoteExcept ...

  2. hadoop客户端提交作业代码解读

    客户端提交作业: YarnRunner.submitApplication()    YarnClientImpl.submitApplication       ApplicationClientP ...

  3. spark提交到yarn_详细总结spark基于standalone、yarn集群提交作业流程

    最近总结了一些关于spark core的内容,今天先来和大家分享一下spark的运行模式. spark运行模式 (1)local:在本地eclipse.IDEA中写spark代码运行程序,一般用于测试 ...

  4. oozie 重新提交作业

    在oozie的运行过程当中可能会出现错误,比如数据库连接不上,或者作业执行报错导致流程进入suspend或者killed状态,这个时候我们就要分析了,如果确实是数据或者是网络有问题,我们比如把问题解决 ...

  5. Hadoop平台作业参数设置关于mapreduce.job.split.metainfo.maxsize的说明

    Hadoop平台作业参数设置关于mapreduce.job.split.metainfo.maxsize的说明 1.MR程序时执行时报错:    YarnRuntimeException: java. ...

  6. P2339 提交作业usaco

    P2339 提交作业usaco 题目背景 usaco 题目描述 贝西在哞哞大学选修了 C 门课,她要把所有作业分别交给每门课的老师,然后去车站和同学们一起回家.每个老师在各自的办公室里,办公室要等他们 ...

  7. 使用MRUnit,Mockito和PowerMock进行Hadoop MapReduce作业的单元测试

    0.preliminary 环境搭建 Setup development environment Download the latest version of MRUnit jar from Apac ...

  8. hadoop emr_在Amazon EMR上运行Hadoop MapReduce作业

    hadoop emr 不久前,我发布了如何使用CLI设置EMR群集的信息. 在本文中,我将展示如何使用适用于AWS的Java SDK来设置集群. 展示如何使用Java AWS开发工具包执行此操作的最佳 ...

  9. 在Amazon EMR上运行Hadoop MapReduce作业

    不久前,我发布了如何使用CLI设置EMR群集的信息. 在本文中,我将展示如何使用适用于AWS的Java SDK来设置集群. 展示如何使用Java AWS开发工具包执行此操作的最佳方法是展示完整的示例, ...

  10. pbs 写matlab作业,pbs提交作业

    如何如何使用使用pbs 提交作业 1 串行任务提交 用户通过qsub 命令来向系统提交任务,有两种方式提交:脚本方式和命令行方式.(一 般情况下,不允许root 用户使用qsub 命令提交作业) 1. ...

最新文章

  1. iOS开发系列--UITableView全面解析
  2. JAVA中字符集详解
  3. java cxf服务端代码_【JAVA】 cxf 生成 webservice 服务端代码
  4. 【数据分析】Python :知乎数据清洗整理和结论研究
  5. UI5 plugin - uploadCollection
  6. 在Eclipse中查看JDK类库的源代码
  7. 源码剖析 Netty 服务启动 NIO
  8. 71. Simplify Path
  9. java最好性能手机_企业级Java应用最重要的4个性能指标
  10. AJAX编写用户注册实例及技术小结
  11. 7模型集成:细粒度用户评论情感分析冠军思路及源码
  12. 全排列 流程图_[分享]给排水工程全流程施工详解,看看精品工程是怎样施工的!...
  13. Clion配置Ros环境
  14. [体验编译原理]编写简易计算器
  15. 容器化之路:谁偷走了我的构建时间
  16. Debian 10截图小工具 flameshot
  17. dell服务器重装iso系统,戴尔R620安装windows2012R2过程和方法
  18. containsAll和contains
  19. BUG InvalidArgumentError (see above for traceback): You must feed a value for placeholder tensor 'Pl
  20. 【Matlab】简单的滑模控制程序及Simulink仿真

热门文章

  1. Qt:#pragma comment(lib,“ws2_32.lib“) 报错
  2. 常见设计规范与 Sketch 源文件下载集合
  3. 【Adobe美术基础】字体安装
  4. Excel 脚本编写
  5. Chromeedge好用扩展插件分享
  6. -1岁的产品经理日记(20年秋招产品经理经历分享,含简历、笔经、面经)
  7. appinfo.json
  8. 之前做设计收集的部分网站
  9. Geoserver+postSQL+openlayer实现路径规划
  10. 软件项目开发流程以及人员职责,软件工程中五种常用的软件开发模型整理