互联网时代的到来,使得名人的形象变得更加鲜活,也拉近了明星和粉丝之间的距离。歌星、影星、体育明星、作家等名人通过互联网能够轻易实现和粉丝的互动,赚钱也变得前所未有的简单。同时,互联网的飞速发展本身也造就了一批互联网明星,这些人借助新的手段,最大程度发挥了粉丝经济的能量和作用,在互联网时代赚得盆满钵满。

正是基于这样一个大背景,今天我们做一个分析明星微博数据的小项目。

1、项目需求

自定义输入格式,将明星微博数据排序后按粉丝数关注数 微博数分别输出到不同文件中。

2、数据集

明星 明星微博名称 粉丝数 关注数 微博数

俞灏明 俞灏明 10591367 206 558

李敏镐 李敏镐 22898071 11 268

林心如 林心如 57488649 214 5940

黄晓明 黄晓明 22616497 506 2011

张靓颖 张靓颖 27878708 238 3846

李娜 李娜 23309493 81 631

徐小平 徐小平 11659926 1929 13795

唐嫣 唐嫣 24301532 200 2391

有斐君 有斐君 8779383 577 4251

3、分析

自定义InputFormat读取明星微博数据,通过自定义getSortedHashtableByValue方法分别对明星的fan、followers、microblogs数据进行排序,然后利用MultipleOutputs输出不同项到不同的文件中

4、实现

1)、定义WeiBo实体类,实现WritableComparable接口

  1. package com.buaa;
  2. import java.io.DataInput;
  3. import java.io.DataOutput;
  4. import java.io.IOException;
  5. import org.apache.hadoop.io.WritableComparable;
  6. /**
  7. * @ProjectName MicroblogStar
  8. * @PackageName com.buaa
  9. * @ClassName WeiBo
  10. * @Description TODO
  11. * @Author 刘吉超
  12. * @Date 2016-05-07 14:54:29
  13. */
  14. public class WeiBo implements WritableComparable<Object> {
  15. // 粉丝
  16. private int fan;
  17. // 关注
  18. private int followers;
  19. // 微博数
  20. private int microblogs;
  21. public WeiBo(){};
  22. public WeiBo(int fan,int followers,int microblogs){
  23. this.fan = fan;
  24. this.followers = followers;
  25. this.microblogs = microblogs;
  26. }
  27. public void set(int fan,int followers,int microblogs){
  28. this.fan = fan;
  29. this.followers = followers;
  30. this.microblogs = microblogs;
  31. }
  32. // 实现WritableComparable的readFields()方法,以便该数据能被序列化后完成网络传输或文件输入
  33. @Override
  34. public void readFields(DataInput in) throws IOException {
  35. fan  = in.readInt();
  36. followers = in.readInt();
  37. microblogs = in.readInt();
  38. }
  39. // 实现WritableComparable的write()方法,以便该数据能被序列化后完成网络传输或文件输出
  40. @Override
  41. public void write(DataOutput out) throws IOException {
  42. out.writeInt(fan);
  43. out.writeInt(followers);
  44. out.writeInt(microblogs);
  45. }
  46. @Override
  47. public int compareTo(Object o) {
  48. // TODO Auto-generated method stub
  49. return 0;
  50. }
  51. public int getFan() {
  52. return fan;
  53. }
  54. public void setFan(int fan) {
  55. this.fan = fan;
  56. }
  57. public int getFollowers() {
  58. return followers;
  59. }
  60. public void setFollowers(int followers) {
  61. this.followers = followers;
  62. }
  63. public int getMicroblogs() {
  64. return microblogs;
  65. }
  66. public void setMicroblogs(int microblogs) {
  67. this.microblogs = microblogs;
  68. }
  69. }

