目录

一、MapReduce组件

1、Combiner-合并

2、InputFormat-输入格式

3、OutputFormat-输出格式

二、Shuffle

1、Map端的Shuffle

2、Reduce端的Shuffle

3、MapReduce执行流程

4、Shuffle优化

三、扩展

1、小文件问题

2、压缩机制

3、推测执行机制

4、数据倾斜

5、join


一、MapReduce组件

1、Combiner-合并

可以在Driver类中通过job.setCombinerClass(XXXReducer.class);来设置Combiner类

Combiner实际上是在不改变计算结果前提的下来减少Reducer的输入数据量

在实际过程中,如果添加Combiner,那么可以有效的提高MapReduce的执行效率,缩短MapReduce的执行时间。

但是需要注意的是,并不是所有的场景都适合于使用Combiner。

可以传递运算的场景,建议使用Combiner,例如求和、求积、最值、去重等;

但是不能传递的运算,不能使用Combiner,例如求平均值

2、InputFormat-输入格式

①、InputFormat发生在MapTask之前。数据由InputFormat来负责进行切分和读取,然后将读取到的数据给MapTask处理,所以InputFormat读取出来的数据是什么类型,MapTask接收的数据就是什么类型

②、作用:

用于对文件进行切片处理

提供输入流用于读取数据

③、在MapReduce中,如果不指定,那么默认使用TextInputFormat。而TextInputFormat继承了FileInputFormat。默认情况下。FileInputFormat负责对文件进行切片处理;TextInputFormat赋值提供输入流来读取数据

④、FileInputFormat在对文件进行切片过程中的注意问题

切片最小是1个字节大小,最大时Long.MAX_VALUE

如果是一个空文件,则整个文件作为一个切片来进行处理

在MapReduce中,文件存在可切与不可切的问题。大多数情况下,默认文件是可切的;但是如果是压缩文件,则不一定可切

如果文件不可切,无论文件多大,都作为一个切片来进行处理

在MapReduce中,如果不指定,Split和Block等大

如果需要调小Split,那么需要调小maxSize;如果需要调大Split,那么需要调大minSize

直接在代码FileInputFormat.setMaxInputSplitSize();就可以调节

在切片过程中,需要注意阈值SPLIT_SLOP=1.1

⑤、 TextInputFormat在读取数据过程中需要注意的问题

TextInputFormat在对文件进行处理之前,会先判断文件是否可切:

先获取文件的压缩编码,然后判断压缩编码是否为空。

如果压缩编码为空,则说明该文件不是压缩文件,那么默认可切;

如果压缩编码不为空,则说明该文件是一个压缩文件,会判断这是否是一个可切的压缩文件

在MapReduce中,默认只有BZip2(.bz2)压缩文件可切

从第二个MapTask开始,会从当前切片的第二行开始处理,处理到下一个切片的第一行;第一个MapTask要多处理一行数据;最后一个MapTask要少处理一行数据。这样做的目的是为了保证数据的完整性

⑥、自定义输入格式:定义一个类继承InputFormat,但是考虑到切片过程相对复杂,所以可以考虑定义一个类继承FileInputFormat,而在FileInputFormat中已经覆盖了切片过程,只需要考虑如何实现读取过程即可

文件数据:(score.txt)

tom
math 90
english 98
jary
math 78
english 87
rose
math 87
english 90
bob
math 67
english 87
alex
math 59
english 80
helen
math 79
english 60

代码如下:

