问题还原:

求被评分次数最多的10部电影,并给出评分次数(电影名,评分次数)

ratings.dat
用户ID,电影ID,评分,评分时间戳
1::1193::5::978300760
movies.dat
电影ID,电影名字,电影类型
2::Jumanji (1995)::Adventure|Children's|Fantasy

数据地址:链接:https://pan.baidu.com/s/1qj7RWDYiVnDKBJFFFcYhlw 密码:katx

思路:

根据数据样例分析,使用电影Id作为连接条件;

select a.*,b.* from a join b on a.id = b.id;

电影信息:movies.dat是个小数据,属于大小表Join,选择使用mapJoin,根据mapJoin的思路,采用缓存小文件的处理方式。

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/*** (1)求被评分次数最多的10部电影,并给出评分次数(电影名,评分次数)*   ratings.dat*   用户ID,电影ID,评分,评分时间戳*   1::1193::5::978300760*   *   movies.dat*   电影ID,电影名字,电影类型*  2::Jumanji (1995)::Adventure|Children's|Fantasy* 思路:*  1、首先将ratings.dat和movies.dat两个数据进行join;*  2、然后对数据进行切分统计:在mapper端输出,movie是key,1是value;*  3、在reducer端进行合并统计*  4、然后在第二个mapreduce程序中,进行排序输出**/
public class Top10_Rating_Movie {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();FileSystem fs = FileSystem.get(conf);Job job = Job.getInstance(conf);job.setJarByClass(Top10_Rating_Movie.class);//job.addArchiveToClassPath(new Path("")); //缓存压缩文件到classPath//job.addCacheArchive(new URI("")); //缓存普通文件到工作目录//job.addFileToClassPath(new Path("")); //缓存普通文件到classPath//这里使用的时候缓存普通文件到工作目录//这里要在本地进行测试,所以使用的本地文件系统的文件路径//如果要打jar包运行在集群上,使用HDFS文件系统的路径job.addCacheFile(new URI("file:/G:/files/mr/day3/q2/input/movies.dat"));job.setMapperClass(StepOneMapper.class);job.setReducerClass(StepOneReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);//因为输入文件夹中有多个文件,为了区分,这里指定到文件名称Path inputPath = new Path("G:/files/mr/day3/q2/input/ratings.dat");Path outputPath = new Path("G:/files/mr/day3/q2/output1_1");//判断输出目录是否存在,如果存在,就删除if(fs.exists(outputPath)){fs.delete(outputPath, true);}//设置数据的输入输出路径FileInputFormat.setInputPaths(job, inputPath);FileOutputFormat.setOutputPath(job, outputPath);//System.exit(job.waitForCompletion(true) ? 0:1);Job job2 = Job.getInstance(conf);job2.setJarByClass(Top10_Rating_Movie.class);job2.setMapperClass(StepTwoMapper.class);job2.setReducerClass(StepTwoReducer.class);job2.setOutputKeyClass(Movie.class);job2.setOutputValueClass(NullWritable.class);//因为输入文件夹中有多个文件,为了区分,这里指定到文件名称Path inputPath2 = new Path("G:/files/mr/day3/q2/output1_1");Path outputPath2 = new Path("G:/files/mr/day3/q2/output1_2");//判断输出目录是否存在,如果存在,就删除if(fs.exists(outputPath2)){fs.delete(outputPath2, true);}//设置数据的输入输出路径FileInputFormat.setInputPaths(job2, inputPath2);FileOutputFormat.setOutputPath(job2, outputPath2);ControlledJob stepOne = new ControlledJob(job.getConfiguration());ControlledJob stepTwo = new ControlledJob(job2.getConfiguration());stepTwo.addDependingJob(stepOne);JobControl jc = new JobControl("MV");jc.addJob(stepOne);jc.addJob(stepTwo);//public class JobControl implements Runnable {}Thread t = new Thread(jc);t.start();while(!jc.allFinished()){Thread.sleep(1000);}System.exit(0);}/***    */public static class StepOneMapper extends Mapper<LongWritable, Text, Text, IntWritable>{//用户ID,电影ID,评分,评分时间戳//1::1193::5::978300760//电影ID,电影名字,电影类型//2::Jumanji (1995)::Adventure|Children's|Fantasy/*** 根据开始的分析,在mapper中的setup方法中,对缓存的小文件进行切分处理* 这里只要电影的Id和电影名称* 使用HashMap进行存储* 在map()方法中进行join操作;*/Map<String,String> movieMap = new HashMap<>();@Overrideprotected void setup(Context context)throws IOException, InterruptedException {/*//使用的时候getLocalCacheArchives()这个方法被标记位过时,但是不影响使用//获取缓存文件的路径Path[] cacheFilePath = context.getLocalCacheArchives();//由于只缓存了一个文件,所以在这里就不进行判断了String path = cacheFilePath[0].toUri().toString();//通过File IO读取指定路径的文件进行处理BufferedReader bf1 = new BufferedReader(new FileReader(new File(path)));//以上这些是为了在集群中运行*///在本地测试,这样读取缓存文件,因为文件已经缓存到工作空间中了,所以直接通过文件名读取即可BufferedReader bf = new BufferedReader(new FileReader(new File("movies.dat")));String line = null;while((line=bf.readLine()) != null){String[] movies = line.toString().split("::");//这里直接将userId看作字符串进行处理String movieId = movies[0];String movieName = movies[1];movieMap.put(movieId, movieName);}//关闭流bf.close();}Text K = new Text();IntWritable V = new IntWritable(1);@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//用户ID,电影ID,评分,评分时间戳//1::1193::5::978300760//逐行读取ratings.dat文件  这里不多说读取的机制,详情见以后的源码分析String[] ratings = value.toString().split("::");String movieId = ratings[1];//这里就不展示mapjoin的输出结果了//如果想要查看 可以在map中拼接输出,然后直接让reducer端原样输出就可以了//判断评分记录中的电影ID是否在电影信息中//如果在就输出if(movieMap.containsKey(movieId)){String outKey = movieId+"::"+movieMap.get(movieId);K.set(outKey);context.write(K, V);}}}public static class StepOneReducer extends Reducer<Text, IntWritable, Text, NullWritable>{Text K = new Text();@Overrideprotected void reduce(Text key, Iterable<IntWritable> values,Context context)throws IOException, InterruptedException {//map端的输出最终可以看作 <k,{v1,v2,v3,...vn}>这种形式//所以在这里迭代统计评分次数,就是一个wordCountint sum = 0;for (IntWritable intWritable : values) {//因为intWritable.get()都是1,所以直接++sum++;}K.set(key.toString()+"::"+sum);context.write(K,NullWritable.get());}}public static class StepTwoMapper extends Mapper<LongWritable, Text, Movie, NullWritable>{@Overrideprotected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {//直接读取第一步的输出结果,构建Movie对象,输出即可String[] line = value.toString().split("::");Movie mv = new Movie(line[0],line[1],Integer.parseInt(line[2]));context.write(mv, NullWritable.get());}}public static class StepTwoReducer extends Reducer<Movie, NullWritable, Movie, NullWritable>{//通过计数器控制输出Top10int count = 0;@Overrideprotected void reduce(Movie key, Iterable<NullWritable> values,Context context)throws IOException, InterruptedException {for (NullWritable nt : values) {if(count < 10){context.write(key,NullWritable.get());}count++;}}}}

对于第二个Mapper和Redcuer中采用自定义Bean的输出格式为了容易排序。

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.WritableComparable;/*** WritableComparable这个接口继承了Writable和Comparable接口* 主要用来做序列化/反序列化和 自定义排序规则使用**/
public class Movie implements WritableComparable<Movie>{private String movieId; //电影idprivate String movieName;//电影名称private int ratingTime;//评分总次数public String getMovieId() {return movieId;}public void setMovieId(String movieId) {this.movieId = movieId;}public String getMovieName() {return movieName;}public void setMovieName(String movieName) {this.movieName = movieName;}public int getRatingTime() {return ratingTime;}public void setRatingTime(int ratingTime) {this.ratingTime = ratingTime;}public Movie() {}public Movie(String movieId, String movieName, int ratingTime) {this.movieId = movieId;this.movieName = movieName;this.ratingTime = ratingTime;}@Overridepublic String toString() {return movieId + ", " + movieName + ", " + ratingTime;}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(movieId);out.writeUTF(movieName);out.writeInt(ratingTime);}//!!!序列化和反序列化字段要严格对应//!!!序列化和反序列化字段要严格对应//!!!序列化和反序列化字段要严格对应@Overridepublic void readFields(DataInput in) throws IOException {movieId = in.readUTF();movieName = in.readUTF();ratingTime = in.readInt();}@Overridepublic int compareTo(Movie o) {//因为要对电影的总评分次数倒序排列return o.ratingTime - this.ratingTime;}}

不善言辞,有问题大家指出来,共同学习。

电影评分次数Top10问题相关推荐

