概述

本文要说的 TF-IDF 分布式实现,运用了很多之前 MapReduce 的核心知识点。算是 MapReduce 的一个小应用吧。


版权说明

著作权归作者所有。
商业转载请联系作者获得授权,非商业转载请注明出处。
本文作者:Q-WHai
发表日期: 2016年6月24日
本文链接:https://qwhai.blog.csdn.net/article/details/51747801
来源:CSDN
更多内容:分类 >> 大数据之 Hadoop


学前导读

本文并不打算再啰里啰嗦地讲解一大堆 TF-IDF 的概念,以及 TF-IDF 能够做什么。如果你对此还不够了解,可以转到我的另一篇博客《 数据挖掘:基于TF-IDF算法的数据集选取优化 》进行学习。
由于本人的语言表达可能并不十分简单明了,如果你阅读本文的时候遇到一些难以理解的地方,可以点击下面相关的链接进行学习。这些都是本文的基础和前提,当然也可以提交评论与我进行交流。

  • 《 数据挖掘:基于TF-IDF算法的数据集选取优化 》
  • 《 从 WordCount 到 MapReduce 计算模型 》
  • 《 MapReduce 进阶:多 MapReduc e的链式模式 》
  • 《 MapReduce 进阶:多路径输入输出 》
  • 《 MapReduce 进阶:Partitioner 组件 》

算法框架

首先我们来看一下,分布式的 TF-IDF 的算法框架图:

在图中,我们有三个大模块,这三个大模块正是 MapReduce 中的三个 Job。
在学习 TF-IDF 的时候我们就知道了,TF-IDF 的计算可以分成三个部分进行。第一个阶段:计算各个文档中每个单词的 TF 值;第二阶段:计算所有文档中所有单词的 IDF 值;第三个阶段:计算各个文档中各个单词的 TF-IDF 值。在单机的环境下,很容易实现这些计算。可是,分布式环境下要怎么做呢?于是,根据这三个阶段,我设计了上面的架构图。
TFMapReduceCore 类包含的是计算 TF 的核心类,IDFMapReduceCore 中则包含了 IDF 的核心类,IntegrateCore 中包含的是将 TF、IDF 的结果进行整合,从而计算最终的 TF-IDF 结果。且这里还产生了两个中间输出目录,而这两个中间输出目录也正是第三个阶段的输入目录,这一步中,需要用到 MapReduce 的多路径输入。上面也有专门的文章描述了这一块。


代码实现

TFMapReduceCore

这里我将与计算 TF 相关的代码封装在同一个 TFMapReduceCore 类中,其中的 TFMapper, TFReducer 等都是 TFMapReduceCore 类的一个子类。

TFMapper

public static class TFMapper extends Mapper<Object, Text, Text, Text> {private final Text one = new Text("1");private Text label = new Text();private int allWordCount = 0;private String fileName = "";@Overrideprotected void setup(Mapper<Object, Text, Text, Text>.Context context)throws IOException, InterruptedException {fileName = getInputSplitFileName(context.getInputSplit());}@Overrideprotected void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context)throws IOException, InterruptedException {StringTokenizer tokenizer = new StringTokenizer(value.toString());while (tokenizer.hasMoreTokens()) {allWordCount++;label.set(String.join(":", tokenizer.nextToken(), fileName));context.write(label, one);}}@Overrideprotected void cleanup(Mapper<Object, Text, Text, Text>.Context context)throws IOException, InterruptedException {context.write(new Text("!:" + fileName), new Text(String.valueOf(allWordCount)));}private String getInputSplitFileName(InputSplit inputSplit) {String fileFullName = ((FileSplit)inputSplit).getPath().toString();String[] nameSegments = fileFullName.split("/");return nameSegments[nameSegments.length - 1];}
}

因为我们输入的源文件是用一个文件表示一个分类,如果你是以其他规则划分,那么可以不必遵从本文的逻辑。上面我首先在 setup() 里获取文件名,这样做的目的在于不用在 map() 中重复获取,从而提升程序的效率。并且在 cleanup() 里把文件名(也就是分类)信息写入到 Mapper 的输出路径中。
大家可能注意到了这里我写入文件名的时候,使用了一个技巧,使用“!”充当了一个单词。因为这个字符的 ASCII 码比所有的字符的 ASCII 码都要小,这样做的目的是可以让这条记录在其他所有记录之前被访问( 这里所指的其他所有记录指的是,同一个分类中的所有记录。因为这里我们有对 Mapper 的输出做 Partitioner 分区 )。

