reduce端join算法实现

1、需求:

订单数据表t_order:

id

date

pid

amount

1001

20150710

P0001

2

1002

20150710

P0001

3

1002

20150710

P0002

3

商品信息表t_product

id

pname

category_id

price

P0001

小米5

1000

2

P0002

锤子T1

1000

3

假如数据量巨大,两表的数据是以文件的形式存储在HDFS中,需要用mapreduce程序来实现一下SQL查询运算:

select  a.id,a.date,b.name,b.category_id,b.price from t_order a join t_product b on a.pid = b.id

2、实现机制:

通过将关联的条件作为map输出的key,将两表满足join条件的数据并携带数据所来源的文件信息,发往同一个reduce task,在reduce中进行数据的串联

public class OrderJoin {static class OrderJoinMapper extends Mapper<LongWritable, Text, Text, OrderJoinBean> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 拿到一行数据,并且要分辨出这行数据所属的文件String line = value.toString();String[] fields = line.split("\t");// 拿到itemidString itemid = fields[0];// 获取到这一行所在的文件名(通过inpusplit)String name = "你拿到的文件名";// 根据文件名,切分出各字段(如果是a,切分出两个字段,如果是b,切分出3个字段)
OrderJoinBean bean = new OrderJoinBean();bean.set(null, null, null, null, null);context.write(new Text(itemid), bean);}}static class OrderJoinReducer extends Reducer<Text, OrderJoinBean, OrderJoinBean, NullWritable> {@Overrideprotected void reduce(Text key, Iterable<OrderJoinBean> beans, Context context) throws IOException, InterruptedException {//拿到的key是某一个itemid,比如1000//拿到的beans是来自于两类文件的bean//  {1000,amount} {1000,amount} {1000,amount}   ---   {1000,price,name}//将来自于b文件的bean里面的字段,跟来自于a的所有bean进行字段拼接并输出
        }}
}

缺点:这种方式中,join的操作是在reduce阶段完成,reduce端的处理压力太大,map节点的运算负载则很低,资源利用率不高,且在reduce阶段极易产生数据倾斜

解决方案: map端join实现方式

1、原理阐述

适用于关联表中有小表的情形;

可以将小表分发到所有的map节点,这样,map节点就可以在本地对自己所读到的大表数据进行join并输出最终结果,可以大大提高join操作的并发度,加快处理速度

2、实现示例

--先在mapper类中预先定义好小表,进行join

--引入实际场景中的解决方案:一次加载数据库或者用distributedcache

public class TestDistributedCache {static class TestDistributedCacheMapper extends Mapper<LongWritable, Text, Text, Text>{FileReader in = null;BufferedReader reader = null;HashMap<String,String> b_tab = new HashMap<String, String>();String localpath =null;String uirpath = null;//是在map任务初始化的时候调用一次
        @Overrideprotected void setup(Context context) throws IOException, InterruptedException {//通过这几句代码可以获取到cache file的本地绝对路径,测试验证用Path[] files = context.getLocalCacheFiles();localpath = files[0].toString();URI[] cacheFiles = context.getCacheFiles();//缓存文件的用法——直接用本地IO来读取//这里读的数据是map task所在机器本地工作目录中的一个小文件in = new FileReader("b.txt");reader =new BufferedReader(in);String line =null;while(null!=(line=reader.readLine())){String[] fields = line.split(",");b_tab.put(fields[0],fields[1]);}IOUtils.closeStream(reader);IOUtils.closeStream(in);}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//这里读的是这个map task所负责的那一个切片数据(在hdfs上)String[] fields = value.toString().split("\t");String a_itemid = fields[0];String a_amount = fields[1];String b_name = b_tab.get(a_itemid);// 输出结果  1001    98.9    banancontext.write(new Text(a_itemid), new Text(a_amount + "\t" + ":" + localpath + "\t" +b_name ));}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(TestDistributedCache.class);job.setMapperClass(TestDistributedCacheMapper.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(LongWritable.class);//这里是我们正常的需要处理的数据所在路径FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));//不需要reducerjob.setNumReduceTasks(0);//分发一个文件到task进程的工作目录job.addCacheFile(new URI("hdfs://hadoop-server01:9000/cachefile/b.txt"));//分发一个归档文件到task进程的工作目录
//        job.addArchiveToClassPath(archive);//分发jar包到task节点的classpath下
//        job.addFileToClassPath(jarfile);
        job.waitForCompletion(true);}
}

web日志预处理

1、需求:

对web访问日志中的各字段识别切分

去除日志中不合法的记录

根据KPI统计需求,生成各类访问请求过滤数据

2、实现代码:

a) 定义一个bean,用来记录日志数据中的各数据字段

