已知有以下直播平台数据

json格式

{"id":158008900435,"uid":120010010445,"nickname":"jack435","gold":445,"watchnumpv":4350,"watchnumuv":870,"hots":1350,"nofollower":435,"looktime":8700,"smlook":2175,"follower":1740,"gifter":870,"length":2620,"area":"A_US","rating":"B","exp":1305,"type":"video_rating"}

其中,每一行代表一次直播,uid代表主播id,watchnumpv代表观看的次数,follower代表本次直播关注的人数,length代表本次直播时长。
求某一天主播的总watchnumpv,follower,length,并求时长前十的主播

第一步,对json数据进行数据清洗

public class DataCleanMap extends Mapper<LongWritable, Text, Text, Text> {@Overrideprotected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {String line = v1.toString();JSONObject jsonObject = JSON.parseObject(line);String id = jsonObject.getString("uid");int gold = jsonObject.getIntValue("gold");int watchnumpv = jsonObject.getIntValue("watchnumpv");int follower= jsonObject.getIntValue("follower");int length = jsonObject.getIntValue("length");if (gold >= 0 && watchnumpv >= 0 && follower >= 0 && length >= 0) {Text k2 = new Text();k2.set(id);Text v2 = new Text();v2.set(gold + "\t" + watchnumpv + "\t" + follower + "\t" + length);context.write(k2, v2);}}
}
public class DataCleanJob {public static void main(String[] args) {try {if (args.length != 2) {System.exit(100);}Configuration conf = new Configuration();Job wcjob = Job.getInstance(conf);wcjob.setJarByClass(DataCleanJob.class);wcjob.setMapperClass(DataCleanMap.class);// mapper输出的key类型wcjob.setMapOutputKeyClass(Text.class);// mapper输出的value的类型wcjob.setMapOutputValueClass(Text.class);wcjob.setNumReduceTasks(0);// 输入文件路径FileInputFormat.setInputPaths(wcjob, new Path(args[0]));// 输出路径,路径不能已存在,否则就结出错FileOutputFormat.setOutputPath(wcjob, new Path(args[1]));boolean res = wcjob.waitForCompletion(true);System.out.println(res);} catch (Exception e) {e.printStackTrace();}}
}

第二步,统计主播这天的总watchnumpv,follower,length

public class VideoInfoMap extends Mapper<LongWritable, Text, Text, VideoInfoWritable> {@Overrideprotected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {String line = v1.toString();String[] fields = line.split("\t");String id = fields[0];long gold = Long.parseLong(fields[1]);long watchnumpv = Long.parseLong(fields[2]);long follower = Long.parseLong(fields[3]);long length = Long.parseLong(fields[4]);Text k2 = new Text();k2.set(id);VideoInfoWritable v2 = new VideoInfoWritable();v2.set(gold, watchnumpv, follower, length);context.write(k2, v2);}
}
public class VideoInfoReduce extends Reducer<Text, VideoInfoWritable, Text, VideoInfoWritable> {@Overrideprotected void reduce(Text k2, Iterable<VideoInfoWritable> v2s, Context context) throws IOException, InterruptedException {long goldsum = 0;long watchnumpvsum = 0;long followersum = 0;long lengthsum = 0;for (VideoInfoWritable v2 : v2s) {goldsum += v2.getGold();watchnumpvsum += v2.getWatchnumpv();followersum += v2.getFollower();lengthsum += v2.getLength();}Text k3 = k2;VideoInfoWritable v3 = new VideoInfoWritable();v3.set(goldsum, watchnumpvsum, followersum, lengthsum);context.write(k3, v3);}
}
public class VideoInfoWritable implements Writable {private long gold;private long watchnumpv;private long follower;private long length;public void set(long gold, long watchnumpv, long follower, long length) {this.gold = gold;this.watchnumpv = watchnumpv;this.follower = follower;this.length = length;}@Overridepublic void write(DataOutput dataOutput) throws IOException {dataOutput.writeLong(gold);dataOutput.writeLong(watchnumpv);dataOutput.writeLong(follower);dataOutput.writeLong(length);}@Overridepublic void readFields(DataInput dataInput) throws IOException {this.gold = dataInput.readLong();this.watchnumpv = dataInput.readLong();this.follower = dataInput.readLong();this.length = dataInput.readLong();}@Overridepublic String toString() {return gold + "\t" + watchnumpv + "\t" + follower + "\t" + length;}...get and set method
}
public class VideoInfoJob {public static void main(String[] args) {try {if (args.length != 2) {System.exit(100);}Configuration conf = new Configuration();Job wcjob = Job.getInstance(conf);wcjob.setJarByClass(VideoInfoJob.class);wcjob.setMapperClass(VideoInfoMap.class);// mapper输出的key类型wcjob.setMapOutputKeyClass(Text.class);// mapper输出的value的类型wcjob.setMapOutputValueClass(VideoInfoWritable.class);wcjob.setReducerClass(VideoInfoReduce.class);// reducer输出的key类型wcjob.setOutputKeyClass(Text.class);// reducer输出的value类型wcjob.setOutputValueClass(VideoInfoWritable.class);// 输入文件路径FileInputFormat.setInputPaths(wcjob, new Path(args[0]));// 输出路径,路径不能已存在,否则就结出错FileOutputFormat.setOutputPath(wcjob, new Path(args[1]));boolean res = wcjob.waitForCompletion(true);System.out.println(res);} catch (Exception e) {e.printStackTrace();}}
}

第三步,排序输出length前十的主播

public class VideoInfoTop10Map extends Mapper<LongWritable, Text, Text, LongWritable> {@Overrideprotected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {String line = v1.toString();String[] fields = line.split("\t");String id = fields[0];long length = Long.parseLong(fields[4]);Text k2 = new Text();k2.set(id);LongWritable v2 = new LongWritable();v2.set(length);context.write(k2, v2);}
}
public class VideoInfoTop10Reduce extends Reducer<Text, LongWritable, Text, LongWritable> {HashMap<String, Long> map = new HashMap<>();@Overrideprotected void reduce(Text k2, Iterable<LongWritable> v2s, Context context) throws IOException, InterruptedException {long lengthsum = 0;for (LongWritable v2 : v2s) {lengthsum += v2.get();}map.put(k2.toString(), lengthsum);}@Overrideprotected void setup(Context context) throws IOException, InterruptedException {super.setup(context);}@Overrideprotected void cleanup(Context context) throws IOException, InterruptedException {Configuration conf = context.getConfiguration();String dt = conf.get("dt");Map<String, Long> sortedMap = MapUtils.sortValue(map);Set<Map.Entry<String, Long>> entries = sortedMap.entrySet();Iterator<Map.Entry<String, Long>> it = entries.iterator();int count = 1;while (count <= 10 && it.hasNext()) {Map.Entry<String, Long> entry = it.next();String key = entry.getKey();Long value = entry.getValue();Text k3 = new Text();k3.set(dt + "\t" + key);LongWritable v3 = new LongWritable();v3.set(value);context.write(k3, v3);count++;}}
}
public class VideoInfoJobTop10 {public static void main(String[] args) {try {if (args.length != 2) {System.exit(100);}String[] fielsd = args[0].split("/");String tmpdt = fielsd[fielsd.length - 1];String dt = DateUtils.transDataFormat(tmpdt);Configuration conf = new Configuration();conf.set("dt", dt);Job wcjob = Job.getInstance(conf);wcjob.setJarByClass(VideoInfoJobTop10.class);wcjob.setMapperClass(VideoInfoTop10Map.class);// mapper输出的key类型wcjob.setMapOutputKeyClass(Text.class);// mapper输出的value的类型wcjob.setMapOutputValueClass(LongWritable.class);wcjob.setReducerClass(VideoInfoTop10Reduce.class);// reducer输出的key类型wcjob.setOutputKeyClass(Text.class);// reducer输出的value类型wcjob.setOutputValueClass(LongWritable.class);// 输入文件路径FileInputFormat.setInputPaths(wcjob, new Path(args[0]));// 输出路径,路径不能已存在,否则就结出错FileOutputFormat.setOutputPath(wcjob, new Path(args[1]));boolean res = wcjob.waitForCompletion(true);System.out.println(res);} catch (Exception e) {e.printStackTrace();}}
}
public class MapUtils {/*** 根据Map的value值降序排序* @param map* @param <K>* @param <V>* @return*/public static <K, V extends Comparable<? super V>> Map<K, V> sortValue(Map map) {List<Map.Entry<K, V>> list = new ArrayList<>(map.entrySet());Collections.sort(list, new Comparator<Map.Entry<K, V>>() {@Overridepublic int compare(Map.Entry<K, V> o1, Map.Entry<K, V> o2) {int compare = (o1.getValue()).compareTo(o2.getValue());return -compare;}});Map<K, V> returnMap = new LinkedHashMap<>();for (Map.Entry<K, V> entry : list) {returnMap.put(entry.getKey(), entry.getValue());}return returnMap;}
}
public class DateUtils {private static SimpleDateFormat sdf1 = new SimpleDateFormat("yyyyMMdd");private static SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd");public static String transDataFormat(String dt) {String res = "1970-01-01";try {Date date = sdf1.parse(dt);res = sdf2.format(date);} catch (Exception e) {System.out.println("日期转换失败:" + dt);}return res;}
}

第四步,用脚本每天执行以上代码

假设每天的数据都放到一个文件中,文件名格式为yyyyMMdd那么,每天定时执行这个脚本就能知道昨天的直播时长top10的主播

#!/bin/bash
if [ "X$1" = "X" ]
then# OSX获取昨天日期yes_time=`date -v-1d +"%Y%m%d"`# linux系统获取昨天日期
#    yes_time=`date +%Y%m%d --date="1 days ago"`
elseyes_time=$1
ficleanjob_input=hdfs://localhost:9000/data/videoinfo/${yes_time}
cleanjob_output=hdfs://localhost:9000/data/videoinfo_clean/${yes_time}videoinfojob_input=${cleanjob_output}
videoinfojob_output=hdfs://localhost:9000/res/videoinfojob/${yes_time}videoinfojobtop10_input=${cleanjob_output}
videoinfojobtop10_output=hdfs://localhost:9000/res/videoinfojobtop10/${yes_time}jobs_home=/Users/liangjiepeng/Documents/ideaCode/a/targethdfs dfs -rm -r ${cleanjob_output}
hdfs dfs -rm -r ${videoinfojob_output}
hdfs dfs -rm -r ${videoinfojobtop10_output}hadoop jar \
${jobs_home}/a-v1.0-jar-with-dependencies.jar \
com.itmayiedu.hadoop.dataClean.DataCleanJob \
${cleanjob_input} \
${cleanjob_output}hdfs dfs -ls ${cleanjob_output}/_SUCCESS
if [ "$?" = 0 ]
thenecho "cleanJob execute success..."hadoop jar \${jobs_home}/a-v1.0-jar-with-dependencies.jar \com.itmayiedu.hadoop.dataClean.videoinfo.VideoInfoJob \${videoinfojob_input} \${videoinfojob_output}hdfs dfs -ls ${videoinfojob_output}/_SUCCESSif [ "$?" != "0" ]thenecho "VideoInfoJob execute faild..."fihadoop jar \${jobs_home}/a-v1.0-jar-with-dependencies.jar \com.itmayiedu.hadoop.dataClean.top10.VideoInfoJobTop10 \${videoinfojobtop10_input} \${videoinfojobtop10_output}hdfs dfs -ls ${videoinfojobtop10_output}/_SUCCESSif [ "$?" != "0" ]thenecho "VideoInfoJobTop10 execute faild..."fi
elseecho "cleanJob execute faild... date time is ${year_time}"
fi

第五步,通过sqoop将数据导入到mysql中,便于web页面读取数据并展示

sqoop export --connect jdbc:mysql://localhost:3306/test --username root --password 123456 --table top10 --export-dir /res/videoinfojobtop10/20200306 --input-fields-terminated-by "\t"

大数据之直播平台数据统计相关推荐

