目录

核心思想

Mapper

Reducer

Driver

数据类型

简单案例wordcount

maven工程搭建

代码实现

在集群中测试

序列化

序列化和反序列化

自定义序列化

案例1:计算

案例2:过滤

案例3:排序

案例4:分区

案例5:组合

案例6:Join

mapreduce和mysql

环境准备

案例1:从mysql中读数据

案例2:往mysql中写数据

实际应用场景

案例1:日期转换

案例2:过滤+分区

核心思想

MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。其核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。

分布式的运算程序往往需要分成至少2个阶段。

  • 第一个阶段的MapTask并发实例,完全并行运行,互不相干。
  • 第二个阶段的ReduceTask并发实例互不相干,但是他们的数据依赖于上一个阶段的所有MapTask并发实例的输出。

MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个MapReduce程序,串行运行。

Mapper

  • 用户自定义的Mapper要继承自己的父类
  • Mapper的输入数据是KV对的形式(KV的类型可自定义)
  • Mapper中的业务逻辑写在map()方法中
  • Mapper的输出数据是KV对的形式(KV的类型可自定义)
  • map()方法(MapTask进程)对每一个<K,V>调用一次

Reducer

  • 用户自定义的Reducer要继承自己的父类
  • Reducer的输入数据类型对应Mapper的输出数据类型,也是KV
  • Reducer的业务逻辑写在Reducer()方法中
  • ReduceTask进程对每一组相k的<k,v>组调用一次reduce()方法

Driver

相当于YARN集群的客户端,用于提交我们整个程序到YARN集群,提交的是封装了MapReduce程序相关运行参数的job对象。

数据类型

简单案例wordcount

需求:使用mapreduce统计分析以下城市出现的次数

maven工程搭建

maven工程搭建我们在上一节的笔记中已经详细介绍,大家可以参考:大数据实训笔记3:hdfs

我们在这里只介绍pom.xml中导入的依赖和log4j.properties日志的内容。

<!-- 自定义属性设置版本号 -->
<properties><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><hadoop-version>3.1.3</hadoop-version>
</properties>
<dependencies><!--junit单元测试--><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>RELEASE</version></dependency><!--slf4j日志--><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>2.0.0-alpha2</version></dependency><!--hadoop中的common工具类--><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>${hadoop-version}</version></dependency><!--hadoop中的client--><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop-version}</version></dependency><!--hadoop中的hdfs--><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>${hadoop-version}</version></dependency>
</dependencies>
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

之后我们创建wordcount包,在这个包下面写Mapper, Reducer和Driver类。

代码实现

代码中有详细的注释,在此不再赘述。后续的mapreduce案例均按照此框架进行代码的编写,大家一定要借助这个简单的案例理解透彻。注意复制代码之后导入的包要正确!

//Mapper类
//<>中的数据类型分别为输入key类型、输入value类型、输出key类型、输入value类型
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {private Text k=new Text();                   //输出keyprivate IntWritable v=new IntWritable(1);    //输出value,这里设置它恒为1//重载map方法,只要输入map后回车就可以自动生成方法框架@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();     //获取一行数据String[] split = line.split(" ");   //切割,以空格为分隔符for (String word : split) {         k.set(word);            //设置输出的key为每一个单词context.write(k,v);     //写出,例如(北京, 1), (上海, 1)}}
}
//Reducer类
//<>中的数据类型分别为:
//输入key类型(Mapper输出key类型)、输入value类型(Mapper输出value类型)
//输出key类型(最终输出key类型)、输出value类型(最终输出value类型)
public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {private IntWritable v=new IntWritable();    //输出value//重载reduce方法@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum=0;    //每一个城市出现的次数for (IntWritable value : values) {  //求和,算出一个城市出现了几次sum+=value.get();}v.set(sum);   //输出value为每一个城市出现的次数context.write(key,v);    //写出}
}
//Driver类,用来调度Mapper和Reducer类,最终执行的就是这个类的main方法
public class WordCountDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {//设置输入输出路径//要分析的txt放在input\wordcount文件夹下//输出路径wordcount是不存在的,执行main方法之后自动创建,如果该文件夹已经存在会报错args=new String[]{"D:\\input\\wordcount","D:\\output\\wordcount"};//写一个配置对象Configuration conf=new Configuration();//获取job对象Job job = Job.getInstance(conf);//设置jar位置job.setJarByClass(WordCountDriver.class);//关联mapper和reducerjob.setMapperClass(WordCountMapper.class);job.setReducerClass(WordCountReducer.class);//设置mapper的输出类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);//设置最终输出类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);//设置输入输出路径FileInputFormat.setInputPaths(job,new Path(args[0]));FileOutputFormat.setOutputPath(job,new Path(args[1]));//提交boolean result = job.waitForCompletion(true);System.exit(result?0:1);}
}