public class WebLogBean {private String remote_addr;// 记录客户端的ip地址private String remote_user;// 记录客户端用户名称,忽略属性"-"private String time_local;// 记录访问时间与时区private String request;// 记录请求的url与http协议private String status;// 记录请求状态;成功是200private String body_bytes_sent;// 记录发送给客户端文件主体内容大小private String http_referer;// 用来记录从那个页面链接访问过来的private String http_user_agent;// 记录客户浏览器的相关信息private boolean valid = true;// 判断数据是否合法public String getRemote_addr() {return remote_addr;}public void setRemote_addr(String remote_addr) {this.remote_addr = remote_addr;}public String getRemote_user() {return remote_user;}public void setRemote_user(String remote_user) {this.remote_user = remote_user;}public String getTime_local() {return time_local;}public void setTime_local(String time_local) {this.time_local = time_local;}public String getRequest() {return request;}public void setRequest(String request) {this.request = request;}public String getStatus() {return status;}public void setStatus(String status) {this.status = status;}public String getBody_bytes_sent() {return body_bytes_sent;}public void setBody_bytes_sent(String body_bytes_sent) {this.body_bytes_sent = body_bytes_sent;}public String getHttp_referer() {return http_referer;}public void setHttp_referer(String http_referer) {this.http_referer = http_referer;}public String getHttp_user_agent() {return http_user_agent;}public void setHttp_user_agent(String http_user_agent) {this.http_user_agent = http_user_agent;}public boolean isValid() {return valid;}public void setValid(boolean valid) {this.valid = valid;}@Overridepublic String toString() {StringBuilder sb = new StringBuilder();sb.append(this.valid);sb.append("\001").append(this.remote_addr);sb.append("\001").append(this.remote_user);sb.append("\001").append(this.time_local);sb.append("\001").append(this.request);sb.append("\001").append(this.status);sb.append("\001").append(this.body_bytes_sent);sb.append("\001").append(this.http_referer);sb.append("\001").append(this.http_user_agent);return sb.toString();
}
}

b)定义一个parser用来解析过滤web访问日志原始记录

public class WebLogParser {public static WebLogBean parser(String line) {WebLogBean webLogBean = new WebLogBean();String[] arr = line.split(" ");if (arr.length > 11) {webLogBean.setRemote_addr(arr[0]);webLogBean.setRemote_user(arr[1]);webLogBean.setTime_local(arr[3].substring(1));webLogBean.setRequest(arr[6]);webLogBean.setStatus(arr[8]);webLogBean.setBody_bytes_sent(arr[9]);webLogBean.setHttp_referer(arr[10]);if (arr.length > 12) {webLogBean.setHttp_user_agent(arr[11] + " " + arr[12]);} else {webLogBean.setHttp_user_agent(arr[11]);}if (Integer.parseInt(webLogBean.getStatus()) >= 400) {// 大于400,HTTP错误webLogBean.setValid(false);}} else {webLogBean.setValid(false);}return webLogBean;}public static String parserTime(String time) {time.replace("/", "-");return time;}
}

c) mapreduce程序

public class WeblogPreProcess {static class WeblogPreProcessMapper extends Mapper<LongWritable, Text, Text, NullWritable> {Text k = new Text();NullWritable v = NullWritable.get();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();WebLogBean webLogBean = WebLogParser.parser(line);if (!webLogBean.isValid())return;k.set(webLogBean.toString());context.write(k, v);}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(WeblogPreProcess.class);job.setMapperClass(WeblogPreProcessMapper.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));job.waitForCompletion(true);}
}

流量统计相关需求

1、对流量日志中的用户统计总上、下行流量技术点: 自定义javaBean用来在mapreduce中充当value

注意: javaBean要实现Writable接口,实现两个方法

    //序列化,将对象的字段信息写入输出流
    @Overridepublic void write(DataOutput out) throws IOException {out.writeLong(upflow);out.writeLong(downflow);out.writeLong(sumflow);}//反序列化,从输入流中读取各个字段信息
    @Overridepublic void readFields(DataInput in) throws IOException {upflow = in.readLong();downflow = in.readLong();sumflow = in.readLong();}

1、统计流量且按照流量大小倒序排序

技术点:这种需求,用一个mapreduce -job 不好实现,需要两个mapreduce -job

第一个job负责流量统计,跟上题相同

第二个job读入第一个job的输出,然后做排序

要将flowBean作为map的key输出,这样mapreduce就会自动排序 此时,flowBean要实现接口WritableComparable   要实现其中的compareTo()方法,方法中,我们可以定义倒序比较的逻辑

1、统计流量且按照手机号的归属地,将结果数据输出到不同的省份文件中技术点:自定义Partitioner

    @Overridepublic int getPartition(Text key, FlowBean value, int numPartitions) {String prefix = key.toString().substring(0,3);Integer partNum = pmap.get(prefix);return (partNum==null?4:partNum);}

