前置博客:

搭建Hadoop3.1.2伪分布方式环境
本博客示例中可能出现的错误及解决方案:
Name node is in safe mode.
Container killed on request. Exit code is 143

简介

在开发之初,Avro就是围绕着完善Hadoop生态系统的数据处理而开展的(使用Avro作为Hadoop MapReduce需要处理数据序列化和反序列化的场景),因此Hadoop MapReduce集成Avro也就是自然而然的事情。

在MapReduce中使用Avro可以提升数据的处理性能,主要是以下几点:

  • 向Job提供数据文件时可以使用Avro序列化过的二进制数据文件
  • 在数据解析方面速度比较快
  • 排序功能

本博客用到的软件的版本:

  • CentOS7.0
  • Hadoop的版本是3.1.2
  • Avro的版本是1.9.1

示例1:单词统计

Hadoop MapReduce读取源文件进行计数统计,然后将计算结果作为Avro格式的数据写到目标文件中。

首先将avro-mapred-1.9.1.jar上传到share/hadoop/mapreduce/目录

第一步:创建Maven项目

  • 添加Maven依赖
<dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version>
</dependency>
<dependency><groupId>org.apache.avro</groupId><artifactId>avro</artifactId><version>1.9.1</version>
</dependency>
<dependency><groupId>org.apache.avro</groupId><artifactId>avro-mapred</artifactId><version>1.9.1</version>
</dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.2</version>
</dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version>
</dependency>
  • 配置插件
 <plugins><plugin><groupId>org.apache.avro</groupId><artifactId>avro-maven-plugin</artifactId><version>1.9.1</version><executions><execution><phase>generate-sources</phase><goals><goal>schema</goal></goals><configuration><sourceDirectory>${project.basedir}/src/main/resources/</sourceDirectory><outputDirectory>${project.basedir}/src/main/java/</outputDirectory></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-jar-plugin</artifactId><configuration><archive><manifest><mainClass>com.hc.WordCountDriver</mainClass></manifest></archive></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.8</source><target>1.8</target></configuration></plugin>
</plugins>

第二步:在resources目录中提供log4j.properties

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

第三步:创建WordCountSchema

public class WordCountSchema {public static Schema schema = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"WordCountRecord\",\"fields\":[{\"name\":\"count\",\"type\":\"int\"}]}");
}

上面schema的值是我们写的Avro的字符串:

{"type":"record",
"name":"WordCountRecord",
"fields":[{"name":"count","type":"int"}
]
}

第四步:创建WordCountMapper

public class WordCountMapper extends Mapper<Object, Text, AvroKey<String>, AvroValue<GenericRecord>> {private GenericRecord record = new GenericData.Record(WordCountSchema.schema);@Overrideprotected void map(Object key, Text value, Context context) throws IOException, InterruptedException {String[] words = value.toString().split(" ");for (int i = 0; i < words.length; i++) {if (words[i].length() > 0 && Character.isLetter(words[i].charAt(0))) {AvroKey<String> word = new AvroKey<>(words[i]);record.put("count", 1);context.write(word, new AvroValue<>(record));}}}
}

第五步:创建WordCountReducer

public class WordCountReducer extends Reducer<AvroKey<String>, AvroValue<GenericRecord>, AvroKey<String>, AvroValue<Integer>> {@Overrideprotected void reduce(AvroKey<String> key, Iterable<AvroValue<GenericRecord>> values, Context context) throws IOException, InterruptedException {int sum = 0;for (AvroValue<GenericRecord> value : values) {GenericRecord record = value.datum();sum += Integer.parseInt(record.get("count").toString());}context.write(key, new AvroValue<>(sum));}}

第六步:创建WordCountDriver

