hadoop之MapReduce学习

MapReduce概述

MapReduce定义

MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。

MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并行运行在一个Hadoop集群上。

MapReduce优缺点

优点

  • 易于编程

  • 良好的扩展性

    当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展它的计算能力。

  • 高容错性

    MapReduce设计的初衷就是使程序能够部署在廉价的PC机器上,这就要求它具有很高的容错性。比如其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由Hadoop内部完成的。

  • 适合PB级别以上海量数据的利离线处理

    可以实现上千台服务器集群并发工作,提供数据处理能力

缺点

  • 不擅长实时计算

  • 不擅长流式计算

  • 不擅长DAG(有向无环图)计算

    多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce并不是不能做,而是使用后,每个MapReduce作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常的低下。

MapReduce核心思想

(1)分布式的运算程序往往需要分成至少2个阶段。
(2)第一个阶段的MapTask并发实例,完全并行运行,互不相干。
(3)第二个阶段的ReduceTask并发实例互不相干,但是他们的数据依赖于上一个阶段的所有MapTask并发实例的输出。
(4)MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个   MapReduce程序,串行运行。
总结:分析WordCount数据流走向深入理解MapReduce核心思想。

MapReduce进程

一个完整的MapReduce程序在分布式运行时有三类实例进程

(1)MrAppMaster:负责整个程序的过程调度及状态协调。

(2)MapTask:负责Map阶段的整个数据处理流程。

(3)ReduceTask:负责Reduce阶段的整个数据处理流程。

hadoop常用数据序列化类型

MapReduce编程规范

用户编写的程序分成三个部分:Mapper、Reducer和Driver。

1.Mapper阶段

2.Reduce阶段

3.Driver阶段

相当于YARN集群的客户端,用于提交我们整个程序到YARN集群,提交的是封装了MapReduce程序相关运行参数的job对象

WordCount案例实操

编写Mapper

package com.pihao.mr;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*** 以wordCount案例为例:* 自定义Mapper类,需要继承Hadoop提供的Mapper,并且根据具体业务指定输入数据和输出数据的类型*  输入数据的类型*  KEYIN, 读取文件的偏移量 数字(LongWritable)*  VALUEIN, 读取文件的一行数据 文本(Text)**  输出数据的类型* KEYOUT,  输出数据的key的类型,一个单词(Text)* VALUEOUT 输出数据value的类型,给单词的标记数字(IntWritable)*/
public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {private Text outKey = new Text();private IntWritable outValue = new IntWritable(1);/*** Map阶段的核心业务处理方法,每输入一行数据会调用一次map方法* @param key* @param value* @param context* @throws IOException* @throws InterruptedException*/@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//获取当前输入的数据String line = value.toString();String[] datas = line.split(" ");for (String data : datas) {//遍历集合 封装 输出数据的key,valueoutKey.set(data);context.write(outKey,outValue);}}
}

编写Reduce

package com.pihao.mr;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/*** 以wordCount案例为例:* 自定义Reducer类,需要继承Hadoop提供的Reducer,并且根据具体业务指定输入数据和输出数据的类型*  输入数据的类型*  KEYIN, Map端的输出的Key的输出类型(Text)*  VALUEIN, Map端的输出的Value的输出类型(IntWritable)**  输出数据的类型* KEYOUT,  输出数据的Key的类型,就是一个单词(Text)* VALUEOUT 输出数据的Value的类型,单词出现的总次数(IntWritable)*/
public class WordCountReducer extends Reducer<Text, IntWritable, Text,IntWritable> {private Text outKey = new Text();private IntWritable outValue = new IntWritable();/*** Reduce阶段的核心业务处理方法:一组相同的Key的values会调用一次reduce方法* @param key* @param values* @param context* @throws IOException* @throws InterruptedException*/@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int total = 0;//遍历valuesfor (IntWritable value:values){//累加,输出total += value.get();}//封装key和valueoutKey.set(key);outValue.set(total);context.write(outKey,outValue);}
}

编写Driver

package com.pihao.mr;import com.sun.org.apache.bcel.internal.generic.NEW;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/*** MR程序的驱动类:主要用于MR任务*/
public class WordCountDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration configuration = new Configuration();// 声明一个Job对象Job job = Job.getInstance(configuration);//指定当前job的驱动类job.setJarByClass(WordCountDriver.class);//指定当前Job的Mapper和Reducerjob.setMapperClass(WordCountMapper.class);job.setReducerClass(WordCountReducer.class);//指定Map端输出数据的key,value的数据类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);//指定最终输出结果的key,value的数据类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);//指定输入数据的目录和输出数据的目录FileInputFormat.setInputPaths(job,new Path("D:/测试数据/wcinput/hello.txt"));FileOutputFormat.setOutputPath(job,new Path("D:/测试数据/wcoutput"));// FileInputFormat.setInputPaths(job,new Path(args[0])); 打包到linux服务器时用// FileOutputFormat.setOutputPath(job,new Path(args[1]));//提交jobjob.waitForCompletion(true); // true表示可以监控}
}

执行过程

集群测试

本地测试简单,直接在IDEA中执行就能运行

将刚才创建的maven工程打成jar包:wc.jar,放在linux服务器上,群启hadoop集群,执行命令,注意复制主类的类路径:com.pihao.mr.WordCountDriver

[atguigu@hadoop102 software]$ hadoop jar  wc.jar com.pihao.mr.WordCountDriver /wcinput /wcoutput