自定义partition后,要根据自定义partitioner的逻辑设置相应数量的reduce task

job.setNumReduceTasks(5);

注意:如果reduceTask的数量>= getPartition的结果数  ,则会多产生几个空的输出文件part-r-000xx

如果     1<reduceTask的数量<getPartition的结果数 ,则有一部分分区数据无处安放,会Exception!!!

如果 reduceTask的数量=1,则不管mapTask端输出多少个分区文件,最终结果都交给这一个reduceTask,最终也就只会产生一个结果文件 part-r-00000

社交粉丝数据分析

以下是qq的好友列表数据,冒号前是一个用,冒号后是该用户的所有好友(数据中的好友关系是单向的)

A:B,C,D,F,E,O

B:A,C,E,K

C:F,A,D,I

D:A,E,F,L

E:B,C,D,M,L

F:A,B,C,D,E,O,M

G:A,C,D,E,F

H:A,C,D,E,O

I:A,O

J:B,O

K:A,C,D

L:D,E,F

M:E,F,G

O:A,H,I,J

求出哪些人两两之间有共同好友,及他俩的共同好友都有谁?

解题思路:

求出哪些人两两之间有共同好友,及他俩的共同好友都有谁?

解题思路:

第一步

map

读一行   A:B,C,D,F,E,O

输出    <B,A><C,A><D,A><F,A><E,A><O,A>

在读一行   B:A,C,E,K

输出   <A,B><C,B><E,B><K,B>

REDUCE

拿到的数据比如<C,A><C,B><C,E><C,F><C,G>......

输出:

<A-B,C>

<A-E,C>

<A-F,C>

<A-G,C>

<B-E,C>

<B-F,C>.....

第二步

map

读入一行<A-B,C>

直接输出<A-B,C>

reduce

读入数据  <A-B,C><A-B,F><A-B,G>.......

输出: A-B  C,F,G,.....