package org.example.authinput;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class AuthDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(AuthDriver.class);job.setMapperClass(AuthMapper.class);job.setReducerClass(AuthReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 指定输入格式类job.setInputFormatClass(AuthInputFormat.class);FileInputFormat.addInputPath(job, new Path("hdfs://hadoop01:9000/txt/score.txt"));FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop01:9000/result/auth_input"));job.waitForCompletion(true);}}
package org.example.authinput;import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.LineReader;import java.io.IOException;
import java.net.URI;// 泛型表示读取出来的数据类型
public class AuthInputFormat extends FileInputFormat<Text, Text> {@Overridepublic RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {return new AuthReader();}
}class AuthReader extends RecordReader<Text, Text> {private LineReader reader;private Text key;private Text value;private long length;private float pos = 0;private static final byte[] blank = new Text(" ").getBytes();// 初始化方法,在初始化的时候会被调用一次// 一般会利用这个方法来获取一个实际的流用于读取数据@Overridepublic void initialize(InputSplit split, TaskAttemptContext context) throws IOException {// 转化FileSplit fileSplit = (FileSplit) split;// 获取切片所存储的位置Path path = fileSplit.getPath();// 获取切片大小length = fileSplit.getLength();// 连接HDFSFileSystem fs = FileSystem.get(URI.create(path.toString()), context.getConfiguration());// 获取实际用于读取数据的输入流FSDataInputStream in = fs.open(path);// 获取到的输入流是一个字节流,要处理的文件是一个字符文件// 考虑将字节流包装成一个字符流,最好还能够按行读取reader = new LineReader(in);}// 判断是否有下一个键值对要交给map方法来处理// 试着读取文件。如果读取到了数据,那么说明有数据要交给map方法处理,此时返回true// 反之,如果没有读取到数据,那么说明所有的数据都处理完了,此时返回false@Overridepublic boolean nextKeyValue() throws IOException {// 构建对象来存储数据key = new Text();value = new Text();Text tmp = new Text();// 读取第一行数据// 将读取到的数据放到tmp中// 返回值表示读取到的字节个数if (reader.readLine(tmp) <= 0) return false;key.set(tmp.toString());pos += tmp.getLength();// 读取第二行数据if (reader.readLine(tmp) <= 0) return false;value.set(tmp.toString());pos += tmp.getLength();// 读取第三行数据if (reader.readLine(tmp) <= 0) return false;value.append(blank, 0, blank.length);value.append(tmp.getBytes(), 0, tmp.getLength());pos += tmp.getLength();// key = tom// value = math 90 english 98return true;}// 获取键@Overridepublic Text getCurrentKey() {return key;}// 获取值@Overridepublic Text getCurrentValue() {return value;}// 获取执行进度@Overridepublic float getProgress() {return pos / length;}@Overridepublic void close() throws IOException {if (reader != null)reader.close();}
}
package org.example.authinput;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class AuthMapper extends Mapper<Text, Text, Text, IntWritable> {@Overrideprotected void map(Text key, Text value, Context context) throws IOException, InterruptedException {// key = tom// value = math 90 english 98// 拆分数据String[] arr = value.toString().split(" ");context.write(key, new IntWritable(Integer.parseInt(arr[1])));context.write(key, new IntWritable(Integer.parseInt(arr[3])));}
}
package org.example.authinput;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class AuthReducer extends Reducer<Text, IntWritable, Text, IntWritable> {@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable value : values) {sum += value.get();}context.write(key, new IntWritable(sum));}
}

⑦、多源输入:在MapReduce中,允许同时指定多个文件作为输入源,而且这多个文件可以放在不同的路径下。这多个文件的数据格式可以不同,可以为每一个文件单独指定输入格式

本地数据:/characters.txt   我们可以自己定义测试数据。

package org.example.multipleinputs;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class MultipleDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(MultipleDriver.class);// 如果输入的多个文件的处理方式一致,那么可以统一设置Mapperjob.setMapperClass(MultipleMapper.class);job.setReducerClass(MultipleReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 多源输入MultipleInputs.addInputPath(job, new Path("hdfs://hadoop01:9000/txt/words.txt"),TextInputFormat.class);MultipleInputs.addInputPath(job, new Path("D:/characters.txt"),TextInputFormat.class);FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop01:9000/result/multiple_inputs"));job.waitForCompletion(true);}}
package org.example.multipleinputs;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class MultipleMapper extends Mapper<LongWritable, Text, Text, IntWritable> {private final IntWritable once = new IntWritable(1);@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 获取到一行数据之后都需要拆分字符char[] cs = value.toString().toCharArray();for (char c : cs) {context.write(new Text(c + ""), once);}}
}
package org.example.multipleinputs;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class MultipleReducer extends Reducer<Text, IntWritable, Text, IntWritable> {@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable value : values) {sum += value.get();}context.write(key, new IntWritable(sum));}
}