可以查看YARN web页面监控执行的任务 hadoop103:8080

Hadoop序列化

序列化概述

自定义bean对象实现序列化接口Writable

在企业开发中往往常用的基本序列化类型不能满足所有需求,比如在Hadoop框架内部传递一个bean对象,那么该对象就需要实现序列化接口Writable

package writable;import org.apache.hadoop.io.Writable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;/*** 流量对象:实现Hadoop的序列化*/
public class FlowBean implements Writable {private Integer upFlow;private Integer downFlow;private Integer sumFlow;/*** 序列化方法* @param out* @throws IOException*/@Overridepublic void write(DataOutput out) throws IOException {out.writeInt(upFlow);out.writeInt(downFlow);out.writeInt(sumFlow);}/*** 反序列化方法,这里的反序列顺序一定要和序列化的保持一致* @param in* @throws IOException*/@Overridepublic void readFields(DataInput in) throws IOException {upFlow = in.readInt();downFlow = in.readInt();sumFlow = in.readInt();}
}

序列化案例实操

需求:统计每一个手机号耗费的总上行流量、总下行流量、总流量

原日志的数据样式

期望输出的数据样式

序列化案例分析

回顾我们之前的wordCount的案例,这里最后的输出值已经不再是一个数字了,而是有多个字段:号码,上传流量,下载流量。hadoop自带的对象已经无法满足我们的需求,因此需要重新创建一个类对象

自定义业务对象,并实现序列化

package writable;import org.apache.hadoop.io.Writable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;/*** 流量对象:实现Hadoop的序列化*/
public class FlowBean implements Writable {private Integer upFlow;private Integer downFlow;private Integer sumFlow;//xxx getter setter 略public void setSumFlow(){this.sumFlow = this.upFlow+this.downFlow;}@Overridepublic String toString() {return upFlow+"\t"+downFlow+"\t"+sumFlow;}/*** 序列化方法* @param out* @throws IOException*/@Overridepublic void write(DataOutput out) throws IOException {out.writeInt(upFlow);out.writeInt(downFlow);out.writeInt(sumFlow);}/*** 反序列化方法* @param in* @throws IOException*/@Overridepublic void readFields(DataInput in) throws IOException {upFlow = in.readInt();downFlow = in.readInt();sumFlow = in.readInt();}
}

编写Mapper

package writable;import com.sun.org.apache.bcel.internal.generic.NEW;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class FlowMapper extends Mapper<LongWritable, Text,Text, FlowBean> {private Text outKey = new Text();private FlowBean outValue = new FlowBean();/*** 核心业务逻辑处理* @param key* @param value* @param context* @throws IOException* @throws InterruptedException*/@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//获取当前行数据String line = value.toString();//切割数据String[] phoneDatas = line.split("\t");//获取输出数据的key(手机号)//1    13736230513 192.196.100.1   www.baidu.com   2481    24681   200outKey.set(phoneDatas[1]);;//获取输出数据的valueoutValue.setUpFlow(Integer.parseInt(phoneDatas[phoneDatas.length-3]));outValue.setDownFlow(Integer.parseInt(phoneDatas[phoneDatas.length-2]));outValue.setSumFlow();//将数据写出context.write(outKey,outValue);}}

编写Reducer

package writable;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class FlowReducer extends Reducer<Text,FlowBean,Text,FlowBean> {private FlowBean outValue = new FlowBean();/*** 核心业务逻辑处理* @param key* @param values* @param context* @throws IOException* @throws InterruptedException*/@Overrideprotected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {//遍历当前一组相同key的valuesint totalUpFlow = 0;int totalDownFlow = 0;for (FlowBean item:values) {totalUpFlow += item.getUpFlow();totalDownFlow += item.getDownFlow();}//封装输出数据的valueoutValue.setUpFlow(totalUpFlow);outValue.setDownFlow(totalDownFlow);outValue.setSumFlow();//结果写出context.write(key,outValue);}
}

编写驱动类

package writable;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class FlowDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration configuration = new Configuration();Job job = Job.getInstance(configuration);job.setJarByClass(FlowDriver.class);job.setMapperClass(FlowMapper.class);job.setReducerClass(FlowReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);FileOutputFormat.setOutputPath(job,new Path("C:\\大数据学习\\result.txt"));FileInputFormat.setInputPaths(job,new Path("D:\\测试数据\\phone_data"));job.waitForCompletion(true);}
}

MapReduce框架原理

InputFormat数据输入

MapReduce数据流

MapTask并行度决定机制

数据切片与MapTask并行度决定机制

MapReduce的执行大致流程

简易版: InputFormat  -->   Mapper     -->     Reducer        --> OutputFormat
详细版: InputFormat  --> map   sort   -->  copy sort reduce  --> OutputFormat

InputFormat过程分析

InputFormat过程主要包括两个阶段:切片和读取数据

切片

1.切片就是从文件的逻辑上进行大小的切分,一个切片多大,将来一个MapTask处理数据就有多大

2.一个切片就会产生一个MapTask

3.切片时只考虑文件本身,不考虑数据的整体性

4.切片大小和切块大小默认时一致的,这样设计目的就是为了将来切片有跨机器的情况

