详解MapReduce实现数据去重与倒排索引应用场景案例
Hadoop笔试题: 找出不同人的共同好友(要考虑数据去重)
例子:
张三:李四,王五,赵六
李四:张三,田七,王五
实际工作中,数据去重用的还是挺多的,包括空值的过滤等等,本文就 数据去重 与 倒排索引 详细讲解一下.
一、数据去重[模拟某运营商呼叫详单去重]
项目中统计数据集的种类个数、网站日志文件计算访问地等案例都会涉及到数据去重,重复数据删除等都是经常使用的存储数据缩减技术.通过一个简单案例来说明MapReduce怎么实现数据去重.
①原始模拟数据[c呼出,b呼入]
13711111111 c
13611111111 b
13711111111 b
13722222222 c
13611111111 c
13711111111 b
13611111111 b
13711111111 b
13722222222 b
13611111111 c
将同一条数据所有记录交给一台Reduce,最终结果输出一次即可.
Map阶段采用Hadoop默认作业输入方式后,输入的value作为输出的key.
//Mapper任务
static class DDMap extends Mapper<LongWritable,Text,Text,Text>{
private static Text line = new Text();
protected void map(LongWritable k1,Text v1,Context context){
line = v1;
Text text = new Text(“”);
try {
context.write(line,text);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
};
}
//Reducer任务
static class DDReduce extends Reducer<Text,Text,Text,Text>{
protected void reduce(Text k2,Iterable<Text> v2s,Context context){
Text text = new Text(“”);
try {
context.write(k2, text);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
};
}
}
//初始化参数
public static final String HOST_PATH=”hdfs://v:9000″;
//读取文件路径【需要手动创建】
public static final String INPUT_PATH=HOST_PATH+”/DDin”;
//输出文件路径
public static final String OUTPUT_PATH=HOST_PATH+”/DDout”;
//执行mapreduce任务驱动
public static void main (String[] args) throws Exception{
final Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI(HOST_PATH), conf);
if(fs.exists(new Path(OUTPUT_PATH))){
fs.delete(new Path(OUTPUT_PATH), true);
}
//创建job对象
final Job job = new Job(conf);
//通知job文件输入路径
FileInputFormat.setInputPaths(job, INPUT_PATH);
//通知job文件输出路径
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
//通知job将输入文件解析成键值对的方式【默认可省略】
job.setInputFormatClass(TextInputFormat.class);
//调用自定义的Mapper函数
job.setMapperClass(DDMap.class);
//设置k2,v2类型,如果<k2,v2><k3,v3>类型一致,可以省略
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//调用自定义的Reducer函数
job.setReducerClass(DDReduce.class);
//设置k3,v3类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//通知job将<k3,v3>写入HDFS中的方式[默认值,可省略]
job.setOutputFormatClass(TextOutputFormat.class);
//执行job
job.waitForCompletion(true);
}
13611111111 b
13611111111 c
13711111111 b
13711111111 c
13722222222 b
13722222222 c
二、倒排索引(Inverted Index)
倒排索引是文档检索系统最常用的数据结构,广泛用于 全文搜索引擎 .
主要用来存储某个单词或词组在某个文档或一组文档中存储位置的映射,即提供了一种根据内容来查找文档的方式.
因为不是根据文档来确定文档内容,而是进行相反操作,所以叫做倒排索引.
实际应用中, 每个文档对应一个权重值,此用来指每个文档与搜索内容的相关度.
最常用使用词频作为权重值 ,即记录单词在文档中出现的次数.
更复杂的权重或还要记录单词在多个文档中出现过,以实现TF-IDF(Term Frequency-Inverse Document Frequency)算法,或考虑单词在文档中的位置等等.
File1:MapReduce is simple
File2:MapReduce is powerful is simple
File3:Hello MapReduce bye MapReduce
关注的信息:单词、文档URL、词频.
<key,value>类似:<”MapReduce” File1.txt 1>
<key,value>对只能有两个值,根据需求需要将 File1.txt 1合并作为value .
单词作为key的好处 :利用MR框架默认的排序,将同一文档的相同单词的词频组成列表,传递给Combine过程.
URL与词频合并为value的好处 :利用MR框架默认的HashPartitioner类完成Shuffle过程,将相同单词的所有记录发送给同一个Reducer处理.
通过一个Reduce无法同时完成词频统计与生成文档列表,需要添加Combine过程完成词频统计.
Combine过程将key值相同的value值累加,获取该key(单词)在本文档中的词频数.
将相同key的value组合成倒排索引文件所需的格式即可.
单词文件不宜过大,要保证每个文件对应一个split,否则由于Reduce过程没有进一步统计词频,最终结果可能会出现词频未统计完全的单词,可通过重写InputFormat类将每个文件作为一个split.还可利用复合键值对等实现包含更多信息的倒排索引.
package sort;
import java.io.IOException;
import java.net.URI;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class Test {
// File1:MapReduce is simple
// File2:MapReduce is powerful is simple
// File3:Hello MapReduce bye MapReduce
// Hello File3.txt:1;
// MapReduce File3.txt:2;File1.txt:1;File2.txt:1;
// bye File3.txt:1;
// is File1.txt:1;File2.txt:2;
// powerful File2.txt:1;
// simple File2.txt:1;File1.txt:1;
static class IIMap extends Mapper<LongWritable, Text, Text, Text> {
private static Text key = new Text();// 单词和URL
private static Text value = new Text();// 词频
private FileSplit fileSplit;// Split对象
protected void map(LongWritable k1, Text v1, Context context) {
// 获取<k1,v1>对所属的FileSplit对象
FileSplit fileSplit = (FileSplit) context.getInputSplit();
StringTokenizer stringTokenizer = new StringTokenizer(v1.toString());
while (stringTokenizer.hasMoreTokens()) {
int indexOf = fileSplit.getPath().toString().indexOf("File");
key.set(stringTokenizer.nextToken() + ":"
+ fileSplit.getPath().toString().substring(indexOf));
value.set("1");
try {
context.write(key, value);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
};
}
// Combiner任务
static class IICombiner extends Reducer<Text, Text, Text, Text> {
private Text text = new Text();
protected void reduce(Text key, Iterable<Text> values, Context context) {
// 统计词频
int sum = 0;
for (Text value : values) {
sum += Integer.parseInt(value.toString());
}
int splitIndex = key.toString().indexOf(":");
// 重设value值 URL词频合并
text.set(key.toString().substring(splitIndex + 1) + ":" + sum);
// 重设key为单词
key.set(key.toString().substring(0, splitIndex));
try {
context.write(key, text);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
};
}
// Reducer任务
static class IIReduce extends Reducer<Text, Text, Text, Text> {
private Text v3 = new Text();
protected void reduce(Text k2, Iterable<Text> v2s, Context context) {
// 生成文档列表
String fileList = new String();
for (Text value : v2s) {
fileList += value.toString() + ";";
}
v3.set(fileList);
try {
context.write(k2, v3);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
};
}
// 初始化参数
public static final String HOST_PATH = "hdfs://v:9000";
// 读取文件路径【需要手动创建】
public static final String INPUT_PATH = HOST_PATH + "/IIin";
// 输出文件路径
public static final String OUTPUT_PATH = HOST_PATH + "/IIout";
// 执行mapreduce任务驱动
public static void main(String[] args) throws Exception {
final Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI(HOST_PATH), conf);
if (fs.exists(new Path(OUTPUT_PATH))) {
fs.delete(new Path(OUTPUT_PATH), true);
}
// 创建job对象
final Job job = new Job(conf);
// 通知job文件输入路径
FileInputFormat.setInputPaths(job, INPUT_PATH);
// 通知job文件输出路径
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
// 通知job将输入文件解析成键值对的方式【默认可省略】
job.setInputFormatClass(TextInputFormat.class);
// 调用自定义的Mapper函数
job.setMapperClass(IIMap.class);
// 设置k2,v2类型,如果<k2,v2><k3,v3>类型一致,可以省略
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setCombinerClass(IICombiner.class);
// 调用自定义的Reducer函数
job.setReducerClass(IIReduce.class);
// 设置k3,v3类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 通知job将<k3,v3>写入HDFS中的方式[默认值,可省略]
job.setOutputFormatClass(TextOutputFormat.class);
// 执行job
job.waitForCompletion(true);
}
}
Hello File3.txt:1;
MapReduce File3.txt:2;File1.txt:1;File2.txt:1;
bye File3.txt:1;
is File1.txt:1;File2.txt:2;
powerful File2.txt:1;
simple File2.txt:1;File1.txt:1;
详解MapReduce实现数据去重与倒排索引应用场景案例相关推荐
- MapReduce的数据去重功能
实验材料及说明 现有某电商网站用户对商品的收藏数据,记录了用户收藏的商品id以及收藏日期,文件名为buyer_favorite.buyer_favorite包含:买家id,商品id,收藏日期这三个字段 ...
- 稳扎稳打Silverlight(17) - 2.0数据之详解DataGrid, 绑定数据到ListBox
[索引页] [源码下载] 稳扎稳打Silverlight(17) - 2.0数据之详解DataGrid, 详解ListBox 作者:webabcd 介绍 Silverlight 2.0 详解DataG ...
- 大数据架构详解_【数据如何驱动增长】(3)大数据背景下的数仓建设 amp; 数据分层架构设计...
背景 了解数据仓库.数据流架构的搭建原理对于合格的数据分析师或者数据科学家来说是一项必不可少的能力.它不仅能够帮助分析人员更高效的开展分析任务,帮助公司或者业务线搭建一套高效的数据处理架构,更是能够从 ...
- python归一化处理_详解python实现数据归一化处理的方式:(0,1)标准化
在机器学习过程中,对数据的处理过程中,常常需要对数据进行归一化处理,下面介绍(0, 1)标准化的方式,简单的说,其功能就是将预处理的数据的数值范围按一定关系"压缩"到(0,1)的范 ...
- [转]Hadoop集群_WordCount运行详解--MapReduce编程模型
Hadoop集群_WordCount运行详解--MapReduce编程模型 下面这篇文章写得非常好,有利于初学mapreduce的入门 http://www.nosqldb.cn/1369099810 ...
- mysql c接口返回自增id_详解mysql插入数据后返回自增ID的七种方法
引言 mysql 和 oracle 插入的时候有一个很大的区别是: oracle 支持序列做 id: mysql 本身有一个列可以做自增长字段. mysql 在插入一条数据后,如何能获得到这个自增 i ...
- mysql 新增返回主键自增id_详解mysql插入数据后返回自增ID的七种方法
引言 mysql 和 oracle 插入的时候有一个很大的区别是: oracle 支持序列做 id: mysql 本身有一个列可以做自增长字段. mysql 在插入一条数据后,如何能获得到这个自增 i ...
- mysql影响行数解析_详解MySQL的数据行和行溢出机制
一.行 有哪些格式? 你可以像下面这样看一下你的mysql行格式设置. 其实mysql的数据行有两种格式,一种就是图中的 compact格式,还有一种是redundant格式. compact是一种紧 ...
- 5G QoS控制原理专题详解-SM策略数据的源头
相关文章会在公众号同步更新.公众号:5G通信大家学 持续更新的相关5G内容都是直接根据3GPP整理,保证更新内容的准确性,避免通过二手,甚至多手的资料,以讹传讹误导各位同学.如果大家阅读时发现问题,随 ...
最新文章
- 机器人技术推动工业领域的数字革命
- 中国伺服电机行业运营现状及前景趋势展望报告2022-2028年版
- 【libsvm 错误使用mex】
- mysql master 配置_MySQL双Master配置的方法详解
- C++中cstring和int互换
- ElasticSearch实践(三)Rest API简介
- 【题解】luoguP2680运输计划
- 版本控制工具——subversion
- Linux 教程: (Linux基础+命令大全)
- Wing IDE Pro 6 for Mac(Python开发工具)安装破解图文教程
- 智慧校园生态圈方案介绍
- 树莓派改造无线打印机
- Kaggle天池比赛经验
- 第十七届智能视觉组线上赛比赛流程及相关补充说明
- html文件在Chrome打开中文乱码
- 咸鱼APP产品使用报告体验分析
- 20 October in ss
- html怎么加深字体颜色,我打印网页的字的颜色非常浅,怎样才能加深? – 手机爱问...
- 内网环境能连接数据库 使用vpn用工具能连接数据库但是java驱动连接不了
- php读取excel文件数据
热门文章
- 基于Android的点餐系统设计与实现
- 计算机专业、物联网工程大一寒假规划必备篇
- 支持linux的usb无线网卡芯片,Realtek RTL8188CUSamp;RTL8188ETV 型芯片USB无线网卡驱动程序for Linuxamp;Android...
- 屹立千年,只为你一个回眸
- 电源管理芯片:LED驱动电源芯片的计划及面积
- 计算机机器人游戏教学计划,机器人教学计划.docx
- 《从两月失败职场经历看内部创业四大弊病》有感
- 财路网每日原创推送: 创世区块10年:记住这群加密狂魔
- 《RabbitMQ实战指南》读书笔记
- 什么是IP地址 IP地址的工作原理