通过API操作之前要先了解几个基本知识

一、hadoop的基本数据类型和java的基本数据类型是不一样的,但是都存在对应的关系

如下图

如果需要定义自己的数据类型,则必须实现Writable

hadoop的数据类型可以通过get方法获得对应的java数据类型

而java的数据类型可以通过hadoop数据类名的构造函数,或者set方法转换

二、hadoop提交作业的的步骤分为八个,可以理解为天龙八步

如下:

map端工作:

1.1 读取要操作的文件--这步会将文件的内容格式化成键值对的形式,键为每一行的起始位置偏移,值为每一行的内容

1.2 调用map进行处理--在这步使用自定义的Mapper类来实现自己的逻辑,输入的数据为1.1格式化的键值对,输入的数据也是键值对的形式

1.3 对map的处理结果进行分区--map处理完毕之后可以根据自己的业务需求来对键值对进行分区处理,比如,将类型不同的结果保存在不同的文件中等。这里设置几个分区,后面就会有对应的几个Reducer来处理相应分区中的内容

1.4 分区之后,对每个分区的数据进行排序,分组--排序按照从小到大进行排列,排序完毕之后,会将键值对中,key相同的选项 的value进行合并。如,所有的键值对中,可能存在

hello 1

hello 1

key都是hello,进行合并之后变成

hello 2

可以根据自己的业务需求对排序和合并的处理进行干涉和实现

1.5 归约(combiner)--简单的说就是在map端进行一次reduce处理,但是和真正的reduce处理不同之处在于:combiner只能处理本地数据,不能跨网络处理。通过map端的combiner处理可以减少输出的数据,因为数据都是通过网络传输的,其目的是为了减轻网络传输的压力和后边reduce的工作量。并不能取代reduce

reduce端工作:

2.1 通过网络将数据copy到各个reduce

2.2 调用reduce进行处理--reduce接收的数据是整个map端处理完毕之后的键值对,输出的也是键值对的集合,是最终的结果

2.3 将结果输出到hdfs文件系统的路径中

新建一个java项目,并导入hadoop包,在项目选项上右键,如图选择

找到hadoop的安装目录,选择所有的包

在找到hadoop安装目录下的lib,导入其中的所有包

新建JMapper类为自定义的Mapper类

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;//自定义的Mapper类必须继承Mapper类,并重写map方法实现自己的逻辑
public class JMapper extends Mapper<LongWritable, Text, Text, LongWritable> {//处理输入文件的每一行都会调用一次map方法,文件有多少行就会调用多少次protected void map(LongWritable key,Text value,org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, LongWritable>.Context context)throws java.io.IOException, InterruptedException {//key为每一行的起始偏移量//value为每一行的内容//每一行的内容分割,如hello  world,分割成一个String数组有两个数据,分别是hello,worldString[] ss = value.toString().toString().split("\t");//循环数组,将其中的每个数据当做输出的键,值为1,表示这个键出现一次for (String s : ss) {//context.write方法可以将map得到的键值对输出context.write(new Text(s), new LongWritable(1));}};
}

新建JReducer类为自定义的Reducer

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;//自定义的Reducer类必须继承Reducer,并重写reduce方法实现自己的逻辑,泛型参数分别为输入的键类型,值类型;输出的键类型,值类型;之后的reduce类似
public class JReducer extends Reducer<Text, LongWritable, Text, LongWritable> {//处理每一个键值对都会调用一次reduce方法,有多少个键值对就调用多少次protected void reduce(Text key,java.lang.Iterable<LongWritable> value,org.apache.hadoop.mapreduce.Reducer<Text, LongWritable, Text, LongWritable>.Context context)throws java.io.IOException, InterruptedException {//key为每一个单独的单词,如:hello,world,you,me等//value为这个单词在文本中出现的次数集合,如{1,1,1},表示总共出现了三次long sum = 0;//循环value,将其中的值相加,得到总次数for (LongWritable v : value) {sum += v.get();}//context.write输入新的键值对(结果)context.write(key, new LongWritable(sum));};
}

