4.1. Mapreduce中的排序初步

4.1.1 需求

对日志数据中的上下行流量信息汇总,并输出按照总流量倒序排序的结果

数据如下:

1363157985066 1372623050300-FD-07-A4-72-B8:CMCC120.196.100.82             2427248124681200

1363157995052 138265441015C-0E-8B-C7-F1-E0:CMCC120.197.40.4402640200

1363157991076 1392643565620-10-7A-28-CC-0A:CMCC120.196.100.99241321512200

1363154400022 139262511065C-0E-8B-8B-B1-50:CMCC120.197.40.4402400200

4.1.2 分析

基本思路:实现自定义的bean来封装流量信息,并将bean作为map输出的key来传输

MR程序在处理数据的过程中会对数据排序(map输出的kv对传输到reduce之前,会排序),排序的依据是map输出的key

所以,我们如果要实现自己需要的排序规则,则可以考虑将排序因素放到key中,让key实现接口:WritableComparable

然后重写key的compareTo方法

4.1.3 实现

1、 自定义的bean

public class FlowBean implements WritableComparable<FlowBean>{

long upflow;

long downflow;

long sumflow;

//如果空参构造函数被覆盖,一定要显示定义一下,否则在反序列时会抛异常

public FlowBean(){}

public FlowBean(long upflow, long downflow) {

super();

this.upflow = upflow;

this.downflow = downflow;

this.sumflow = upflow + downflow;

}

public long getSumflow() {

return sumflow;

}

public void setSumflow(long sumflow) {

this.sumflow = sumflow;

}

public long getUpflow() {

return upflow;

}

public void setUpflow(long upflow) {

this.upflow = upflow;

}

public long getDownflow() {

return downflow;

}

public void setDownflow(long downflow) {

this.downflow = downflow;

}

//序列化,将对象的字段信息写入输出流

@Override

public void write(DataOutput out) throws IOException {

out.writeLong(upflow);

out.writeLong(downflow);

out.writeLong(sumflow);

}

//反序列化,从输入流中读取各个字段信息

@Override

public void readFields(DataInput in) throws IOException {

upflow = in.readLong();

downflow = in.readLong();

sumflow = in.readLong();

}

@Override

public String toString() {

return upflow + "\t" + downflow + "\t" + sumflow;

}

@Override

public int compareTo(FlowBean o) {

//自定义倒序比较规则

return sumflow > o.getSumflow() ? -1:1;

}

}

2、 mapper 和 reducer

public class FlowCount {

static class FlowCountMapper extends Mapper<LongWritable, Text, FlowBean,Text > {

@Override

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String line = value.toString();

String[] fields = line.split("\t");

try {

String phonenbr = fields[0];

long upflow = Long.parseLong(fields[1]);

long dflow = Long.parseLong(fields[2]);

FlowBean flowBean = new FlowBean(upflow, dflow);

context.write(flowBean,new Text(phonenbr));

} catch (Exception e) {

e.printStackTrace();

}

}

}

static class FlowCountReducer extends Reducer<FlowBean,Text,Text, FlowBean> {

@Override

protected void reduce(FlowBean bean, Iterable<Text> phonenbr, Context context) throws IOException, InterruptedException {

Text phoneNbr = phonenbr.iterator().next();

context.write(phoneNbr, bean);

}

}

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();

Job job = Job.getInstance(conf);

job.setJarByClass(FlowCount.class);

job.setMapperClass(FlowCountMapper.class);

job.setReducerClass(FlowCountReducer.class);

job.setMapOutputKeyClass(FlowBean.class);

job.setMapOutputValueClass(Text.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(FlowBean.class);

// job.setInputFormatClass(TextInputFormat.class);

FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.waitForCompletion(true);

}

}

4.2. Mapreduce中的分区Partitioner

4.2.1 需求

根据归属地输出流量统计数据结果到不同文件,以便于在查询统计结果时可以定位到省级范围进行

4.2.2 分析

Mapreduce中会将map输出的kv对,按照相同key分组,然后分发给不同的reducetask

默认的分发规则为:根据key的hashcode%reducetask数来分发

所以:如果要按照我们自己的需求进行分组,则需要改写数据分发(分组)组件Partitioner

自定义一个CustomPartitioner继承抽象类:Partitioner

然后在job对象中,设置自定义partitioner: job.setPartitionerClass(CustomPartitioner.class)

4.2.3 实现