执行Driver类中的main方法,我们来到输出路径下,就可以查看输出结果了。 如果输出的中文为乱码,把txt文档的字符编码改为utf-8即可。

在集群中测试

除了在本地运行,我们还可以把刚刚编写的程序打包上传到集群中测试。打包之前,记得把自定义的路径(args)注释掉,否则会报错。

首先,我们需要在pom.xml中加入打包插件。其中的com.mr.wordcount.WordCountDriver是我的Driver类所在位置,大家可以选中自己的Driver类,右键-Copy Reference获取。

<build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>3.6.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin </artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest><mainClass>com.mr.wordcount.WordCountDriver</mainClass></manifest></archive></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins>
</build>

然后,点击右边栏的Maven-mapreduce_demo(项目名称)-Lifecycle,依次运行clean, install, package,就可以在项目下的target文件夹找到我们打包好的jar包了。

将其重命名后,上传到集群测试。Alt+P跳转到上传界面。

sftp> cd /opt/module/hadoop-3.1.3

拖拽上传即可。上传成功后,我们就可以测试并查看结果。

//com.mr.wordcount.WordCountDriver是Driver类的位置,记得更改
//将要统计分析的txt放在input文件夹下
//output文件夹必须不存在,执行后自动生成
[hadoop@hadoop101 hadoop-3.1.3]$ hadoop jar wordcount.jar com.mr.wordcount.WordCountDriver /input /output

之后还会介绍很多案例,也可以用同样的方法打包、上传、测试。

序列化

序列化和反序列化

序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输。

反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。

一般来说,“活的”对象只生存在内存里,关机断电就没有了。而且“活的”对象只能由本地的进程使用,不能被发送到网络上的另外一台计算机。然而序列化可以存储”活的”对象,可以将”活的”对象发送到远程计算机。

Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,Header,继承体系等),不便于在网络中高效传输。所以,Hadoop自己开发了一套序列化机制(Writable)。

自定义序列化

  • 写一个类,必须实现Writable接口
  • 重写序列化方法
  • 重写反序列化方法
  • 写空参构造方法
  • 重写toString()

案例1:计算

需求:对以下学生的语文、数学、英语成绩求每位学生的最高分、最低分、总分、平均分。