新建执行提交作业的类,取名JSubmit

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;public class JSubmit {public static void main(String[] args) throws IOException,URISyntaxException, InterruptedException, ClassNotFoundException {//Path类为hadoop API定义,创建两个Path对象,一个输入文件的路径,一个输入结果的路径Path outPath = new Path("hdfs://localhost:9000/out");//输入文件的路径为本地linux系统的文件路径Path inPath = new Path("/home/hadoop/word");//创建默认的Configuration对象Configuration conf = new Configuration();//根据地址和conf得到hadoop的文件系统独享//如果输入路径已经存在则删除FileSystem fs = FileSystem.get(new URI("hdfs://localhost:9000"), conf);if (fs.exists(outPath)) {fs.delete(outPath, true);}//根据conf创建一个新的Job对象,代表要提交的作业,作业名为JSubmit.class.getSimpleName()Job job = new Job(conf, JSubmit.class.getSimpleName());//1.1//FileInputFormat类设置要读取的文件路径FileInputFormat.setInputPaths(job, inPath);//setInputFormatClass设置读取文件时使用的格式化类job.setInputFormatClass(TextInputFormat.class);//1.2调用自定义的Mapper类的map方法进行操作//设置处理的Mapper类job.setMapperClass(JMapper.class);//设置Mapper类处理完毕之后输出的键值对 的 数据类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(LongWritable.class);//1.3分区,下面的两行代码写和没写都一样,默认的设置
<span style="white-space:pre">     </span>job.setPartitionerClass(HashPartitioner.class);
<span style="white-space:pre">     </span>job.setNumReduceTasks(1);//1.4排序,分组//1.5归约,这三步都有默认的设置,如果没有特殊的需求可以不管
       //2.1将数据传输到对应的Reducer//2.2使用自定义的Reducer类操作//设置Reducer类job.setReducerClass(JReducer.class);//设置Reducer处理完之后 输出的键值对 的数据类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(LongWritable.class);//2.3将结果输出//FileOutputFormat设置输出的路径FileOutputFormat.setOutputPath(job, outPath);//setOutputFormatClass设置输出时的格式化类job.setOutputFormatClass(TextOutputFormat.class);//将当前的job对象提交job.waitForCompletion(true);}
}

运行java程序,可以再控制台看到提交作业的提示

在hdfs中查看输出的文件

运行成功!

转载于:https://www.cnblogs.com/jchubby/p/4429702.html

通过java api提交自定义hadoop 作业相关推荐

  1. HDFS中JAVA API的使用(hadoop的文件上传和下载)

    HDFS是一个分布式文件系统,既然是文件系统,就可以对其文件进行操作,比如说新建文件.删除文件.读取文件内容等操作.下面记录一下使用JAVA API对HDFS中的文件进行操作的过程. 对分HDFS中的 ...

  2. Spark Launcher Java API提交Spark算法

    在介绍之前,我先附上spark 官方文档地址: http://spark.apache.org/docs/latest/api/java/org/apache/spark/launcher/packa ...

  3. 作业优先调度java代码_如何在触发它时在java代码中为hadoop作业设置优先级?

    这是我的工作人员的外表.如何在触发这项工作时设定低优先级? public int run(String[] args) throws Exception { this.initJob(); Path ...

  4. aws上部署hadoop_在AWS Elastic MapReduce上运行PageRank Hadoop作业

    aws上部署hadoop 在上一篇文章中,我描述了一个执行PageRank计算的示例,该示例是使用Apache Hadoop进行Mining Massive Dataset课程的一部分. 在那篇文章中 ...

  5. 在AWS Elastic MapReduce上运行PageRank Hadoop作业

    在上一篇文章中,我描述了执行PageRank计算的示例,该示例是使用Apache Hadoop进行Mining Massive Dataset课程的一部分. 在那篇文章中,我接受了Java中现有的Ha ...

  6. Hadoop作业提交分析(三)

    http://www.cnblogs.com/spork/archive/2010/04/12/1710294.html 通过前面两篇文章的分析,对Hadoop的作业提交流程基本明了了,下面我们就可以 ...

  7. hadoop_单元测试Java Hadoop作业

    hadoop 在我以前的文章中,我展示了如何设置一个完整的基于Maven的项目,以用Java创建Hadoop作业. 当然并没有完成,因为它缺少单元测试部分. 在这篇文章中,我将展示如何将MapRedu ...

  8. Linux下Hadoop hdfs Java API使用

    0 前言 搞了大约2天时间终于把Linux下面Java API的使用给弄清楚了.做个笔记方便以后参考.环境如下所示 Hadoop:2.5.1 Linux:Ubuntu kylin eclipse:lu ...

  9. 【hadoop2.6.0】利用Hadoop的 Java API

    Hadoop2.6.0的所有Java API都在 http://hadoop.apache.org/docs/r2.6.0/api/overview-summary.html 里. 下面实现一个利用J ...

最新文章

  1. 微信支付的坑 返回值 -1
  2. 2017 Multi-University Training Contest - Team 2——HDU6045HDU6047HDU6055
  3. matlab dtft的函数,DTFT的Matlab矩阵计算的理解
  4. 第2课 桐桐的运输方案《聪明人的游戏 信息学探秘.提高篇》
  5. 【英语学习】【Level 07】U05 Best Destination L3 An Australian Adventure
  6. 【AI面试题】逻辑回归和线性回归的区别
  7. 避免线上故障的10条建议
  8. CentOS下MySQL主从同步配置
  9. android布局时长分析,Android性能优化:布局优化 详细解析(含、、讲解 )
  10. Android-- Dialog对话框的使用方法
  11. Atitit nodejs js 获取图像分辨率 尺寸 大小 宽度 高度
  12. 超好用的富文本编辑器froalaEditor(方便传图片和视频等)
  13. java 加水印_Java添加水印(图片水印,文字水印)
  14. windows7装python哪个版本好_[合集] 弱弱的问下Windows10和7,装Python的哪个版本
  15. 计算机革命的主角和英雄——十大超级老牌黑客
  16. 你一定会用到的SolidWorks快捷键汇总大全
  17. html5 移动端手写签名,H5移动端项目实现手写签名功能 vue实现手写签名
  18. 网恋中在拿谁的情感玩游戏
  19. python--data.dropna
  20. IDEA服务器端JQuery框架加载失败--已解决

热门文章

  1. ubuntu下面解决mysqld_safe Directory '/var/run/mysqld' for UNIX socket file don't exists
  2. pycharm建立启动器命令
  3. springboot入门(项目)
  4. Windows10系统安装 .NET Framework 3.5
  5. devexpress PivotGrid Grand Total
  6. 有关字符编码学习记录
  7. ​Linux下C如何调用PCI Lib函数
  8. RFID位置数据这么多,企业应该怎么利用?
  9. SQL Server 数据库关键知识点详解(优秀经典)
  10. Windows开启远程桌面服务(Win10)