TFCombiner & TFReducer

从上面的 Mapper 中可以看到 Mapper 输出的 key 的格式为: : 。如此,只要去解析 key 中的 keyword 就可以了。而在 Mapper 的 cleanup() 方法中还写入文件的信息。这样一来,我们就可以使用这个 “!: allWordCount” 对每个文件进行区分开来。区分的原理之前也说到过了,就是因为 “!” 的 ASCII 码最小的原因。

public static class TFCombiner extends Reducer<Text, Text, Text, Text> {private int allWordCount = 0;@Overrideprotected void reduce(Text key, Iterable<Text> values,Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {if (values == null) {return;}if(key.toString().startsWith("!")) {allWordCount = Integer.parseInt(values.iterator().next().toString());return;}int sumCount = 0;for (Text value : values) {sumCount += Integer.parseInt(value.toString());}double tf = 1.0 * sumCount / allWordCount;context.write(key, new Text(String.valueOf(tf)));}
}

通过上面的 Combiner 的 reduce 操作之后,所有单词的 TF 值都已经计算完成。再通过一次 Reducer 操作就 ok 了。Reducer 的代码如下:

public static class TFReducer extends Reducer<Text, Text, Text, Text> {@Overrideprotected void reduce(Text key, Iterable<Text> values,Reducer<Text, Text, Text, Text>.Context context)throws IOException, InterruptedException {if (values == null) {return;}for (Text value : values) {context.write(key, value);}}
}

TFPartitioner

在 Partitioner 分区这一块,就简单地以自定义的 Hash Partitioner 作为分区类。如果你有更加严格的要求,可以参考我之前的博客《MapReduce 进阶:Partitioner 组件》。

public static class TFPartitioner extends Partitioner<Text, Text> {@Overridepublic int getPartition(Text key, Text value, int numPartitions) {String fileName = key.toString().split(":")[1];return Math.abs((fileName.hashCode() * 127) % numPartitions);}
}

IDFMapReduceCore

这里我将与计算 IDF 相关的代码封装在同一个 IDFMapReduceCore 类中,其中的 IDFMapper, IDFReducer 都是 IDFMapReduceCore 类的一个子类。

IDFMapper

因为 IDF 的计算是针对所有文档的,所以在 IDFMapper 中可以直接按照计算 WordCount 的逻辑来编写就 ok 了。因为在计算 IDF 时,我们不需要关心某一个单词的词频,所以这里统一的使用 1 填充 mapper 的输出 value.

public static class IDFMapper extends Mapper<Object, Text, Text, Text> {private final Text one = new Text("1");private Text label = new Text();@Overrideprotected void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context)throws IOException, InterruptedException {StringTokenizer tokenizer = new StringTokenizer(value.toString());label.set(tokenizer.nextToken().split(":")[0]);context.write(label, one);}
}

IDFReducer

在前面我们已经统计了某一个单词在某一个文档(分类)出现的标志,也就是单词 W 在文档 D 中出现过了一次。这样一来,我们就可以统计出单词 W 在全部文档中出现过多少次了。而这一思想,正是计算 WordCount 逻辑。所以代码很好编写。等等,我们还需要计算所有的文档数。是的,在计算 IDF 的公式中,我们需要知道一共有多少个文档。可是,在当前的情况下我们无法获得这个值,因为这是在 Reducer 中。虽然在 Reducer 里面无法计算文档总数,但是在 Reducer 外面却可以。这个过程就是纯粹的 Java 逻辑,很简单,不多说了。
当我们知道了训练文档总数,就可以通过 job 将信息传递给 Reducer。只是这里我们并不是调用 job.setNumReduceTasks(N),而是调用了 job.setProfileParams(msg) 方法。

