Hadoop MapReduce 统计汽车销售信息

  • 汽车销售数据文件
  • 统计各城市销售汽车的数量
    • 思路
    • 代码
  • 统计各城市销售品牌的数量
    • 思路
    • 方案1
    • 方案1代码
    • 方案2
    • 方案2代码

本文将讨论如何使用Hadoop MapReduce来统计汽车销售信息。

汽车销售数据文件

汽车销售的记录文件名叫Cars.csv,里面记录了汽车的销售信息,数据内容如下:

山西省,3,朔州市,朔城区,2013,LZW6450PF,上汽通用五菱汽车股份有限公司,五菱,小型普通客车,个人,非营运,1,L3C,8424,79,汽油,4490,1615,1900,,,,2,3050,1386,175/70R14LT,4,2110,1275,,7,,,,,客车,1913,男性
山西省,3,晋城市,城区,2013,EQ6450PF1,东风小康汽车有限公司,东风,小型普通客车,个人,非营运,1,DK13-06,1587,74,汽油,4500,1680,1960,,,,2,3050,1435,185R14LT 6PR,4,1970,1290,,7,,东风小康汽车有限公司,,EQ6440KMF,客车,1929,男性
山西省,12,长治市,长治城区,2013,BJ6440BKV1A,北汽银翔汽车有限公司,北京,小型普通客车,个人,非营运,1,BJ415A,1500,75,,4440,,,,,,,,,,,,,,,,北汽银翔汽车有限公司,北京,BJ6440BKV1A,,1938,男性
山西省,12,长治市,长治城区,2013,DXK6440AF2F,东风小康汽车有限公司,东风,小型普通客车,个人,非营运,1,DK15,1499,85,汽油,4365,1720,1770,,,,2,2725,1425,185/65R14,4,1835,1235,,5,,东风小康汽车有限公司,东风,DXK6440AF2F,多用途乘用车,1926,女性
...

格式为:
第1列:销售的省份
第3列:销售的城市
第7列:汽车生产商
第8列:汽车品牌名
第12列:汽车销售数量

已经将Cars.csv上传到HDFS文件系统的/input目录下。

统计各城市销售汽车的数量

思路

要统计城市销售汽车的数量,由于只涉及到了城市、数量,所以可以可以采取用城市作为Key,用销售数量作为Value,当成Map的输出,再由Reduce对Map的结果按照城市将销售数据进行汇总即可。

代码

