本文是哈工大大数据分析实验1的完整实验记录,包含环境配置,相关知识介绍以及实验解析。希望对后来人有帮助(新手小白没什么头绪,走一步查一步对应的博客o(╥﹏╥)o),博客链接之间会穿插一些我自己的理解,讲的不是太好,介意的话可以顺序点开推荐博客的链接详细阅读

环境配置

基于Linux的Hadoop伪分布式安装

环境配置方面,我使用ubuntu虚拟机进行hadoop伪分布集群搭建(小白主要怕都装在windows后续其他实验可能会出现环境冲突,但我后来发现,直接在windows底下搭建环境流程的复杂程度也差不多,怕麻烦的话直接无脑windows)

这里有个小tip,就是windows上下载的安装包/其他文件,都可以通过拖拽的形式复制到虚拟机里边,可以不需要再配置共享文件夹(当然如果希望虚拟机和本机上同时共享最新版本的文件,还是需要配置一下的)或者通过ssh才能向虚拟机传文件

具体环境配置可以参考这篇博客,我全程跟下来没有发现错误(至于版本问题大家自己再去查一下其他资料,jdk/hadoop版本过高过低都可能有问题)

基于Linux的Hadoop伪分布式安装

修改配置文件的时候会用到vim,相关的操作命令如下,其实只要掌握"i"和":wq"好像也就够用了

vim命令大全

每次启动HDFS伪分布式系统,在控制台输入的指令如下:

# '#'后为注释,使用时可删去
# 启动hdfs
start-all.sh
# 显示进程:有6个进程表示正常
jps
# 退出hdfs
#如果中途停止实验,在关闭虚拟机之前一定一定要先退出hdfs,要不hdfs会出问题,可能需要重新格式化hdfs
stop-all.sh

Idea安装配置

由于编程需要,我在ubantu也装了idea,此处安装专业版本和社区版本对实验没有影响

因为需要调用hadoop的包,所以需要在idea导入相关的jar包

project structure 》module 》dependencies

无脑勾选<hadoop安装路径>/share/hadoop下的6个文件夹

上传实验数据

这里需要学习一部分HDFS的相关指令,上传文件命令可参考这篇博客,同样的也只需要学习其中几个命令,比如"hdfs dfs -mkdir""hadoop fs -put"差不多够用了

如何上传文件到hdfs?

注意Hadoop3.之后,Namenode的web默认访问端口为9870,已经不是博客写的50070,浏览器直接搜"localhost:9870"就会弹出网页,搜索框里搜索新建文件夹的名字就可以知道是否成功上传文件了

至于其他更多指令,可以查看上面这篇博客引用的另外一篇博客

HDFS基本命令行操作及上传文件的简单API

Hadoop相关知识学习

实验根据指导书的提示,需要用到map-reduce,这里可以参考一波官方文档Hadoop Map/Reduce教程,但是从我个人角度,官方文档对我帮助不大,所以我找了其他参考价值更大的博客

MapReduce程序总览

官方文档主要介绍了各个组件的作用,但是并没有太细致的讲解程序的流程(比如参数传递之类的,但是这个对程序的编写很重要,毕竟使用mapper&reducer时是靠重写里边的函数来具体实现功能的),所以我找了其他几篇详细注释了程序流程的博客

MapReduce基础入门(一)

【其他更多博客可见-涂涂的主页-第5页的某些文章】

Mapper / Reducer

从上面那篇博客中,我们可以学习到

