Hadoop MapReduce 统计汽车销售信息
Hadoop MapReduce 统计汽车销售信息
- 汽车销售数据文件
- 统计各城市销售汽车的数量
- 思路
- 代码
- 统计各城市销售品牌的数量
- 思路
- 方案1
- 方案1代码
- 方案2
- 方案2代码
本文将讨论如何使用Hadoop MapReduce来统计汽车销售信息。
汽车销售数据文件
汽车销售的记录文件名叫Cars.csv,里面记录了汽车的销售信息,数据内容如下:
山西省,3,朔州市,朔城区,2013,LZW6450PF,上汽通用五菱汽车股份有限公司,五菱,小型普通客车,个人,非营运,1,L3C,8424,79,汽油,4490,1615,1900,,,,2,3050,1386,175/70R14LT,4,2110,1275,,7,,,,,客车,1913,男性
山西省,3,晋城市,城区,2013,EQ6450PF1,东风小康汽车有限公司,东风,小型普通客车,个人,非营运,1,DK13-06,1587,74,汽油,4500,1680,1960,,,,2,3050,1435,185R14LT 6PR,4,1970,1290,,7,,东风小康汽车有限公司,,EQ6440KMF,客车,1929,男性
山西省,12,长治市,长治城区,2013,BJ6440BKV1A,北汽银翔汽车有限公司,北京,小型普通客车,个人,非营运,1,BJ415A,1500,75,,4440,,,,,,,,,,,,,,,,北汽银翔汽车有限公司,北京,BJ6440BKV1A,,1938,男性
山西省,12,长治市,长治城区,2013,DXK6440AF2F,东风小康汽车有限公司,东风,小型普通客车,个人,非营运,1,DK15,1499,85,汽油,4365,1720,1770,,,,2,2725,1425,185/65R14,4,1835,1235,,5,,东风小康汽车有限公司,东风,DXK6440AF2F,多用途乘用车,1926,女性
...
格式为:
第1列:销售的省份
第3列:销售的城市
第7列:汽车生产商
第8列:汽车品牌名
第12列:汽车销售数量
已经将Cars.csv上传到HDFS文件系统的/input
目录下。
统计各城市销售汽车的数量
思路
要统计城市销售汽车的数量,由于只涉及到了城市、数量,所以可以可以采取用城市作为Key,用销售数量作为Value,当成Map的输出,再由Reduce对Map的结果按照城市将销售数据进行汇总即可。
代码
package com.wux.labs.hadoop.mr;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class CarSalesCount1 {public static class CarSalesCountMapperextends Mapper<Object, Text, Text, IntWritable> {private Text city = new Text();public void map(Object key, Text value, Context context) throws IOException, InterruptedException {// CSV文件,所以这里用逗号分割列String values[] = value.toString().split(",");// 第3列是城市,这里以城市作为Keycity.set(values[2]);// 第12列是销售数量,直接取销售数量,而不是取1context.write(city, new IntWritable(Integer.parseInt(values[11])));}}public static class CarSalesCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;// 对销售数量累加for (IntWritable val : values) {sum += val.get();}result.set(sum);// 根据城市得到累加的结果context.write(key, result);}}public static void main(String[] args) throws Exception {System.setProperty("HADOOP_USER_NAME", "hadoop");Configuration conf = new Configuration();Job job = Job.getInstance(conf, "SalesCount1");job.setJarByClass(CarSalesCount1.class);job.setMapperClass(CarSalesCountMapper.class);job.setReducerClass(CarSalesCountReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path("/input/Cars.csv"));FileOutputFormat.setOutputPath(job, new Path("/output"));System.exit(job.waitForCompletion(true) ? 0 : 1);}
}
打包后直接运行:
hadoop jar Hadoop-1.0-SNAPSHOT.jar com.wux.labs.hadoop.mr.CarSalesCount1
运行完成后可查看结果:
统计各城市销售品牌的数量
思路
与销售数量不同,销售数量的话直接将销售记录中的数量相加就可以了,但是销售品牌数,则只统计到汽车品牌的数量,不管一个品牌销售了多少辆汽车,都只算一个品牌,因此,不能按销售数量累加统计。
方案1
在Map阶段,可以按:城市 + 品牌 作为输出的Key,Value任意,在Map之后增加Combiner阶段,因为Map阶段的Key可以保证输出的记录中相同城市的不同记录中品牌都是不重复的,所以Combiner阶段可以按城市作为Key,1作为Value输出,最后由Reduce阶段按城市Key,对Combiner的Value进行汇总即可。
方案1代码
package com.wux.labs.hadoop.mr;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class CarSalesCount2 {public static class CarSalesCountMapperextends Mapper<Object, Text, Text, IntWritable> {private Text city = new Text();public void map(Object key, Text value, Context context) throws IOException, InterruptedException {// CSV文件,所以这里用逗号分割列String values[] = value.toString().split(",");// 第3列是城市,第8列是品牌,这里以城市+品牌作为Keycity.set(values[2] + " " + values[7]);// Map阶段的Value不会使用,所以Value可以任意context.write(city, new IntWritable(1));}}public static class CarSalesCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {Text cs = new Text();// Combiner阶段按照Map阶段的Key进行拆分,城市作为Keycs.set(key.toString().split(" ")[0]);// 因为每行记录中同一个城市的品牌不会重复,所以Value取1context.write(cs, new IntWritable(1));}}public static class CarSalesCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;// 对品牌数量累加for (IntWritable val : values) {sum += val.get();}result.set(sum);// 根据城市得到累加的结果context.write(key, result);}}public static void main(String[] args) throws Exception {System.setProperty("HADOOP_USER_NAME", "hadoop");Configuration conf = new Configuration();Job job = Job.getInstance(conf, "SalesCount");job.setJarByClass(CarSalesCount2.class);job.setMapperClass(CarSalesCountMapper.class);job.setCombinerClass(CarSalesCountCombiner.class);job.setReducerClass(CarSalesCountReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path("/input/Cars.csv"));FileOutputFormat.setOutputPath(job, new Path("/output"));System.exit(job.waitForCompletion(true) ? 0 : 1);}
}
打包后直接运行:
hadoop jar Hadoop-1.0-SNAPSHOT.jar com.wux.labs.hadoop.mr.CarSalesCount2
运行完成后可查看结果:
方案2
除了可以重组Key,增加Combiner阶段来进行计数,还可以直接在Map阶段用城市作为Key,用品牌作为Value进行输出,Map阶段后不需要Combiner阶段直接到Reduce阶段,Reduce阶段由于接收到的Map的输出不是数字,而是汽车品牌,是字符串,所以可以用Set进行数据保存,最后统计Set中元素的个数即可。
方案2代码
package com.wux.labs.hadoop.mr;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;
import java.util.HashSet;
import java.util.Set;public class CarSalesCount3 {public static class CarSalesCountMapperextends Mapper<Object, Text, Text, Text> {private Text city = new Text();private Text brand = new Text();public void map(Object key, Text value, Context context) throws IOException, InterruptedException {// CSV文件,所以这里用逗号分割列String values[] = value.toString().split(",");// 第3列是城市,这里以城市作为Keycity.set(values[2]);// 第8列是品牌,直接取品牌作为Valuebrand.set(values[7]);context.write(city, brand);}}public static class CarSalesCountReducer extends Reducer<Text, Text, Text, IntWritable> {private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {Set set = new HashSet();// 将销售品牌放入Set集合for (Text val : values) {String brand = val.toString();if (!set.contains(brand)) {set.add(brand);}}result.set(set.size());// 根据城市得到累加的结果context.write(key, result);}}public static void main(String[] args) throws Exception {System.setProperty("HADOOP_USER_NAME", "hadoop");Configuration conf = new Configuration();Job job = Job.getInstance(conf, "SalesCount1");job.setJarByClass(CarSalesCount3.class);job.setMapperClass(CarSalesCountMapper.class);job.setReducerClass(CarSalesCountReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);FileInputFormat.addInputPath(job, new Path("/input/Cars.csv"));FileOutputFormat.setOutputPath(job, new Path("/output"));System.exit(job.waitForCompletion(true) ? 0 : 1);}
}
打包后直接运行:
hadoop jar Hadoop-1.0-SNAPSHOT.jar com.wux.labs.hadoop.mr.CarSalesCount3
运行完成后可查看结果:
Hadoop MapReduce 统计汽车销售信息相关推荐
- 电信信息日志使用mapreduce统计的两种方式
信息准备: 数据信息: 1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2 ...
- hadoop使用mapreduce统计词频_hadoop利用mapreduce运行词频统计(非例程)
1.运行环境 1.Ubuntu16.04单系统 2.hadoop-3.2.1 2.操作步骤 1.使用eclipse编写map reduce run 函数 2.导出jar包 3.将需要进行词频统计的文件 ...
- Hadoop | MapReduce之 WordCount词频统计
WordCount词频统计 词频统计 WordCountMap.java // Map类,继承于org.apache.hadoop.mapreduce.Mapper; public class Wor ...
- hadoop之MapReduce统计选修课程人数,不及格门数,选课人数
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 目录 前言 一.题目要求 二.数据解析 student.txt文件部分数据 三.需求分析及代码编写 总体的思路: 需求1:求DataB ...
- MapReduce统计排序和HDFS的读写
实验材料及说明 在Ubuntu系统的/学号(每个人之间的学号)/salesInfo目录下,有买家的购买记录文件Sales,该文件记录了买家的id,购买商品的id以及购买日期,文件为名为Sales.Sa ...
- hadoop调用python算法_使用Python实现Hadoop MapReduce程序
根据上面两篇文章,下面是我在自己的ubuntu上的运行过程.文字基本采用博文使用Python实现Hadoop MapReduce程序, 打字很浪费时间滴. 在这个实例中,我将会向大家介绍如何使用Py ...
- hadoop MapReduce实例解析
1.MapReduce理论简介 1.1 MapReduce编程模型 MapReduce采用"分而治之"的思想,把对大规模数据集的操作,分发给一个主节点管理下的各个分节点共同完成,然 ...
- 使用Python实现Hadoop MapReduce程序
根据上面两篇文章,下面是我在自己的ubuntu上的运行过程.文字基本采用博文使用Python实现Hadoop MapReduce程序, 打字很浪费时间滴. 在这个实例中,我将会向大家介绍如何使用Py ...
- python - hadoop,mapreduce demo
Hadoop,mapreduce 介绍 59888745@qq.com 大数据工程师是在Linux系统下搭建Hadoop生态系统(cloudera是最大的输出者类似于Linux的红帽), 把用户的交易 ...
最新文章
- Xinlinx 7系列 FPGA 总览
- 生成JSON数据--官方方法
- CVE-2017-7529Nginx越界读取缓存漏洞POC
- Linux 命令(44)—— expand 命令
- 8-12 三个定时任务
- POJ - 1459 Power Network(最大流)(模板)
- c语言 异或_C语言位运算实例讲解
- 利用Java发送邮件(含附件)的例子
- Kali、linux中安装软件
- Hive 知识体系保姆级教程
- ETF基金定投策略回测分析
- 【笔记】两个根因分析方法:5WHYamp;10WHY
- 自动识别查找特定的串口号 比如设备管理器中Modem属性里的串口 按这个方法可以获取设备管理器任意信息。C++
- 华硕笔记本X450JB拆机及加装固态硬盘
- 服务器系统解决方案,服务器操作系统解决方案
- 毗邻华尔街,哥伦比亚大学、纽约大学如何将金融科技的理论与实践结合?
- Reality Labs首次向媒体开放,空间音频、EMG腕带体验大公开
- CoordinatorLayout + AppBarLayout 实现标题栏置顶
- Java数据库部分(MySQL+JDBC)(一、MySQL超详细学习笔记)
- IBM AIX操作系统
热门文章
- HTTP协议6-HTTP内容类型
- 程序员请尊重前辈的代码
- 20201120翻译_disba基于Python的面波正演模拟程序包
- mysql百万数据join_MySQL百万级、千万级数据多表关联SQL语句调优
- 非机动车检测数据集(用于训练目标跟踪)
- 回收站是计算机硬盘,如何查找移动硬盘回收站
- 黑马培训教学SSM整合中Security遇到的问题org.springframework.security.access.AccessDeniedException: Access is denied
- 探讨SEO之项目管理
- 移动固态硬盘中安装VMware+Ubuntu失败原因
- 大数据集群失联问题解决方案