MAPREDUCE的实战案例
reduce端join算法实现
1、需求:
订单数据表t_order:
id |
date |
pid |
amount |
1001 |
20150710 |
P0001 |
2 |
1002 |
20150710 |
P0001 |
3 |
1002 |
20150710 |
P0002 |
3 |
商品信息表t_product
id |
pname |
category_id |
price |
P0001 |
小米5 |
1000 |
2 |
P0002 |
锤子T1 |
1000 |
3 |
假如数据量巨大,两表的数据是以文件的形式存储在HDFS中,需要用mapreduce程序来实现一下SQL查询运算:
select a.id,a.date,b.name,b.category_id,b.price from t_order a join t_product b on a.pid = b.id
2、实现机制:
通过将关联的条件作为map输出的key,将两表满足join条件的数据并携带数据所来源的文件信息,发往同一个reduce task,在reduce中进行数据的串联
public class OrderJoin {static class OrderJoinMapper extends Mapper<LongWritable, Text, Text, OrderJoinBean> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 拿到一行数据,并且要分辨出这行数据所属的文件String line = value.toString();String[] fields = line.split("\t");// 拿到itemidString itemid = fields[0];// 获取到这一行所在的文件名(通过inpusplit)String name = "你拿到的文件名";// 根据文件名,切分出各字段(如果是a,切分出两个字段,如果是b,切分出3个字段) OrderJoinBean bean = new OrderJoinBean();bean.set(null, null, null, null, null);context.write(new Text(itemid), bean);}}static class OrderJoinReducer extends Reducer<Text, OrderJoinBean, OrderJoinBean, NullWritable> {@Overrideprotected void reduce(Text key, Iterable<OrderJoinBean> beans, Context context) throws IOException, InterruptedException {//拿到的key是某一个itemid,比如1000//拿到的beans是来自于两类文件的bean// {1000,amount} {1000,amount} {1000,amount} --- {1000,price,name}//将来自于b文件的bean里面的字段,跟来自于a的所有bean进行字段拼接并输出 }} }
缺点:这种方式中,join的操作是在reduce阶段完成,reduce端的处理压力太大,map节点的运算负载则很低,资源利用率不高,且在reduce阶段极易产生数据倾斜
解决方案: map端join实现方式
1、原理阐述
适用于关联表中有小表的情形;
可以将小表分发到所有的map节点,这样,map节点就可以在本地对自己所读到的大表数据进行join并输出最终结果,可以大大提高join操作的并发度,加快处理速度
2、实现示例
--先在mapper类中预先定义好小表,进行join
--引入实际场景中的解决方案:一次加载数据库或者用distributedcache
public class TestDistributedCache {static class TestDistributedCacheMapper extends Mapper<LongWritable, Text, Text, Text>{FileReader in = null;BufferedReader reader = null;HashMap<String,String> b_tab = new HashMap<String, String>();String localpath =null;String uirpath = null;//是在map任务初始化的时候调用一次 @Overrideprotected void setup(Context context) throws IOException, InterruptedException {//通过这几句代码可以获取到cache file的本地绝对路径,测试验证用Path[] files = context.getLocalCacheFiles();localpath = files[0].toString();URI[] cacheFiles = context.getCacheFiles();//缓存文件的用法——直接用本地IO来读取//这里读的数据是map task所在机器本地工作目录中的一个小文件in = new FileReader("b.txt");reader =new BufferedReader(in);String line =null;while(null!=(line=reader.readLine())){String[] fields = line.split(",");b_tab.put(fields[0],fields[1]);}IOUtils.closeStream(reader);IOUtils.closeStream(in);}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//这里读的是这个map task所负责的那一个切片数据(在hdfs上)String[] fields = value.toString().split("\t");String a_itemid = fields[0];String a_amount = fields[1];String b_name = b_tab.get(a_itemid);// 输出结果 1001 98.9 banancontext.write(new Text(a_itemid), new Text(a_amount + "\t" + ":" + localpath + "\t" +b_name ));}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(TestDistributedCache.class);job.setMapperClass(TestDistributedCacheMapper.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(LongWritable.class);//这里是我们正常的需要处理的数据所在路径FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));//不需要reducerjob.setNumReduceTasks(0);//分发一个文件到task进程的工作目录job.addCacheFile(new URI("hdfs://hadoop-server01:9000/cachefile/b.txt"));//分发一个归档文件到task进程的工作目录 // job.addArchiveToClassPath(archive);//分发jar包到task节点的classpath下 // job.addFileToClassPath(jarfile); job.waitForCompletion(true);} }
web日志预处理
1、需求:
对web访问日志中的各字段识别切分
去除日志中不合法的记录
根据KPI统计需求,生成各类访问请求过滤数据
2、实现代码:
a) 定义一个bean,用来记录日志数据中的各数据字段
public class WebLogBean {private String remote_addr;// 记录客户端的ip地址private String remote_user;// 记录客户端用户名称,忽略属性"-"private String time_local;// 记录访问时间与时区private String request;// 记录请求的url与http协议private String status;// 记录请求状态;成功是200private String body_bytes_sent;// 记录发送给客户端文件主体内容大小private String http_referer;// 用来记录从那个页面链接访问过来的private String http_user_agent;// 记录客户浏览器的相关信息private boolean valid = true;// 判断数据是否合法public String getRemote_addr() {return remote_addr;}public void setRemote_addr(String remote_addr) {this.remote_addr = remote_addr;}public String getRemote_user() {return remote_user;}public void setRemote_user(String remote_user) {this.remote_user = remote_user;}public String getTime_local() {return time_local;}public void setTime_local(String time_local) {this.time_local = time_local;}public String getRequest() {return request;}public void setRequest(String request) {this.request = request;}public String getStatus() {return status;}public void setStatus(String status) {this.status = status;}public String getBody_bytes_sent() {return body_bytes_sent;}public void setBody_bytes_sent(String body_bytes_sent) {this.body_bytes_sent = body_bytes_sent;}public String getHttp_referer() {return http_referer;}public void setHttp_referer(String http_referer) {this.http_referer = http_referer;}public String getHttp_user_agent() {return http_user_agent;}public void setHttp_user_agent(String http_user_agent) {this.http_user_agent = http_user_agent;}public boolean isValid() {return valid;}public void setValid(boolean valid) {this.valid = valid;}@Overridepublic String toString() {StringBuilder sb = new StringBuilder();sb.append(this.valid);sb.append("\001").append(this.remote_addr);sb.append("\001").append(this.remote_user);sb.append("\001").append(this.time_local);sb.append("\001").append(this.request);sb.append("\001").append(this.status);sb.append("\001").append(this.body_bytes_sent);sb.append("\001").append(this.http_referer);sb.append("\001").append(this.http_user_agent);return sb.toString(); } }
b)定义一个parser用来解析过滤web访问日志原始记录
public class WebLogParser {public static WebLogBean parser(String line) {WebLogBean webLogBean = new WebLogBean();String[] arr = line.split(" ");if (arr.length > 11) {webLogBean.setRemote_addr(arr[0]);webLogBean.setRemote_user(arr[1]);webLogBean.setTime_local(arr[3].substring(1));webLogBean.setRequest(arr[6]);webLogBean.setStatus(arr[8]);webLogBean.setBody_bytes_sent(arr[9]);webLogBean.setHttp_referer(arr[10]);if (arr.length > 12) {webLogBean.setHttp_user_agent(arr[11] + " " + arr[12]);} else {webLogBean.setHttp_user_agent(arr[11]);}if (Integer.parseInt(webLogBean.getStatus()) >= 400) {// 大于400,HTTP错误webLogBean.setValid(false);}} else {webLogBean.setValid(false);}return webLogBean;}public static String parserTime(String time) {time.replace("/", "-");return time;} }
c) mapreduce程序
public class WeblogPreProcess {static class WeblogPreProcessMapper extends Mapper<LongWritable, Text, Text, NullWritable> {Text k = new Text();NullWritable v = NullWritable.get();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();WebLogBean webLogBean = WebLogParser.parser(line);if (!webLogBean.isValid())return;k.set(webLogBean.toString());context.write(k, v);}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(WeblogPreProcess.class);job.setMapperClass(WeblogPreProcessMapper.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));job.waitForCompletion(true);} }
流量统计相关需求
1、对流量日志中的用户统计总上、下行流量技术点: 自定义javaBean用来在mapreduce中充当value
注意: javaBean要实现Writable接口,实现两个方法
//序列化,将对象的字段信息写入输出流 @Overridepublic void write(DataOutput out) throws IOException {out.writeLong(upflow);out.writeLong(downflow);out.writeLong(sumflow);}//反序列化,从输入流中读取各个字段信息 @Overridepublic void readFields(DataInput in) throws IOException {upflow = in.readLong();downflow = in.readLong();sumflow = in.readLong();}
1、统计流量且按照流量大小倒序排序
技术点:这种需求,用一个mapreduce -job 不好实现,需要两个mapreduce -job
第一个job负责流量统计,跟上题相同
第二个job读入第一个job的输出,然后做排序
要将flowBean作为map的key输出,这样mapreduce就会自动排序 此时,flowBean要实现接口WritableComparable 要实现其中的compareTo()方法,方法中,我们可以定义倒序比较的逻辑
1、统计流量且按照手机号的归属地,将结果数据输出到不同的省份文件中技术点:自定义Partitioner
@Overridepublic int getPartition(Text key, FlowBean value, int numPartitions) {String prefix = key.toString().substring(0,3);Integer partNum = pmap.get(prefix);return (partNum==null?4:partNum);}
自定义partition后,要根据自定义partitioner的逻辑设置相应数量的reduce task
job.setNumReduceTasks(5); |
注意:如果reduceTask的数量>= getPartition的结果数 ,则会多产生几个空的输出文件part-r-000xx
如果 1<reduceTask的数量<getPartition的结果数 ,则有一部分分区数据无处安放,会Exception!!!
如果 reduceTask的数量=1,则不管mapTask端输出多少个分区文件,最终结果都交给这一个reduceTask,最终也就只会产生一个结果文件 part-r-00000
社交粉丝数据分析
以下是qq的好友列表数据,冒号前是一个用,冒号后是该用户的所有好友(数据中的好友关系是单向的)
A:B,C,D,F,E,O
B:A,C,E,K
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J
求出哪些人两两之间有共同好友,及他俩的共同好友都有谁?
解题思路:
求出哪些人两两之间有共同好友,及他俩的共同好友都有谁?
解题思路:
第一步 map 读一行 A:B,C,D,F,E,O 输出 <B,A><C,A><D,A><F,A><E,A><O,A> 在读一行 B:A,C,E,K 输出 <A,B><C,B><E,B><K,B> REDUCE 拿到的数据比如<C,A><C,B><C,E><C,F><C,G>...... 输出: <A-B,C> <A-E,C> <A-F,C> <A-G,C> <B-E,C> <B-F,C>..... 第二步 map 读入一行<A-B,C> 直接输出<A-B,C> reduce 读入数据 <A-B,C><A-B,F><A-B,G>....... 输出: A-B C,F,G,..... |
package cn.itcast.bigdata.mr.fensi;import java.io.IOException;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class SharedFriendsStepOne {static class SharedFriendsStepOneMapper extends Mapper<LongWritable, Text, Text, Text> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// A:B,C,D,F,E,OString line = value.toString();String[] person_friends = line.split(":");String person = person_friends[0];String friends = person_friends[1];for (String friend : friends.split(",")) {// 输出<好友,人>context.write(new Text(friend), new Text(person));}}}static class SharedFriendsStepOneReducer extends Reducer<Text, Text, Text, Text> {@Overrideprotected void reduce(Text friend, Iterable<Text> persons, Context context) throws IOException, InterruptedException {StringBuffer sb = new StringBuffer();for (Text person : persons) {sb.append(person).append(",");}context.write(friend, new Text(sb.toString()));}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(SharedFriendsStepOne.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.setMapperClass(SharedFriendsStepOneMapper.class);job.setReducerClass(SharedFriendsStepOneReducer.class);FileInputFormat.setInputPaths(job, new Path("D:/srcdata/friends"));FileOutputFormat.setOutputPath(job, new Path("D:/temp/out"));job.waitForCompletion(true);}}
package cn.itcast.bigdata.mr.fensi;import java.io.IOException; import java.util.Arrays;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class SharedFriendsStepTwo {static class SharedFriendsStepTwoMapper extends Mapper<LongWritable, Text, Text, Text> {// 拿到的数据是上一个步骤的输出结果// A I,K,C,B,G,F,H,O,D,// 友 人,人,人 @Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String[] friend_persons = line.split("\t");String friend = friend_persons[0];String[] persons = friend_persons[1].split(",");Arrays.sort(persons);for (int i = 0; i < persons.length - 1; i++) {for (int j = i + 1; j < persons.length; j++) {// 发出 <人-人,好友> ,这样,相同的“人-人”对的所有好友就会到同1个reduce中去context.write(new Text(persons[i] + "-" + persons[j]), new Text(friend));}}}}static class SharedFriendsStepTwoReducer extends Reducer<Text, Text, Text, Text> {@Overrideprotected void reduce(Text person_person, Iterable<Text> friends, Context context) throws IOException, InterruptedException {StringBuffer sb = new StringBuffer();for (Text friend : friends) {sb.append(friend).append(" ");}context.write(person_person, new Text(sb.toString()));}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(SharedFriendsStepTwo.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.setMapperClass(SharedFriendsStepTwoMapper.class);job.setReducerClass(SharedFriendsStepTwoReducer.class);FileInputFormat.setInputPaths(job, new Path("D:/temp/out/part-r-00000"));FileOutputFormat.setOutputPath(job, new Path("D:/temp/out2"));job.waitForCompletion(true);}}
转载于:https://www.cnblogs.com/duan2/p/7538049.html
MAPREDUCE的实战案例相关推荐
- 【MapReduce】分区(分区实战案例)、Combiner、Shuffer
分区(分区实战案例).Combiner.Shuffer 1 分区 2 根据部门号建立分区 3 Combiner 4 Shuffer 手动反爬虫,禁止转载: 原博地址 https://blog.csdn ...
- 2021年大数据Spark(四十一):SparkStreaming实战案例六 自定义输出 foreachRDD
目录 SparkStreaming实战案例六 自定义输出-foreachRDD 需求 注意: 代码实现 SparkStreaming实战案例六 自定义输出-foreachRDD 需求 对上述案例的结果 ...
- Impala内存优化实战案例
Impala内存优化实战案例 李珂 畅游DT时代 2016-03-25 文章来源:中国联通网研院网优网管部--IT技术研究团队 作者:李珂 一. 引言 Hadoop生态中的NoSQL数据分析三剑客Hi ...
- 【大数据AI人工智能】大数据处理实战案例汇总
大数据处理实战案例汇总 本文总结了一系列大数据处理相关的实战案例,让你一目了然地了解大数据处理技术. 文章目录 大数据处理实战案例汇总 1. 谷歌搜索引擎 2. Facebook实时动态 3. Net ...
- 2021年大数据Spark(三十九):SparkStreaming实战案例四 窗口函数
目录 SparkStreaming实战案例四 窗口函数 需求 代码实现 SparkStreaming实战案例四 窗口函数 需求 使用窗口计算: 每隔5s(滑动间隔)计算最近10s(窗口长度)的数据! ...
- 2021年大数据Spark(三十七):SparkStreaming实战案例二 UpdateStateByKey
目录 SparkStreaming实战案例二 UpdateStateByKey 需求 1.updateStateByKey 2.mapWithState 代码实现 SparkStreaming实战案例 ...
- 2021年大数据Spark(三十六):SparkStreaming实战案例一 WordCount
目录 SparkStreaming实战案例一 WordCount 需求 准备工作 代码实现 第一种方式:构建SparkConf对象 第二种方式:构建SparkContext对象 完整代码如下所示: 应 ...
- 【分布式事务】tcc-transaction分布式TCC型事务框架搭建与实战案例(基于Dubbo/Dubbox)...
一.背景 有一定分布式开发经验的朋友都知道,产品/项目/系统最初为了能够快速迭代上线,往往不太注重产品/项目/系统的高可靠性.高性能与高扩展性,采用单体应用和单实例数据库的架构方式快速迭代开发:当产品 ...
- 7个实战案例、24个学习视频、12G干货资料...今天带你免费入门Python数据分析!...
相信许多做数据的都有这样的经历: 你花了大半天整合了一张数据表,却因为其他部门的错误,导致表格结构全错了!于是你又要吭哧吭哧重新来过... 每次数据都重复洗一遍,还这么慢,要是有一劳永逸的方法就好了. ...
最新文章
- 怎么学python-没有任何基础的人,该如何学习Python?「附具体步骤」
- php网站制作商品结算怎么做,一种以让产品、信息快速同步多网站销售并结算的技术的制作方法...
- python祝福祖国代码_国庆节踩空间留言代码_国庆节祝福正在加载中
- 文本编辑软件_IDE与文本编辑器的比较
- python spark视频_Spark2.x+Python大数据机器学习视频课程
- div中直接绑定富文本值
- Microsoft Enterprise Library 5.0 系列(四) Logging Application Block
- DEDE内容页调用栏目的SEO标题、描述、关键字的方法
- Delphi之Exception获得错误信息
- python算法常用技巧与内置库
- 关于地统计的一些知识点
- 使用c++ winhttp实现post请求
- photoshop cs5 中 复制图层的快捷键是那个
- 小武实习的debug日记2
- JS async库:parallel, series, waterfall, whilst用法
- 职场上的“打工者心态”,正在悄悄毁掉你
- Collections常用功能、Set与Map集合
- 网络天才网页中文版_网络天才电脑版
- ASP音乐网站的设计与实现
- 《坤之色——楚雄》孙溟㠭中国艺术
热门文章
- 企业信息门户与办公自动化的集成应用
- Hadoop计算中的Shuffle过程
- Cubieboard2裸机开发之(四)定时器操作
- int b = 1;int c = b^0xff;求C
- pigeon hole
- TIPS FOR LIVING AT CAMBRIDGE
- advanced search at idiscover
- web of science patent search
- gscatter in matlab Scatter plot by group machine learning and statics tool box required
- TypeSprict -- 基础类型