3、OutputFormat-输出格式

①、OutputFormat发生在ReduceTask之后,接收ReduceTask产生的数据,然后将结果按照指定格式来写出

②、作用:

校验输出路径,例如检查输出路径不存在

提供输出流用于将数据写出

③、在MapReduce中,如果不指定,默认使用的是TextOutputFormat。 TextOutputFroamt继承了FileOutputFormat。其中,FileOutputFormat负责对输出路径进行校验,TextOutputFormat则是对数据进行写出

④、在MapReduce中,也支持自定义输出格式以及多源数据,但是注意,实际开发中自定义输出格式以及多源输出用的非常少

二、Shuffle

1、Map端的Shuffle

①、当MapTask调用map方法处理数据之后,会将处理结果进行写出,写出到MapTask自带的缓冲区中。每一个MapTask都会自带一个缓冲区,本质上是一个环形的字节数组,维系在内存中,默认大小是100M

②、数据在缓冲区中会进行分区、排序,如果指定了combiner,那么还会进行合并。这次排序是将完全杂乱没有规律的数据整理成有序的数据,所以使用的是快速排序(Quick Sort)

③、当缓冲区使用达到指定阈值(默认是0.8,即缓冲区使用达到80%)的时候,会进行spill(溢写),产生一个溢写文件。因为数据在缓冲区已经分区且排序,所以产生的单个溢写文件中的数据是分好区且排好序的

④、溢写之后,MapTask产生的数据会继续写道缓冲区中,如果再次达到条件,会再次进行溢写。每一个溢写都会产生一个新的溢写文件。多个溢写文件之间的数据是局部有序但整体无序的。

⑤、当所有数据都处理完之后,那么MapTask会将所有的溢写文件进行合并(merge),合并成一个大的结果文件final out。在merge的时候,如果有数据依然在缓冲区中,那么会将缓冲区中的数据直接merge到final out中

⑥、在merge过程中,数据会再次进行分区且排序,因此final out 中的数据是分好区且排好序的。如果溢写文件个数达到3个及以上,并且指定了Combiner,那么在merge过程中还会进行combine这次排序是将局部有序的数据整理成整体有序的状态,所以采用的是归并排序(Merge Sort)

⑦、注意问题

缓冲区设置为环形的目的减少重复寻址的次数

设置阈值的目的是为了降低阻塞的几率

溢写过程不一定会产生

原始数据的大小并不能决定溢写的次数

溢写文件的大小收序列化因素的影响

2、Reduce端的Shuffle

①、当ReduceTask达到启动阈值(默认是0.05,即当有5%的MapTask结束)的时候,就会启动来抓取数据

②、ReduceTask启动之后,会在当前服务器上来启动多个(默认是5个)fetch线程来抓取数据

③、fetch线程启动之后,会通过HTTP请求中的get请求来获取数据,在发送请求的时候会携带分区号作为参数

④、fetch线程会将抓取来的数据临时存储到本地磁盘上,形成一个个的小文件

⑤、当所有的fetch抓取完数据之后,ReduceTask会将这些小文件进行merge,合并成一个大文件。在merge过程中,会对数据再次进行排序。这次排序是将局部有序的数据整理成整体有序的状态,所以采用的是归并排序

⑥、merge完成之后,ReduceTask会将相同的键对应的值分到一组去,形成一个(伪)迭代器(本质上是一个基于迭代模式实现的流),这个过程称之为分组(group)

⑦、分组之后,每一个键调用一次reduce方法

3、MapReduce执行流程

4、Shuffle优化

①、适当的增大缓冲区。实际过程中,可以缓冲区设置为250M~400M之间

②、增加Combiner,但是不是所有场景都适合于使用Combiner

③、可以考虑对结果进行压缩传输。如果网络条件比较差,那么可以考虑将final out文件压缩之后再传递给ReduceTask,但是ReduceTask收到数据之后需要进行解压,所以这种方案是在网络传输和压缩解压之间的一种取舍

④、适当的考虑fetch线程的数量。

三、扩展

1、小文件问题

①、在大数据环境下,希望所处理的文件都是大文件,但是在生产环境中,依然不可避免的会产生很多小文件

②、小文件的危害

