MapReduce中Shuffle机制的学习案例——房屋租赁信息

由于在《自己动手搭建一个简单的基于Hadoop的离线分析系统》系列中直接将清洗后的数据导入Hive中进行分析,没有使用到Hadoop中的MapReduce框架,因此这篇文章将通过该框架对输入数据进行清洗,并对清洗后的数据经行分析,数据源仍来源于同一网站的网络爬虫。
Hadoop版本:2.6.5

Shuffle机制

  上图是MapReduce框架的原理图,其中Shuffle框架指的是“如何将Map阶段处理的数据传递给Reduce阶段”,具体过程是:

  1. 首先,经由Map处理过的信息不是直接写入文件中,而是写入本地的环形缓冲区中。
  2. 当缓冲区发生溢出时,通过键的compareTo方法对键-值对进行快速排序,并进行分区操作(如果配置了Reduce个数和自定义的Partitioner类,默认根据键执行Hash分组),然后写入到本地文件中,这个文件是不断进行滚动的,当产生新的溢出时,将会滚动到下一个文件进行上述操作。
  3. 缓冲区每次产生新的溢出时,新文件将会和旧文件一起进行合并,这里的“合并”是针对各个分区进行归并排序,这个操作在Map执行任务期间会运行多次。
  4. Reduce端将根据自己所属的分区号,从各个Map端的输出中提取出属于自己的一部分键-值对,并对这些来自于不同Map的键-值对进行归并排序,这个过程将会在内存与本地磁盘上执行多次。
  5. 归并排序后的键-值对集合中,相同键(可以根据Grouping进行自定义)的键-值对将作为一个输入被Reduce端进行读取,并做执行后续操作。

案例任务与相关类

1. 输入数据
房屋详细租赁信息,例如:

北京市  北京市 海淀区 338(总租金)    45(房屋面积)    2(居住人数) 北京市海淀区复兴路61号院   地铁万寿路直达天安门、301医院、美尔目医院

2. 输出数据
各个区/县的最高租金与详细租赁信息、最低租金与详细租赁信息、平均租金(元/人),并按照所属省进行归类输出,例如:

MAXINFO:北京市 北京市 东城区 398(总租金) 46(房屋面积) 2(居住人数) 北京市东城区广渠门夕照寺中街绿景苑小区六号一单元四楼 199.0(人均租金)
MININFO:北京市 北京市 东城区 520(总租金) 90(房屋面积) 7(居住人数) 北京市东城区冠城名敦道 74.28571(人均租金)
MEANPRICE:115.69858(人均租金)

3. Map类(M3RentinfoMapper)
将输入数据根据所需字段进行切分,切分后的信息封装为一个自定义类(RentinfoBean),自定义键(ComparableBean)并实现排序方法,将ComparableBean与RentinfoBean作为键-值对进行输出,例如:

ComparableBean [city=上海市, zone=浦东新区, priceforone=91.6]    上海市 上海市 浦东新区 458(总租金) 60(房屋面积) 5(居住人数) 上海市浦东新区陆家嘴街道钱仓路313弄

4. Reduce类(M3RentinfoReducer)
将输入数据进行相应处理,得到输出数据。

5. Shuffle相关类
自定义分区Partitioner类(PartitionBean)
自定义GroupingComparator类(GroupingBean)

自定义键的排序方法、Partitioner、GroupingComparator

自定义键的排序方法

  MapReduce程序在处理数据的过程中会对数据排序(例如,Map输出的键-值对传输到Reduce端之前,会进行排序),排序的依据是Map输出的键,所以,我们如果要自定义排序方法,可以考虑将排序因素放到键中,让键实现WritableComparableWritableComparableWritableComparable接口。
  基本思路:自定义的类ComparableBeanComparableBeanComparableBean实现WritableComparableWritableComparableWritableComparable接口并封装排序所需信息,然后重写键的compareTocompareTocompareTo方法,并将该类作为Map输出的键。由于需要根据不同省对各自区\县的租金(元/人)进行分析,因此需要将市(city)、区/县(zone)和租金(priceforone)封装到ComparableBeanComparableBeanComparableBean中。