InputFormat的体系结构

  • FileInputFormat InputFormat的子实现类,实现切片逻辑, 实现了getSplits() 负责切片

  • TextInputFormat FileInputFormat的子实现类, 实现读取数据的逻辑,createRecordReader() 返回一个RecordReader,在RecordReader中实现了读取数据的方式:安行读取。

  • CombineFileInputFormat FileInputFormat的子实现类,此类中也实现了 一套切片逻辑 (处理:适用于小文件计算场景。)

Hadoop默认的切片规则(源码角度分析)

// 切片源码
public List<InputSplit> getSplits(JobContext job) throws IOException {StopWatch sw = new StopWatch().start();// minSize = 1(默认情况) // 但是我们也可以通过改变mapreduce.input.fileinputformat.split.minsize 配置项来改变minSize大小long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));// maxSize = Long的最大值(默认情况)// 但是我们也可以通过改变mapreduce.input.fileinputformat.split.maxsize 配置项来改变maxSize大小long maxSize = getMaxSplitSize(job);// 管理最终切完片的对象的集合 最终返回的就是此集合List<InputSplit> splits = new ArrayList<InputSplit>();// 获取当前文件的详情List<FileStatus> files = listStatus(job);boolean ignoreDirs = !getInputDirRecursive(job)&& job.getConfiguration().getBoolean(INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, false);// 遍历获取到的文件列表,一次按照文件为单位进行切片  for (FileStatus file: files) {// 如果是忽略文件以及是文件夹就不进行切片if (ignoreDirs && file.isDirectory()) {continue;}// 获取文件的路径Path path = file.getPath();// 获取文件的内容大小long length = file.getLen();// 如果不是空文件 继续切片if (length != 0) {// 获取文件的具体的块信息BlockLocation[] blkLocations;if (file instanceof LocatedFileStatus) {blkLocations = ((LocatedFileStatus) file).getBlockLocations();} else {FileSystem fs = path.getFileSystem(job.getConfiguration());blkLocations = fs.getFileBlockLocations(file, 0, length);}// 判断是否要进行切片(主要判断当前文件是否是压缩文件,有一些压缩文件时不能够进行切片)if (isSplitable(job, path)) {// 获取HDFS中的数据块的大小long blockSize = file.getBlockSize();// 计算切片的大小--> 128M 默认情况下永远都是块大小long splitSize = computeSplitSize(blockSize, minSize, maxSize);-- 内部方法:protected long computeSplitSize(long blockSize, long minSize,long maxSize) {return Math.max(minSize, Math.min(maxSize, blockSize));}long bytesRemaining = length;// 判断当前的文件的剩余内容是否要继续切片 SPLIT_SLOP = 1.1// 判断公式:bytesRemaining)/splitSize > SPLIT_SLOP// 用文件的剩余大小/切片大小 > 1.1 才继续切片(这样做的目的是为了让我们每一个MapTask处理的数据更加均衡)while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);splits.add(makeSplit(path, length-bytesRemaining, splitSize,blkLocations[blkIndex].getHosts(),blkLocations[blkIndex].getCachedHosts()));bytesRemaining -= splitSize;}// 如果最后文件还有剩余且不足一个切片大小,最后再形成最后的一个切片if (bytesRemaining != 0) {int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,blkLocations[blkIndex].getHosts(),blkLocations[blkIndex].getCachedHosts()));}} else { // not splitableif (LOG.isDebugEnabled()) {// Log only if the file is big enough to be splittedif (length > Math.min(file.getBlockSize(), minSize)) {LOG.debug("File is not splittable so no parallelization "+ "is possible: " + file.getPath());}}splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),blkLocations[0].getCachedHosts()));}} else { //Create empty hosts array for zero length filessplits.add(makeSplit(path, 0, length, new String[0]));}}// Save the number of input files for metrics/loadgenjob.getConfiguration().setLong(NUM_INPUT_FILES, files.size());sw.stop();if (LOG.isDebugEnabled()) {LOG.debug("Total # of splits generated by getSplits: " + splits.size()+ ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));}return splits;
}

FileInputFormat

TextInputFormat

CombineTextInutFormat

框架默认的FileInputFormat切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个MapTask,这样如果有大量小文件,就会产生大量的MapTask,处理效率极其低下。

应用场景

CombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个MapTask处理。

虚拟存储切片最大值设置

//指定InputFormat的具体实现
job.setInputFormatClass(CombineTextInputFormat.class);
//指定CombineTextInputFormat中切片的最大值
CombineTextInputFormat.setMaxInputSplitSize(job,4194304); //4M

切片机制

生成切片过程包括:虚拟存储过程和切片过程二部分

(1)虚拟存储过程:
将输入目录下所有文件大小,依次和设置的setMaxInputSplitSize值比较,如果不大于设置的最大值,逻辑上划分一个块。如果输入文件大于设置的最大值且大于两倍,那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值2倍,此时将文件均分成2个虚拟存储块(防止出现太小切片)。
例如setMaxInputSplitSize值为4M,输入文件大小为8.02M,则先逻辑上分成一个4M。剩余的大小为4.02M,如果按照4M逻辑划分,就会出现0.02M的小的虚拟存储文件,所以将剩余的4.02M文件切分成(2.01M和2.01M)两个文件。
(2)切片过程:
(a)判断虚拟存储的文件大小是否大于setMaxInputSplitSize值,大于等于则单独形成一个切片。
(b)如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片。
(c)测试举例:有4个小文件大小分别为1.7M、5.1M、3.4M以及6.8M这四个小文件,则虚拟存储之后形成6个文件块,大小分别为:
1.7M,(2.55M、2.55M),3.4M以及(3.4M、3.4M)
最终会形成3个切片,大小分别为:
(1.7+2.55)M,(2.55+3.4)M,(3.4+3.4)M