/*** WordCount程序Mapper重写* Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>** KEYIN 是指框架读取到的数据的key的类型,在默认的InputFormat下,读到的key是一行文本的起始偏移量,所以key的类型是Long* VALUEIN 是指框架读取到的数据的value的类型,在默认的InputFormat下,读到的value是一行文本的内容,所以value的类型是String* KEYOUT 是指用户自定义逻辑方法返回的数据中key的类型,由用户业务逻辑决定,在此wordcount程序中,我们输出的key是单词,所以是String* VALUEOUT 是指用户自定义逻辑方法返回的数据中value的类型,由用户业务逻辑决定,在此wordcount程序中,我们输出的value是单词的数量,所以是Integer** 但是,String ,Long等jdk中自带的数据类型,在序列化时,效率比较低,hadoop为了提高序列化效率,自定义了一套序列化框架* 所以,在hadoop的程序中,如果该数据需要进行序列化(写磁盘,或者网络传输),就一定要用实现了hadoop序列化框架的数据类型** Long ----> LongWritable* String ----> Text* Integer ----> IntWritable* Null ----> NullWritable*/static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{@Overrideprotected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {String[] words = value.toString().split(" ");for(String word: words){context.write(new Text(word), new IntWritable(1));}}}
/*** WordCount程序Reducer重写* 首先,和前面一样,Reducer类也有输入和输出,输入就是Map阶段的处理结果,输出就是Reduce最后的输出* reducetask在调我们写的reduce方法,reducetask应该收到了前一阶段(map阶段)中所有maptask输出的数据中的一部分* (数据的key.hashcode%reducetask数==本reductask号),所以reducetaks的输入类型必须和maptask的输出类型一样** reducetask将这些收到kv数据拿来处理时,是这样调用我们的reduce方法的:* 先将自己收到的所有的kv对按照k分组(根据k是否相同)* 将某一组kv中的第一个kv中的k传给reduce方法的key变量,把这一组kv中所有的v用一个迭代器传给reduce方法的变量values*/static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException {int sum = 0;for(IntWritable v: values){sum += v.get();}context.write(key, new IntWritable(sum));}}

从这里边我们大概知道了Mapper&Reducer类的四个参数分别是:输入key,输入value,输出key,输出value;其中Mapper类输出与Reducer输入类型相对应。具体类型可以根据程序实际编写的内容在main函数中通过setMapOutputKeyClass / setMapOutputValueClass & setOutputKeyClass / setOutputValueClass指定,并且为了程序效率通常使用序列化的类型

map函数中有三个参数,前两个参数与Mapper参数列表的前两个相对应,表示输入数据的key和value;第三个参数Context表示map函数的输出,类型与Mapper参数列表后两个相对应;reduce函数也大概如此,但是第二个参数是Mapper产生的相同key对应的不同value的集合,故具体处理时,通常是一个遍历框架

但是在理解学长火炬的过程中,我发现实现功能时,不仅仅只重写了map/reduce函数,还重写了Mapper/Reducer类下的其他函数,所以我又学习了其他博客

MapReduce学习笔记(二)——Mapper、Reducer和Driver

大家也可以通过在idea中ctrl然后选中Mapper/Reducer查看默认类是怎么编写的

Mapper中一共有4个方法分别为seup、run、map、cleanup。setup在run中最先执行,cleanup在run中最后执行,run方法中会循环执行map方法,map方法是我们需要在现在这个类中重写的,这个方法中主要执行业务相关的内容,当map不符合应用要求时,可以试着通过增加setup和cleanup的内容来满足应用的需求