  1. 机器学习工程师 — Udacity 电影评分的 k 均值聚类

    电影评分的 k 均值聚类 假设你是 Netflix 的一名数据分析师,你想要根据用户对不同电影的评分研究用户在电影品位上的相似和不同之处.了解这些评分对用户电影推荐系统有帮助吗?我们来研究下这方面的数 ...

  2. 无监督学习 | KMeans之Sklearn实现:电影评分聚类

    文章目录 1. KMeans in Sklearn 2. Sklearn 实例:电影评分的 k 均值聚类 2.1 数据集概述 2.2 二维 KMeans 聚类 3. 肘部法选取最优 K 值 4. 多维 ...

  3. 【机器学习】k-Means Clustering_电影评分与推荐实例

    因本人刚开始写博客,学识经验有限,如有不正之处望读者指正,不胜感激:也望借此平台留下学习笔记以温故而知新. 电影评分的 k 均值聚类 假设你是 Netflix 的一名数据分析师,你想要根据用户对不同电 ...

  4. 大数据Spark电影评分数据分析

    目录 1 数据 ETL 2 使用 SQL 分析 3 使用 DSL 分析 4 保存结果数据 5 案例完整代码 6 Shuffle 分区数目问题 1 数据 ETL 使用电影评分数据进行数据分析,分别使用D ...