package bean;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.WritableComparable;public class ComparableBean implements WritableComparable<ComparableBean> {//设置key中包含的字段:市、区、价格/人private String city;private String zone;private String priceforone;/*** @return the city*/public String getCity() {return city;}/*** @param city the city to set*/public void setCity(String city) {this.city = city;}/*** @return the zone*/public String getZone() {return zone;}/*** @param zone the zone to set*/public void setZone(String zone) {this.zone = zone;}/*** @return the priceforone*/public String getPriceforone() {return priceforone;}/*** @param priceforone the priceforone to set*/public void setPriceforone(String priceforone) {this.priceforone = priceforone;}/* (non-Javadoc)* @see java.lang.Object#toString()*/@Overridepublic String toString() {return "ComparableBean [city=" + city + ", zone=" + zone + ", priceforone=" + priceforone + "]";}@Overridepublic void readFields(DataInput input) throws IOException {this.city = input.readUTF();this.zone = input.readUTF();this.priceforone = input.readUTF();}@Overridepublic void write(DataOutput output) throws IOException {output.writeUTF(this.city);output.writeUTF(this.zone);output.writeUTF(this.priceforone);}//实现自定义compareTo方法@Overridepublic int compareTo(ComparableBean o) {int samecity = this.city.compareTo(o.city);if (samecity == 0) {int samezone = this.zone.compareTo(o.zone);if (samezone == 0) {return -Float.compare(Float.parseFloat(this.getPriceforone()), Float.parseFloat(o.getPriceforone()));} else {return samezone;}} else {return samecity;}}}

自定义Partitioner

  Mapreduce中会将Map输出的键-值对,按照相同键进行分组,然后分发给不同的Reduce,默认的分发规则是根据键的hashcode%reduce数量来进行分发,因此,如果要按照我们自己的需求进行分区,则需要改写数据分发(分区)组件Partitioner。
  基本思路:自定义一个PartitionBeanPartitionBeanPartitionBean继承抽象类PartitionerPartitionerPartitioner,实现getPartitiongetPartitiongetPartition方法并根据Map输出值RentinfoBeanRentinfoBeanRentinfoBean的provinceprovinceprovince划分到对应分区中,并将该类作为任务所需的Partitioner类:job.setPartitionerClass(PartitionBean.class)。

package bean;import java.util.HashMap;import org.apache.hadoop.mapreduce.Partitioner;public class PartitionBean extends Partitioner<ComparableBean, RentinfoBean> {static HashMap<String, Integer> provincemap = new HashMap<String, Integer>();static {provincemap.put("北京市", 0);provincemap.put("天津市", 1);provincemap.put("重庆市", 2);provincemap.put("上海市", 3);}//实现自定义getPartition方法@Overridepublic int getPartition(ComparableBean kBean, RentinfoBean vBean, int numofpart) {Integer partcode = provincemap.get(vBean.getProvince());return partcode == null?numofpart:partcode;}}

自定义GroupingComparator

  由于在Reduce端归并排序后的键-值对集合中,相同键的键-值对将作为一个输入被Reduce端进行读取,因此,我们可以通过自定义GroupingComparatorGroupingComparatorGroupingComparator类,实现自定义集合划分。
  基本思路:自定义一个GroupingBeanGroupingBeanGroupingBean继承WritableComparatorWritableComparatorWritableComparator类,并根据集合划分所需要的信息重写该类的comparecomparecompare方法,将该类作为任务所需的GroupingComparator类:job.setGroupingComparatorClass(GroupingBean.class)。由于我们需要将具有相同的区/县信息划分为一个集合,因此需要将Map输出键ComparableBeanComparableBeanComparableBean中的city和zone字段作为comparecomparecompare方法的依据,忽略不同租金对集合划分的影响。

package bean;import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;public class GroupingBean extends WritableComparator {protected GroupingBean() {super(ComparableBean.class, true);}@SuppressWarnings("rawtypes")@Overridepublic int compare(WritableComparable a, WritableComparable b) {ComparableBean aBean = (ComparableBean) a;ComparableBean bBean = (ComparableBean) b;int samecity = aBean.getCity().compareTo(bBean.getCity());if (samecity == 0) {return aBean.getZone().compareTo(bBean.getZone());} else {return samecity;}}
}

Mapper与Reducer

 static class M3RentinfoMapper extends Mapper<LongWritable, Text, ComparableBean, RentinfoBean> {@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, ComparableBean, RentinfoBean>.Context context)throws IOException, InterruptedException {rentinfoBean = RentinfoPaser.parse(value.toString());//获取房租价格/人priceforonetemp = (Float.parseFloat(rentinfoBean.getPrice()) / Float.parseFloat(rentinfoBean.getNumofpp()));comparableBean.setCity(rentinfoBean.getCity());comparableBean.setZone(rentinfoBean.getZone());comparableBean.setPriceforone(String.valueOf(priceforonetemp));context.write(comparableBean, rentinfoBean);}}static class M3RentinfoReducer extends Reducer<ComparableBean, RentinfoBean, NullWritable, M3InfoBean> {@Overrideprotected void reduce(ComparableBean key, Iterable<RentinfoBean> values,Reducer<ComparableBean, RentinfoBean, NullWritable, M3InfoBean>.Context context)throws IOException, InterruptedException {float sum_priceforonetemp = 0;float priceforone = 0;M3InfoBean bean = new M3InfoBean();int infoBeannum = 1;RentinfoBean rentinfoBean = new RentinfoBean();for (RentinfoBean value : values) {priceforone = Float.parseFloat(value.getPrice()) / Float.parseFloat(value.getNumofpp());sum_priceforonetemp += priceforone;if (infoBeannum == 1) {bean.setMaxrentinfo(value.toString().concat("\001").concat(String.valueOf(priceforone)));}rentinfoBean = value;infoBeannum ++;}bean.setMinrentinfo(rentinfoBean.toString().concat("\001").concat(String.valueOf(priceforone)));bean.setMeanrentinfo(String.valueOf(sum_priceforonetemp / (infoBeannum - 1)));context.write(NullWritable.get(), bean);}}

程序完整代码与各阶段输出

源数据、代码与输出文件
链接:https://pan.baidu.com/s/1UOH4_vP53OjI5LVmQ3rgiQ
提取码:vkcp

MapReduce中Shuffle机制的学习案例——房屋租赁信息相关推荐

  1. mapreduce的shuffle机制(来自学习笔记)

    3. MAPREDUCE原理篇(2) 3.1 mapreduce的shuffle机制 3.1.1 概述: MapReduce中,mapper阶段处理的数据如何传递给reduce阶段,是MapReduc ...

  2. 数据库中锁机制的学习

           我们在做很多项目时都要涉及到数据库,特别是一些比较大型的web项目,更是有较大的并发处理,所以对数据库的操作有可能会产生死锁,对于数据库的死锁,一般数据库系统都会有一套机制去解锁,一般不 ...

  3. 什么是MapReduce?MapReduce的运行机制是什么?MapReduce的实现过程

    1. MAPREDUCE原理篇(1) Mapreduce是一个分布式运算程序的编程框架,是用户开发"基于hadoop的数据分析应用"的核心框架: Mapreduce核心功能是将用户 ...

  4. spark基础之shuffle机制和原理分析

    一 概述 Shuffle就是对数据进行重组,由于分布式计算的特性和要求,在实现细节上更加繁琐和复杂 在MapReduce框架,Shuffle是连接Map和Reduce之间的桥梁,Map阶段通过shuf ...

  5. Vue3+TypeScript从入门到进阶(六)——TypeScript知识点——附沿途学习案例及项目实战代码

    文章目录 一.简介 二.Vue2和Vue3区别 三.Vue知识点学习 四.TypeScript知识点 一.JavaScript和TypeScript 二.TypeScript的安装和使用 1.Type ...

  6. Vue3+TypeScript从入门到进阶(八)——项目打包和自动化部署——附沿途学习案例及项目实战代码

    文章目录 一.简介 二.Vue2和Vue3区别 三.Vue知识点学习 四.TypeScript知识点 五.项目实战 六.项目打包和自动化部署 一. 项目部署和DevOps 1.1. 传统的开发模式 1 ...

  7. shuffle机制和原理分析

    Shuffle简介 Shuffle描述着数据从map task输出到reduce task输入的这段过程.shuffle是连接Map和Reduce之间的桥梁,Map的输出要用到Reduce中必须经过s ...

  8. 第三节 Hadoop学习案例——MapReduce课程设计 好友推荐功能

    提示:文章内容主要以案例为主 目录 前言 项目说明 一,程序需求 1.需求 2.数据 二,编码操作 1.项目建包目录 2.FriendsRecommend.java 3.FriendsRecommen ...

  9. Hadoop学习(16)-MapReduce的shuffle详解

    原文来自:扎心了,老铁的<Hadoop学习之路(二十三)MapReduce中的shuffle详解>

  10. Hadoop学习笔记—11.MapReduce中的排序和分组

    Hadoop学习笔记-11.MapReduce中的排序和分组 一.写在之前的 1.1 回顾Map阶段四大步骤 首先,我们回顾一下在MapReduce中,排序和分组在哪里被执行: 从上图中可以清楚地看出 ...

最新文章

  1. dataframe for 循环 数据格式 python_Python中的for循环
  2. python读什么英文-django的英文读法是什么
  3. 2018.09.16模拟总结
  4. 软件项目组织管理(七)项目成本管理
  5. 句向量的表示(上)—无监督
  6. 数组排列组合算法汇总
  7. 解决applet覆盖遮罩层div的问题
  8. Linux(CentOS7.1)修改默认yum源为国内的阿里云yum源
  9. Linux-iptables初识
  10. MVC实用架构设计(三)——EF-Code First(4):数据查询
  11. KVC、谓词、单例设计模式
  12. 迅为IMX6ULL开发板Linux RS232/485驱动实验(上)
  13. 6.4.3.4 -排除默认网关故障
  14. icon php图标集合,iconfont字体图标和各种css小图标的详解
  15. JavaScript基础知识总结 18:dom基本操作
  16. opc是什么? opc ua是什么?
  17. 山东 计算机专业,山东省内计算机专业大学排名?
  18. fiddler mac
  19. 仙剑三功略(结局,冶炼,魔剑养成)
  20. [论文翻译]基于图像自适应GAN重建

热门文章

  1. 最简单的解释 webdriver的工作原理
  2. STM32F4+W25Q64实现一个U盘
  3. TypeError: can‘t convert cuda:0 device type tensor to numpy. Use Tensor.cpu()
  4. SolidWorks DocumentManager LicenseKey
  5. 在美女如云的时尚公司当程序员是怎样的体验?
  6. kafka 报错 SyntaxError: invalid syntax
  7. Github骚操作绑定中国+86手机号码实现两步验证
  8. 关于删除/卸载win10自带IE 11浏览器后恢复问题
  9. 什么是WiFi无缝漫游(即无线AP自动切换)?
  10. java汤姆猫安装_汤姆猫跑酷安装