public class WordCountDriver {public static void main(String[] args) throws Exception {Configuration cfg = new Configuration();//获取配置信息以及封装任务Job job = Job.getInstance(cfg, "WordCountAvro");job.setJarByClass(WordCountDriver.class);//设置jar加载路径job.setMapperClass(WordCountMapper.class);//设置Mapper类,执行map方法job.setReducerClass(WordCountReducer.class);//设置Reducer类,执行reduce方法AvroJob.setMapOutputKeySchema(job,Schema.create(Schema.Type.STRING));AvroJob.setMapOutputValueSchema(job,WordCountSchema.schema);AvroJob.setOutputKeySchema(job, Schema.create(Schema.Type.STRING));AvroJob.setOutputValueSchema(job, Schema.create(Schema.Type.INT));//设置输入和输出路径FileInputFormat.addInputPath(job, new Path(args[0]));//文件输入路径FileOutputFormat.setOutputPath(job, new Path(args[1]));//文件输出路径boolean res = job.waitForCompletion(true);//提交job并等待结束System.exit(res ? 0 : 1); //退出程序}
}

上面代码使用AvroJob来配置作业,AvroJob类主要用来给输入、map输出以及最后输出数据指定Avro模式。

第七步:运行程序

  1. 项目打包

    将打好的包上传到Linux服务器中,重命名为mrad.jar
  2. 在Linux服务器端启动Hadoop,在HDFS根目录创建input目录,同时在里面放置几篇英文文章作为待测试的数据
  3. 运行1步中上传的jar包

    4.查看程序运行结果:

示例2:通过MapReduce程序找到各个班级年龄最小的学生

第一步:将avro-1.9.1.jar上传到到share/hadoop/mapreduce/目录


否则的话会报错:

第二步:创建Maven项目

  • 添加Maven依赖
<dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version>
</dependency>
<dependency><groupId>org.apache.avro</groupId><artifactId>avro</artifactId><version>1.9.1</version>
</dependency>
<dependency><groupId>org.apache.avro</groupId><artifactId>avro-mapred</artifactId><version>1.9.1</version>
</dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.2</version>
</dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version>
</dependency>
  • 配置插件
 <plugins><plugin><groupId>org.apache.avro</groupId><artifactId>avro-maven-plugin</artifactId><version>1.9.1</version><executions><execution><phase>generate-sources</phase><goals><goal>schema</goal></goals><configuration><sourceDirectory>${project.basedir}/src/main/resources/</sourceDirectory><outputDirectory>${project.basedir}/src/main/java/</outputDirectory></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-jar-plugin</artifactId><configuration><archive><manifest><mainClass>com.hc.StuDriver</mainClass></manifest></archive></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.8</source><target>1.8</target></configuration></plugin>
</plugins>
  • 第二步:在resources目录中
  1. 提供log4j.properties
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
  1. 提供stu.avsc
{"type": "record","name": "StuRecord","fields": [{"name": "name", "type": "string"},{"name": "age", "type": "int"},{"name": "gender", "type": "string"},{"name": "class", "type": "string"}]
}

第三步:创建StuSchema

public class StuSchema {public static Schema schema ;static{InputStream is = StuSchema.class.getClassLoader().getResourceAsStream("stu.avsc");try {schema = new Parser().parse(is);} catch (IOException e) {e.printStackTrace();}}public static void main(String[] args) {System.out.println(schema);}
}

第四步:创建StuMapper

public class StuMapper extends Mapper<LongWritable, Text, AvroKey<String>, AvroValue<GenericRecord>> {private GenericRecord record = new GenericData.Record(StuSchema.schema);@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] strings = value.toString().split("\\s+"); //匹配一个或多个空格if (strings.length < 4) {return;}record.put("name",strings[0]);record.put("age",Integer.parseInt(strings[1]));record.put("gender",strings[2]);record.put("class",strings[3]);context.write(new AvroKey<>(strings[3]),new AvroValue<>(record));}
}

Mapper程序主要做的事情就是将存放在txt中的记录解析成一个个的GenericRecord对戏,然后以班级名称为键,record为值传递给Reducer做进一步处理。