// Mapper类中默认的run函数
public void run(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {this.setup(context);try {while(context.nextKeyValue()) {this.map(context.getCurrentKey(), context.getCurrentValue(), context);}} finally {this.cleanup(context);}}

Reducer大概情况也与Mapper相似

// Reducer类中默认的run函数
public void run(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {this.setup(context);try {while(context.nextKey()) {this.reduce(context.getCurrentKey(), context.getValues(), context);Iterator<VALUEIN> iter = context.getValues().iterator();if (iter instanceof ValueIterator) {((ValueIterator)iter).resetBackupStore();}}} finally {this.cleanup(context);}}

Hadoop数据类型

Hadoop有自己的数据类型,这是因为MapReduce是基于磁盘的计算框架,会产生大量的磁盘IO,从而产生大量的序列化/反序列化操作,而Java自带的Serializable是一个繁杂的完整框架,直接使用它对本就耗费资源的MR来说无疑是雪上加霜,因此Hadoop有着自己的数据类型,分为内建类型和用户自定义类型

内建类型

hadoop包装了java的基本数据类型使他们实现以上的接口而且给予实现细节,这些类都实现了WritableComparable接口,能够在不同的hadoop节点之间毫无障碍的传输了

BooleanWritable : 标准布尔型数值
ByteWritable : 单字节数值
DoubleWritable : 双字节数值
FloatWritable : 浮点数
IntWritable : 整型数
LongWritable : 长整型数
Text : 使用UTF8格式存储的文本
NullWritable : 当<key, value>中的key或value为空时使用

用户自定义

实现WritableComparable或Writable接口即可

但是注意,如果你的自定义类型是要放在value上那么两个接口实现那个都行,但是如果你要放在key上那么必须实现WritableComparable,不然MR分组排序会因为没有比较方法而报错,同时这也是多级排序的实现方法

正式实验

数据抽样

1. 本次实验采取分层抽样的方式,选取D中的某一属性A,按A进行分层抽样,将抽样结果保存至HDFS中命名为D_Sample;
2. Tips:在Map阶段以属性A作为Key,然后在Reduce阶段进行抽样,如果属性是连续属性,可以考虑进行离散化;
【 3.1节中的分层抽样所使用的属性A为user_career属性 】

大致思路是在 Mapper:标出当前行对应的career,输出形式为 (career字段值,本行内容),然后在Reducer:根据不同的career自动分组,每一组按照固定的比例(概率)进行抽样【这部分代码我是照着火炬写的】在编写过程中发现了导包的重要性

import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class Sample {// 本类用于实现分层抽样,使用的属性是user_career(12个属性中的第11个)// Mapper:标出当前行对应的careerpublic static final class SampleMapper extends Mapper<LongWritable,Text, Text,Text> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();// 切割出一个个单词// 注意|前要加转义字符String[] list = line.split("\\|");// career赋值成对应的值Text career = new Text();career.set(list[10]);// 相当于只是在每一行前边加了一个head:careercontext.write(career,value);}}// Reduce:根据不同的career自动分组,每一组按照固定的比例(概率)进行抽样public static final class SampleReducer extends Reducer<Text,Text,Text,Text>{@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {for(Text item:values){// 产生1-100的随机数int num = (int)(Math.random()*100);// 抽样比例为1/5if(num<20){context.write(item,new Text(""));}}}}public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {// 定义文件输入输出路径String input = "hdfs://localhost:9000/dataPreprocess/data.txt";String out = "hdfs://localhost:9000/D_Sample";// 创建conf和jobConfiguration conf=new Configuration();Job job = Job.getInstance(conf,"Sample");// 设置jar包所在路径job.setJarByClass(Sample.class);// 指定Mapper类和Reducer类job.setMapperClass(SampleMapper.class);job.setReducerClass(SampleReducer.class);// 指定maptask输出类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);// 制定reducetask输出类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);// 指定mapreducer程序数据的输入输出路径Path inputPath = new Path(input);Path outputPath = new Path(out);FileInputFormat.setInputPaths(job,inputPath);FileOutputFormat.setOutputPath(job,outputPath);// 提交任务boolean waitForCompletion = job.waitForCompletion(true);System.exit(waitForCompletion?0:1);}}

期间报了好几个错误,在这里大致记录一下【也许有些包不是一定要导入的,大家可以自己判断】:

Exception in thread "main" java.lang.NoClassDefFoundError: org/slf4j/LoggerFactory
at org.apache.hadoop.conf.Configuration.<clinit>(Configuration.java:229)
at Sample.main(Sample.java:57)
Caused by: java.lang.ClassNotFoundException: org.slf4j.LoggerFactory
at java.net .URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 2 more
解决方法:导入commons-logging.jar包,下载网址如下 https://commons.apache.org/proper/commons-logging/download_logging.cgi ,选择bin.tar.gz

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/htrace/core/Tracer$Builder
at org.apache.hadoop.fs.FsTracer.get(FsTracer.java:42)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3460)
at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(FileInputFormat.java:530)
at Sample.main(Sample.java:73)
Caused by: java.lang.ClassNotFoundException: org.apache.htrace.core.Tracer$Builder
at java.net .URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 9 more
解决方法:加入$HADOOP_HOME/share/hadoop/common/lib下的jar包,可以直接添加这个文件作为依赖

最后整个项目的依赖如图:

点击运行就夸夸一顿跑,控制台输出一堆消息,结束后到localhost:9870里查找,可以发现文件成功输出

查找资料后可知,reduce最后输出结果保存在part-r-00000中,后续如果需要利用这一步的结果,就需要用其作为输入文件

MapReduce输出格式:
默认情况下只有一个 Reduce,输出只有一个文件,默认文件名为 part-r-00000,输出文件的个数与 Reduce 的个数一致。 如果有两个Reduce,输出结果就有两个文件,第一个为part-r-00000,第二个为part-r-00001,依次类推

数据过滤

1. 选取D_Sample中的某一连续属性B进行排序,我们认为取值排名在1%~99%之间的值为正常值,
即其它值为奇异值,我们需要过滤掉原始数据集合D中属性B取值为奇异值的数据;
2. 过滤后的数据保存在HDFS中,命名为D_Filter;
【 3.2节中的奇异值过滤所使用的属性B为两个属性,分别是longitude和latitude。为了避免同学们使用额外的流程计算奇异值边界,给定longitude的有效范围为[8.1461259, 11.1993265],latitude的有效范围为[56.5824856, 57.750511],可以在代码中直接使用; 】

做这一步的时候我还以为要调用到排序函数,但是后来发现似乎直接提取有效范围内的数据就可以了(比第一个任务更简单)。因此只需要重写Mapper里的map方法,只有当longitude和latitude取值都落在有效范围内,才保留该条数据(输出给Reducer);对应的main函数只需要在Sample的main函数基础上,改掉一些参数设置就够了

注意,经过我的确认,实验数据是有重复的(我估计可能是同一份数据复制了11份),所以如果重写Reducer没有设置迭代器,而是直接输出(key:""),那么就会达到去重的效果(最终结果只有44.45MB)。如果重写Reducer不希望达到去重效果,那么就得在迭代器里输出,而这种写法就是默认Reducer,所以可以直接不重写,在main中也可不设置 job.setReducerClass (缺省直接调用默认Reducer)

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class Filter {// 本类用于实现数据过滤,使用的属性是longitude和latitude(12个属性中的第2/3个)// 指导书已经给出对应属性的有效范围:longitude的有效范围为[8.1461259, 11.1993265]//                             latitude的有效范围为[56.5824856, 57.750511]// 本类只需要重写Mapper,在Mapper中,只有取值在有效范围内的数据才会被保留public static final class FilterMapper extends Mapper<LongWritable, Text, Text, Text> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 分割操作与在Sample相同String line = value.toString();// 切割出一个个单词// 注意|前要加转义字符String[] list = line.split("\\|");// 定义两个double变量存储longitude和latitudedouble longitude = Double.parseDouble(list[1]);double latitude = Double.parseDouble(list[2]);// 只保留对应属性取值在有效范围内的数据if (longitude >= 8.1461259 && longitude <= 11.1993265 && latitude >= 56.5824856 && latitude <= 57.750511) {context.write(value, new Text(""));}}}// 如果像这样重写Reducer 【直接输出(key:"")】,那么就会达到去重的效果// 如果希望不去重,那么应该在迭代器内输出(那么这和默认的Reduce也没区别)public static final class FilterReducer extends Reducer<Text, Text, Text, Text> {@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {context.write(key,new Text(""));}}// main函数几乎都相同public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {// 定义文件输入输出路径String input = "hdfs://localhost:9000/dataPreprocess/data.txt";String out = "hdfs://localhost:9000/D_Filter";// 创建conf和jobConfiguration conf=new Configuration();Job job = Job.getInstance(conf,"Filter");// 设置jar包所在路径job.setJarByClass(Filter.class);// 指定Mapper类和Reducer类job.setMapperClass(FilterMapper.class);//job.setReducerClass(FilterReducer.class);// 指定maptask输出类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);// 制定reducetask输出类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);// 指定mapreducer程序数据的输入输出路径Path inputPath = new Path(input);Path outputPath = new Path(out);FileInputFormat.setInputPaths(job,inputPath);FileOutputFormat.setOutputPath(job,outputPath);// 提交任务boolean waitForCompletion = job.waitForCompletion(true);System.exit(waitForCompletion?0:1);}}

数据格式转换与归一化

1. 一些数据属性可能存在不同的格式,如日期、温度,我们需要转化为统一的格式;另一些属性则往
往需要归一化;
2. 对D_Filter中的数据进行格式转换与归一化,结果保存在HDFS中,仍然命名为D_Filter;
3. 本实验中建议使用的归一化方式为Min-Max归一化
【 3.3中,数据格式属性涉及到属性user_birthday和review_date,这些日期字段可能使用2018-03-21、2018/03/21、March 21, 2019这些格式,转换为哪种格式取决于同学们自己;temperature有华氏与摄氏两种,同样的,目标格式取决于同学们;需要归一化的属性则是rating 】

这一部分其实统共有三个小任务:日期格式统一(可以通过SimpleDateFormat辅助转换),温度格式统一以及rating归一化,其中rating的归一化需要先统计有效rating值的max和min(注意,rating的缺失值为"?"),我一开始觉得这部分至少需要两轮MR来完成,一轮统计rating的max和min,另一轮转化各个属性值。后来发现只要引入public static的全局变量,就可以在一轮之内完成所有任务:即在mapper中第一遍扫描全部数据,统计出rating的max和min,同时完成格式转化;reducer中进行rating的归一化(利用全局max和min)

static:静态修饰符,被static修饰的变量和方法类似于全局变量和全局方法,可以在不创建对象时调用,当然也可以在创建对象之后调用

这一部分我用到了之前不知道的SimpleDateFormat(其实也是从火炬中学习到了,学长都好牛),在这里贴一个博客链接。其实只需要掌握“利用parse提取对应日期信息”以及“利用format转化成相应的日期格式”就够了。当然理论上直接用正则表达式匹配也能完成相应任务,但是显然SimpleDateFormat更省心一些

Java中SimpleDateFormat的用法介绍(代码示例)
官方文档:Class SimpleDateFormat

使用SimpleDateFormat还需注意符号的意义以及不同个数符号对于表义的影响,比如:

y,表示年份,例如yyyy表示4位数字年份,yy表示2位数字年份
M,表示月份,例如MM表示2位数字月份,MMM表示月份的英文缩写,MMMM表示月份的完整英文名称
d,表示月中的日期,例如dd表示2位数字日期
E,表示星期几,例如EEE表示星期的英文缩写,EEEE表示星期的完整英文名称
H,表示小时(24小时制),例如HH表示2位数字小时数,H表示一位或两位数字小时数
h,表示小时(12小时制),例如hh表示2位数字小时数,h表示一位或两位数字小时数
m,表示分钟,例如mm表示2位数字分钟数
s,表示秒钟,例如ss表示2位数字秒数
S,表示毫秒数,例如SSS表示3位数字毫秒数

完整代码如下:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;
import java.util.regex.Matcher;
import java.util.regex.Pattern;public class FormatNormalize {// 本类用于进行格式转换以及归一化处理// review_date:12个属性中的第5个// temperature:12个属性中的第6个// rating:12个属性中的第7个// user_birthday:12个属性中的第9个// 初始化max和min(全局变量)public static double max = Double.NEGATIVE_INFINITY;public static double min = Double.POSITIVE_INFINITY;public static  String DateFormatConversion(String s) throws ParseException {// 匹配 "March 10,1979"SimpleDateFormat format1 = new SimpleDateFormat("MMMM d,yyyy", Locale.ENGLISH);// 匹配 ”1975-09-21“SimpleDateFormat format2 = new SimpleDateFormat("yyyy-MM-dd");// 匹配 “1997/12/01”SimpleDateFormat format3 = new SimpleDateFormat("yyyy/MM/dd");// 第1种格式:转化成第3种格式if(s.contains(",")){Date date = format1.parse(s);return format3.format(date);}// 第2种格式:转化成第3种格式else if(s.contains("-")){Date date = format2.parse(s);return format3.format(date);}// 第3种格式:直接输出else{return s;}}public static final class FormatNormalizeMapper extends Mapper<LongWritable, Text, Text,Text> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();// 切割出一个个单词// 注意|前要加转义字符String[] list = line.split("\\|");// 1. 更新rating的最大最小值// 如果此时的rating不缺失if(!list[6].equals("?")){double rating = Double.parseDouble(list[6]);if(rating>max){max = rating;}if(rating<min){min = rating;}}// 2. 统一摄氏度和华氏度if(list[5].contains("℉")){float temperature = Float.parseFloat(list[5].substring(0,list[5].length()-1));temperature = (temperature-32)/1.8f;String t = String.format("%.1f",temperature)+"℃";line = line.replace(list[5],t);}// 3. 统一日期信息try {String review_date = DateFormatConversion(list[4]);String user_birthday = DateFormatConversion(list[8]);line = line.replace(list[4],review_date);line = line.replace(list[8],user_birthday);} catch (ParseException e) {e.printStackTrace();}// 4. 对应字段输出到reduce进行进一步处理context.write(new Text(line),new Text(""));}}public static final class FormatNormalizeReducer extends Reducer<Text,Text,Text,Text>{@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {// System.out.println("max:"+max+"    &    min:"+min+"\n\n\n\n");// 一些重复操作String line = key.toString();// 切割出一个个单词// 注意|前要加转义字符String[] list = line.split("\\|");// rating不缺失if(!list[6].equals("?")){double rating = Double.parseDouble(list[6]);// Min-Max归一化rating = (rating-min)/(max-min);line = line.replace(list[6],String.valueOf(rating));}// 如果不在迭代器中输出,那么就会造成去重的效果// 题目没有要求去重,所以还是写了迭代器for(Text value:values){context.write(new Text(line),new Text(""));}}}public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {// 定义文件输入输出路径String input = "hdfs://localhost:9000/D_Filter/part-r-00000";String out = "hdfs://localhost:9000/D_FormatNormalize";// 创建conf和jobConfiguration conf = new Configuration();Job job = Job.getInstance(conf, "FormatNormalize");// 设置jar包所在路径job.setJarByClass(FormatNormalize.class);// 指定Mapper类和Reducer类job.setMapperClass(FormatNormalizeMapper.class);job.setReducerClass(FormatNormalizeReducer.class);// 指定maptask输出类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);// 制定reducetask输出类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);// 指定mapreducer程序数据的输入输出路径Path inputPath = new Path(input);Path outputPath = new Path(out);FileInputFormat.setInputPaths(job, inputPath);FileOutputFormat.setOutputPath(job, outputPath);// 提交任务boolean waitForCompletion = job.waitForCompletion(true);System.exit(waitForCompletion ? 0 : 1);}}

指导书要求最终把结果也输出到D_Filter,但是实际上这样写代码会报错(似乎不允许输出输出同一个文件夹),所以我把结果输出到另一个新建文件夹下了

数据清洗(缺失值填充)

1. D_Filter中某些数据在属性E上可能存在缺失,我们需要使用某种方法对E进行填充,然后将数据保
存在HDFS中,命名为D_Done;
2. 填充策略可以考虑使用默认值,也可以使用平均值、中位数,还可以利用相似度寻找与缺失数据相
似的其他数据,然后借此对缺失的数据进行填充;
【3.4中存在缺失值的属性为rating和user_income,根据先验知识,rating近似依赖于user_income、longitude、latitude和altitude,user_income近似依赖于user_nationality和user_career。对rating和user_income的填充可以利用这些依赖关系;】

这一部分任务我选择使用平均值来填充。待填充属性值包括rating和user_income,但由于rating依赖于user_income,所以我决定分两轮MR完成(当然直接一轮填充,只统计当前的有效数据也说的过去)。第一轮填充user_income,第二轮填充rating

首先我创建了一个数据结构Record,包含两个成员变量sum和num,统计属性值的总和以及个数,方便后期计算出对应的平均值

public class Record {// 本类用于统计当前属性的和以及个数,方便最终求取平均值private double sum;private int num;public Record() {this.sum = 0;this.num = 0;}public double getSum() {return sum;}public void setSum(double sum) {this.sum = sum;}public int getNum() {return num;}public void setNum(int num) {this.num = num;}
}

填充思路很简单,还是用上了全局变量的办法,全局设置总体统计量xxRecord(只要待填充属性值不缺失就纳入统计)以及分类统计量xxCount(利用HashMap,统计相同依赖值下的sum和num)。在mapper中计算统计量,在reducer中提取出依赖属性的值,与分类统计量对比,如果有相同依赖值的记录,取对应的sum和num计算出平均值填充进去;否则拿总体统计量的平均值填充

【当然听我朋友说可以调一个工具包直接进行多元线性回归,我猜也许是OLSMultipleLinearRegression,大家可以去试一试】

两个属性值填充的思路以及代码高度相似,所以在这里我就贴其中一个代码就好了

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;
import java.util.HashMap;public class IncomeFill {// 本类用于填充具有缺失值的属性user_income// 其中user_income近似依赖于user_nationality和user_career//   user_income:12个属性中的第12个//   user_nationality:12个属性中的第10个//   user_career:12个属性中的第11个// 初始化全局变量// 总体统计public static Record incomeRecord = new Record();// 分类统计public static HashMap<String,Record> incomeCount = new HashMap<>();// Mapper:对相关属性值进行统计public static final class IncomeFillMapper extends Mapper<LongWritable, Text, Text,Text> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 老规矩,先切分出各个属性String line = value.toString();// 切割出一个个单词// 注意|前要加转义字符String[] list = line.split("\\|");// 如果此时的income不缺失if(!list[11].contains("?")){// 提取相关属性值String nationality_career = list[10]+list[11];double income = Double.parseDouble(list[11]);// 总体记录先往上添加一个incomeRecord.setNum(incomeRecord.getNum()+1);incomeRecord.setSum(incomeRecord.getSum()+income);// 补充分类记录// 如果之前没有相同nationality_career的记录,则需创建记录if(!incomeCount.containsKey(nationality_career)){Record record = new Record();record.setSum(income);record.setNum(1);incomeCount.put(nationality_career,record);}// 否则只需更新else{Record record = incomeCount.get(nationality_career);record.setSum(record.getSum()+income);record.setNum(record.getNum()+1);}}// 无论如何都无脑输出,不对原数据进行修改context.write(value,new Text(""));}}// Reducer:根据统计信息,填充缺失值public static final class IncomeFillReducer extends Reducer<Text, Text, Text,Text> {@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {// 还是分割字符串的操作String line = key.toString();// 切割出一个个单词// 注意|前要加转义字符String[] list = line.split("\\|");// 只需要对缺失值进行填充修改if(list[11].contains("?")){// 提取相关属性值String nationality_career = list[10]+list[11];// 有相同类的统计数据存在if(incomeCount.containsKey(nationality_career)){// 获取该类的统计平均值Record record = incomeCount.get(nationality_career);double income = record.getSum()/record.getNum();line = line.replace(list[11],String.valueOf(income));}// 否则用全体统计平均值填充else{double income = incomeRecord.getSum()/incomeRecord.getNum();line = line.replace(list[11],String.valueOf(income));}// 修改待输出的keykey = new Text(line);}for(Text value:values){context.write(key,new Text(""));}}}public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {// 定义文件输入输出路径String input = "hdfs://localhost:9000/D_Filter/part-r-00000";String out = "hdfs://localhost:9000/D_IncomeFill";// 创建conf和jobConfiguration conf = new Configuration();Job job = Job.getInstance(conf, "IncomeFill");// 设置jar包所在路径job.setJarByClass(IncomeFill.class);// 指定Mapper类和Reducer类job.setMapperClass(IncomeFillMapper.class);job.setReducerClass(IncomeFillReducer.class);// 指定maptask输出类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);// 制定reducetask输出类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);// 指定mapreducer程序数据的输入输出路径Path inputPath = new Path(input);Path outputPath = new Path(out);FileInputFormat.setInputPaths(job, inputPath);FileOutputFormat.setOutputPath(job, outputPath);// 提交任务boolean waitForCompletion = job.waitForCompletion(true);System.exit(waitForCompletion ? 0 : 1);/*// testRecord record = new Record();record.setSum(88);record.setNum(5);incomeCount.put("trytry",record);Record record1 = incomeCount.get("trytry");System.out.println(record1.getSum()+"   "+record1.getNum());record1.setSum(100);record1.setNum(8);System.out.println(incomeCount.get("trytry").getSum()+"   "+incomeCount.get("trytry").getNum());*/}}

总结

实验到这就结束了,总体来说并不复杂,四个部分的代码都是按要求重写mapper和reducer就够了。整个实验捋下来,反而是配环境最让我痛苦,憨憨配了两回才配成功。

最后得感谢一下学长火炬!!!!!给学长磕磕!如果没有火炬参考,自己从零摸索简直太痛苦了。虽然最后来看几乎就是再复现一遍火炬,但还算是有所收获,也希望大家能自己动手过一遍实验,至少可以稍微学一波知识

学长火炬:哈工大2021年春大数据分析实验整理

最后的最后吐槽一波:CSDN的markdown编辑器也太太太难用了!!!!!!!!生气[○・`Д´・ ○],我调的格式到发布的时候全都没有掉了......

HIT-大数据分析Lab1:数据预处理-实验记录相关推荐

  1. 解读:大数据分析及其数据来源

    当我们谈到大数据分析,首先需要确定数据分析的方向和拟解决的问题,然后才能确定需要的数据和分析范围.大数据驱动的分析主要的挑战不是技术问题,而是方向和组织领导的问题,要确定方向,提出问题,需要对行业做深 ...

  2. 大数据分析的“数据来源”有哪些?

    当我们谈到大数据分析,首先需要确定数据分析的方向和拟解决的问题,然后才能确定需要的数据和分析范围.大数据驱动的分析主要的挑战不是技术问题,而是方向和组织领导的问题,要确定方向,提出问题,需要对行业做深 ...

  3. Python数据分析中数据预处理:编码将文字型数据转换为数值型

    [小白从小学Python.C.Java] [Python-计算机等级考试二级] [Python-数据分析] Python数据分析中 数据预处理:编码 将文字型数据转换为数值型 选择题 对于以下pyth ...

  4. 上海数据分析师培训哪家好,大数据分析与数据分析师有什么区别?

    数据分析可谓是最近的"香饽饽",不少企业在尝过了甜头后,纷纷开始找寻相关的人才,也有许多年轻人看重了数据分析的潜力,想要转行. 但零基础无从下手,这时培训班就成为了一个很好的选择, ...

  5. 淘宝双11大数据分析(数据准备篇)

    文章目录 前言 数据内容分析 `user_log.csv`文件内容含义 `train.csv` 和 `test.csv` 文件内容含义 数据上传到Linux系统并解压 数据集的预处理 文件信息截取 导 ...

  6. 云计算实验4 面向行业背景的大数据分析与处理综合实验

    一. 实验目的 掌握分布式数据库接口Spark SQL基本操作,以及训练综合能力,包括:数据预处理.向量处理.大数据算法.预测和可视化等综合工程能力 二. 实验环境 Linux的虚拟机环境和实验指导手 ...

  7. 【Python数据分析】数据预处理2——数据集成

    数据挖掘需要的数据往往分布在不同的数据源中,数据集成就是将多个数据源合并存放在一个一致的数据存储位置(如数据仓库)中的过程. 一.实体识别 常见的实体识别有: 1.同名异义(两个不同数据源中属性名称相 ...

  8. 【Python数据分析】数据预处理1——数据清洗

    在数据挖掘过程中,数据预处理过程是占比很大的一部分工作 数据预处理过程主要有以下几个部分 1.数据清洗--2.数据集成--3.数据变换--4.数据规约 本文介绍数据清洗部分 一.缺失值分析与处理 1. ...

  9. python数据预处理实验_PimaIndiansdiabetes-数据预处理实验(一)

    有趣的事,Python永远不会缺席! 一.PimaIndiansdiabetes.csv 数据集介绍 1.该数据集最初来自国家糖尿病/消化/肾脏疾病研究所.数据集的目标是基于数据集中包含的某些诊断测量 ...

最新文章

  1. 洛谷专题训练 ——【算法1-1】模拟与高精度
  2. redhad yum 安装mysql_yum安装Mysql
  3. 如何从eclipse迁移到idea
  4. [摘录]高效人士七习惯—从依赖到独立
  5. 数据表_业务地址(Business Address)信息
  6. 用户选择屏幕搜索帮助的代码实现
  7. apache AH01630: client denied by server configuration错误解决方法
  8. 中篇 | 多轮对话机器之话题意图识别
  9. nginx ngx_http_mirror_module模块
  10. 正斜杠( / )和反斜杠( \ )的区别
  11. RPM包实现LAMP
  12. 直播不是造车,特斯拉开店玩直播却栽了个“跟头”
  13. linux下源码安装apache服务
  14. 如何避免_如何避免钢板弹簧受损
  15. Linux中chmod用法
  16. 网卡82546驱动linux,Dell服务器常见Linux驱动选择
  17. myelicpes2019初次使用设置_实况足球2019球员数据编辑器怎么使用
  18. CSS实现折角效果:
  19. 关于段地址和偏移地址的理解
  20. Microsoft Visual Studio 2005中使用水晶报表详细说明

热门文章

  1. (官网)虚幻3--HUD技术指南
  2. js ie8火狐谷歌兼容性问题,不需要判断是哪个浏览器,只需要判断支不支持这个属性
  3. tiup cluster upgrade
  4. 点序AS2258开卡方法,AS2258固态修复指导,AS2258量产工具开卡软件下载
  5. 敏捷CSM认证:什么是产品负责人?
  6. 敏捷CSM认证:Scrum的定义
  7. 视频教程-FFmpeg音视频开发实战6 iOS/Android/windows/Linux-其他
  8. Javascript验证表单身份证号码的合法性
  9. 2018年最新Android的保活方案及效果统计
  10. 量化策略要素_成功的产品发布策略的三个要素