  1. 天池大数据众智平台 - 数据科学家社区

    天池大数据众智平台 - 数据科学家社区 https://tianchi.aliyun.com/

  2. Python爬虫 | 对广州市政府数据统一开放平台数据的爬取

    Python爬虫 | 对广州市政府数据统一开放平台数据的爬取 简单爬虫 网页分析 爬虫代码 简单爬虫 本次爬虫演示的是对 广州市政府数据统一开放平台 数据的爬取 网页分析 我们先到url=' http ...

  3. 用Python实现原生爬取某牙直播平台数据

    最近学习了一大堆和大数据相关的东西,Hadoop.Elastic.Python等.写一个简单的实战项目贯通一下.爬取一下某牙直播平台的人气排行. 一.确定自己需要的数据,并找到最适合爬取的页面 首先我 ...

  4. selenium抓取斗鱼直播平台数据

    https://www.cnblogs.com/xinyangsdut/p/7617691.html 程序说明: 抓取斗鱼直播平台的直播房间号及其观众人数,最后统计出某一时刻的总直播人数和总观众人数. ...

  5. python抓取直播源 并更新_Python爬虫实例(二)使用selenium抓取斗鱼直播平台数据...

    程序说明:抓取斗鱼直播平台的直播房间号及其观众人数,最后统计出某一时刻的总直播人数和总观众人数. 过程分析: 进入平台首页,来到页面底部点击下一页,发现url地址没有发生变化,这样的话再使用urlli ...