2)、自定义WeiboInputFormat,继承FileInputFormat抽象类

  1. package com.buaa;
  2. import java.io.IOException;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.fs.FSDataInputStream;
  5. import org.apache.hadoop.fs.FileSystem;
  6. import org.apache.hadoop.fs.Path;
  7. import org.apache.hadoop.io.Text;
  8. import org.apache.hadoop.mapreduce.InputSplit;
  9. import org.apache.hadoop.mapreduce.RecordReader;
  10. import org.apache.hadoop.mapreduce.TaskAttemptContext;
  11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  12. import org.apache.hadoop.mapreduce.lib.input.FileSplit;
  13. import org.apache.hadoop.util.LineReader;
  14. /**
  15. * @ProjectName MicroblogStar
  16. * @PackageName com.buaa
  17. * @ClassName WeiboInputFormat
  18. * @Description TODO
  19. * @Author 刘吉超
  20. * @Date 2016-05-07 10:23:28
  21. */
  22. public class WeiboInputFormat extends FileInputFormat<Text,WeiBo>{
  23. @Override
  24. public RecordReader<Text, WeiBo> createRecordReader(InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException {
  25. // 自定义WeiboRecordReader类,按行读取
  26. return new WeiboRecordReader();
  27. }
  28. public class WeiboRecordReader extends RecordReader<Text, WeiBo>{
  29. public LineReader in;
  30. // 声明key类型
  31. public Text lineKey = new Text();
  32. // 声明 value类型
  33. public WeiBo lineValue = new WeiBo();
  34. @Override
  35. public void initialize(InputSplit input, TaskAttemptContext context) throws IOException, InterruptedException {
  36. // 获取split
  37. FileSplit split = (FileSplit)input;
  38. // 获取配置
  39. Configuration job = context.getConfiguration();
  40. // 分片路径
  41. Path file = split.getPath();
  42. FileSystem fs = file.getFileSystem(job);
  43. // 打开文件
  44. FSDataInputStream filein = fs.open(file);
  45. in = new LineReader(filein,job);
  46. }
  47. @Override
  48. public boolean nextKeyValue() throws IOException, InterruptedException {
  49. // 一行数据
  50. Text line = new Text();
  51. int linesize = in.readLine(line);
  52. if(linesize == 0)
  53. return false;
  54. // 通过分隔符'\t',将每行的数据解析成数组
  55. String[] pieces = line.toString().split("\t");
  56. if(pieces.length != 5){
  57. throw new IOException("Invalid record received");
  58. }
  59. int a,b,c;
  60. try{
  61. // 粉丝
  62. a = Integer.parseInt(pieces[2].trim());
  63. // 关注
  64. b = Integer.parseInt(pieces[3].trim());
  65. // 微博数
  66. c = Integer.parseInt(pieces[4].trim());
  67. }catch(NumberFormatException nfe){
  68. throw new IOException("Error parsing floating poing value in record");
  69. }
  70. //自定义key和value值
  71. lineKey.set(pieces[0]);
  72. lineValue.set(a, b, c);
  73. return true;
  74. }
  75. @Override
  76. public void close() throws IOException {
  77. if(in != null){
  78. in.close();
  79. }
  80. }
  81. @Override
  82. public Text getCurrentKey() throws IOException, InterruptedException {
  83. return lineKey;
  84. }
  85. @Override
  86. public WeiBo getCurrentValue() throws IOException, InterruptedException {
  87. return lineValue;
  88. }
  89. @Override
  90. public float getProgress() throws IOException, InterruptedException {
  91. return 0;
  92. }
  93. }
  94. }

3)、编写mr程序

  1. package com.buaa;
  2. import java.io.IOException;
  3. import java.util.Arrays;
  4. import java.util.Comparator;
  5. import java.util.HashMap;
  6. import java.util.Map;
  7. import java.util.Map.Entry;
  8. import org.apache.hadoop.conf.Configuration;
  9. import org.apache.hadoop.conf.Configured;
  10. import org.apache.hadoop.fs.FileSystem;
  11. import org.apache.hadoop.fs.Path;
  12. import org.apache.hadoop.io.IntWritable;
  13. import org.apache.hadoop.io.Text;
  14. import org.apache.hadoop.mapreduce.Job;
  15. import org.apache.hadoop.mapreduce.Mapper;
  16. import org.apache.hadoop.mapreduce.Reducer;
  17. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  18. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  19. import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
  20. import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
  21. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  22. import org.apache.hadoop.util.Tool;
  23. import org.apache.hadoop.util.ToolRunner;
  24. /**
  25. * @ProjectName MicroblogStar
  26. * @PackageName com.buaa
  27. * @ClassName WeiboCount
  28. * @Description TODO
  29. * @Author 刘吉超
  30. * @Date 2016-05-07 09:07:36
  31. */
  32. public class WeiboCount extends Configured implements Tool {
  33. // tab分隔符
  34. private static String TAB_SEPARATOR = "\t";
  35. // 粉丝
  36. private static String FAN = "fan";
  37. // 关注
  38. private static String FOLLOWERS = "followers";
  39. // 微博数
  40. private static String MICROBLOGS = "microblogs";
  41. public static class WeiBoMapper extends Mapper<Text, WeiBo, Text, Text> {
  42. @Override
  43. protected void map(Text key, WeiBo value, Context context) throws IOException, InterruptedException {
  44. // 粉丝
  45. context.write(new Text(FAN), new Text(key.toString() + TAB_SEPARATOR + value.getFan()));
  46. // 关注
  47. context.write(new Text(FOLLOWERS), new Text(key.toString() + TAB_SEPARATOR + value.getFollowers()));
  48. // 微博数
  49. context.write(new Text(MICROBLOGS), new Text(key.toString() + TAB_SEPARATOR + value.getMicroblogs()));
  50. }
  51. }
  52. public static class WeiBoReducer extends Reducer<Text, Text, Text, IntWritable> {
  53. private MultipleOutputs<Text, IntWritable> mos;
  54. protected void setup(Context context) throws IOException, InterruptedException {
  55. mos = new MultipleOutputs<Text, IntWritable>(context);
  56. }
  57. protected void reduce(Text Key, Iterable<Text> Values,Context context) throws IOException, InterruptedException {
  58. Map<String,Integer> map = new HashMap< String,Integer>();
  59. for(Text value : Values){
  60. // value = 名称 + (粉丝数 或 关注数 或 微博数)
  61. String[] records = value.toString().split(TAB_SEPARATOR);
  62. map.put(records[0], Integer.parseInt(records[1].toString()));
  63. }
  64. // 对Map内的数据进行排序
  65. Map.Entry<String, Integer>[] entries = getSortedHashtableByValue(map);
  66. for(int i = 0; i < entries.length;i++){
  67. mos.write(Key.toString(),entries[i].getKey(), entries[i].getValue());
  68. }
  69. }
  70. protected void cleanup(Context context) throws IOException, InterruptedException {
  71. mos.close();
  72. }
  73. }
  74. @SuppressWarnings("deprecation")
  75. @Override
  76. public int run(String[] args) throws Exception {
  77. // 配置文件对象
  78. Configuration conf = new Configuration();
  79. // 判断路径是否存在,如果存在,则删除
  80. Path mypath = new Path(args[1]);
  81. FileSystem hdfs = mypath.getFileSystem(conf);
  82. if (hdfs.isDirectory(mypath)) {
  83. hdfs.delete(mypath, true);
  84. }
  85. // 构造任务
  86. Job job = new Job(conf, "weibo");
  87. // 主类
  88. job.setJarByClass(WeiboCount.class);
  89. // Mapper
  90. job.setMapperClass(WeiBoMapper.class);
  91. // Mapper key输出类型
  92. job.setMapOutputKeyClass(Text.class);
  93. // Mapper value输出类型
  94. job.setMapOutputValueClass(Text.class);
  95. // Reducer
  96. job.setReducerClass(WeiBoReducer.class);
  97. // Reducer key输出类型
  98. job.setOutputKeyClass(Text.class);
  99. // Reducer value输出类型
  100. job.setOutputValueClass(IntWritable.class);
  101. // 输入路径
  102. FileInputFormat.addInputPath(job, new Path(args[0]));
  103. // 输出路径
  104. FileOutputFormat.setOutputPath(job, new Path(args[1]));
  105. // 自定义输入格式
  106. job.setInputFormatClass(WeiboInputFormat.class) ;
  107. //自定义文件输出类别
  108. MultipleOutputs.addNamedOutput(job, FAN, TextOutputFormat.class, Text.class, IntWritable.class);
  109. MultipleOutputs.addNamedOutput(job, FOLLOWERS, TextOutputFormat.class, Text.class, IntWritable.class);
  110. MultipleOutputs.addNamedOutput(job, MICROBLOGS, TextOutputFormat.class, Text.class, IntWritable.class);
  111. // 去掉job设置outputFormatClass,改为通过LazyOutputFormat设置
  112. LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
  113. //提交任务
  114. return job.waitForCompletion(true)?0:1;
  115. }
  116. // 对Map内的数据进行排序(只适合小数据量)
  117. @SuppressWarnings("unchecked")
  118. public static Entry<String, Integer>[] getSortedHashtableByValue(Map<String, Integer> h) {
  119. Entry<String, Integer>[] entries = (Entry<String, Integer>[]) h.entrySet().toArray(new Entry[0]);
  120. // 排序
  121. Arrays.sort(entries, new Comparator<Entry<String, Integer>>() {
  122. public int compare(Entry<String, Integer> entry1, Entry<String, Integer> entry2) {
  123. return entry2.getValue().compareTo(entry1.getValue());
  124. }
  125. });
  126. return entries;
  127. }
  128. public static void main(String[] args) throws Exception {
  129. String[] args0 = {
  130. "hdfs://ljc:9000/buaa/microblog/weibo.txt",
  131. "hdfs://ljc:9000/buaa/microblog/out/"
  132. };
  133. int ec = ToolRunner.run(new Configuration(), new WeiboCount(), args0);
  134. System.exit(ec);
  135. }
  136. }