/**

* 定义自己的从map到reduce之间的数据(分组)分发规则 按照手机号所属的省份来分发(分组)ProvincePartitioner

* 默认的分组组件是HashPartitioner

*

* @author

*

*/

public class ProvincePartitioner extends Partitioner<Text, FlowBean> {

static HashMap<String, Integer> provinceMap = new HashMap<String, Integer>();

static {

provinceMap.put("135", 0);

provinceMap.put("136", 1);

provinceMap.put("137", 2);

provinceMap.put("138", 3);

provinceMap.put("139", 4);

}

@Override

public int getPartition(Text key, FlowBean value, int numPartitions) {

Integer code = provinceMap.get(key.toString().substring(0, 3));

return code == null ? 5 : code;

}

}

4.3. mapreduce数据压缩

4.3.1 概述

这是mapreduce的一种优化策略:通过压缩编码对mapper或者reducer的输出进行压缩,以减少磁盘IO,提高MR程序运行速度(但相应增加了cpu运算负担)

1、 Mapreduce支持将map输出的结果或者reduce输出的结果进行压缩,以减少网络IO或最终输出数据的体积

2、 压缩特性运用得当能提高性能,但运用不当也可能降低性能

3、 基本原则:

运算密集型的job,少用压缩

IO密集型的job,多用压缩

4.3.2 MR支持的压缩编码

4.3.3 Reducer输出压缩

在配置参数或在代码中都可以设置reduce的输出压缩

1、在配置参数中设置

mapreduce.output.fileoutputformat.compress=false

mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.DefaultCodec

mapreduce.output.fileoutputformat.compress.type=RECORD

2、在代码中设置

Job job = Job.getInstance(conf);

FileOutputFormat.setCompressOutput(job, true);

FileOutputFormat.setOutputCompressorClass(job, (Class<? extends CompressionCodec>) Class.forName(""));

4.3.4 Mapper输出压缩

在配置参数或在代码中都可以设置reduce的输出压缩

1、在配置参数中设置

mapreduce.map.output.compress=false

mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.DefaultCodec

2、在代码中设置:

conf.setBoolean(Job.MAP_OUTPUT_COMPRESS, true);

conf.setClass(Job.MAP_OUTPUT_COMPRESS_CODEC, GzipCodec.class, CompressionCodec.class);

4.3.5 压缩文件的读取

Hadoop自带的InputFormat类内置支持压缩文件的读取,比如TextInputformat类,在其initialize方法中:

public void initialize(InputSplit genericSplit,

TaskAttemptContext context) throws IOException {

FileSplit split = (FileSplit) genericSplit;

Configuration job = context.getConfiguration();

this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);

start = split.getStart();

end = start + split.getLength();

final Path file = split.getPath();

// open the file and seek to the start of the split

final FileSystem fs = file.getFileSystem(job);

fileIn = fs.open(file);

//根据文件后缀名创建相应压缩编码的codec

CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);

if (null!=codec) {

isCompressedInput = true;

decompressor = CodecPool.getDecompressor(codec);

//判断是否属于可切片压缩编码类型

if (codec instanceof SplittableCompressionCodec) {

final SplitCompressionInputStream cIn =

((SplittableCompressionCodec)codec).createInputStream(

fileIn, decompressor, start, end,

SplittableCompressionCodec.READ_MODE.BYBLOCK);

//如果是可切片压缩编码,则创建一个CompressedSplitLineReader读取压缩数据

in = new CompressedSplitLineReader(cIn, job,

this.recordDelimiterBytes);

start = cIn.getAdjustedStart();

end = cIn.getAdjustedEnd();

filePosition = cIn;

} else {

//如果是不可切片压缩编码,则创建一个SplitLineReader读取压缩数据,并将文件输入流转换成解压数据流传递给普通SplitLineReader读取

in = new SplitLineReader(codec.createInputStream(fileIn,

decompressor), job, this.recordDelimiterBytes);

filePosition = fileIn;

}

} else {

fileIn.seek(start);

//如果不是压缩文件,则创建普通SplitLineReader读取数据

in = new SplitLineReader(fileIn, job, this.recordDelimiterBytes);

filePosition = fileIn;

}

4.4更多MapReduce编程案例

4.4.1 reduce端join算法实现

1、需求:

订单数据表t_order:

id

date

pid

amount

1001

20150710

P0001

2

1002

20150710

P0001

3

1002

20150710

P0002

3

商品信息表t_product

id

name

category_id

price

P0001

小米5

C01

2

P0002

锤子T1

C01

3