//自定义序列化
public class StudentBean implements Writable {//对应txt中的几个变量和我们需要求的变量private String name;private long chinese;private long math;private long english;private long maxScore;private long minScore;private long sumScore;private double avgScore;//序列化@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(name);out.writeLong(chinese);out.writeLong(math);out.writeLong(english);out.writeLong(maxScore);out.writeLong(minScore);out.writeLong(sumScore);out.writeDouble(avgScore);}//反序列化@Overridepublic void readFields(DataInput in) throws IOException {name = in.readUTF();chinese = in.readLong();math = in.readLong();english = in.readLong();maxScore = in.readLong();minScore = in.readLong();sumScore = in.readLong();avgScore = in.readDouble();}//空参构造public StudentBean() {}//有参构造public StudentBean(String name, Long chinese, Long math, Long english) {this.name = name;this.chinese = chinese;this.math = math;this.english = english;this.maxScore = Math.max(Math.max(chinese, math), english);this.minScore = Math.min(Math.min(chinese, math), english);this.sumScore = (chinese + math + english);this.avgScore = this.sumScore / 3.0;}//以下这些右键-Generate-Getter & Setter就可以自动生成public String getName() {return name;}public void setName(String name) {this.name = name;}public Long getChinese() {return chinese;}public void setChinese(Long chinese) {this.chinese = chinese;}public Long getMath() {return math;}public void setMath(Long math) {this.math = math;}public Long getEnglish() {return english;}public void setEnglish(Long english) {this.english = english;}public Long getMaxScore() {return maxScore;}public void setMaxScore(Long maxScore) {this.maxScore = maxScore;}public Long getMinScore() {return minScore;}public void setMinScore(Long minScore) {this.minScore = minScore;}public Long getSumScore() {return sumScore;}public void setSumScore(Long sumScore) {this.sumScore = sumScore;}public Double getAvgScore() {return avgScore;}public void setAvgScore(Double avgScore) {this.avgScore = avgScore;}//重载toString方法,后续Mapper和Reducer会用到//不输出name是因为后续我们用name做key,输出的时候已经有name了,不需要重复输出@Overridepublic String toString() {return chinese +"\t" + math +"\t" + english +"\t" + maxScore +"\t" + minScore +"\t" + sumScore +"\t" + avgScore ;}
}

接下来编写Mapper, Reducer和Driver类,与上个案例类似,这里只给出部分代码。