5、运行结果

本文作者:刘超ljc

来源:51CTO

利用 MapReduce分析明星微博数据实战相关推荐

  1. MapReduce分析明星微博数据

    互联网时代的到来,使得名人的形象变得更加鲜活,也拉近了明星和粉丝之间的距离.歌星.影星.体育明星.作家等名人通过互联网能够轻易实现和粉丝的互动,赚钱也变得前所未有的简单.同时,互联网的飞速发展本身也造 ...

  2. Hadoop实战系列之MapReduce 分析 Youtube视频数据

    Hadoop实战系列之MapReduce 分析 Youtube视频数据 一.实战介绍 MapReduce 是 Hadoop 的计算框架. 在运行一个 MR 程序时,任务过程被分为两个阶段:Map 阶段 ...

  3. 利用ELK分析Nginx日志生产实战(高清多图)

    本文以api.mingongge.com.cn域名为测试对象进行统计,日志为crm.mingongge.com.cn和risk.mingongge.com.cn请求之和(此二者域名不具生产换环境统计意 ...

  4. 利用机器学习分析脑电数据(原理分析+示例代码+快速上手)

    由于本人对于脑机接口以及脑电技术的极度爱好(其实目的是:是把U盘插到大脑里,然后就不用学习了哈哈哈哈),近几月看了较多这方面的内容,变打算写下博客总结分析一下. 目录 一.  机器学习分析简介 二.机 ...

  5. 爬虫实例 利用Ajax爬取微博数据

    随着代理IP技术的普及,爬虫的使用也变得简单起来,许多企业和个人都开始用爬虫技术来抓取数据.那么今天就来分享一个爬虫实例,帮助你们更好的理解爬虫.下面我们用程序模拟Ajax请求,将我的前10页微博全部 ...

  6. 进阶分享 | 7000字,利用Python分析泰坦尼克数据

    目录 排名 数据探索 导入库 导入数据 字段信息 字段分类 缺失值 数据假设 删除字段 修改.增加字段 猜想 统计分析 可视化分析 年龄与生还 舱位与生还 登船地点.性别与生还的关系 票价.舱位与生还 ...

  7. 利用Spring Boot处理JSON数据实战(包括jQuery,html,ajax)附源码 超详细

    在Spring Boot的Web应用中 内置了JSON数据的解析功能,默认使用Jackson自动完成解析(不需要解析加载Jackson依赖包)当控制器返回一个Java对象或集合数据时 Spring B ...

  8. CentOS 7.2下ELK分析Nginx日志生产实战(高清多图)

    注:本文系原创投稿 本文以api.mingongge.com.cn域名为测试对象进行统计,日志为crm.mingongge.com.cn和risk.mingongge.com.cn请求之和(此二者域名 ...

  9. python处理excel表格数据-利用Python处理和分析Excel表中数据实战.doc

    利用Python处理和分析Excel表中数据实战 [利用python进行数据分析--基础篇]利用Python处理和分析Excel表中数据实战 原创 2017年06月28日 15:09:32 标签: p ...

最新文章

  1. odoo pivot中去掉求和_一文读懂深度学习中的卷积运算与图像处理
  2. linux kernel 三次握手建立TCP链接的实现
  3. Python基础教程笔记——条件,循环和其他语句
  4. 命令行重启Oracle数据库
  5. 部份API学习笔记(Math,System,Object,Date,SimpleDateFormat)
  6. 辽宁计算机专业院校排名2015,liaoning高校排行榜_辽宁高校排名 2015年辽宁省最佳大学排行榜...
  7. Python Logging Loggers
  8. Chrome浏览器插件之---AdBlock和Adblock Plus
  9. 一些牛人博客,值得收藏和学习
  10. HD地址批量生成 java
  11. 便利蜂创始人数字化经验分享:如何用全链路数字化 重塑零售业
  12. 案例:微博传播引爆点
  13. 2022年度Top9的任务管理系统
  14. win10“我们找不到你的相机“,错误代码0xA00F4244<NoCamerasAttached>解决方法
  15. 升平,景玉军.计算机虚拟技术在高职汽车维修教学中的应用研究[j].,汽车新技术教学方法探讨...
  16. ocx 加载 页面卡死
  17. JS(五):JS的window对象之window相关方法、定时器
  18. XP电脑桌面图标文字带上颜色不透明 解决办法
  19. IOI2018退役记 + NOIP2018游记
  20. java动态代理(AOP)

热门文章

  1. 糟糕的设计会为我们的工作带来什么启发?
  2. 如何使用gitHub进行合作开发
  3. 西门子PLC几种常见的通讯协议分享
  4. 云豹源码php mysql_如何进行直播搭建,云豹直播源码搭建
  5. 速腾M1激光雷达调试
  6. linux中more命令的意思,linux中的more命令的详细解释
  7. 创意黑板学术汇报毕业答辩PPT模板分享
  8. python人名查电话(字典)_python检索用人名查电话_创建一个将人名用作键的字典后,输入姓名查找值,返回错误......
  9. 深度学习系列:全连接神经网络和BP算法
  10. GPS人员定位系统:一种更安全高效的人员定位管理系统