  5. Spark综合练习——电影评分数据分析

    我正在参加年度博客之星评选,请大家帮我投票打分,您的每一分都是对我的支持与鼓励. 2021年「博客之星」参赛博主:Maynor大数据 (感谢礼品.红包免费送!) https://bbs.csdn.ne ...

  6. SQL语句查询电影评分案例分析

    SQL语句查询电影评分案例分析 部分数据:(全数据有100万条) {"movie":"1193","rate":"5", ...

  7. 案例分析-电影评分分析

    电影评分分析 数据 原始数据展示 json格式说明 数据字段含义 需求 1.每个用户评分最高的10部电影评分信息(用户最喜爱的十部电影) 2.每个用户的uid和评分的平均值. 3.最大方(评分平均值高 ...

  8. 大数据Hive集成python分析框架—搜狗实验室(用户查询日志)—电影评分分析

    一.Spark 大数据分析框架 1.1 数据结构 1.2 SQL语句简介 二.搜狗实验室(用户查询日志)数据分析 2.1获取数据集并初步分析: 2.2 创建数据库/表--导入数据分析 三.电影评分分析 ...

  9. python数据分析案例2:电影评分数据集的分析

    这里是南京财经大学的Mooc课程的个人学习笔记,课程网址是:https://www.icourse163.org/course/NJUE-1458311167,课程是免费的,老师讲的很好很认真,欢迎学 ...

最新文章

  1. 切换分支 更改只影响当前分支代码_idea 中分支管理操作的意思
  2. The Code Is The Model
  3. 页面性能优化参考建议
  4. 单例在多线程中的使用
  5. (草稿)如何判断一名UiPath开发人员是否合格?
  6. java点名程序界面设计_用Java语言编写一个班级点名的程序
  7. AutoCAD2020简体中文语言包
  8. 小猫跳圈-第12届蓝桥杯Scratch省赛3真题第1题
  9. Unity翻译工具(使用百度翻译)
  10. 什么是决策!决策的定义!决策的本质!大数据决策定义!
  11. 使用keycloak自定义SPI接入外部用户登录
  12. STM32L4时钟系统(转载)
  13. Yann LeCun:大模型方向错了,智力无法接近人类
  14. RAID技术详解与总结
  15. jQ选择器与常用的方法归纳
  16. 文章转载---西工大博士生文言文答辩致谢
  17. Flatpak 不是未来
  18. git重置用户git账号金与密码
  19. Elasticsearch实战 | term: xxx was completely eliminated by analyzer
  20. oracle 首位是字母,Oracle 10g如何对用户姓名,按首字母排序、查询

热门文章

  1. 计算机操作系统第四版题库,汤子瀛计算机操作系统第4版配套题库(含名校考研真题)...
  2. SQL Server 外围应用配置器
  3. java中的IO流之序列化与反序列化(对象数据和文件的读写交互)
  4. 广告行业中那些趣事系列2:BERT实战NLP文本分类任务(附github源码)
  5. Android 获取本地闹钟铃声
  6. scrollTop滚动的时候一直是0
  7. 中学生应该如何选择零食
  8. 永久代退出舞台,元空间腾空出世
  9. PTA 补充题库 7-18 冒泡法排序
  10. byte[]数组相等比较