//Mapper类
public class StudentMapper extends Mapper<LongWritable, Text, Text, StudentBean> {private  Text k = new Text();private  StudentBean v = new StudentBean();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString(); //这是分割的另一种方法,可以按照\t,\n,空格等进行分割                    StringTokenizer sti = new StringTokenizer(line);    v.setName(sti.nextToken());v.setChinese(Long.parseLong(sti.nextToken()));v.setMath(Long.parseLong(sti.nextToken()));v.setEnglish(Long.parseLong(sti.nextToken()));k.set(v.getName());context.write(k, v);}
}
//Reducer类
public class StudentReducer extends Reducer<Text, StudentBean, Text, StudentBean> {@Overrideprotected void reduce(Text key, Iterable<StudentBean> values, Context context) throws IOException, InterruptedException {StudentBean bean = values.iterator().next();//计算在这步完成,我们编写的有参构造函数会帮我们完成计算StudentBean v = new StudentBean(bean.getName(), bean.getChinese(), bean.getMath(), bean.getEnglish());context.write(key, v);}
}

运行结果如下:

案例2:过滤

需求:根据日志信息,统计每个联系方式上网流量,上行流量、下行流量、总流量。

这个案例与案例1类似,大家可以自行编写。不过观察数据,我们会发现,有一些数据为空。因此,在Mapper的编写中,要格外注意。

//在Mapper类中,获取上行流量与下行流量时
//因为不一定有ip地址,因此不能直接写split[3]
v.setUpFlow(Long.parseLong(split[split.length - 3]));
v.setDownFlow(Long.parseLong(split[split.length - 2]));
//Reducer类
public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> {@Overrideprotected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {long sumUpFlow = 0;long sumDownFlow = 0;//对电话号码相同的用户,计算出其上下行流量的总和for (FlowBean bean : values) {          sumUpFlow += bean.getUpFlow();sumDownFlow += bean.getDownFlow();}FlowBean v = new FlowBean(key.toString(), sumUpFlow, sumDownFlow);context.write(key, v);}
}

运行结果如下:

案例3:排序

需求:将数据先按年龄升序排序,如果年龄相等,再比较身高。

和前面的案例相似,只是需要多写一个MySort类负责比较。

//MySort类
public class MySort implements WritableComparable<MySort2> {private int age;private int height;//相等返回0,小于返回-1,大于返回1@Overridepublic int compareTo(MySort2 other) {if(this.age == other.age) {if(this.height == other.height) {return 0;}else if(this.height < other.height) {return  -1;}else {return 1;}}else if(this.age < other.age) {return -1;}else {return 1;}}//序列化@Overridepublic void write(DataOutput out) throws IOException {out.writeInt(age);out.writeInt(height);}//反序列化@Overridepublic void readFields(DataInput in) throws IOException {age = in.readInt();height = in.readInt();}public MySort() {}public MySort(int age, int height) {this.age = age;this.height = height;}public int getAge() {return age;}public void setAge(int age) {this.age = age;}public int getHeight() {return height;}public void setHeight(int height) {this.height = height;}@Overridepublic String toString() {return age + height + "";}@Overridepublic int hashCode() {return age + height;}
}
//Mapper类中,将输出类型设置为MySort
private MySort k = new MySort();
k.setAge(v.getAge());
k.setHeight(v.getHeight());

案例4:分区

需求:在案例3的基础上,将数据按照年龄分为3个区。

//MyPartioner自定义分区
//泛型参数是mapper输出数据的key和value的数据类型
public class MyPartioner extends Partitioner<MySort, WorkSort> {@Overridepublic int getPartition(MySort key, WorkSort value, int num) {//按照年龄分区//num代表在年龄范围内平均分成几个区return value.getAge()/(100/num);}
}
//在Driver类中
//设置分区
job.setPartitionerClass(MyPartioner.class);
job.setNumReduceTasks(3);

案例5:组合

需求:在案例3的基础上,统计各组中身高超过170的附近人数。

//MyGroupingComparator类
public class MyGroupingComparator extends WritableComparator {@Overridepublic int compare(WritableComparable a, WritableComparable b) {//key1,key2是mapper输出的keyMySort2 key1 = (MySort2) a;MySort2 key2 = (MySort2) b;//身高在170以上的返回0,可以把key所对应的value组合放到同一key的values中if(key1.getHeight() > 170 && key2.getHeight() > 170)return  0;elsereturn -1;}public MyGroupingComparator() {//创建适合key的实例super(MySort2.class, true);}
}
//在Driver类中
//设置看哪些数据可以在reducer端进行一次处理
job.setGroupingComparatorClass(MyGroupingComparator.class);
job.setNumReduceTasks(1);

案例6:Join

需求:统计person表和score表有关联的数据信息。(score表第二列是person表的第一列id)

//Record类
public class Record implements Writable {private int isPerson;   //0表示score, 1表示personprivate String content; //输出内容//......
}

除了像上述几个案例那样将Mapper, Reducer和Driver类分开编写,我们还可以把它们作为一个类下的子类。

//MyJoin类
public class MyJoin {//Mapper类public static class JoinMapper extends Mapper<LongWritable, Text, IntWritable, Record> {private IntWritable k = new IntWritable();  //存放personIdprivate Record v = new Record();            //存放输出内容@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String [] fields = value.toString().split("\t");    //获取一行数据,以tab为分隔符int personId = 0;if(fields.length == 6) {    //如果长度为6,说明为person表personId = Integer.parseInt(fields[0]);     //person表的第一列为personIdv.setIsPerson(1);v.setContent(fields[1] + "\t" + fields[2] + "\t" + fields[3] + "\t" + fields[4] + "\t" + fields[5]);}else {  //否则是score表personId = Integer.parseInt(fields[1]);     //score表的第二列为personIdv.setIsPerson(0);v.setContent(fields[2] + "\t" + fields[3]);}k.set(personId);context.write(k, v);}}//Reducer类public static class JoinReducer extends Reducer<IntWritable, Record, IntWritable, Text> {private Text v = new Text();@Overrideprotected void reduce(IntWritable key, Iterable<Record> values, Context context) throws IOException, InterruptedException {String personStr = null;List<String> scoreList = new ArrayList();    //一个人可能有多门课的成绩for  (Record record: values) {if (record.getIsPerson() == 1){personStr = record.getContent();}else {scoreList.add(record.getContent());}}for (String score: scoreList) {v.set(personStr + "\t" + score);context.write(key, v);}}}//相当于Driver类public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {//......Job job = Job.getInstance(conf, "queryMr");//......}
}

mapreduce和mysql

环境准备

自行安装mysql,可以安装Navicat,便于数据库操作。

在mysql中,新建mapreduce数据库。在mapreduce数据库下,新建tb_student表。设计如下:

插入几条数据:

在pom.xml中导入mysql依赖。注意替换成自己的版本。

<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.19</version>
</dependency>

案例1:从mysql中读数据

需求:统计学生信息中每个年龄的学生数量。

//Student类,注意一定要DBWritable类
public class Student implements DBWritable, Writable {//存放从mysql中读的数据private int sid;private String sname;private String sex;private int age;private String birthday;//序列化和反序列化的代码自行编写//从数据库中读取数据@Overridepublic void readFields(ResultSet resultSet) throws SQLException {this.sid = resultSet.getInt("sid");this.sname = resultSet.getString("sname");this.sex = resultSet.getString("sex");this.age = resultSet.getInt("age");this.birthday = resultSet.getString("birthday");}//其他方法自行编写
}
//ReadSql类,负责从MySql中读取数据
public class ReadSql {//Mapper类public static class ReadDBMapper extends Mapper<LongWritable, Student, IntWritable, IntWritable> {private IntWritable k = new IntWritable();private IntWritable v = new IntWritable(1);     //默认为1@Overrideprotected void map(LongWritable key, Student value, Context context) throws IOException, InterruptedException {k.set(value.getAge());    //按照需求,以年龄为keycontext.write(k, v);}}//Reducer类public static class ReadDBReducer extends Reducer<IntWritable, IntWritable, Text, NullWritable> {private Text k = new Text();@Overrideprotected void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int count = 0;for (IntWritable value: values) {count += value.get();}k.set(key.get() + "岁的学生有" + count + "人");context.write(k, NullWritable.get());}}//相当于Driver类public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();//连接数据库//localhost连接本地数据库,3306为端口,字符集为utf-8,时区为GMT(这一项不设置我会报错,大家可以自行调整)//最后两个参数是用户名和密码DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://localhost:3306/mapreduce?characterEncoding=utf-8&serverTimezone=GMT", "root", "passwd");Job job = Job.getInstance(conf, "mrsql");//......//表中的属性名,不能出错,否则无法读取数据String[] fields = {"sid,", "sname", "sex", "age", "birthday"};//设置输入输出路径DBInputFormat.setInput(job, Student.class, "tb_student", null, "sid", fields);Path outPath = new Path("D:\\output\\sql");FileSystem fs = outPath.getFileSystem(conf);//如果输出路径存在,删除if(fs.exists(outPath)) {fs.delete(outPath, true);}FileOutputFormat.setOutputPath(job, outPath);//......}
}

案例2:往mysql中写数据

需求:把文件中数据写入到mysql中。

//在Student类中
//写入数据库
@Override
public void write(PreparedStatement statement) throws SQLException {statement.setInt(1, sid);statement.setString(2, sname);statement.setString(3, sex);statement.setInt(4, age);statement.setString(5, birthday);
}
//WriteSql类,负责往MySql中写入数据
public class WriteSql {//Mapper类和Reducer类自行编写//相当于Driver类public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {//......//设置输入输出路径FileInputFormat.setInputPaths(job, new Path("D:\\input\\sql"));String[] fields = {"sid", "sname", "sex", "age", "birthday"};DBOutputFormat.setOutput(job, "tb_student", fields);//......}
}

在MySql中查看,数据写入成功。

实际应用场景

可以看到,这次的文件中有一万多条,是疫情时期的真实数据。使用mapreduce能很快地对这么多数据进行统计分析。

案例1:日期转换

需求:把文件数据日期转换成xxxx年xx月xx日。

public class Date {//Mapper类public static class DateFormatMapper extends Mapper<LongWritable, Text,Text, NullWritable> {private Text k=new Text();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//获取一行数据String text=new String(value.getBytes(),0,value.getLength(),"GBK");//以逗号分割,因为我们的文件是.csvString[] line = text.split(",", -1);//计算日期String date="2021年"+line[0];k.set(date+","+line[1]+","+line[2]+","+line[3]+","+line[4]+","+
line[5]+","+line[6]+","+line[7]);context.write(k,NullWritable.get());}}//Reducer和Driver类自行编写
}

日期转换成功。把输出的文件后缀改为csv就可以用excel表格更清楚地查看。

案例2:过滤+分区

需求:对于2021年疫情数据,过滤出省份为湖北省的数据,非湖北省的数据单独放在一个文件夹中,只需要前6列。

public class Data implements WritableComparable<Data> {//定义6个变量用来存储6列数据//都设置为String是因为后四个数据都可能为空,不能设置为intprivate String date;private String city;private String district;private String sick;private String discharge;private String death;//重载比较方法,使最后数据能够按顺序输出@Overridepublic int compareTo(Data o) {int i = date.compareTo(o.date);int j = city.compareTo(o.city);int m = district.compareTo(o.district);int n = sick.compareTo(o.sick);int g = discharge.compareTo(o.discharge);int h = death.compareTo(o.death);if (i == 0) {if (j == 0) {if (m == 0) {if (n == 0) {if (g == 0) {return h;} else {return g;}} else {return n;}} else {return m;}} else {return j;}} else {return i;}}@Overridepublic boolean equals(Object o) {if (this == o)return true;if (o == null || getClass() != o.getClass())return false;Data data = (Data) o;return Objects.equals(date, data.date) &&Objects.equals(city, data.city) &&Objects.equals(district, data.district) &&Objects.equals(sick, data.sick) &&Objects.equals(discharge, data.discharge) &&Objects.equals(death, data.death);}@Overridepublic int hashCode() {return Objects.hash(date, city, district, sick, discharge, death);}//序列化和反序列化以及其他方法自行编写
}
public class Hubei {//过滤出湖北省的分区public static class DataPartioner extends Partitioner<Data, NullWritable> {//重写分区方法@Overridepublic int getPartition(Data data, NullWritable nullWritable, int numPartitions) {//比较开头是否为湖北,是湖北就符合条件,筛选出来if(data.getCity().startsWith("湖北")){return 0;}return 1;}}//Mapper, Reducer和Driver类自行编写//删除输出路径下的一切文件夹和文件public static void deleteFile(File file){File[] files = file.listFiles();if(files!=null&&files.length!=0){for (int i = 0; i < files.length; i++) {deleteFile(files[i]);}}file.delete();}public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {args = new String[]{"D:\\input\\covid19","D:\\output\\covid19.2"};File file=new File(args[1]);if(file.exists()){deleteFile(file);initDriver(args);}else{initDriver(args);}}
}

大数据实训笔记4:mapreduce相关推荐