  6. cloud一分钟 | 腾讯云联手斗鱼、虎牙两大头部游戏直播平台开启 定制道具的创新互动...

    Hello,everyone: 9月7日早,星期五,祝大家工作愉快! 一分钟新闻时间: 完 01  微 信 群   添加小编微信:tangguoyemeng,备注"进群"即可,加入 ...

  7. 不同服务器的ps4账号吗,原神PC与PS4互通数据吗 不同平台数据互通分析

    原神最近上线了pc版本,那么,你知道PC与PS4互通吗?这是很多玩家关心的问题,不同的平台,数据能否互通呢?比如说,不同平台是否可以一起玩,不同平台帐号是否可以切换,下面就为大家带来原神不同平台数据互 ...

  8. 数据治理管理平台——数据资产管理

    数据治理中的资产管理是一切治理活动的起点,在数据治理活动中,占据首要地位,只有将数据真正地资产化,才能有序进行后续的深入挖掘与研究. 数据资产管理作为数据治理的重要组成部分,有效地将数据规范管理和数据 ...

  9. LDR6028 OTG取电传数据方案-直播声卡数据充电转接线方案

    TYPE-C手机设备同时支持充电,USB2.0(OTG)传输方案,LDR6028 PD协议芯片可同时实现充电跟传输数据功能. 手机接U盘,接充电 1.概述 LDR6028 SOP8 是乐得瑞科技针对 ...

最新文章