public static class IDFReducer extends Reducer<Text, Text, Text, Text> {private Text label = new Text();@Overrideprotected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)throws IOException, InterruptedException {if (values == null) {return;}int fileCount = 0;for (Text value : values) {fileCount += Integer.parseInt(value.toString());}label.set(String.join(":", key.toString(), "!"));int totalFileCount = Integer.parseInt(context.getProfileParams()) - 1;double idfValue = Math.log10(1.0 * totalFileCount / (fileCount + 1));context.write(label, new Text(String.valueOf(idfValue)));}
}

IntegrateCore

这里我将与计算 TF-IDF 相关的代码封装在同一个 IntegrateCore 类中,其中的 IntegrateMapper, IntegrateReducer 都是 IntegrateCore 类的一个子类。在计算的最后一步中,没有什么需要说明的地方。只是,前面计算 TF、IDF 产生的中间输出文件的格式并不统一,所以这里需要对不同格式的文件内容进行不同的考虑。
IntegrateMapper

public static class IntegrateMapper extends Mapper<Object, Text, Text, Text> {@Overrideprotected void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context)throws IOException, InterruptedException {StringTokenizer tokenizer = new StringTokenizer(value.toString());context.write(new Text(tokenizer.nextToken()), new Text(tokenizer.nextToken()));}
}

IntegrateReducer

public static class IntegrateReducer extends Reducer<Text, Text, Text, Text> {private double keywordIDF = 0.0d;private Text value = new Text();@Overrideprotected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)throws IOException, InterruptedException {if (values == null) {return;}if (key.toString().split(":")[1].startsWith("!")) {keywordIDF = Double.parseDouble(values.iterator().next().toString());return;}value.set(String.valueOf(Double.parseDouble(values.iterator().next().toString()) * keywordIDF));context.write(key, value);}
}

测试运行

数据源

android

android
java
activity
map

hadoop

map
reduce
ssh
mapreduce

ios

ios
iphone
jobs

java

java
code
eclipse
java
map

python

python
pycharm

执行命令

执行此命令之前,请先将测试数据上传到 HDFS 的 /input 目录下。

$ hadoop jar temp/run.jar /input /output

执行结果

activity:android 0.0994850021680094
android:android 0.0994850021680094
code:java   0.07958800173440753
eclipse:java    0.07958800173440753
ios:ios 0.13264666955734586
iphone:ios  0.13264666955734586
java:android    0.0554621874040891
java:java   0.08873949984654256
jobs:ios    0.13264666955734586
map:android 0.024227503252014105
map:hadoop  0.024227503252014105
map:java    0.019382002601611284
mapreduce:hadoop    0.0994850021680094
pycharm:python  0.1989700043360188
python:python   0.1989700043360188
reduce:hadoop   0.0994850021680094
ssh:hadoop  0.0994850021680094

看到这个结果你可能会认为这个结果不一定可靠。如果你怀疑这些结果,你可以自己编写一个单机版的 Java 程序进行验证。当然,我已经验证过了。

Job

此处是浏览器登录 Cluster Metrics 的信息展示。显示的是程序在执行完成之后的内容,看到有三个 Job 参与了 TF-IDF 的计算。


GitHub download

  • https://github.com/Hadoop-league/TF-IDF_MR

征集

如果你也需要使用ProcessOn这款在线绘图工具,可以使用如下邀请链接进行注册:
https://www.processon.com/i/56205c2ee4b0f6ed10838a6d