  1. 大数据实训笔记10:hive的应用

    目录 数据定义 数据仓库操作 数据仓库的创建 数据仓库的查询 数据仓库的修改 数据仓库的删除 表操作 内部表 外部表 分区表 桶表 表的修改 表的删除 视图 数据操作 数据导入 加载数据 插入数据 数 ...

  2. 大数据实训笔记1:hadoop环境搭建及单机模式

    必备环境 VMware Centos 7 SwitchHosts!(可以不用) SecureCRT hadoop-3.1.3.tar jdk-8u212-linux-x64.tar hadoop环境搭 ...

  3. 搭建高校AI大数据实训室,2019高校大数据科研教学整体解决方案,数道云

    伴随着互联网技术的迅猛发展,正在逐步改变传统的高校教育模式,以大数据.云计算.AI等等技术为核心的教育模式正在逐步发展. 高校实行AI大数据实训室有何实质性的效果呢? 大数据的出现催生出产业人才缺口瓶 ...

  4. 大数据实训报告_教学大数据实训平台解决方案_德拓信息_上海市徐汇区

    大数据实训 方案简介 德拓开发出多套专业的教学体系,为大数据.云计算.AI等专业的师生提供全套完整的教学实训实战方案.DSight智慧实验室包括教学实训.项目实战.科研应用三大模块,同时还提供对应的课 ...