存储:每一个小文件在HDFS上都会对应一条元数据。如果有大量的小文件,那么在HDFS中就会产生大量的元数据。元数据过多,就会占用大量的内存,还会导致查询效率变低

计算:每一个小文件都会对应一个切片,每一个切片会对应一个MapTask(线程)。如果有大量的小文件,就会产生大量的切片,就会导致产生大量的MapTask。如果MapTask过多,那么就会致使服务器的线程的承载压力变大,致使服务器产生卡顿甚至崩溃

③、到目前为止,市面上针对小文件的处理手段无非两种:合并和打包

④、Hadoop针对小文件提供了原生的打包手段:Hadoop Archive,将指定小文件打成一个har包

2、压缩机制

①、MapReduce支持对数据进行压缩:可以对MapTask产生的中间结果(final out)进行压缩,也支持对ReduceTask的输出结果进行压缩

②、在MapReduce中,默认支持的压缩格式有:Default,BZip2,GZip,Lz4,Spappy,ZStandard,其中比较常用的是BZip2

③、主要是在Driver来进行设置,设置语句也比较简单:

package org.example.compress;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
// import org.apache.hadoop.io.compress.CompressionCodec;
// import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class CompressDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();// 设置参数// 开启Mapper结果的压缩机制// conf.set("mapreduce.map.output.compress", "true");// 设置压缩编码类// conf.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);Job job = Job.getInstance(conf);job.setJarByClass(CompressDriver.class);job.setMapperClass(CompressMapper.class);job.setReducerClass(CompressReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path("hdfs://hadoop01:9000/txt/words.txt"));FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop01:9000/result/compress"));// 对Reduce结果进行压缩FileOutputFormat.setCompressOutput(job, true);FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);// FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);job.waitForCompletion(true);}}

3、推测执行机制

①、推测执行机制本质上是MapReduce针对慢任务的一种优化。慢任务指的是其他任务都正常执行完,但是其中几个任务依然没有结束,那么这几个任务就称之为慢任务

②、旦出现了慢任务,那么MapReduce会将这个任务拷贝一份放到其他节点上,两个节点同时执行相同的任务,谁先执行完,那么它的结果就作为最终结果;另外一个没有执行完的任务就会被kill掉

③、慢任务出现的场景

任务分配不均匀

节点性能不一致

数据倾斜

④、在实际生产过程中,因为数据倾斜导致慢任务出现的机率更高,此时推测执行机制并没有效果反而会占用更多的集群资源,所以此时一般会考虑关闭推测执行机制

⑤、推测执行机制配置(放在mapred-site.xml文件中)true是开着的,false是关闭

<property>
<name>mapreduce.map.speculative</name>
<value>true</value>
</property><property>
<name>mapreduce.reduce.speculative</name>
<value>true</value>
</property>

4、数据倾斜

①、数据倾斜指的是任务之间处理的数据量不均等。例如统计视频网站上各个视频的播放量,那么此时处理热门视频的任务索要处理的数据量就会比其他的任务要多,此时就产生了数据倾斜

②、Map端的数据倾斜的产生条件:多源输入、文件不可切、文件大小不均等。一般认为Map端的倾斜无法解决

③、实际开发中,有90%的数据倾斜发生在了Reduce端,直接原因就是因为是对数据进行分类,本质原因是因为数据本身就有倾斜的特性,可以考虑使用二阶段聚合的方式来处理Reduce端的数据倾斜

5、join

①、如果在处理数据的时候,需要同时处理多个文件,且文件相互关联,此时可以考虑将主要处理的文件放在输入路径中,将其他关联文件缓存中,需要的时候再从缓存中将文件取出来处理

②、案例:统计每一天卖了多少钱

数据:order.txt 表示订单 订单编号、日期、商品数量、卖出数量

1001 20170710 4 2
1002 20170710 3 100
1003 20170710 2 40
1004 20170711 2 23
1005 20170711 4 55
1006 20170823 3 20
1007 20170823 2 3
1008 20170823 4 23
1009 20170912 2 10
1010 20170912 2 2
1011 20170914 3 14
1012 20170914 3 18

数据:product.txt  表示商品 商品编号、商品名、商品单价