MapReduce 应用:TF-IDF 分布式实现相关推荐

  1. 搜索引擎:文本分类——TF/IDF算法

    原理 TFIDF的主要思想是:如果某个词或短语在一篇文章中出现的频率TF高,并且在其他文章中很少出现,则认为此词或者短语具有很好的类别区分能力,适合用来分类.TFIDF实际上是:TF * IDF,TF ...

  2. MapReduce基础开发之五分布式下载ftp文件到本地再迁移到hdfs

    为利用Hadoop集群平台的分布存储和计算能力,基于MapReduce将ftp文件分布式下载并上传到HDFS中. 1.文件移动流程:ftp服务器->datanode本地目录->HDFS目录 ...

  3. 关键词提取算法—TF/IDF算法

    关键词提取算法一般可分为有监督学习和无监督学习两类. 有监督的关键词提取方法可以通过分类的方式进行,通过构建一个较为完善的词表,然后判断每个文档与词表中的每个词的匹配程度,以类似打标签的方式,达到关键 ...

  4. tf/idf_Neo4j:带密码的TF / IDF(和变体)

    tf/idf 几周前,我写了一篇博客文章,介绍了如何使用scikit-learn在HIMYM成绩单上运行TF / IDF,以按情节找到最重要的短语,然后我很好奇在Neo4j中很难做到. 我首先将Wik ...

  5. Neo4j:带密码的TF / IDF(和变体)

    几周前,我写了一篇博客文章,介绍了如何使用scikit-learn在HIMYM成绩单上运行TF / IDF,以按情节找到最重要的短语,然后我很好奇在Neo4j中很难做到. 我首先将Wikipedia的 ...

  6. scitkit-learn:计算机科学论文的TF / IDF和余弦相似度

    几个月前,我下载了数千篇计算机科学论文的元数据,这样我就可以尝试编写一个迷你推荐引擎来告诉我接下来应该读什么论文. 由于我没有任何人可以阅读每篇论文的数据,因此排除了协作过滤方法,所以我认为我可以尝试 ...

  7. 随笔_从《芳华》影评到TF/IDF算法

     前两天看好多<芳华>的影评说:为什么好人没好报?于是感叹一堆世态炎凉,人性丑陋什么的.我觉得这问题是:为什么中央空调(对谁都温暖)往往不被看好.  先说说TF/IDF算法,这是一种信息处 ...

  8. 文本特征抽取的向量空间模型(VSM)和TF/IDF方法

    文本特征抽取 两组小说,一组是爱情的,另一组是科幻的.我们能否用支持向量机训练一个模型,用来识别小说类型呢? 这个并不容易.因为支持向量机这类机器学习算法只能接受数学里面的向量作为输入.如果用它来做文 ...

  9. 使用lingpipe自然语言处理包进行文本分类/** * 使用 lingpipe的tf/idf分类器训练语料 * * @author laigood */ public class trai

    /**  * 使用 lingpipe的tf/idf分类器训练语料  *   * @author laigood  */ public class traintclassifier { //训练语料文件 ...

  10. tf-idf词向量和bow_使用词袋Bow和TF IDF进行多标签分类

    tf-idf词向量和bow 1.加载数据 (1. Load the data) For this study, we are using Kaggle data for Toxic Comment C ...

最新文章

  1. 同时使用多网卡提升树莓派网络利用率
  2. IDEA Properties中文unicode转码问题
  3. Oracle EBS-SQL (PO-13):检查采购物料无一揽子协议价格.sql
  4. FMS3系列(二):创建可交互的FMS连接--I can say:Hello World
  5. css字体自定义,bootstrap自定义字体
  6. bigdecimal 加法_巧用加法的运算律,简化有理数的加法运算
  7. (25)FPGA乘法器设计(第5天)
  8. 区块链如何击败 AI、云计算成为最受欢迎技能?
  9. 【Struts】ActionForm
  10. 【学习笔记】广义逆矩阵及共轭转置求解
  11. 【分享】系统启动项大全
  12. 驱动仿真物品移动乌龟\Gazebo\cmd_vel和/gazebo/set_model_state
  13. 前端性能和加载体验优化实践(附:PWA、离线包、内存优化、预渲染)
  14. 使用Cplex求解均值方差模型
  15. java商城系统和php商城系统哪个好?
  16. IBM大连工作室成立 Design thinking实现快速创新
  17. Android Studio 获取经纬度
  18. OpenCV读取AVI文件
  19. 时钟的时针、分针和秒针重合问题
  20. 如何解决word文档中英文单词在每行末尾处自动换行,留有空白,不美观问题?

热门文章

  1. Bitcoin 中的挖矿算法(6) 难度调整
  2. IPFS (1) 初步简介
  3. 《研磨设计模式》chap20 享元模式 Flyweight (3)重写应用场景
  4. C++ Primer 5th笔记(chap 14 重载运算和类型转换)可调用对象与function
  5. 在ubuntu20.10上搭建SVN Server
  6. pipe读写前设置O_NOATIME
  7. redis 附近的人_Redis GEO地理位置信息,查看附近的人
  8. 【攻防世界008】answer_to_everything
  9. java项目(注册和登录(成功后查看商品的信息))
  10. 2020-11-23(“花式扫雷” 辅助制作)