  5. 湖北高校实用的大数据平台,专业的高校大数据实训平台解决方案,波若高校实训平台...

    大数据工程教学实训平台又称大数据挖掘实战中心,将云存储资源.服务器资源和网络资源整合,然后通过Vmware等虚拟化搭建私有云平台,在私有云平台上搭建统一的数据挖掘平台和基于Hadoop的大数据分析平台 ...

  6. 大数据实训室课程体系设计案例分享

    大数据课程体系 大数据实训课程体系设计依据 一.培养目标:大数据技术与应用专业主要培养大数据应用与工程技术领域的复合型高级技术人才.毕业生具有信息科学和数据科学基础知识与基本技能,掌握大数据技术所需要 ...

  7. 大数据实训基地建设方案分享

    大数据实训室建设方案 中高职及本科在大数据专业建设所遇到的困难 大数据.信息安全.人工智能等新信息技术产业发展迅猛,人才极其匮乏,各个本科及职业院校纷纷开设相应的专业方向.但是,绝大多数院校因为师资和 ...

  8. 大数据实训室助力国家高校人才梯队建设

    5天,60小时,不间断的持续学习,是什么内容使得已经工作多年的教师还能如此热情高涨的学习?答案就是:新华三大数据教师培训. 2016年11月21~25日,新华三集团在杭州总部举办"2016新 ...