第五步:创建StuReducer

public class StuReducer extends Reducer<AvroKey<String>, AvroValue<GenericRecord>, AvroKey<GenericRecord>, NullWritable> {@Overrideprotected void reduce(AvroKey<String> key, Iterable<AvroValue<GenericRecord>> values, Context context) throws IOException, InterruptedException {GenericRecord min = null;for (AvroValue<GenericRecord> item : values) {GenericRecord record = item.datum();if (min == null || (Integer.parseInt(min.get("age").toString()) > Integer.parseInt(record.get("age").toString()))) {min = new GenericData.Record(StuSchema.schema);min.put("name", record.get("name"));min.put("age", record.get("age"));min.put("gender", record.get("gender"));min.put("class", record.get("class"));}}context.write(new AvroKey<>(min), NullWritable.get());}
}

Reducer的逻辑其实是通过循环比较的方式找到每个班级年龄最小的学生。

第六步:创建StuDriver

public class StuDriver {public static void main(String[] args) throws Exception {Configuration cfg = new Configuration();// 可以解决在Hadoop集群中运行时使用的Avro版本和集群中Avro版本不一致的问题。//cfg.setBoolean(Job.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);//获取配置信息以及封装任务Job job = Job.getInstance(cfg, "Stu");job.setJarByClass(StuDriver.class);//设置jar加载路径job.setMapperClass(StuMapper.class);//设置Mapper类,执行map方法job.setReducerClass(StuReducer.class);//设置Reducer类,执行reduce方法AvroJob.setMapOutputKeySchema(job,Schema.create(Schema.Type.STRING));;AvroJob.setMapOutputValueSchema(job,StuSchema.schema);AvroJob.setOutputKeySchema(job, StuSchema.schema);job.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(AvroKeyOutputFormat.class);//设置输入和输出路径FileInputFormat.addInputPath(job, new Path(args[0]));//文件输入路径FileOutputFormat.setOutputPath(job, new Path(args[1]));//文件输出路径boolean res = job.waitForCompletion(true);//提交job并等待结束System.exit(res ? 0 : 1); //退出程序}
}

第七步:运行程序

  1. 项目打包
    将打好的包上传到Linux服务器中,重命名为mras.jar

  2. 在Linux服务器端启动Hadoop,在HDFS根目录创建input目录,同时在里面创建文件文件stu.txt,在其中添加测试的数据:

zhangsan    13  male    shiziBan
lisi    14  female  musicBan
wanger  19  male    musicBan
mazi    15  male    shiziBan
qianwu  12  female  wudaoBan
zhaoliu 16  female  shiziBan
lisi    18  male    wudaoBan
xiangming   13  female  shiziBan
wangwei 18  female  wudaoBan
ligang  10  male    musicBan
  1. 运行1步中上传的jar包

    4.查看程序运行结果:

    上传avro-tools-1.9.1.jar到Linux服务器的test目录,


运行命令:

MapReduce整合Avro相关推荐

  1. 大数据分析工程师大纲

    大数据分析工程师大纲 阶段一.业务数据分析师 课程一.数据挖掘/分析师之硬技能 - 必备常用工具使用与高级技巧 本部分内容主要介绍了数据挖掘.分析师.数据产品经理必备的常用工具的,主要有 Excel, ...

  2. HBase 常用操作

    hbase只支持行级事务,不支持多行事务. 进入shell:hbase shell: 配置完分布式zk后: 单启Hmaster:hbase-daemon.sh start master HFile默认 ...

  3. BigBrother的大数据之旅Day 13 hbase(2)

    HBase(2) 详述人员角色表的设计思路以及实现 思路:两个部分的信息分别保存到两张表中,因为hbase是列存储的表,一般存储非关系数据,就像记笔记一样,把关键点写上. 第一张表: 个人信息表 ro ...

  4. Spark性能优化--如何解决数据倾斜

