微博内容(如图):ID  content

公式:

TF:词条在某个微博中出现的词频(出现次数).

N:微博总数

DF:词条在多少个微博中出现过

案例用到四个reduceTask,下标计数从0开始,三个统计词频TF,一个统计微博总数N。

FirstMapper.java

对输入文件的每行记录微博内容进行分词,统计微博词频TF及微博总数,每个词条输出词频数1;每个微博输出一个count=1

package com.jeff.mr.tf;import java.io.IOException;
import java.io.StringReader;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.wltea.analyzer.core.IKSegmenter;
import org.wltea.analyzer.core.Lexeme;/***  TF:词条在某个微博中出现的词频(出现次数).N:微博总数DF:词条在多少个微博中出现过--------------------------------*   第一个MR,计算TF和计算N(微博总数)* @author root**/
public class FirstMapper extends Mapper<LongWritable, Text, Text, IntWritable>{protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {//value是微博文件每一行以制表符\t隔开String[]  v =value.toString().trim().split("\t");if(v.length>=2){String id=v[0].trim();String content =v[1].trim();//对微博内容进行中文分词处理StringReader sr =new StringReader(content);IKSegmenter ikSegmenter =new IKSegmenter(sr, true);Lexeme word=null;while( (word=ikSegmenter.next()) !=null ){String w= word.getLexemeText();//w就是微博内容的每一个词汇//输出格式为:key为:词汇_微博ID    value是1,出现次数context.write(new Text(w+"_"+id), new IntWritable(1));}//每执行一次这个方法,就表示统计了一条微博数,将来在第四个reduce分区执行,参见FirstPartition,自定义分区规则context.write(new Text("count"), new IntWritable(1));}else{System.out.println(value.toString()+"-------------");}}}

FirstPartition.java

自定义分区,使得key为count的分区到最后一个分区(编号3),其他的分别分区编号为0/1/2三个reduceTask

package com.jeff.mr.tf;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;/*** 第一个MR自定义分区,把key为count的,即用来计算微博总数的数据分区到第四个reduce分区,* 前三个reduce分区用来计算TF,就是单个微博中词汇出现次数* @author root**/
public class FirstPartition extends HashPartitioner<Text, IntWritable>{public int getPartition(Text key, IntWritable value, int reduceCount) {if(key.equals(new Text("count")))return 3;elsereturn super.getPartition(key, value, reduceCount-1);}}

FirstReduce.java

计算单个词条的词频TF,输入数据为FirstMapper.java的输出,key为词条_id.或者count,值为词频个数或者count个数,当key为count时不参与计算只输出查看。

输出格式:词条_ID 词频

package com.jeff.mr.tf;import java.io.IOException;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/*** c1_001,2* c2_001,1* count,10000* @author root**/
public class FirstReduce extends Reducer<Text, IntWritable, Text, IntWritable>{protected void reduce(Text arg0, Iterable<IntWritable> arg1,Context arg2)throws IOException, InterruptedException {int sum =0;for( IntWritable i :arg1 ){sum= sum+i.get();}if(arg0.equals(new Text("count"))){System.out.println(arg0.toString() +"___________"+sum);}arg2.write(arg0, new IntWritable(sum));}}

在dfs-location上新建路径:/usr/input/tf-idf并上传文件微博内容:

接下来就可以执行FirstJob.java来执行第一个MR:

package com.jeff.mr.tf;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class FirstJob {public static void main(String[] args) {Configuration config =new Configuration();config.set("fs.defaultFS", "hdfs://node4:8020");config.set("yarn.resourcemanager.hostname", "node4");try {FileSystem fs =FileSystem.get(config);
//          JobConf job =new JobConf(config);Job job =Job.getInstance(config);job.setJarByClass(FirstJob.class);job.setJobName("weibo1");job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);
//          job.setMapperClass();job.setNumReduceTasks(4);job.setPartitionerClass(FirstPartition.class);job.setMapperClass(FirstMapper.class);job.setCombinerClass(FirstReduce.class);job.setReducerClass(FirstReduce.class);FileInputFormat.addInputPath(job, new Path("/usr/input/tf-idf"));Path path =new Path("/usr/output/weibo1");if(fs.exists(path)){fs.delete(path, true);}FileOutputFormat.setOutputPath(job,path);boolean f= job.waitForCompletion(true);if(f){}} catch (Exception e) {e.printStackTrace();}}
}

执行成功:

刷新DFS-Location,看到在/usr/output/weibo1的目录下生成了四个分区文件,每一个分区文件都是四个reduceTask的输出文件

其中第四个分区文件就是用来计算Count微博总数N的,其他三个都是微博中词汇即出现次数。

比如:0.03元_3824213951437432       1

这个就表示0.03元这个词在ID为3824213951437432微博中出现了1次

TwoMapper.java

统计DF,词条在多少个微博中出现过

输出格式:词条 出现的微博个数

package com.jeff.mr.tf;import java.io.IOException;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
//统计df:词在多少个微博中出现过。
public class TwoMapper extends Mapper<LongWritable, Text, Text, IntWritable> {protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {/*** 1  获取当前 mapper Task的数据片段(split)* 2 当前mapper Task的数据来源于第一个MR输出的四个文件*/FileSplit fs = (FileSplit) context.getInputSplit();//可以从fs获取第一个MR的文件名,除了最后一个文件是用来计算微博总数的,其他都是TFif (!fs.getPath().getName().contains("part-r-00003")) {String[] v = value.toString().trim().split("\t");if (v.length >= 2) {//获取{0.03元_3824213951437432    1},这种第一个MR的输出数据,即每一行String[] ss = v[0].split("_");if (ss.length >= 2) {String w = ss[0];//得到每一个词汇,输出次数1,此处所有微博的词汇都会输出1次context.write(new Text(w), new IntWritable(1));}} else {System.out.println(value.toString() + "-------------");}}}
}

TwoReduce.java

package com.jeff.mr.tf;import java.io.IOException;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;/*** 计算词汇在所有微博中出现的次数* @author jeffSheng* 2018年10月17日*/
public class TwoReduce extends Reducer<Text, IntWritable, Text, IntWritable>{/*** 输入数据:*    key:0.03元     value:1(次)* Iterable<IntWritable> arg1,即key相等的一组数据*/protected void reduce(Text key, Iterable<IntWritable> arg1,Context context)throws IOException, InterruptedException {int sum =0;for( IntWritable i :arg1 ){sum= sum + i.get();}context.write(key, new IntWritable(sum));}}

执行TwoJob.java第二个MR,计算每个词汇在所有微博出现次数即DF

package com.jeff.mr.tf;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class TwoJob {public static void main(String[] args) {Configuration config =new Configuration();config.set("fs.defaultFS", "hdfs://node4:8020");config.set("yarn.resourcemanager.hostname", "node4");try {
//          JobConf job =new JobConf(config);Job job =Job.getInstance(config);job.setJarByClass(TwoJob.class);job.setJobName("weibo2");//设置map任务的输出key类型、value类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);
//          job.setMapperClass();job.setMapperClass(TwoMapper.class);job.setCombinerClass(TwoReduce.class);job.setReducerClass(TwoReduce.class);//mr运行时的输入数据从hdfs的哪个目录中获取FileInputFormat.addInputPath(job, new Path("/usr/output/weibo1"));FileOutputFormat.setOutputPath(job, new Path("/usr/output/weibo2"));boolean f= job.waitForCompletion(true);if(f){System.out.println("执行job成功");}} catch (Exception e) {e.printStackTrace();}}
}

刷新DFS-Location看到/usr/output/weibo2下的DF输出文件:

比如0.03元 在所有微博中出现了1次

根据公式计算微博词汇权重:

LastMapper.java

输入数据为所有词的TF,所有词的DF,微博总数N,根据这三个变量计算词条最终权重。

输出格式:微博ID 词条:权重

package com.jeff.mr.tf;import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.text.NumberFormat;
import java.util.HashMap;
import java.util.Map;import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;/*** 最后计算* @author root**/
public class LastMapper extends Mapper<LongWritable, Text, Text, Text> {//存放微博总数public static Map<String, Integer> cmap = null;//存放dfpublic static Map<String, Integer> df = null;// 在map方法执行之前,即mapperTask初始化的时候执行/*** mapReduce的执行过程回顾:* 比如一个文件被分割成1024个碎片段,则一定有与之对应的1024个mapTask去执行每个碎片段。* mapTask在有碎片段的节点上执行,即 dataNode上有碎片段,在dataNode上执行。所以每个DataNode上就* 有一个NodeManager来执行mapReduce程序,NodeManager里面有一个与之对应的ApplicationMatser* 负责从resourceManager中请求资源即Contianer中文是容器,其实是资源。申请资源后,ApplicationMatser* 则可以通过一个Executor对象执行mapperTask,并监控和记录执行状态、进度等数据汇报给NodeManager,NodeManager* 再汇报给resourceManager。* Executor对象执行mapperTask的时候先初始化对应的MapTask,其实就是我们的LastMapper.* java自定义的xxxMapper,只要初始化成功就调用LastMapper的setUp方法,这个时候map方法还没执行,* map方法是循环调用的,即每一行都调用一次,但是setUp方法只会调用一次。不过1024个碎片段对应1024个mapTask,* 就会执行setup方法1024次,还是狠多次,所以我们可以考虑从共享内存中取得一部分数据,比如微博总数N和DF记录。* 我们使用cmap和df两个Map来存放,判断是否为空,即保证存过就不用再存了。* * */protected void setup(Context context) throws IOException,InterruptedException {System.out.println("******************");if (cmap == null || cmap.size() == 0 || df == null || df.size() == 0) {URI[] ss = context.getCacheFiles();if (ss != null) {for (int i = 0; i < ss.length; i++) {URI uri = ss[i];if (uri.getPath().endsWith("part-r-00003")) {//微博总数Path path =new Path(uri.getPath());
//                      FileSystem fs =FileSystem.get(context.getConfiguration());
//                      fs.open(path);BufferedReader br = new BufferedReader(new FileReader(path.getName()));String line = br.readLine();if (line.startsWith("count")) {String[] ls = line.split("\t");cmap = new HashMap<String, Integer>();cmap.put(ls[0], Integer.parseInt(ls[1].trim()));}br.close();} else if (uri.getPath().endsWith("part-r-00000")) {//词条的DFdf = new HashMap<String, Integer>();Path path =new Path(uri.getPath());BufferedReader br = new BufferedReader(new FileReader(path.getName()));String line;while ((line = br.readLine()) != null) {String[] ls = line.split("\t");df.put(ls[0], Integer.parseInt(ls[1].trim()));}br.close();}}}}}protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {FileSplit fs = (FileSplit) context.getInputSplit();
//      System.out.println("--------------------");if (!fs.getPath().getName().contains("part-r-00003")) {String[] v = value.toString().trim().split("\t");if (v.length >= 2) {int tf =Integer.parseInt(v[1].trim());//tf值String[] ss = v[0].split("_");if (ss.length >= 2) {String w = ss[0];String id=ss[1];//根据公式计算权重,输出:微博Id  词汇1:权重1 词汇2:权重2  double s=tf * Math.log(cmap.get("count")/df.get(w));NumberFormat nf =NumberFormat.getInstance();nf.setMaximumFractionDigits(5);context.write(new Text(id), new Text(w+":"+nf.format(s)));}} else {System.out.println(value.toString() + "-------------");}}}
}

LastReduce.java

计算所有词条的最终权重,相同微博在后边显示其所有的词条:权重,并使用制表符\t隔开。

输出格式:微博ID  词条:权重  词条:权重

package com.jeff.mr.tf;import java.io.IOException;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class LastReduce extends Reducer<Text, Text, Text, Text>{protected void reduce(Text key, Iterable<Text> arg1,Context context)throws IOException, InterruptedException {StringBuffer sb =new StringBuffer();for( Text i :arg1 ){sb.append(i.toString()+"\t");}context.write(key, new Text(sb.toString()));}}

执行LastJob计算最终输出结果:

我们这里采用的是在本地提交到Linux环境下进行执行测试的

package com.jeff.mr.tf;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class LastJob {public static void main(String[] args) {Configuration config =new Configuration();
//      config.set("fs.defaultFS", "hdfs://node1:8020");
//      config.set("yarn.resourcemanager.hostname", "node1");config.set("mapred.jar", "C:\\Users\\Administrator\\Desktop\\weibo3.jar");try {FileSystem fs =FileSystem.get(config);
//          JobConf job =new JobConf(config);Job job =Job.getInstance(config);job.setJarByClass(LastJob.class);job.setJobName("weibo3");//          DistributedCache.addCacheFile(uri, conf);//2.5/*** 之所以以下两行可以加载到内存因为微博总数的文件和df文件其实都不大,所有可以在任务启动之初先加载到内存*///把微博总数N加载到内存job.addCacheFile(new Path("/usr/output/weibo1/part-r-00003").toUri());//把df加载到内存job.addCacheFile(new Path("/usr/output/weibo2/part-r-00000").toUri());//设置map任务的输出key类型、value类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);
//          job.setMapperClass();job.setMapperClass(LastMapper.class);job.setReducerClass(LastReduce.class);//mr运行时的输入数据从hdfs的哪个目录中获取FileInputFormat.addInputPath(job, new Path("/usr/output/weibo1"));Path outpath =new Path("/usr/output/weibo3");if(fs.exists(outpath)){fs.delete(outpath, true);}FileOutputFormat.setOutputPath(job,outpath );boolean f= job.waitForCompletion(true);if(f){System.out.println("执行job成功");}} catch (Exception e) {e.printStackTrace();}}
}

需要做的是将工程打包放在桌面weibo3.jar,然后在LastJob中添加:

config.set("mapred.jar", "C:\\Users\\Administrator\\Desktop\\weibo3.jar");

配置文件放在src下:

开始执行:

打开:http://node1:18088/cluster

观察刚开始执行

观察执行完成;

刷新DFS-Location

比如:3823890239358658     继续:4.89035  支持:3.04452

表示在微博ID为3823890239358658微博中,[继续]的全部微博中权重为4.89035,[支持]的全部微博中权重为3.04452

有了这些结果,我们就可以做出一些商业或者其他领域的重要选择!

当然也可以在本地进行测试,就是在LastMapper的setUp中注释掉的代码:

FileSystem fs =FileSystem.get(context.getConfiguration());FSDataInputStream fsdInputStream = fs.open(path);

将输入流封装进BufferedReader即可。

案例5-挖掘微博广告高权重词条相关推荐

  1. 微博广告分布式配置中心的构建与实践

    来自:DBAplus社群 讲师介绍 邸海峰 新浪微博 广告业务部高级运维工程师 主要负责微博广告业务部自动化运维平台设计与开发.微服务体系建设.资源成本优化等工作: 针对微博业务性质,春晚.明星热点事 ...

  2. 第七十六期:3000台服务器不宕机,微博广告系统全景运维大法

    微博现在日活达到了 2 亿,微博广告是微博最重要且稳定的收入来源,没有之一,所以微博广告系统的稳定性是我们广告运维所有工作中的重中之重. 作者:孙燕来源 微博现在日活达到了 2 亿,微博广告是微博最重 ...

  3. maya刷权重时有个叉_运营小白如何注册一个高权重的抖音账号?

    关注▲二月老师▲运营小白的职场引路人 同学们好,我是二月老师.今天给大家分享一个有趣的话题:如何注册一个高权重的账号? 现如今,抖音已经成为当下炙手可热的APP,深受广大年轻群体的喜爱. 我们可以看到 ...

  4. 微博广告投放如何定向推广?在微博推广广告有效果吗?

    新浪微博是一个特别适合推广的平台,微博不仅上边有明星还有各个领域的大神有影响力的媒体,去组成咱们现在看到的一个多姿多彩的微博,在微博上可以了解到每天发生的一些时事,会以最快的速度让用户知道,人流量集大 ...

  5. 教你怎样通过SEO做出高权重网站

    SEO优化工作是一个极具挑战性的工作,因为其过程漫长且包含很多的细节,一个细节考虑不到就可能导致整个网站优化工作功亏一篑:本次更新的课程从域名.服务器选择开始讲起,涵盖SEO网站优化的方方面面. 本教 ...

  6. 如何做一个高权重的网站-成都企业网站建设

    很多站长将网站权重做到1或2 以后,就很难再提高网站权重了.智宇SEO总结9点教你如何打造一个高权重的网站.或许我总结的还不够全面,欢迎大家补充. 1.网站品牌规划. 个人网站我建议是做小而美的,因为 ...

  7. 支持百亿请求的微博广告运维技术实践

    来自:DBAplus社群 本文根据朱伟老师在[2019 Gdevops全球敏捷运维峰会-广州站]现场演讲内容整理而成. 讲师介绍 朱伟,微博广告SRE团队负责人,书籍<智能运维:从0搭建大规模分 ...

  8. 搜索引擎提交软件_增加SEO超级外链须知的高权重网站目录提交方法

    做SEO的朋友对开放式网站目录应该不陌生吧.网站目录就是按一定的分类方法把收录的网站进行分类归档.网站目录本身是不主动抓取网页的,一般只记录网站的名称,网址和有限的说明文字.和网址站.导航站.酷站网址 ...

  9. 修改目录标题层级_关键词所在页面的层级越高权重越大

    查找引擎优化的关键词地点页面的层级越高权重越大,要说查找引擎优化关键词优化,说下关键词吧,拿网站主页为例,比方说主页标题中的58网站目录,在个"网站目录"这个词能够称为关键词. 假 ...

最新文章

  1. java.lang.UnsatisfiedLinkError:no jhdf5 in java.library.path问题的解决
  2. LNMP环境中WordPress程序伪静态解决方案
  3. Intuit的Alex Balazs访谈
  4. 架构语言ArchiMate -业务层(Business Layer)
  5. Inverse of Rows and Columns
  6. 国科大prml15-BP
  7. Ignite Compute helloworld-分布式计算
  8. 印象笔记粘贴HTML,印象笔记剪藏插件
  9. 重磅!2020北京智源大会完整日程公布,4天19场高端AI论坛邀你参加
  10. GreenDotNet0 1 1发布 Net精简环境 及示例 Net在线漫画下载器
  11. LBS计算两坐标的距离
  12. 八皇后问题(详解带注释)
  13. mybatis resulttype
  14. Python中一些少数人知晓且有趣的特性
  15. book--Unix Linux大学教程
  16. javaEE---CSS
  17. GhostNet: More Features from Cheap Operations
  18. 2022CCPC桂林站感想与反思
  19. 2020.03.18模拟赛17(第三题)
  20. EDC大展览,你的EDC是什么呢

热门文章

  1. 【考研数学二——导数与微分知识梳理(思维导图)】
  2. 基于51单片机DHT11的加湿器proteus仿真程序设计
  3. Connectify中文版在Win7下创建免费热点让上网更简单
  4. VMware下载地址
  5. 论文阅读 (86):Normality Guided Multiple Instance Learning for Weakly Supervised Video Anomaly Detection
  6. 订单模块业务处理流程梳理记录
  7. c语言里怎样输出%符号?
  8. java metrics.counter_技术专栏 | 深入理解Metrics(二):Counters
  9. 学习CALCULATE函数(五)修订版
  10. pytorch深度学习实战——预训练网络