假如数据量巨大,两表的数据是以文件的形式存储在HDFS中,需要用mapreduce程序来实现一下SQL查询运算:

select  a.id,a.date,b.name,b.category_id,b.price from t_order a join t_product b on a.pid = b.id

2、实现机制:

通过将关联的条件作为map输出的key,将两表满足join条件的数据并携带数据所来源的文件信息,发往同一个reduce task,在reduce中进行数据的串联

public class OrderJoin {

static class OrderJoinMapper extends Mapper<LongWritable, Text, Text, OrderJoinBean> {

@Override

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

// 拿到一行数据,并且要分辨出这行数据所属的文件

String line = value.toString();

String[] fields = line.split("\t");

// 拿到itemid

String itemid = fields[0];

// 获取到这一行所在的文件名(通过inpusplit)

String name = "你拿到的文件名";

// 根据文件名,切分出各字段(如果是a,切分出两个字段,如果是b,切分出3个字段)

OrderJoinBean bean = new OrderJoinBean();

bean.set(null, null, null, null, null);

context.write(new Text(itemid), bean);

}

}

static class OrderJoinReducer extends Reducer<Text, OrderJoinBean, OrderJoinBean, NullWritable> {

@Override

protected void reduce(Text key, Iterable<OrderJoinBean> beans, Context context) throws IOException, InterruptedException {

//拿到的key是某一个itemid,比如1000

//拿到的beans是来自于两类文件的bean

//  {1000,amount} {1000,amount} {1000,amount}   ---   {1000,price,name}

//将来自于b文件的bean里面的字段,跟来自于a的所有bean进行字段拼接并输出

}

}

}

缺点:这种方式中,join的操作是在reduce阶段完成,reduce端的处理压力太大,map节点的运算负载则很低,资源利用率不高,且在reduce阶段极易产生数据倾斜

解决方案: map端join实现方式

4.4.2 map端join算法实现

1、原理阐述

适用于关联表中有小表的情形;

可以将小表分发到所有的map节点,这样,map节点就可以在本地对自己所读到的大表数据进行join并输出最终结果,可以大大提高join操作的并发度,加快处理速度

2、实现示例

--先在mapper类中预先定义好小表,进行join

--引入实际场景中的解决方案:一次加载数据库或者用distributedcache

public class TestDistributedCache {

static class TestDistributedCacheMapper extends Mapper<LongWritable, Text, Text, Text>{

FileReader in = null;

BufferedReader reader = null;

HashMap<String,String> b_tab = new HashMap<String, String>();

String localpath =null;

String uirpath = null;

//是在map任务初始化的时候调用一次

@Override

protected void setup(Context context) throws IOException, InterruptedException {

//通过这几句代码可以获取到cache file的本地绝对路径,测试验证用

Path[] files = context.getLocalCacheFiles();

localpath = files[0].toString();

URI[] cacheFiles = context.getCacheFiles();

//缓存文件的用法——直接用本地IO来读取

//这里读的数据是map task所在机器本地工作目录中的一个小文件

in = new FileReader("b.txt");

reader =new BufferedReader(in);

String line =null;

while(null!=(line=reader.readLine())){

String[] fields = line.split(",");

b_tab.put(fields[0],fields[1]);

}

IOUtils.closeStream(reader);

IOUtils.closeStream(in);

}

@Override

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

//这里读的是这个map task所负责的那一个切片数据(在hdfs上)

String[] fields = value.toString().split("\t");

String a_itemid = fields[0];

String a_amount = fields[1];

String b_name = b_tab.get(a_itemid);

// 输出结果  100198.9banan

context.write(new Text(a_itemid), new Text(a_amount + "\t" + ":" + localpath + "\t" +b_name ));

}

}

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();

Job job = Job.getInstance(conf);

job.setJarByClass(TestDistributedCache.class);

job.setMapperClass(TestDistributedCacheMapper.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(LongWritable.class);

//这里是我们正常的需要处理的数据所在路径

FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

//不需要reducer

job.setNumReduceTasks(0);

//分发一个文件到task进程的工作目录

job.addCacheFile(new URI("hdfs://hadoop-server01:9000/cachefile/b.txt"));

//分发一个归档文件到task进程的工作目录

//job.addArchiveToClassPath(archive);

//分发jar包到task节点的classpath下

//job.addFileToClassPath(jarfile);

job.waitForCompletion(true);

}

}

4.4.3 web日志预处理

1、需求:

对web访问日志中的各字段识别切分

去除日志中不合法的记录

根据KPI统计需求,生成各类访问请求过滤数据