    1 Data Skew 数据倾斜 1.1 数据倾斜概念 对Hive.Spark.Flink等大数据计算框架来讲,数据量大并不可怕,可怕的是数据倾斜. 数据倾斜是指并行处理的数据集中某一部分的数据显著多 ...

  5. 大数据开发工程师学习路线分享

      大数据是对海量数据存储.计算.统计.分析等一系列处理手段,处理的数据量是TB级,甚至是PB或EB级的数据,是传统数据处理手段无法完成的,大数据涉及分布式计算.高并发处理.高可用处理.集群.实时性计 ...

  6. 山东大学大数据管理与分析知识点总结

    大数据概述 大数据(big data),或称巨量资料,指的是需要新处理模式才能具有更强的决策力.洞察发现力和流程优化能力的海量.高增长率和多样化的信息资产 大数据指不用随机分析法(抽样调查)这样的捷径 ...

  7. 北京哪里大数据培训好?

    大数据是对海量数据存储.计算.统计.分析等一系列处理手段,处理的数据量是TB级,甚至是PB或EB级的数据,是传统数据处理手段无法完成的,大数据涉及分布式计算.高并发处理.高可用处理.集群.实时性计算等 ...

  8. 基于Hadoop技术实现的离线电商分析平台(Flume、Hadoop、Hbase、SpringMVC、highcharts)- 驴妈妈旅游项目

    离线数据分析平台是一种利用hadoop集群开发工具的一种方式,主要作用是帮助公司对网站的应用有一个比较好的了解.尤其是在电商.旅游.银行.证券.游戏等领域有非常广泛,因为这些领域对数据和用户的特性把握 ...

  9. HBase vs Cassandra: 我们迁移系统的原因

    HBase vs Cassandra: 我们迁移系统的原因 HBase vs Cassandra: 我们迁移系统的原因 » 我有分寸 HBase vs Cassandra: 我们迁移系统的原因 Mar ...

最新文章

  1. 关于xshell连接虚拟机和虚拟机连网
  2. /etc/bashrc和/etc/profile傻傻分不清楚?
  3. 使用 json_serializable (flutter packages pub run build_runner build) 问题
  4. helm发布自定义Chart:指定namespace、设置NodePort
  5. 电气6机30节点数据介绍(常适用于优化调度)
  6. geoprocessor and georocessing 的关系
  7. MIT Kimera阅读笔记
  8. 大数据技术之Hadoop3.1.2版本完全分布式部署搭建
  9. Android TextView滚动的两种方案
  10. java博弈,人机博弈小游戏(Java)
  11. RecycleView嵌套滑动机制
  12. linux界面唤醒,Linux计算机实现自动唤醒和关闭的方法步骤详解
  13. System.Diagnostics.debug.Assert(条件)的使用
  14. Javascript特效:照片墙
  15. 传智播客 sklearn数据集与机器学习组成
  16. 【Codeforces Round #555 (Div. 3) G】Inverse of Rows and Columns【bitset优化暴力...】
  17. 港台术语与内地术语之对照
  18. 免费在线 Logo生成器
  19. java将多张图片合成视频
  20. ANSYS 添加PCB板材料 FR-4

热门文章

  1. html一天一次的弹窗,信息弹窗提示一天只弹出一次js代码
  2. clientX,clientY,offsetY,offsetX,screenX,screenY区分
  3. 北境之地服务器没有响应,《北境之地》基础问题解决方法 常规问题怎么解决?...
  4. 用biomaRt进行基因ID转换
  5. BGP服务器在什么业务场景会被用到?
  6. 在线Java 动态运行Java源代码-编译器
  7. matplotlib绘制折线图
  8. HTML中怎么使文字各种居中对齐?(代码示例)
  9. i5-1135g7和i5 -1035g1差别大吗 i51135g7和i51035g1哪个性能好
  10. Ubuntu16.4安装显卡驱动