package com.wux.labs.hadoop.mr;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class CarSalesCount1 {public static class CarSalesCountMapperextends Mapper<Object, Text, Text, IntWritable> {private Text city = new Text();public void map(Object key, Text value, Context context) throws IOException, InterruptedException {// CSV文件,所以这里用逗号分割列String values[] = value.toString().split(",");// 第3列是城市,这里以城市作为Keycity.set(values[2]);// 第12列是销售数量,直接取销售数量,而不是取1context.write(city, new IntWritable(Integer.parseInt(values[11])));}}public static class CarSalesCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;// 对销售数量累加for (IntWritable val : values) {sum += val.get();}result.set(sum);// 根据城市得到累加的结果context.write(key, result);}}public static void main(String[] args) throws Exception {System.setProperty("HADOOP_USER_NAME", "hadoop");Configuration conf = new Configuration();Job job = Job.getInstance(conf, "SalesCount1");job.setJarByClass(CarSalesCount1.class);job.setMapperClass(CarSalesCountMapper.class);job.setReducerClass(CarSalesCountReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path("/input/Cars.csv"));FileOutputFormat.setOutputPath(job, new Path("/output"));System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

打包后直接运行:

hadoop jar Hadoop-1.0-SNAPSHOT.jar com.wux.labs.hadoop.mr.CarSalesCount1

运行完成后可查看结果:

统计各城市销售品牌的数量

思路

与销售数量不同,销售数量的话直接将销售记录中的数量相加就可以了,但是销售品牌数,则只统计到汽车品牌的数量,不管一个品牌销售了多少辆汽车,都只算一个品牌,因此,不能按销售数量累加统计。

方案1

在Map阶段,可以按:城市 + 品牌 作为输出的Key,Value任意,在Map之后增加Combiner阶段,因为Map阶段的Key可以保证输出的记录中相同城市的不同记录中品牌都是不重复的,所以Combiner阶段可以按城市作为Key,1作为Value输出,最后由Reduce阶段按城市Key,对Combiner的Value进行汇总即可。

方案1代码

package com.wux.labs.hadoop.mr;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class CarSalesCount2 {public static class CarSalesCountMapperextends Mapper<Object, Text, Text, IntWritable> {private Text city = new Text();public void map(Object key, Text value, Context context) throws IOException, InterruptedException {// CSV文件,所以这里用逗号分割列String values[] = value.toString().split(",");// 第3列是城市,第8列是品牌,这里以城市+品牌作为Keycity.set(values[2] + " " + values[7]);// Map阶段的Value不会使用,所以Value可以任意context.write(city, new IntWritable(1));}}public static class CarSalesCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {Text cs = new Text();// Combiner阶段按照Map阶段的Key进行拆分,城市作为Keycs.set(key.toString().split(" ")[0]);// 因为每行记录中同一个城市的品牌不会重复,所以Value取1context.write(cs, new IntWritable(1));}}public static class CarSalesCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;// 对品牌数量累加for (IntWritable val : values) {sum += val.get();}result.set(sum);// 根据城市得到累加的结果context.write(key, result);}}public static void main(String[] args) throws Exception {System.setProperty("HADOOP_USER_NAME", "hadoop");Configuration conf = new Configuration();Job job = Job.getInstance(conf, "SalesCount");job.setJarByClass(CarSalesCount2.class);job.setMapperClass(CarSalesCountMapper.class);job.setCombinerClass(CarSalesCountCombiner.class);job.setReducerClass(CarSalesCountReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path("/input/Cars.csv"));FileOutputFormat.setOutputPath(job, new Path("/output"));System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

打包后直接运行:

hadoop jar Hadoop-1.0-SNAPSHOT.jar com.wux.labs.hadoop.mr.CarSalesCount2

运行完成后可查看结果:

方案2

除了可以重组Key,增加Combiner阶段来进行计数,还可以直接在Map阶段用城市作为Key,用品牌作为Value进行输出,Map阶段后不需要Combiner阶段直接到Reduce阶段,Reduce阶段由于接收到的Map的输出不是数字,而是汽车品牌,是字符串,所以可以用Set进行数据保存,最后统计Set中元素的个数即可。

方案2代码

package com.wux.labs.hadoop.mr;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;
import java.util.HashSet;
import java.util.Set;public class CarSalesCount3 {public static class CarSalesCountMapperextends Mapper<Object, Text, Text, Text> {private Text city = new Text();private Text brand = new Text();public void map(Object key, Text value, Context context) throws IOException, InterruptedException {// CSV文件,所以这里用逗号分割列String values[] = value.toString().split(",");// 第3列是城市,这里以城市作为Keycity.set(values[2]);// 第8列是品牌,直接取品牌作为Valuebrand.set(values[7]);context.write(city, brand);}}public static class CarSalesCountReducer extends Reducer<Text, Text, Text, IntWritable> {private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {Set set = new HashSet();// 将销售品牌放入Set集合for (Text val : values) {String brand = val.toString();if (!set.contains(brand)) {set.add(brand);}}result.set(set.size());// 根据城市得到累加的结果context.write(key, result);}}public static void main(String[] args) throws Exception {System.setProperty("HADOOP_USER_NAME", "hadoop");Configuration conf = new Configuration();Job job = Job.getInstance(conf, "SalesCount1");job.setJarByClass(CarSalesCount3.class);job.setMapperClass(CarSalesCountMapper.class);job.setReducerClass(CarSalesCountReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);FileInputFormat.addInputPath(job, new Path("/input/Cars.csv"));FileOutputFormat.setOutputPath(job, new Path("/output"));System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

打包后直接运行:

hadoop jar Hadoop-1.0-SNAPSHOT.jar com.wux.labs.hadoop.mr.CarSalesCount3

运行完成后可查看结果:

Hadoop MapReduce 统计汽车销售信息相关推荐

  1. 电信信息日志使用mapreduce统计的两种方式

    信息准备: 数据信息: 1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2 ...

  2. hadoop使用mapreduce统计词频_hadoop利用mapreduce运行词频统计(非例程)

    1.运行环境 1.Ubuntu16.04单系统 2.hadoop-3.2.1 2.操作步骤 1.使用eclipse编写map reduce run 函数 2.导出jar包 3.将需要进行词频统计的文件 ...

  3. Hadoop | MapReduce之 WordCount词频统计

    WordCount词频统计 词频统计 WordCountMap.java // Map类,继承于org.apache.hadoop.mapreduce.Mapper; public class Wor ...

  4. hadoop之MapReduce统计选修课程人数,不及格门数,选课人数

    提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 目录 前言 一.题目要求 二.数据解析 student.txt文件部分数据 三.需求分析及代码编写 总体的思路: 需求1:求DataB ...

  5. MapReduce统计排序和HDFS的读写

    实验材料及说明 在Ubuntu系统的/学号(每个人之间的学号)/salesInfo目录下,有买家的购买记录文件Sales,该文件记录了买家的id,购买商品的id以及购买日期,文件为名为Sales.Sa ...

  6. hadoop调用python算法_使用Python实现Hadoop MapReduce程序

    根据上面两篇文章,下面是我在自己的ubuntu上的运行过程.文字基本采用博文使用Python实现Hadoop MapReduce程序,  打字很浪费时间滴. 在这个实例中,我将会向大家介绍如何使用Py ...

  7. hadoop MapReduce实例解析

    1.MapReduce理论简介 1.1 MapReduce编程模型 MapReduce采用"分而治之"的思想,把对大规模数据集的操作,分发给一个主节点管理下的各个分节点共同完成,然 ...

  8. 使用Python实现Hadoop MapReduce程序

    根据上面两篇文章,下面是我在自己的ubuntu上的运行过程.文字基本采用博文使用Python实现Hadoop MapReduce程序,  打字很浪费时间滴. 在这个实例中,我将会向大家介绍如何使用Py ...

  9. python - hadoop,mapreduce demo

    Hadoop,mapreduce 介绍 59888745@qq.com 大数据工程师是在Linux系统下搭建Hadoop生态系统(cloudera是最大的输出者类似于Linux的红帽), 把用户的交易 ...

最新文章

  1. Xinlinx 7系列 FPGA 总览
  2. 生成JSON数据--官方方法
  3. CVE-2017-7529Nginx越界读取缓存漏洞POC
  4. Linux 命令(44)—— expand 命令
  5. 8-12 三个定时任务
  6. POJ - 1459 Power Network(最大流)(模板)
  7. c语言 异或_C语言位运算实例讲解
  8. 利用Java发送邮件(含附件)的例子
  9. Kali、linux中安装软件
  10. Hive 知识体系保姆级教程
  11. ETF基金定投策略回测分析
  12. 【笔记】两个根因分析方法:5WHYamp;10WHY
  13. 自动识别查找特定的串口号 比如设备管理器中Modem属性里的串口 按这个方法可以获取设备管理器任意信息。C++
  14. 华硕笔记本X450JB拆机及加装固态硬盘
  15. 服务器系统解决方案,服务器操作系统解决方案
  16. 毗邻华尔街,哥伦比亚大学、纽约大学如何将金融科技的理论与实践结合?
  17. Reality Labs首次向媒体开放,空间音频、EMG腕带体验大公开
  18. CoordinatorLayout + AppBarLayout 实现标题栏置顶
  19. Java数据库部分(MySQL+JDBC)(一、MySQL超详细学习笔记)
  20. IBM AIX操作系统

热门文章

  1. HTTP协议6-HTTP内容类型
  2. 程序员请尊重前辈的代码
  3. 20201120翻译_disba基于Python的面波正演模拟程序包
  4. mysql百万数据join_MySQL百万级、千万级数据多表关联SQL语句调优
  5. 非机动车检测数据集(用于训练目标跟踪)
  6. 回收站是计算机硬盘,如何查找移动硬盘回收站
  7. 黑马培训教学SSM整合中Security遇到的问题org.springframework.security.access.AccessDeniedException: Access is denied
  8. 探讨SEO之项目管理
  9. 移动固态硬盘中安装VMware+Ubuntu失败原因
  10. 大数据集群失联问题解决方案