1 xiaomi 3899
2 huawei 5899
3 opper 2999
4 apple 5999
package org.example.join;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;
import java.net.URI;public class JoinDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(JoinDriver.class);job.setMapperClass(JoinMapper.class);job.setReducerClass(JoinReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Order.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 确定一个主要的处理文件-统计每一天卖了多少钱,键是日期,值是钱// 主要处理文件 -> order.txtFileInputFormat.addInputPath(job,new Path("hdfs://hadoop01:9000/txt/order.txt"));//将文件关联的路径放到缓存中,需要使用的时候再从缓存中取出来处理即可URI[] files = {URI.create("hdfs://hadoop01:9000/txt/product.txt")};job.setCacheFiles(files);FileOutputFormat.setOutputPath(job,new Path("hdfs://hadoop01:9000/result/join"));job.waitForCompletion(true);}
}
package org.example.join;import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;public class JoinMapper extends Mapper<LongWritable, Text,Text,Order> {private final Map<String,Order> map = new ConcurrentHashMap<>();@Overrideprotected void setup(Context context) throws IOException, InterruptedException {//获取文件路径URI file = context.getCacheFiles()[0];//连接hdfsFileSystem fs = FileSystem.get(file,context.getConfiguration());//获取输入流FSDataInputStream in = fs.open(new Path(file.toString()));//获取到的输入流是一个字节流,但要处理的文件是一个字符文件,考虑将字节流转换为字符流BufferedReader re = new BufferedReader(new InputStreamReader(in));String line;while ((line=re.readLine()) != null){// 拆分字段// 2 huawei 5899String[] arr = line.split(" ");Order o = new Order();o.setProductId(arr[0]);o.setPrice(Double.parseDouble(arr[2]));map.put(o.getProductId(),o);}re.close();}//处理主要文件 order.txt@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 1001 20170710 4 2String[] arr = value.toString().split(" ");Order o = new Order();o.setProductId(arr[2]);o.setNum(Integer.parseInt(arr[3]));o.setPrice(map.get(o.getProductId()).getPrice());context.write(new Text(arr[1]),o);}
}
package org.example.join;import org.apache.hadoop.io.Writable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class Order implements Writable {private String productId="";private int num;private double price;public String getProductId() {return productId;}public void setProductId(String productId) {this.productId = productId;}public int getNum() {return num;}public void setNum(int num) {this.num = num;}public double getPrice() {return price;}public void setPrice(double price) {this.price = price;}@Overridepublic void write(DataOutput dataOutput) throws IOException {dataOutput.writeUTF(getProductId());dataOutput.writeInt(getNum());dataOutput.writeDouble(getPrice());}@Overridepublic void readFields(DataInput dataInput) throws IOException {setProductId(dataInput.readUTF());setNum(dataInput.readInt());setPrice(dataInput.readDouble());}
}
package org.example.join;import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class JoinReducer extends Reducer<Text,Order,Text, DoubleWritable> {@Overrideprotected void reduce(Text key, Iterable<Order> values, Context context) throws IOException, InterruptedException {double sum =0;for (Order value : values) {sum += value.getNum()*value.getPrice();}context.write(key,new DoubleWritable(sum));}
}

大数据笔记--Hadoop(第五篇)相关推荐

  1. 大数据笔记9—java基础篇5(API)

    java_API API String类 创建字符串对象的区别对比 String字符串的特点 字符串的比较 例题(String) 例题1. 模拟登陆 例题2遍历字符串 例题3.案例 :统计字符次数 例 ...

  2. 大数据笔记--SparkSQL(第一篇)

    目录 一.Spark Sql 1.概述 2.由来 3.Spark SQL特点 4.为什么SparkSQL的性能会的得到这么大的提升? Ⅰ.内存列存储 二.SparkSql入门 1.概述 2.创建Dat ...

  3. 大数据笔记8—java基础篇4(面向对象-封装-继承-多态)

    面向对象 一.面向对象 1.面向过程 1.2.举例 1.3.总结 二.面向对象 1.简述 2.举例 3.思想特点 2.1.类的定义格式 2.1.1.简述 2.2.2.格式 2.3.3.示例 三.类的使 ...

  4. 大数据笔记10—java基础篇6(集合1-Collection)

    集合 集合(Collection) 一.迭代器<iterator> 案例一 二.并发修改异常 三.Collection集合 案例一(Collection练习) 案例二(Collection ...

  5. 大数据笔记16—java基础篇12(JDBC 、连接池、事务)

    目录 JDBC jdbc概述 jdbc入门案例 API详解 jdbc工具类 预编译执行平台 1.SQL注入问题(安全问题) 2API详解:预处理对象(PreparedStatement) 使用连接池重 ...

  6. 大数据笔记11—java基础篇7(集合2-Map)

    目录Map Map集合 2.1 Map入门 2.2 Map集合的成员方法 Collections类 斗地主案例 苟有恒,何必三更眠五更起: 最无益,莫过一日曝十日寒. Map集合 概述: Map集合是 ...

  7. 好程序员大数据笔记之:Hadoop集群搭建

    好程序员大数据笔记之:Hadoop集群搭建在学习大数据的过程中,我们接触了很多关于Hadoop的理论和操作性的知识点,尤其在近期学习的Hadoop集群的搭建问题上,小细节,小难点拼频频出现,所以,今天 ...

  8. 2017年全球大数据产业报告之海外篇(第五集)

    本文作者│吴极 微信号│wujiwuji1023 本文转载自公众号星河融快(rongkuai888)  ,作者吴极(微信ID:wujiwuji1023)   中国软件网获授权转载. " 在过 ...

  9. 大数据框架Hadoop篇之Hadoop入门

    1. 写在前面 今天开始,想开启大数据框架学习的一个新系列,之前在学校的时候就会大数据相关技术很是好奇,但苦于没有实践场景,对这些东西并没有什么体会,到公司之后,我越发觉得大数据的相关知识很重要,不管 ...

  10. 尚硅谷大数据技术Hadoop教程-笔记02【Hadoop-入门】

    视频地址:尚硅谷大数据Hadoop教程(Hadoop 3.x安装搭建到集群调优) 尚硅谷大数据技术Hadoop教程-笔记01[大数据概论] 尚硅谷大数据技术Hadoop教程-笔记02[Hadoop-入 ...

最新文章

  1. 大写的服!中科大博士写20万字论文:如何给女朋友送礼物
  2. 领歌leangoo敏捷工具个人工作台功能
  3. 按钮点击_如何设置微信小程序按钮点击事件?
  4. 简单的网络图片加载工具类
  5. 利用js代码引入其他js文件到页面中
  6. Java8中list转map方法总结
  7. 手机端车牌识别软件下载
  8. 程序测试包含哪些内容?
  9. EditPlus--用法--快捷键/配置/背景色/字体大小
  10. “多模态视频人物识别”课程分享学习总结
  11. V2X车联网-学习整理笔记
  12. layim手机版嵌入app
  13. win10找不到便签(便利贴)怎么办,Win10找回便签功能的方法
  14. 第十二章 软件壳(四)(代码抽取型壳)
  15. wcdma码片速率_WCDMA中3.84M码片速率的由来
  16. 鸿蒙负责人王成录被曝已离职:华为技术元老,1998年哈工大博士毕业后加入
  17. xcode mac app_IOS苹果APP签名详解
  18. murmurHash使用方法
  19. c#: 线程状态和管理之线程的休眠、挂起和中断
  20. Python爬虫汉字乱码问题

热门文章

  1. Oracle 后台进程初探
  2. 飞鱼星流控王VF12路由器使用体验
  3. java面向对象数组实现家庭收支记账软件_golang实战--家庭收支记账软件(面向过程)...
  4. 无线宽带接入802.16技术简介(转)
  5. uc3854 matlab仿真,基于UC3854硬开关PFC变换电路设计课程设计.doc
  6. “但问耕耘,莫问收获”才有机会“碰运气”——新书《成功与运气:好运和精英社会的神话》解读
  7. DNA损伤修复基因数据库
  8. Linux下Tomcat官网下载安装详细教程+域名绑定访问
  9. Linux学习16-磁盘分区MSDOS与GPT的区别
  10. Windows组策略禁止广告弹窗