2、实现代码:

a) 定义一个bean,用来记录日志数据中的各数据字段

public class WebLogBean {

private String remote_addr;// 记录客户端的ip地址

private String remote_user;// 记录客户端用户名称,忽略属性"-"

private String time_local;// 记录访问时间与时区

private String request;// 记录请求的url与http协议

private String status;// 记录请求状态;成功是200

private String body_bytes_sent;// 记录发送给客户端文件主体内容大小

private String http_referer;// 用来记录从那个页面链接访问过来的

private String http_user_agent;// 记录客户浏览器的相关信息

private boolean valid = true;// 判断数据是否合法

public String getRemote_addr() {

return remote_addr;

}

public void setRemote_addr(String remote_addr) {

this.remote_addr = remote_addr;

}

public String getRemote_user() {

return remote_user;

}

public void setRemote_user(String remote_user) {

this.remote_user = remote_user;

}

public String getTime_local() {

return time_local;

}

public void setTime_local(String time_local) {

this.time_local = time_local;

}

public String getRequest() {

return request;

}

public void setRequest(String request) {

this.request = request;

}

public String getStatus() {

return status;

}

public void setStatus(String status) {

this.status = status;

}

public String getBody_bytes_sent() {

return body_bytes_sent;

}

public void setBody_bytes_sent(String body_bytes_sent) {

this.body_bytes_sent = body_bytes_sent;

}

public String getHttp_referer() {

return http_referer;

}

public void setHttp_referer(String http_referer) {

this.http_referer = http_referer;

}

public String getHttp_user_agent() {

return http_user_agent;

}

public void setHttp_user_agent(String http_user_agent) {

this.http_user_agent = http_user_agent;

}

public boolean isValid() {

return valid;

}

public void setValid(boolean valid) {

this.valid = valid;

}

@Override

public String toString() {

StringBuilder sb = new StringBuilder();

sb.append(this.valid);

sb.append("\001").append(this.remote_addr);

sb.append("\001").append(this.remote_user);

sb.append("\001").append(this.time_local);

sb.append("\001").append(this.request);

sb.append("\001").append(this.status);

sb.append("\001").append(this.body_bytes_sent);

sb.append("\001").append(this.http_referer);

sb.append("\001").append(this.http_user_agent);

return sb.toString();

}

}

b)定义一个parser用来解析过滤web访问日志原始记录

public class WebLogParser {

public static WebLogBean parser(String line) {

WebLogBean webLogBean = new WebLogBean();

String[] arr = line.split(" ");

if (arr.length > 11) {

webLogBean.setRemote_addr(arr[0]);

webLogBean.setRemote_user(arr[1]);

webLogBean.setTime_local(arr[3].substring(1));

webLogBean.setRequest(arr[6]);

webLogBean.setStatus(arr[8]);

webLogBean.setBody_bytes_sent(arr[9]);

webLogBean.setHttp_referer(arr[10]);

if (arr.length > 12) {

webLogBean.setHttp_user_agent(arr[11] + " " + arr[12]);

} else {

webLogBean.setHttp_user_agent(arr[11]);

}

if (Integer.parseInt(webLogBean.getStatus()) >= 400) {// 大于400,HTTP错误

webLogBean.setValid(false);

}

} else {

webLogBean.setValid(false);

}

return webLogBean;

}

public static String parserTime(String time) {

time.replace("/", "-");

return time;

}

}

c) mapreduce程序

public class WeblogPreProcess {

static class WeblogPreProcessMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

Text k = new Text();

NullWritable v = NullWritable.get();

@Override

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String line = value.toString();

WebLogBean webLogBean = WebLogParser.parser(line);

if (!webLogBean.isValid())

return;

k.set(webLogBean.toString());

context.write(k, v);

}

}

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();

Job job = Job.getInstance(conf);

job.setJarByClass(WeblogPreProcess.class);

job.setMapperClass(WeblogPreProcessMapper.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(NullWritable.class);

FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.waitForCompletion(true);

}

}

本文转自yushiwh 51CTO博客,原文链接:http://blog.51cto.com/yushiwh/1913046,如需转载请自行联系原作者