  1. 用AI实现C++、Java、Python代码互译,运行成功率最高达80.9%
  2. Django-Migration admin.0001_initial is applied before its dependency accounts.0001_initial on....
  3. 中心线超出轮廓线多少_激光切割的这些基础知识,你知道多少?
  4. hbase java admin_java连接hbase(一):Admin功能接口表管理
  5. ajax框架怎么学,Ajax框架之DWR学习(文件上传案例)
  6. 电脑文件太多找不到?试试这个免费搜索软件
  7. Win+Tab键实现自定义程序列表间的窗口切换
  8. kotlin界面_Kotlin界面
  9. Hibernate —— 映射关联关系
  10. CMOS摄像头驱动分析笔记1
  11. Python中Image缩放、旋转、翻转等操作
  12. elasticsearch 集群
  13. eggjs 项目实践
  14. 惠斯特电桥平衡条件的证明
  15. Excel 如何合并工作簿中多个工作表
  16. 软硬一体的算法实践,阿里云如何以算法实现场景 “再创新”?
  17. python 战舰_战舰python代码学院
  18. 程序员,你真的会写简历吗?
  19. sudo、sudo -s、sudo -i、su 区别
  20. python 爬取贝壳网小区名称_用Python爬取贝壳网新房和二手房数据

热门文章

  1. python 单行if_python3绘图程序教学:单行If判断式(十三)
  2. hdu 6706 huntian oy 杜教筛
  3. 公共计算机课件,全国计算机等级考试二级ACCESS公共基础知识课件.ppt
  4. 论文阅读:Attention-based Dropout Layer for Weakly Supervised Object Localization
  5. IDEA上连接MYSQL数据库
  6. 判定的测试,语句覆盖,判定覆盖,条件覆盖,判定条件覆盖,组合覆盖,修正的判定条件覆盖
  7. JS window 对象 打开新窗口
  8. 机器学习 - 统计学中的均值、方差、标准差
  9. linux清除文件后df还是满的,Linux系统下如何用du和df命令清除大文件?
  10. 股 市 运 作 原 理