package cn.itcast.bigdata.mr.fensi;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class SharedFriendsStepOne {static class SharedFriendsStepOneMapper extends Mapper<LongWritable, Text, Text, Text> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// A:B,C,D,F,E,OString line = value.toString();String[] person_friends = line.split(":");String person = person_friends[0];String friends = person_friends[1];for (String friend : friends.split(",")) {// 输出<好友,人>context.write(new Text(friend), new Text(person));}}}static class SharedFriendsStepOneReducer extends Reducer<Text, Text, Text, Text> {@Overrideprotected void reduce(Text friend, Iterable<Text> persons, Context context) throws IOException, InterruptedException {StringBuffer sb = new StringBuffer();for (Text person : persons) {sb.append(person).append(",");}context.write(friend, new Text(sb.toString()));}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(SharedFriendsStepOne.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.setMapperClass(SharedFriendsStepOneMapper.class);job.setReducerClass(SharedFriendsStepOneReducer.class);FileInputFormat.setInputPaths(job, new Path("D:/srcdata/friends"));FileOutputFormat.setOutputPath(job, new Path("D:/temp/out"));job.waitForCompletion(true);}}

package cn.itcast.bigdata.mr.fensi;import java.io.IOException;
import java.util.Arrays;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class SharedFriendsStepTwo {static class SharedFriendsStepTwoMapper extends Mapper<LongWritable, Text, Text, Text> {// 拿到的数据是上一个步骤的输出结果// A I,K,C,B,G,F,H,O,D,// 友 人,人,人
        @Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String[] friend_persons = line.split("\t");String friend = friend_persons[0];String[] persons = friend_persons[1].split(",");Arrays.sort(persons);for (int i = 0; i < persons.length - 1; i++) {for (int j = i + 1; j < persons.length; j++) {// 发出 <人-人,好友> ,这样,相同的“人-人”对的所有好友就会到同1个reduce中去context.write(new Text(persons[i] + "-" + persons[j]), new Text(friend));}}}}static class SharedFriendsStepTwoReducer extends Reducer<Text, Text, Text, Text> {@Overrideprotected void reduce(Text person_person, Iterable<Text> friends, Context context) throws IOException, InterruptedException {StringBuffer sb = new StringBuffer();for (Text friend : friends) {sb.append(friend).append(" ");}context.write(person_person, new Text(sb.toString()));}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(SharedFriendsStepTwo.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.setMapperClass(SharedFriendsStepTwoMapper.class);job.setReducerClass(SharedFriendsStepTwoReducer.class);FileInputFormat.setInputPaths(job, new Path("D:/temp/out/part-r-00000"));FileOutputFormat.setOutputPath(job, new Path("D:/temp/out2"));job.waitForCompletion(true);}}

转载于:https://www.cnblogs.com/duan2/p/7538049.html

MAPREDUCE的实战案例相关推荐

  1. 【MapReduce】分区(分区实战案例)、Combiner、Shuffer

    分区(分区实战案例).Combiner.Shuffer 1 分区 2 根据部门号建立分区 3 Combiner 4 Shuffer 手动反爬虫,禁止转载: 原博地址 https://blog.csdn ...

  2. 2021年大数据Spark(四十一):SparkStreaming实战案例六 自定义输出 foreachRDD

    目录 SparkStreaming实战案例六 自定义输出-foreachRDD 需求 注意: 代码实现 SparkStreaming实战案例六 自定义输出-foreachRDD 需求 对上述案例的结果 ...

  3. Impala内存优化实战案例

    Impala内存优化实战案例 李珂 畅游DT时代 2016-03-25 文章来源:中国联通网研院网优网管部--IT技术研究团队 作者:李珂 一. 引言 Hadoop生态中的NoSQL数据分析三剑客Hi ...

  4. 【大数据AI人工智能】大数据处理实战案例汇总

    大数据处理实战案例汇总 本文总结了一系列大数据处理相关的实战案例,让你一目了然地了解大数据处理技术. 文章目录 大数据处理实战案例汇总 1. 谷歌搜索引擎 2. Facebook实时动态 3. Net ...

  5. 2021年大数据Spark(三十九):SparkStreaming实战案例四 窗口函数

    目录 SparkStreaming实战案例四 窗口函数 需求 代码实现 SparkStreaming实战案例四 窗口函数 需求 使用窗口计算: 每隔5s(滑动间隔)计算最近10s(窗口长度)的数据! ...

  6. 2021年大数据Spark(三十七):SparkStreaming实战案例二 UpdateStateByKey

    目录 SparkStreaming实战案例二 UpdateStateByKey 需求 1.updateStateByKey 2.mapWithState 代码实现 SparkStreaming实战案例 ...

  7. 2021年大数据Spark(三十六):SparkStreaming实战案例一 WordCount

    目录 SparkStreaming实战案例一 WordCount 需求 准备工作 代码实现 第一种方式:构建SparkConf对象 第二种方式:构建SparkContext对象 完整代码如下所示: 应 ...

  8. 【分布式事务】tcc-transaction分布式TCC型事务框架搭建与实战案例(基于Dubbo/Dubbox)...

    一.背景 有一定分布式开发经验的朋友都知道,产品/项目/系统最初为了能够快速迭代上线,往往不太注重产品/项目/系统的高可靠性.高性能与高扩展性,采用单体应用和单实例数据库的架构方式快速迭代开发:当产品 ...

  9. 7个实战案例、24个学习视频、12G干货资料...今天带你免费入门Python数据分析!...

    相信许多做数据的都有这样的经历: 你花了大半天整合了一张数据表,却因为其他部门的错误,导致表格结构全错了!于是你又要吭哧吭哧重新来过... 每次数据都重复洗一遍,还这么慢,要是有一劳永逸的方法就好了. ...

最新文章

  1. 怎么学python-没有任何基础的人,该如何学习Python?「附具体步骤」
  2. php网站制作商品结算怎么做,一种以让产品、信息快速同步多网站销售并结算的技术的制作方法...
  3. python祝福祖国代码_国庆节踩空间留言代码_国庆节祝福正在加载中
  4. 文本编辑软件_IDE与文本编辑器的比较
  5. python spark视频_Spark2.x+Python大数据机器学习视频课程
  6. div中直接绑定富文本值
  7. Microsoft Enterprise Library 5.0 系列(四) Logging Application Block
  8. DEDE内容页调用栏目的SEO标题、描述、关键字的方法
  9. Delphi之Exception获得错误信息
  10. python算法常用技巧与内置库
  11. 关于地统计的一些知识点
  12. 使用c++ winhttp实现post请求
  13. photoshop cs5 中 复制图层的快捷键是那个
  14. 小武实习的debug日记2
  15. JS async库:parallel, series, waterfall, whilst用法
  16. 职场上的“打工者心态”,正在悄悄毁掉你
  17. Collections常用功能、Set与Map集合
  18. 网络天才网页中文版_网络天才电脑版
  19. ASP音乐网站的设计与实现
  20. 《坤之色——楚雄》孙溟㠭中国艺术

热门文章

  1. 企业信息门户与办公自动化的集成应用
  2. Hadoop计算中的Shuffle过程
  3. Cubieboard2裸机开发之(四)定时器操作
  4. int b = 1;int c = b^0xff;求C
  5. pigeon hole
  6. TIPS FOR LIVING AT CAMBRIDGE
  7. advanced search at idiscover
  8. web of science patent search
  9. gscatter in matlab Scatter plot by group machine learning and statics tool box required
  10. TypeSprict -- 基础类型