MAPREDUCE实践篇(2)相关推荐

  1. MAPREDUCE实践篇(1)

    2.1 MAPREDUCE 示例编写及编程规范 2.1.1 编程规范 (1)用户编写的程序分成三个部分:Mapper,Reducer,Driver(提交运行mr程序的客户端) (2)Mapper的输入 ...

  2. 什么是MapReduce?MapReduce的运行机制是什么?MapReduce的实现过程

    1. MAPREDUCE原理篇(1) Mapreduce是一个分布式运算程序的编程框架,是用户开发"基于hadoop的数据分析应用"的核心框架: Mapreduce核心功能是将用户 ...

  3. Hadoop详解手册.pdf

    1 HADOOP背景 1 什么是HADOOP 1. HADOOP是apache旗下的一套开源软件平台 2. HADOOP提供的功能:利用服务器集群,根据用户的自定义业务逻辑,对海量数据进行分布式处理 ...

  4. mapreduce理解_大数据

    map:对不同的数据进行同种操作 reduce:按keys 把数据规约到一起 看这篇文章请出去跑两圈,然后泡一壶茶,边喝茶,边看,看完你就对hadoop 与MapReduce的整体有所了解了. [前言 ...

  5. 2021年大数据Hadoop(二十二):MapReduce的自定义分组

    全网最详细的Hadoop文章系列,强烈建议收藏加关注! 后面更新文章都会列出历史文章目录,帮助大家回顾知识重点. 目录 本系列历史文章 前言 MapReduce的自定义分组 需求 分析 实现 第一步: ...

  6. 第2节 mapreduce深入学习:4, 5

    第2节 mapreduce深入学习:4.mapreduce的序列化以及自定义排序 序列化(Serialization)是指把结构化对象转化为字节流. 反序列化(Deserialization)是序列化 ...

  7. 第一个MapReduce程序

    计算文件中每个单词的频数 wordcount 程序调用 wordmap 和 wordreduce 程序. 1 import org.apache.hadoop.conf.Configuration; ...

  8. hadoop程序MapReduce之SingletonTableJoin

    需求:单表关联问题.从文件中孩子和父母的关系挖掘出孙子和爷奶关系 样板:child-parent.txt xiaoming daxiong daxiong alice daxiong jack 输出: ...

  9. 初学Hadoop之图解MapReduce与WordCount示例分析

    Hadoop的框架最核心的设计就是:HDFS和MapReduce.HDFS为海量的数据提供了存储,MapReduce则为海量的数据提供了计算. HDFS是Google File System(GFS) ...

最新文章

  1. Mysql中分页查询两个方法比较
  2. http、TCP/IP协议与socket之间的区别
  3. NYOJ 275 队花的烦恼一
  4. os模块中的shutil的使用方式与方法
  5. Java黑皮书课后题第4章:*4.11(十进制转十六进制)编写程序,提示用户输入0~15之间的一个整数,显示其对应的十六进制数。对于不正确的输入数字,提示非法输入
  6. 关于星空的java小程序_[Java教程]小程序使用Canvas画饼图_星空网
  7. android 电源管理 wakelock 唤醒锁机制
  8. Hive ROW_NUMBER,RANK(),DENSE_RANK()
  9. 纸上谈兵:数学归纳法,递归,栈
  10. 玩转springboot2.x之异步调用@Async
  11. flex学习笔记 数据验证
  12. Python多任务之多进程开发
  13. BIM族库下载——BIM项目停车场管理系统常用族库
  14. B2B,B2C,C2C,C2B,B2G
  15. google scholar 使用不了的问题——已解决
  16. 微博音视频下载与合并
  17. Unity3d+GameFramework:资源分析,资源依赖,循环依赖检测
  18. 【100%通过率】华为OD机试真题 Python 实现【完美走位】【2022.11 Q4新题】
  19. 2021年“创客广东”粤港澳新一代信息技术中小企业创新创业大赛决赛
  20. Android/Java中okhttp用法介绍

热门文章

  1. linux命令作为子进程标记,Linux基础命令---显示进程ps
  2. c语言程序设计 中南大学,中南大学-C语言程序设计试卷.docx
  3. mysql yearweek 日期不准_Mysql 中,WEEK 与YEARWEEK函数的参数问题
  4. php点击字切换验证码,PHP生成图片验证码、点击切换实例 Web程序 - 贪吃蛇学院-专业IT技术平台...
  5. linux内核centos6.9,CentOS6.9手动编译并更新Kernel内核版本
  6. android 9.0室内定位方案,Android GPS室内定位问题的解决方法(location为null)
  7. 应用在机器学习中的聚类数据集产生方法
  8. 电容触摸按键IC AT42QT1070
  9. SP-45ML光电二极管放大电路及其动态特性
  10. 东芝打印机cd40故障_东芝小尺寸UV平板机介绍