MapReduce工作流程

Shuffle机制

Shuffle

Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle

Partition分区

问题:就拿我们上面那个电话号码的例子为例,如果要统计136,137,138的号码,分别放在不同的文件中,这个时候会发现Hadoop提供的默认的功能好像就不能满足了,这就引出了分区的意义。

Partitioner 分区器对象

Partitioner是Hadoop的分区器对象:负责给Map阶段输出数据选择分区的功能,默认实现HashPartitioner类
按照 输出的key的hashCode值 和 ReduceTask的数量 进行取余操作会得到一个数字,这个数字就是当前kv 所属分区的编号,分区编号在Job提交的时候就已经 根据指定ReduceTask的数量定义好了

Hadoop默认的分区规则源码解析

//定位MapTask的map方法中 context.write(outk, outv);
//跟到write(outk, outv)中 进入到 ChainMapContextImpl类的实现中
public void write(KEYOUT key, VALUEOUT value) throws IOException,InterruptedException {output.write(key, value);
}//跟到 output.write(key, value) 内部 NewOutputCollector
public void write(K key, V value) throws IOException, InterruptedException {collector.collect(key, value, partitioner.getPartition(key, value, partitions));
}//重点理解 partitioner.getPartition(key, value, partitions);
//跟进默认的分区规则实现 HashPartitioner类
public int getPartition(K key, V value, int numReduceTasks) {// 根据当前的key的hashCode值和ReduceTask的数量进行取余操作// 获取到的值就是当前kv所属的分区编号。return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}

自定义分区器对象

自定一个分区器类,继承Hadoop提供的Partitioner类,实现 getPartition() 方法,在方法中编写自己的业务逻辑,最终 给当前kv返回所属的分区编号

以刚才的问题为例子: 136,137,138开头的号码输出到不同的文件:

package com.pihao.mr.partition;import com.pihao.mr.writable.FlowBean;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;/*** 自定义分区器对象,需要继承Hadoop提供的Partitioner对象*/
public class PhonePartitioner extends Partitioner<Text, FlowBean> {/*** 重新定义当前kv所属分区的规则(4个分区)* 136 --> 0* 137 --> 1* 138 --> 2* 其他--> 3*/@Overridepublic int getPartition(Text text, FlowBean flowBean, int numPartitions) {int phonePartitioner = 0;//获取手机号String phoneNum = text.toString();if(phoneNum.startsWith("137")){phonePartitioner =1;}else if (phoneNum.startsWith("138")){phonePartitioner =2;}else if (phoneNum.startsWith("136")){phonePartitioner =0;}else{phonePartitioner =3;}return phonePartitioner;}
}
//驱动类中的配置自定义的分区类//指定ReduceTask的数量
job.setNumReduceTasks(4);
//指定分区规则
job.setPartitionerClass(PhonePartitioner.class);

自定义分区总结

// 当ReduceTask的数量设置 > 实际用到的分区数 此时会生成空的分区文件
// 当ReduceTask的数量设置 < 实际用到的分区数 此时会报错
// 当ReduceTask的数量设置 = 1 结果文件会输出到一个文件中,有以下源码可以论证:
// 获取当前ReduceTask的数量
partitions = jobContext.getNumReduceTasks();
// 判断ReduceTask的数量 是否大于1,找指定分区器对象
if (partitions > 1) {partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
} else {// 执行默认的分区规则,最终返回一个唯一的0号分区partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {@Overridepublic int getPartition(K key, V value, int numPartitions) {return partitions - 1;}};
}//分区编号生成的规则:根据指定的ReduceTask的数量 从0开始,依次累加。

WritableComparable排序

排序是MapReduce框架中最重要的操作之一

排序案例实现

bean对象做为key传输,需要实现WritableComparable接口重写compareTo方法,就可以实现排序。

需求:手机的案例,按总流量倒叙排序

第一种方式实现WritableComparable接口

package com.pihao.mr.writablecomparable;import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;/*** 流量对象:实现Hadoop的序列化*/
public class FlowBean implements WritableComparable<FlowBean> {private Integer upFlow;private Integer downFlow;private Integer sumFlow;xxxxgetter,settertoString序列化反序列化/*** 根据总流量倒叙排序* @param o* @return*/@Overridepublic int compareTo(FlowBean o) {return -this.getSumFlow().compareTo(o.getSumFlow());}
}

第二种方式自定义比较器对象

package com.pihao.mr.writablecomparable;import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;/*** 自定义比较器对象,继承Hadoop提供的比较器对象 WritableComparable*/
public class FlowBeanComparable extends WritableComparator {//指定当前比较器对象为谁服务public FlowBeanComparable(){super(FlowBean.class,true);  //注意:这里的FlowBean需要实现 WritableComparable接口}/*** 按总流量升序* @param a* @param b* @return*/@Overridepublic int compare(WritableComparable a, WritableComparable b) {FlowBean bean1 = (FlowBean) a;FlowBean bean2 = (FlowBean) b;return bean1.getSumFlow().compareTo(bean2.getSumFlow());}
}
//Driver类
//设置自定义比较器对象
job.setSortComparatorClass(FlowBeanComparable.class);

Hadoop中实现排序比较的方式

 ① 直接让参与比较的对象上实现WritableComparable 接口,并在该类中实现compareTo,在compareTo中定义自己的比较规则。这种情况 当运行的的时候Hadoop会帮助我们生成 比较器对象WritableComparator。② 自定一个比较器对象需要继承Hadoop提供的WritableComparator类,重写该类compare() 方法,在该方法中定义比较规则,注意在自定义的比较器对象中通过调用父类的super方法将自定义的比较器对象和要参与比较的对象进行关联。
后再Driver类中指定自定义的比较器对象。

Hadoop中获取比较器对象的规则是什么

//定位到 MapTask 类中的init() 方法
//获取比较器对象
comparator = job.getOutputKeyComparator();
//定位 JobConf 类中  getOutputKeyComparator()
//job提交的时候 获取当前MR程序输出数据key的比较器对象
public RawComparator getOutputKeyComparator() {Class<? extends RawComparator> theClass = getClass(JobContext.KEY_COMPARATOR, null, RawComparator.class);if (theClass != null){// 如果通过配置获取到指定的比较器对象的class 直接通过反射实例化(自定义比较器的方式)return ReflectionUtils.newInstance(theClass, this);}  // 如果通过配置没获取到指定的比较器对象,接着判断 (实现WritableComparable接口的方式)// 当前参与比较的对象是否实现了WritableComparable接口return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this);
}// get()方法就是实现获取比较器对象的逻辑
public static WritableComparator get(Class<? extends WritableComparable> c, Configuration conf) {// 根据当前传入的class文件到 comparators的Map中获取比较器对象// 这种情况是 当前参与比较的对象的类型是Hadoop自身的数据类型WritableComparator comparator = comparators.get(c); //获取Hadoop对象自带的的比较器if (comparator == null) {// 考虑到一些极端情况,可能发生GC垃圾回收,导致比较器没了// 为了万无一失 再次让类加载一遍forceInit(c);// 重新加载后再次获取comparator = comparators.get(c);// 此时还没获取到,那就说明当前参与比较的对象的不是Hadoop自身的数据类型if (comparator == null) {// Hadoop 会给当前参与比较的对象生成比较器对象comparator = new WritableComparator(c, conf, true); //这中就是我们自定义java对象}}// Newly passed Configuration objects should be used.ReflectionUtils.setConf(comparator, conf);return comparator;
}

Hadoop自身的数据类型是如何拥有比较器对象,以Text为例

//Text源码
//1. 当前Text实现了WritableComparable接口
public static class Comparator extends WritableComparator {}//2. 在该类中 定义了自己的比较器对象
public Comparator() {super(Text.class);
}//3. 该类中还包含一个静态代码快
static {// register this comparatorWritableComparator.define(Text.class, new Comparator());
}//4. Text类它的比较器对象被管理到一个Map中,以当前类的class文件为key 当前类的比价器对象为value
public static void define(Class c, WritableComparator comparator) {comparators.put(c, comparator);
}

Combine合并

使用场景:为了提升MR程序的运行效率,为了减轻ReduceTask的压力,另外减少了IO的开销

combiner实例

package com.pihao.mr.combine;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;/*** 自定义Combiner类需要继承Hadoop提供的Reducer类(其实写法和Reducer类类似)* TODO 注意:Combine流程一定发生在Map阶段**/
public class WordCountCombiner extends Reducer<Text, IntWritable,Text,IntWritable> {private Text outKey = new Text();private IntWritable outValue = new IntWritable();@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int total = 0;//遍历valuesfor (IntWritable value:values){//累加,输出total += value.get();}//封装key和valueoutKey.set(key);outValue.set(total);context.write(outKey,outValue);}
}
//Driver类
//指定自定义的Combiner
job.setCombinerClass(WordCountCombiner.class);

Combiner不适用的场景:Reduce端处理的数据考虑到多个MapTask的数据的整体集时 就不能提前合并了。

MapTask工作机制

(1)Read阶段:MapTask通过InputFormat获得的RecordReader,从输入InputSplit中解析出一个个 key/value。(2)Map阶段:该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value。(3)Collect收集阶段:在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的key/value分区(调用Partitioner),并写入一个环形内存缓冲区中。(4)Spill阶段:即“溢写”,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。溢写阶段详情:步骤1:利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号Partition进行排序,然后按照key进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照key有序。步骤2:按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.out(N表示当前溢写次数)中。如果用户设置了Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。步骤3:将分区数据的元信息写到内存索引数据结构SpillRecord中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过1MB,则将内存索引写到文件output/spillN.out.index中。(5)Merge阶段:当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。当所有数据处理完后,MapTask会将所有临时文件合并成一个大文件,并保存到文件output/file.out中,同时生成相应的索引文件output/file.out.index。在进行文件合并过程中,MapTask以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并mapreduce.task.io.sort.factor(默认10)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。 让每个MapTask最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。

ReduceTask工作机制

(1)Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。(2)Merge阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。(3)Sort阶段:按照MapReduce语义,用户编写reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。(4)Reduce阶段:reduce()函数将计算结果写到HDFS上。

OutputFormat数据输出

OutputFormat主要负责最终数据的写出

OutputFormat类的体系结构

  • FileOutputFormat (OutputFormat子类),对 checkOutputSpecs() 做了具体的实现
  • TextOutputFormat(FileOutputFomat子类),对 getRecordWriter 做了具体实现

Output使用场景

当我们对MR最终的结果有个性化制定的需求,就可以通过自定义OutputFormat来实现

自定义OutputFormat案例

过滤输入的log日志,包含baidu的网站输到 /dask/baidu.log,不包含的输出到/dask/other.log

步骤

① 自定一个 OutputFormat 类,继承Hadoop提供的OutputFormat,在该类中实现getRecordWriter() ,返回一个RecordWriter② 自定义一个 RecordWriter 并且继承Hadoop提供的RecordWriter类,在该类中重写 write()  和 close()  在这些方法中完成自定义输出。

实例

/*** 自定义OutputFormat,需要继承hadoop提供的OutputFormat,* 这里的k,v就是reducer端的输出类型*/
public class LogOutputFormat extends FileOutputFormat<Text, NullWritable> {/*** 返回一个RecordWriter对象*/@Overridepublic RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {//还需要再定义一个RecordWriter类,来定义输出规则LogRecordWriter logRecordWriter = new LogRecordWriter(job); return logRecordWriter;}
}
/*** 自定义LogRecordWriter,需要继承Hadoop提供的RecordWriter*/
public class LogRecordWriter extends RecordWriter<Text, NullWritable> {//定义输出路径private static final String baiduPath = "D:\\测试数据\\log\\baidu.txt";private static final String otherPath = "D:\\测试数据\\log\\other.txt";private FileSystem fileSystem;private FSDataOutputStream baiduOut;private FSDataOutputStream otherOut;// 初始化工作public LogRecordWriter() throws IOException {//获取文件系统fileSystem = FileSystem.get(new Configuration());baiduOut = fileSystem.create(new Path(baiduPath));otherOut = fileSystem.create(new Path(otherPath));}/*** 实现数据写出的逻辑*/@Overridepublic void write(Text key, NullWritable value) throws IOException, InterruptedException {//获取当前输入的数据String logData = key.toString();if(logData.contains("baidu")){baiduOut.writeBytes(logData+"\n");}else{otherOut.writeBytes(logData+"\n");}}// 关闭资源@Overridepublic void close(TaskAttemptContext context) throws IOException, InterruptedException {IOUtils.closeStream(baiduOut);IOUtils.closeStream(otherOut);}
}
//Mapper
public class LogMapper extends Mapper<LongWritable, Text,Text, NullWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 直接写出context.write(value,NullWritable.get());}
}
//Reducer
public class LogReducer extends Reducer<Text, NullWritable,Text,NullWritable> {@Overrideprotected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {//遍历直接写出for (NullWritable value : values) {context.write(key,NullWritable.get());}}
}
//Driver类
public class LogDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration configuration = new Configuration();Job job = Job.getInstance(configuration);job.setJarByClass(LogDriver.class);job.setMapperClass(LogMapper.class);job.setReducerClass(LogReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);//指定自定义的OutputFormat类job.setOutputFormatClass(LogOutputFormat.class);FileInputFormat.setInputPaths(job,new Path("D:\\测试数据\\log"));FileOutputFormat.setOutputPath(job,new Path("D:\\测试数据\\log_out"));//没有实际意义job.waitForCompletion(true);}
}

Join多种应用

Reduce Join实现逻辑

Map端的主要工作:为来自不同表或文件的key/value对,打标签以区别不同来源记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出Reduce端的主要工作:在Reduce端以连接字段作为key分组已经完成,我们只需要在每一个分组中将那些来源于不同文件的记录分开,最后进行合并就行

需求案例

id pid amount
1001 01 1
1002 02 2
1003 03 3
1004 01 4
1005 02 5
1006 03 6
pid pname
01 小米
02 华为
03 格力

最终想要的效果

id pname amount
1001 小米 1
1004 小米 4
1002 华为 2
1005 华为 5
1003 格力 3
1006 格力 6

案例实操

package com.pihao.mr.join;import org.apache.hadoop.io.Writable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class Orderpd implements Writable {// 第一张表数据private String orderId;private String pid;private Integer amount;//第二张表数据private String pname;private String source; //区分数据来源//xxx getter ,setter方法@Overridepublic String toString() {return orderId+"\t"+pname+"\t"+amount;}// 序列化@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(orderId);out.writeUTF(pid);out.writeInt(amount);out.writeUTF(pname);out.writeUTF(source);}//反序列化@Overridepublic void readFields(DataInput in) throws IOException {orderId = in.readUTF();pid = in.readUTF();amount = in.readInt();pname = in.readUTF();source = in.readUTF();}
}
//Mapper
public class ReduceJoinMapper extends Mapper<LongWritable, Text,Text,Orderpd> {private Text outK = new Text();private Orderpd outV= new Orderpd();private FileSplit inputSplit;@Overrideprotected void setup(Context context) throws IOException, InterruptedException {inputSplit = (FileSplit) context.getInputSplit();}// 将两个需要做关联的文件数据进行收集@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//获取当前行数据String line = value.toString();//进行切割String[] datas = line.split("\t");//将数据封装到对象orderpd中//判断数据读的是哪个文件的if (inputSplit.getPath().getName().contains("order")) {//当前数据来源于第一张表:order.txt   1001    01  1//封装输出数据的keyoutK.set(datas[1]);//封装输出数据的valueoutV.setOrderId(datas[0]);outV.setPid(datas[1]);outV.setAmount(Integer.parseInt(datas[2]));outV.setPname("");outV.setSource("order");}else{// 当前数据来源第二张表  01小米//封装输出数据的keyoutK.set(datas[0]);//封装输出数据的valueoutV.setOrderId("");outV.setPid(datas[0]);outV.setAmount(0);outV.setPname(datas[1]);outV.setSource("pd");}context.write(outK,outV);}
}
//Reducer,join就是在这里面实现的
public class ReduceJoinReducer extends Reducer<Text,Orderpd,Orderpd, NullWritable> {/*** 接受map端整合好的数据,进行最终的join操作*/@Overrideprotected void reduce(Text key, Iterable<Orderpd> values, Context context) throws IOException, InterruptedException {List<Orderpd> orderList = new ArrayList();Orderpd pd = new Orderpd();// 遍历当前相同key的一组valuefor (Orderpd orderpd : values) {//将order的数据管理到一个集合中,注意这里有坑,一定要重新new一个对象Orderpd thisOrderPd = new Orderpd();//判断数据来源if (orderpd.getSource().equals("order")) {try {BeanUtils.copyProperties(thisOrderPd,orderpd);orderList.add(thisOrderPd);} catch (IllegalAccessException e) {e.printStackTrace();} catch (InvocationTargetException e) {e.printStackTrace();}}else{//pd数据try {BeanUtils.copyProperties(pd,orderpd);} catch (IllegalAccessException e) {e.printStackTrace();} catch (InvocationTargetException e) {e.printStackTrace();}}}// 进行Join操作for (Orderpd op : orderList) {op.setPname(pd.getPname());//将数据写出context.write(op,NullWritable.get());}}
}
//Driver
public class ReduceJoinDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration configuration = new Configuration();Job job = Job.getInstance(configuration);job.setJarByClass(ReduceJoinDriver.class);job.setMapperClass(ReduceJoinMapper.class);job.setReducerClass(ReduceJoinReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Orderpd.class);job.setOutputKeyClass(Orderpd.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.setInputPaths(job,new Path("D:\\测试数据\\reducejoin"));FileOutputFormat.setOutputPath(job,new Path("D:\\测试数据\\join_result_out"));job.waitForCompletion(true);}
}

执行测试ReduceJoin汇总成功

ReduceJoin缺点及解决方案

MapJoin

使用场景

Map Join适用于一张表十分小、一张表很大的场景,考虑MR整体的执行效率,且业务场景是一个大文件和一个小文件进行关联操作, 可以使用MapJoin来实现。另外MapJoin也是解决ReduceJoin数据倾斜问题很有效
的办法

优点

思考:在Reduce端处理过多的表,非常容易产生数据倾斜。怎么办?

在Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据的压力,尽可能的减少数据倾斜。

具体实现:采用DistributedCache

(1)在Mapper的setup阶段,将文件读取到缓存集合中。使用hashMap
(2)在Driver驱动类中加载缓存。

案例实操

还是以上面ReduceJoin的需求为例

// Driver
public class MapJoinDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration configuration = new Configuration();Job job = Job.getInstance(configuration);job.setJarByClass(MapJoinDriver.class);job.setMapperClass(MapJoinMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);//设置Reducer的数量为0job.setNumReduceTasks(0);//设置缓存文件路径pd表(坑:这里注意将该文件的编码设置成UTF-8)job.addCacheFile(URI.create("file:///D:/测试数据/cachefile/pd.txt"));FileInputFormat.setInputPaths(job,new Path("D:\\测试数据\\mapjoin"));FileOutputFormat.setOutputPath(job,new Path("D:\\测试数据\\mapjoin_result"));job.waitForCompletion(true);}
}
//Mapper/*** 1.处理缓存文件:将job中设置的缓存路径获取到* 2.根据缓存路径再结合输入流把pd.txt的内容写入到内存中维护*/
public class MapJoinMapper extends Mapper<LongWritable, Text,Text, NullWritable> {private Map<String,String> pdMap = new HashMap<>();private Text outK = new Text();//处理缓存文件@Overrideprotected void setup(Context context) throws IOException {URI[] uris = context.getCacheFiles();URI cacheFile =  uris[0];FileSystem fs = FileSystem.get(new Configuration());FSDataInputStream pdInputStream = fs.open(new Path(cacheFile));//将数据读入内存中 (可以优化)BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(pdInputStream, "UTF-8"));//按行读取String line;while ((line = bufferedReader.readLine()) != null){//将数据保存到Map中 01 小米String[] datas = line.split("\t");pdMap.put(datas[0],datas[1]);}//关闭资源IOUtils.closeStream(pdInputStream);bufferedReader.close();}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//获取当前行数据String line = value.toString();String[] orderDatas = line.split("\t");//进行关联 1001  01  1String pname = pdMap.get(orderDatas[1]);//封装结果集对象String result = orderDatas[0]+"\t"+pname+"\t"+orderDatas[2];outK.set(result);//将结果写出context.write(outK,NullWritable.get());}
}

MapJoin的思想

①:分析文件之间的关系,然后定位关联字段
②:将小文件的数据映射到内存中的一个容器维护起来。
③:当MapTask处理大文件的数据时,每读取一行数据,就根据当前行中的关联字段到内存的容器里获取对象的信息。
④:封装结果将其输出

计数器应用

//添加计数器,到时程序启动就会在控制台输出
context.getCounter("MapTask","method").increment(1);

数据清洗(ETL)

在运行核心业务MapReduce程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。清理的过程往往只需要运行Mapper程序,不需要运行Reduce程序,一般数据清洗在公司中都有专门的人操作

需求

去除日志中字段个数小于等于11的日志行内容

案例实操

就是在Map端的map方法中对数据进行过滤操作

MapReduce工作总结

分析Job提交流程的源码

//定位 job.waitForCompletion(true);  跟到 waitForCompletion() 中
public boolean waitForCompletion(boolean verbose ) throws IOException, InterruptedException,ClassNotFoundException {// 判断当前Job的状态是否为定义阶段                                        if (state == JobState.DEFINE) {// 提交方法submit();}// xxx}
//进入submit() 方法
public void submit()  throws IOException, InterruptedException, ClassNotFoundException {// 确认当前Job的状态ensureState(JobState.DEFINE);// 新老API的兼容setUseNewAPI();// 连接集群(如果是本地模式结果就是LocalRunner, 如果Yarn集群结果就是YARNRuuner)connect();// 开始提交Jobfinal JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient());status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException {// 提交Jobreturn submitter.submitJobInternal(Job.this, cluster);}});state = JobState.RUNNING;LOG.info("The url to track the job: " + getTrackingURL());
}

hadoop之MapReduce学习教程相关推荐

  1. Hadoop之MapReduce学习笔记(二)

    主要内容: mapreduce编程模型再解释: ob提交方式: windows->yarn windows->local : linux->local linux->yarn: ...

  2. hadoop之mapreduce教程+案例学习(二)

    第3章 MapReduce框架原理 目录 第3章 MapReduce框架原理 3.1 InputFormat数据输入 3.1.1 切片与MapTask并行度决定机制 3.1.2 Job提交流程源码和切 ...

  3. hadoop之mapreduce教程+案例学习(一)

    第1章 MapReduce概述 目录 第1章 MapReduce概述 1.1 MapReduce定义 MapReduce是一个分布式运算程序的编程框架,是用户开发"基于Hadoop的数据分析 ...

  4. Hadoop3.x学习教程(二)

    Hadoop3.x学习教程(二) 1.完全分布式运行模式(开发重点) 1.1.编写集群分发脚本 1.2.SSH免密登陆配置 1.3.集群配置 1.4.群起集群 1.完全分布式运行模式(开发重点) 1. ...

  5. 图解大数据 | 分布式平台Hadoop与Map-Reduce详解

    作者:韩信子@ShowMeAI 教程地址:https://www.showmeai.tech/tutorials/84 本文地址:https://www.showmeai.tech/article-d ...

  6. Hadoop 新 MapReduce 框架 Yarn 详解

    Hadoop MapReduceV2(Yarn) 框架简介 原 Hadoop MapReduce 框架的问题 对于业界的大数据存储及分布式处理系统来说,Hadoop 是耳熟能详的卓越开源分布式文件存储 ...

  7. 什么是Hadoop,怎样学习Hadoop

    Hadoop实现了一个分布式文件系统(Hadoop Distributed File System),简称HDFS.HDFS有高容错性的特点,并且设计用来部署在低廉的(low-cost)硬件上;而且它 ...

  8. Hadoop权威指南学习笔记一

    Hadoop简单介绍 声明:本文是本人基于Hadoop权威指南学习的一些个人理解和笔记,仅供学习參考,有什么不到之处还望指出.一起学习一起进步. 转载请注明:http://blog.csdn.net/ ...

  9. 第11期:Hadoop零基础学习路线

    大家好,我是你们的老朋友老王随聊,今天和大家讨论的话题--Hadoop零基础应该怎么学? 通过这段时间和群里同学们交流,发现很多大学生甚至职场小白对Hadoop学习路线不是很清晰,所以我花了一些时间给 ...

最新文章

  1. AngularJS 指令
  2. python log函数怎么打_Python的log日志功能及设置方法
  3. 解决element-ui table show-summary合计行不显示问题
  4. eclipse报错资料备份
  5. 《构建之法》第十三章学习总结
  6. springMVC视频教程
  7. 垃圾收集六大算法全面理解
  8. RTSP的音频视频要各SETUP一次
  9. linux shell 编程 14 删除日志文件
  10. 1.命令行窗口(小黑屏)、CMD窗口、终端、shell、DOS窗口
  11. 【机器学习】LDA算法 (主题模型算法)
  12. Android怎么更换背景色,Android App更改背景颜色
  13. Unity3D | 经典小游戏Battle City
  14. 图像的幅度谱与相位谱
  15. 使用html表单制作简单网页(加表单详细知识点)
  16. dy极速版-艳云脚本云控系统
  17. MFC自绘带背景颜色标题栏
  18. “电脑族”保健 六项注意
  19. android pie mi 3 tab,前沿科技:三星Galaxy Tab S3和Tab A(2017)正在获得Android 9.0 Pie更新
  20. Java入土---面向对象(OOP)

热门文章

  1. KBU808-ASEMI适配大功率开关电源整流桥
  2. 有关是否携带首部的随笔
  3. 图像处理------图像加噪
  4. su 和 sudo su 的区别
  5. Android中级面筋:开发2年的程序员如何短期突击面试?跟着这几步去准备,大厂也不远了
  6. littlevgl教程 Linux,树莓派littlevGL系列教程:容器控件(lv_cont)
  7. Excel - 快速找出数据差异
  8. matlab极坐标系给定圆心画圆,matlab画极坐标
  9. 1646 获取生成数组中的最大值(模拟)
  10. 为什么说 Ubuntu 22.04 LTS 是史诗级的版本