  9. 【数据清洗预处理——四】使用大数据实训云

    数据清洗与预处理--四 大数据实训云 1.登录实训云 2 .进入实训云 创建网络 查看网络拓扑 创建路由 查看网络拓扑 连接网络 创建端口 添加安全组 创建实例 1.实例规划 2.开始创建 测试连通性 ...

最新文章

  1. 一个简单的freemark输入输出的案例(一)
  2. 我擦!没想到你们都是这样 “劝退” 员工的!
  3. java1.8 新特性
  4. 蓝桥杯小朋友排队java_[蓝桥杯][历届试题]小朋友排队 (C++代码)
  5. mysql float 误差_mysql下float类型使用一些误差详解
  6. 【第四篇章-android平台MediaCodec】推断是否支持硬件解码码
  7. xUtils3 图片加载模块
  8. c语言- 负号运算符,C语言运算符盘点,C语言运算符知识点讲解
  9. 关于印发医疗联合体管理办法(试行)的通知
  10. CSMA/CD协议详解
  11. 关于 国产麒麟系统使用killall命令杀死模糊匹配进程失败“未找到该进程” 的解决方法
  12. 【解决方案】谈公众号红包的正确打开方式--传奇创世
  13. appearance
  14. LeetCode #743 Network Delay Time
  15. qt 三方源码 画饼图_[源码和文档分享]基于VC++和QT实现的图的可视化工具
  16. html+css实战174-SEO
  17. 小程序 引入computed报错:Behaviors should be constructed with Behavior()
  18. 2010年IT薪酬:Java,Python,Window…
  19. 《PyInstaller打包实战指南》第十一节 其他进阶命令
  20. Java第三章刘珊珊同学参加了Java课程的学习,她父亲和母亲承诺。

热门文章

  1. mybatis-generator-gui使用详解
  2. 不仅仅是程序本身——基础IO
  3. 单电源运放电路的基本偏置方法
  4. 电脑将图片转为excel表格的几种常用方法
  5. spring boot 整合 spring cache 简单使用
  6. 手把手教你制作车载DVD音乐光盘
  7. 银行结算户和储蓄户区别
  8. Nginx系列--介绍/官网等
  9. 我拿到了腾讯最爱考的数据分析面试题
  10. 搭建自己的https服务器详解