文章目录

  • 一:hadoop.
    • hadoop是什么
    • 1.1 hadoop发展历史
    • 1.2 hadoop优势
    • 1.3 Hodoop1.x-2.x-3.x的区别
    • 1.4 HDFS简单概述
    • 1.5 YARN简单概述
    • 1.6 MapReduce
    • 1.7 HDFS ,YARN,MapReduce三者关系
    • 1.8 安装虚拟机centos7.5
    • 1.9 安装hadoop
    • 1.10 hadoop的运行模式
    • 1.11 本地运行模式
    • 1.12 hadoop 集群配置
    • 1.13 集群性能测试
      • 1.13.1 数据实际存储的地方
    • 1.14 配置历史服务器
    • 1.15 配置日志的聚集
    • 1.16 集群启动/停止方式总结
    • 1.17 编写 Hadoop 集群常用脚本
    • 1.18 hadoop面试题
  • 二:HDFS
    • 2.1HDFS优点
    • 2.2 HDFS缺点
    • 2.3 HDFS 组成架构
    • 2.4 HDFS文件块大小(面试重点)
    • 2.5 HDFS的shell命令
      • 2.6 HDFS上传
      • 2.7 HDFS下载
      • 2.8 HDFS直接操作
    • 2.9 HDFS的API操作
      • 2.9.1 环境准备
      • 2.9.2 创建文件夹
      • 2.9.3 上传文件
      • 2.9.4 优先级
      • 2.9.5 下载
      • 2.9.6 文件的删除
      • 2.9.7 文件的重命名和移动
      • 2.9.8 查看文件信息
      • 2.9.9 判断是否是文件
    • 2.10 HDFS的读写流程(面试重点)
    • 2.11 距离计算
    • 2.12 副本的选择
    • 2.13 hadoop的数据读取流程(面试重点)
    • 2.14 NameNodo和2nn的工作机制
    • 2.15 Fsimage(镜像文件) 和enits(操作日志)
    • 2.16 chkpoint的检查点时间
    • 2.17 DateNode和NameNode
  • 三:MapReduce
    • 3.1 MapReduce的概述&优缺点
    • 3.2 MapReduce核心思想
    • 观看WordCount源码
    • 3.3 常用序列化类型
    • 3.4 MapReduce编码规范
    • 3.5 WordCount代码示例
      • 3.5.1 编写Mapper
      • 3.5.2 编写Reduce
      • 3.5.3 编写Driver
      • 3.5.4 运行
      • 3.5.5 DeBug运行
      • 3.5.6 集群运行
    • 3.6 序列化
    • 3.7 自定义的bean对象实现序列化接口
    • 3.8 案例练习
    • 3.9 MapperReduce框架原理
    • 3.10 inputFromat数据输入
      • 3.10.1 切片与MapTask并行度决定机制
      • 3.10.2 FileinputFromat 切片源码
      • 3.10.3 FileinputFromat 切片机制
      • 3.10.4 TextinputFrom
      • 3.10.5 combineTextInputFormat
        • 3.10.6 CombineTextInputFormat 案例实操
    • 3.11 MapReduce的工作流程(重要)
    • 3.12shuffe机制
    • 3.13 分区
    • 3.14 排序
      • 3.15 全排序
      • 3.15.1 二次排序
    • 3.16 combiner
    • 3.17 combiner代码实现
    • 3.18 合并2张表的数据
    • 3.19 MapJoin与数据倾斜
    • 3.20数据的压缩
  • 四:Yarn
    • 4.1 Yarn的基础架构
    • 4.2 YARN的工作机制(执行流程)
    • 4.3 Yarn的任务调度器
      • 4.3.1 先进先出调度器(FIFO)
      • 4.3.2 容量调度器
    • 4.3.3 公平资源调度器
    • 4.4 Yarn的常用命令
      • 4.4.1 查看任务
      • 4.4.2 查看容器
      • 4.4.3 查看YarnNode的状态
      • 4.4.4Yarn参数核心配置
    • 4.5 容量调度器提交案例
      • 4.6 配置任务的优先级
    • 4.7 配置多队列的公平调度器
    • 4.8 Yarn的tool接口
      • 4.8.1 代码实现

HADOOP中文文档

一:hadoop.

hadoop是什么

1)Hadoop是一个由Apache基金回收研发的分布式基础架构

  • 所谓分布式计算就是在两个或多个软件互相共享信息,这些软件既可以在同一台计算机上运行,也可以在通过网络连接起来的多台计算机上运行

    2)主要解决海量数据的存储和海量数据的分析问题
    3)广义上来说,Hadoop值一个广泛的概念–生态圈

1.1 hadoop发展历史

1)Hadoop创始人 Doug Cutting,为了实现与Google类似的全文搜索功能,在Lucene框架基础上进行优化和升级,查询引擎和索引擎
2)2001年年底Lucene称为Apache基金会的一个子项目。
3)对于海量数据的场景Lucene框架面对和Google同样的困难,存储海量数据困难,检索海量数据慢。
4)学习和模仿Google解决这些问题的办法:微软版Nutch
5)可以说Google是Hadoop的思想之源,(Google在大数据方面得到3篇论文)
GTS–>HDFS
Map-Reduce–>MR
BigTable–>HBase
6)2003年-2004,Google公开了部分GFS和MapReduce思想的细节,以此为基础Doug Cutting等人用了2年的业余时间实现了DFS和MapReduce机制,使Nutch性能飙升。
7)2005年Hadoop作为Lucene的子项目Nutch的一部分正式引入Apache基金会
8)2006年3月份,Map-Reduce和Nutch Distributed File System (NDFS)分别被纳入到Hadoop项目中,Hadoop就此诞生,标志着大数据的时代来临,

1.2 hadoop优势

高可靠性

  • Hadoop底层维护了多个副本,某个出现故障不会导致数据丢失

    高扩展性
    在集群间分配任务的数据,可方便扩展数以千计的节点,

    高效性
  • 在MapReduce的思想下,Hadoop是并行工作的,以加快数据的处理速度

    高容错性
    能够自动将失败的任务进行重新的分配

1.3 Hodoop1.x-2.x-3.x的区别

  • Hodoop1.x和2.x的区别
  • 1.x 是由common-HDFS-MapReaduce组成
  • 2.x是由common -HSDF-Yam-MapReduce组成,在1X的MapReduce将任务调度拆了出来
  • 3.x和3.x的构成上是没有什么区别,有的是细节的改动

1.4 HDFS简单概述

Hadoop Distributed File System 简称HDFS,是一个分布式文件系统
1)NameNode:存储文件的原数据,如文件名,目录结构,文件属性等(生成副本,副本数,文件权限等)以及每个文件的块列表和每个文件所在的Data Node等,
2)Data Node :在本地系统存储文件块数据,以及块数据的效验和。
3)Secondary NameNode:每隔一段时间对NameNode原数据备份

1.5 YARN简单概述

Yet Another Resource Negotiator简称YARN ,另一种资源协调者,是Hadoop的资源管理器

1.6 MapReduce

MapReduce将计算过程分为2个阶段,Map和Reduce
1)Map 阶段并行处理输入数据
2)Reduce 阶段对Map进行汇总

  • 我们将数据存储好,数据存储在n个服务器上,我们让每隔服务器去检索自己的数据,看看有没有检索到,最后将有没有检索到的数据返回

1.7 HDFS ,YARN,MapReduce三者关系

1)DataNode是每一个数据存储的节点
2)NameNode是存储所有的文件放在了什么位置
3)SecondaryNameNode是对NameNode进行一个数据的部分备份
4)ResourceManager 会随机开启一个任务,AppMstr,告诉ResourceManager 我需要多少内存,CPU等,去开启一个MapTask任务,去检索每个DataNode上是否有这个数据,最后Reduce Task,将检索的结果写入

1.8 安装虚拟机centos7.5

1.在VMware上安装cemtos7.5,配置好其静态ip,让其可以通过xshell连接
2.在root用户下输入命令
yum install -y epel-relase
3.关闭防火墙systemctl stop firewalld
systemctl disable firewalld.service

  • 参考地址
    连接

配置java环境变量

1.9 安装hadoop

将hadoop的压缩包解压缩

  • 配置环境HADOOP的环境变量
    sudo vim /etc/profile.d/my_env.sh

  • export JAVA_HOME=/opt/module/jdk1.8.0_212
    export PATH=PATH:PATH:PATH:JAVA_HOME/bin
    #hadoop
    export HADOOP_HOME=/opt/module/hadoop-3.1.3
    export PATH=PATH:PATH:PATH:HADOOP_HOME/bin
    export PATH=PATH:PATH:PATH:HADOOP_HOME/sbin

配置完成需要进行source /etc/profile

1.10 hadoop的运行模式

1.11 本地运行模式

创建一个wcinput文件,里面创建一个word.text,写入数据
liuxngyu liuxingyu
zhangsn zhangsan
然后执行下面 可以算出,每个名字有多少个
bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar wordcount wcinput/ ./wcoutput

1.12 hadoop 集群配置

0.配置好ip地址

sudo vim /etc/hosts — 3台机器配置成一样的ip地址

1.将本机的java和hadoop拷贝到子机上

scp -r jdk1.8.0_212/ liuxingyu@hapool103:/opt/module/ --主机拷贝
rsync -av hadoop-3.1.3/ liuxingyu@hapool103:/opt/module/hadoop-3.1.3/ – 同步和git拉取

xsync ceshi 脚本复制

  • 配置ssh免密登录

cd
cd .shh/
ssh-keygen -t rsa
ssh-copy-id hapool103 给103设置免密登录,102可以免密登录到103中
ssh-copy-id hapool104 给104设置免密登录,102可以免密登录到104中
ssh-copy-id hapool102 给自己设置免密登录,102可以免密登录到102中
exit 退出
ls -al 查看隐藏文件

01.修改配置文件,给3台的集群都配置好
02.配置 workers

hadoop102
hadoop103
hadoop104
注意:该文件中添加的内容结尾不允许有空格,文件中不允许有空行。
同步所有节点配置文件
[atguigu@hadoop102 hadoop]$ xsync /opt/module/hadoop-3.1.3/etc
03.启动集群
(1)如果集群是第一次启动,需要在 hadoop102 节点格式化 NameNode(注意:格式
化 NameNode,会产生新的集群 id,导致 NameNode 和 DataNode 的集群 id 不一致,集群找
不到已往数据。如果集群在运行过程中报错,需要重新格式化 NameNode 的话,一定要先停
止 namenode 和 datanode 进程,并且要删除所有机器的 data 和 logs 目录,然后再进行格式
化。)
[atguigu@hadoop102 hadoop-3.1.3]$ hdfs namenode -format
(2)启动 HDFS
[atguigu@hadoop102 hadoop-3.1.3]$ sbin/start-dfs.sh
(3)在配置了 ResourceManager 的节点(hadoop103)启动 YARN
[atguigu@hadoop103 hadoop-3.1.3]$ sbin/start-yarn.sh
(4)Web 端查看 HDFS 的 NameNode
(a)浏览器中输入:http://hadoop102:9870
(b)查看 HDFS 上存储的数据信息
(5)Web 端查看 YARN 的 ResourceManager
(a)浏览器中输入:http://hadoop103:8088
(b)查看 YARN 上运行的 Job 信息.

1.13 集群性能测试

在hadoop文件下
hadoop fs -mkdir /wcinput
然后此时刷新浏览器就会看到

  • 上传文件
  • hadoop fs -put wcinput/word.text /wcinput
    把wcinput/word.text文件上传到wcinput中 ,然后刷新浏览器即可看到

-踩坑:需要在win10修改
C:\Windows\System32\drivers\etc的host文件
将你虚拟机的名称和ip值设置好
例如192.168.116.122 hadpool 这样即可,3台都在win10设置好

点击这里就可以查看自己上传的数据

1.13.1 数据实际存储的地方

数据是存储在datanode上
data/dfs/data/current/BP-1828537540-192.168.116.132-1665216790015/current/finalized/subdir0/subdir0

  • 查看一下cat blk_1073741825

踩坑
-hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar wordcount /input /output
进行一个集群的个数测试

可以去查看我们的任务

1.14 配置历史服务器

为了查看程序的历史运行情况,需要配置一下历史服务器。具体配置步骤如下:

  • 1) vim mapred-site.xml
    在该文件里面增加如下配置。
<!-- 历史服务器端地址 -->
<property><name>mapreduce.jobhistory.address</name><value>hadoop102:10020</value>
</property>
<!-- 历史服务器 web 端地址 -->
<property><name>mapreduce.jobhistory.webapp.address</name><value>hadoop102:19888</value>
</property>
  • 2)分发配置
  • xsync mapred-site.xml
  • 3)在 hadoop102 启动历史服务器
  • mapred --daemon start historyserver
  • 重启yan,重新执行命令即可查看历史

1.15 配置日志的聚集


-1)配置 yarn-site.xml

<!-- 开启日志聚集功能 -->
<property><name>yarn.log-aggregation-enable</name><value>true</value>
</property>
<!-- 设置日志聚集服务器地址 -->
<property> <name>yarn.log.server.url</name> <value>http://hadoop102:19888/jobhistory/logs</value>
</property>
<!-- 设置日志保留时间为 7 天 -->
<property><name>yarn.log-aggregation.retain-seconds</name><value>604800</value>
</property>
  • 分发xsync yarn-site.xml

  • 关闭historyserver mapred --daemon stop historyserver

  • 重启yare

  • 重新执行任务就可以看到了

1.16 集群启动/停止方式总结

1)各个模块分开启动/停止(配置 ssh 是前提)常用
(1)整体启动/停止 HDFS

start-dfs.sh/stop-dfs.sh

(2)整体启动/停止 YARN

start-yarn.sh/stop-yarn.sh

2)各个服务组件逐一启动/停止
-利用kill -9 2175 杀死data node

hdfs --daemon start datanode    启动datanode

(1)分别启动/停止 HDFS 组件
-hdfs --daemon start/stop namenode/datanode/secondarynamenode
(2)启动/停止 YARN
-yarn --daemon start /stop resourcemanager/nodemanager

1.17 编写 Hadoop 集群常用脚本

在bin 目录创建集群的启动脚本
cd /bin
sudo vim hadoop.sh

#!/bin/bash
if [ $# -lt 1 ]
thenecho "No Args Input..."exit ;
fi
case $1 in
"start")echo " =================== 启动 hadoop 集群 ==================="echo " --------------- 启动 hdfs ---------------"ssh hapool102 "/opt/module/hadoop-3.1.3/sbin/start-dfs.sh"echo " --------------- 启动 yarn ---------------"ssh hapool103 "/opt/module/hadoop-3.1.3/sbin/start-yarn.sh"echo " --------------- 启动 historyserver ---------------"ssh hapool102 "/opt/module/hadoop-3.1.3/bin/mapred --daemon start historyserver"
;;
"stop")echo " =================== 关闭 hadoop 集群 ==================="echo " --------------- 关闭 historyserver ---------------"ssh hapool102 "/opt/module/hadoop-3.1.3/bin/mapred --daemon stop historyserver"echo " --------------- 关闭 yarn ---------------"ssh hapool103 "/opt/module/hadoop-3.1.3/sbin/stop-yarn.sh"echo " --------------- 关闭 hdfs ---------------"ssh hapool102 "/opt/module/hadoop-3.1.3/sbin/stop-dfs.sh"
;;
*)echo "Input Args Error..."
;;
esac

sudo chmod 777 hadoop.sh

  • 然后就可以通过脚本启动和关闭集群了
    hadoop.sh start -->启动
    hadpoop.sh stop -->关闭

配置jps脚本
cd /bin
sudo vim jpsall

#!/bin/bash
for host in hapool102 hapool103 hapool104
doecho =============== $host ===============ssh $host jps
done

sudo chmod 777 jpsall

-然后通过脚本就可以查看3台机器的状况了

1.18 hadoop面试题

  • 常用端口号
  • hadoop3.x

HDFS NameNode 内部常用端口:8020/9000/9820
HDFS NameNode 对用户查询的端口:9870
Yarn查看任务运行情况 8088
历史服务器:19888

  • hadoop2.x

HDFS NameNode 内部常用端口:8020/9000
HDFS NameNode 对用户的查询接口:50070
Yarn 查看历史服务器的端口:8088
历史服务器:19888

  • 2)常用的配置文件
  • 3.x core-site.xml – hdfs-site.xml --yarn-site.xml–mapred-site.xml–worders
  • 3.x core-site.xml – hdfs-site.xml --yarn-site.xml–mapred-site.xml–
  • "

二:HDFS

  • HDFS产生背景

  • 随着数据量的越来越大,在一个操作系统存不下所有的数据,那么就分配到更多的操作系统管理的磁盘中,但是不方便维护和管理,迫切需要一种系统来管理多台机器上的文件,这就是分布式文件管理系统,,HDFS只不过是分布式文件管理系统的一种

  • HDSF的定义

  • HDSF是一个文件系统,用于存储文件,通过目录数来定位文件,其次,它是分布式的,由很多服务器联合起来实现其功能,集群中的服务器有各自的角色。
    使用场景 适合一次写入,多次读出的场景。一个文件经过创建,写入和关闭,之后就不需要改变

2.1HDFS优点

  • 1.高容错性,
  • 数据自动保存多个副本,通过增加副本的方式,提高容错性,
  • 某一个副本丢失后,可以自动的恢复
  • 2)适合处理大数据
  • 数据规模:能够处理数据规模达到GB-TB-甚至PB级别的数据
  • 文件规模:能够处理百万规模以上的文件数量,数量相当庞大
  • 3)可在廉价的机器上,通过多副本机制,提高可靠性

2.2 HDFS缺点

1) 不适合低延迟数据访问,比如毫秒级的数据存储是无法做到的
2) 无法高效的对大量的小文件进行存储

  • 存储大量小文件的话,会占用NameNode大量的内存来存储文件目录和块信息,这样是不可取的,以为Name的存储是有有限的
  • 小文件存储的寻址时间会超过读取的时间,它违反了HDFS的设计目标
  • 3)不支持文件的并发写入,文件的随机修改,
  • 一个文件只能有一个写,不允许有多个线程同时写

2.3 HDFS 组成架构

  • NameNode :就是master 它是一个主管,管理者
  • (1)管理HDFS的名称空间
  • (2)配置副本策略
  • (3)管理数据块(Block)的信息
  • (4)处理客户端的读写请求
  • DataNode 就是Slave NameNode下达命令,DataNode进行实际的操作
  • (1)存储实际的存储块,
  • (2)执行读写的操作
  • Client:就是客户端
  • (1)文件切份,文件上传HDFS的时候,Client将文件切分成一个BLOCK,然后进行上传
  • (2)与NameNode 进行交互,获取文件的位置信息
  • (3)与DataNode 进行交互,读取或写入数据,
  • (4)Client提供一些命令来管理HDFS比如NameNode格式化
  • (5)Client可以通过一些命令来访问HDFS,比如对HDFS的增删改查的操作
  • SecondaeyNameNode,并非NameNode的热备,当NameNode挂掉的时候,它并不能马上替换NameNode并提供服务
  • (1)辅助NameNode,分担其工作量,比如定期合并Simage和Edits,并推送给NameNode;
  • (2)紧急情况下,可辅助NameNode

2.4 HDFS文件块大小(面试重点)

  • HDFS中的文件在物理上是分块存储,(BLOCK),块的大小可以通过配置参数(dsf.blocksize),来规定,默认大小在Hadoop2.x/3.x版本中是128M,1X的版本是64M

如果传过来的数据是1KB,既然默认的是128那么,他不会占用128,只会占用1KB,其他的空间别人可以使用

  • 2)如果寻址的时间为10ms,即查找到目标的Block的时间为10ms
  • 3)寻址时间为传输时间的1%,为最大状态,因为传输时间为10ms/0.01=1000ms=1s
  • 4)目前磁盘的传输数据为100mb/s,

ps:如果买到的服务器就是普通的机械硬盘,那传输率在10mb左右,那么就将块大小设为128mb

  • 总结:HDFS是来进行文件的存储,里面有NameNode,SecondaeyNameNode,DataNode三种,其中NameNode用来管理DataNode相当于一个老板的角色。
  • SecondaeyNameNode是来对NameNode一个“秘书的角色”
  • DataNode的作用是存储文件的存储块,和读取写入的操作(”底层打工人”)
  • 文件进行存储的时候,比如我们的数据很大,一台机器肯定是存储不下的,将数据分为N个Block,数据块,1个Block的默认大小是128,文件块是分布式存储在每一个DataNode上的
  • HDFS块的大小设置取决于磁盘的传输速率

-上传了一个es的压缩包,我们可以看到上传数据的时候,将es压缩包分成5个Block

2.5 HDFS的shell命令

  • 基本语法
  • hadoop fs 具体命令 OR hdfs dfs 具体命令
  • 以上都是一些常用的命令

2.6 HDFS上传

  • (1) mkdir 创建文件夹
  • (示例)hadoop fs -mkdir /sanguo
  • (2)将本地文件剪切上传
  • 2.1: sudo vim shuguo.txt(新建文件)
  • 2.2:hadoop fs -moveFromLocal ./shuguo.txt /sanguo(剪切文件到HDFS上)
  • (3)将本地文件复制到HDFS中(copyFromLocal)
  • 3.1 sudo vim weiguo.txt(创建文件)
  • 3.2hadoop fs -copyFromLocal ./weiguo.txt /sanguo(将本地文件复制上传上去)

    -(4)put(等同于coopyFromLocal)
  • 4.1 sudo vim wuguo.txt
  • 4.2 hadoop fs -put wuguo.txt /sanguo


-(5)数据追加(appendToFile )

  • 5.1 sudo vim liubei.txt
  • 5.2 hadoop fs -appendToFile liubei.txt /sanguo/shuguo.txt
  • 将liubei.txt文件的内容追加到shuguo.txt文件

2.7 HDFS下载

  • (1)下载到本地(copyToLocal )
  • 将HDFS的/sanguo/shuguo.txt文件下载到本地
  • hadoop fs -copyToLocal /sanguo/shuguo.txt ./
  • (2)下载到本地(get)
    • 将HDFS的/sanguo/shuguo.txt文件下载到本地命名为shuguo2.txt
  • hadoop fs -get /sanguo/shuguo.txt ./shuguo2.txt

生产环境下多用于上传(put)下载(get)

2.8 HDFS直接操作

  • (1)直接查看(ls)

  • 等同于虚拟机的ls 只不是是直接查看的hdfs的数据

  • hadoop-3.1.3]$ hadoop fs -ls /sanguo

  • (2)直接查看文件内容(cat)

  • hadoop fs -cat /sanguo/shuguo.txt

  • 修改文件权限(chmod)
  • hadoop fs -chmod 777 /sanguo/shuguo.txt
  • (3) mkdir 创建文件夹
  • hadoop fs -mkdir /jinguo(上面已经用过了)
  • (4)在hdfs的一个文件夹拷贝到另外一个文件夹(cp)
  • hadoop fs -cp /sanguo/shuguo.txt /jinguo

  • (5)移动数据(mv)

  • hadoop fs -mv /sanguo/wuguo.txt /jinguo

  • hadoop fs -mv /sanguo/weiguo.txt /jinguo

  • 将sanguo/wuguo.txt 和weiguo.txt的数据移动到 jinguo下

  • (6)查看文件末尾1kb的数据 (tail)
  • hadoop fs -tail /jinguo/shuguo.txt
  • (7)删除文件(rm)
  • hadoop fs -rm /sanguo/shuguo.txt
  • (8) 递归删除(rm -r )
  • hadoop fs -rm -r /sanguo
  • (9)查询文件夹的大小(du)
  • hadoop fs -du -s -h /jinguo 查询文件夹的总大小,第一个是总的大小,3个文件加一起,第二个是因为有3个副本27*3共占用81
  • hadoop fs -du -h /jinguo 查看的是文件夹的每一个文件的占用大小

    (10)设置副本数量
  • hadoop fs -setrep 10 /jinguo/shuguo.txt

    设置的有10个副本数量,现在只有3个服务器也就是说最大就是3,设置了10个,等以后在添加服务器,到10个服务器,这10个服务器就占满了,超过了10台服务器就不会增加了

2.9 HDFS的API操作

2.9.1 环境准备

将win10所需要的包复制到E:tool下

  • 配置环境变量
  • 变量名:HADOOP_HOME 变量值E:\tool\hadoopAPI\hadoop-3.1.0
  • 在path中%HADOOP_HOME%\bin

打开idea新建Maven项目

pom依赖
<dependencies><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.30</version></dependency></dependencies>

在项目的 src/main/resources 目录下,新建一个文件,命名为“log4j.properties”,在文件
中填入

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

2.9.2 创建文件夹

  @Testpublic void mrkder() throws IOException, URISyntaxException, InterruptedException {//连接的集群地址,也就是内部通信的地址//创建一个配置文件Configuration configuration = new Configuration();//获取客户端对象FileSystem fs = FileSystem.get(new URI("hdfs://hapool102:8020"), configuration,"hapool102");fs.mkdirs(new Path("/xiyou/huaguoshan"));fs.close();}

  • 封装一下代码

2.9.3 上传文件

  //上传@Testpublic void copy() throws IOException {//参数一:是否删除源文件//参数二:是否对数据进行覆盖//参数三:源文件地址//参数四:上传的地址fs.copyFromLocalFile(false,false,new Path("E:\\sunwukong.txt"),new Path("/xiyou/huaguoshan"));}

2.9.4 优先级

  • hdfs-default.xml–>hdfs.site.xml
  • 在java的resources目录下新建一个hdfs-site.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>dfs.replication</nme><value>1</value>
</property>
</configuration>

  • 修改默认的副本数是1 说明目前在resources下创建的优先级最大

  • 在Configuration中进行配置
  • 总结:hdfs-default.xml–>hdfs.site.xml–>resoures下配置–>代码配置

2.9.5 下载

 @Testpublic void dowload() throws IOException {//参数一:是否删除源文件//参数二:需要在哪里下载//参数三:下载在哪里//参数四:本地效验fs.copyToLocalFile(false,new Path("/xiyou/huaguoshan/sunwukong.txt"),new Path("E:/"),false);}
  • 开启了本地效验,除了下载一个txt文件,还会下载一个.sunwukong.txt.crc 这个文件,这个文件会在发送之前将数据进行加密,然后发送完毕进行一个加密之前的数据和接受到的数据进行一个效验,如果数据丢包了,下载的文件就不可以用了

2.9.6 文件的删除

@Testpublic void delete() throws IOException {//第一个参数:删除的目录//第二个参数:是否递归删除fs.delete(new Path("/xiyou"),true);}

2.9.7 文件的重命名和移动

public void filemv() throws IOException {//fs.rename(new Path("/jinguo/wuguo.txt"),new Path("/jinguo/xxx.txt"));}

2.9.8 查看文件信息

 @Testpublic void read() throws IOException {//路径//是否递归RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fs.listFiles(new Path("/jinguo"), true);while (locatedFileStatusRemoteIterator.hasNext()){LocatedFileStatus next = locatedFileStatusRemoteIterator.next();BlockLocation[] blockLocations = next.getBlockLocations();     //获取文件呢存储的块的信息System.out.println(Arrays.toString(blockLocations));}}

2.9.9 判断是否是文件

  @Testpublic  void fileread() throws IOException {FileStatus[] fileStatuses = fs.listStatus(new Path("/"));for (FileStatus fileStatus : fileStatuses) {if (fileStatus.isFile()) {System.out.println("是文件==="+fileStatus.getPath().getName());}else{System.out.println("不是文件==="+fileStatus.getPath().getName());}}}

2.10 HDFS的读写流程(面试重点)

1.客户端将数据进行发送,先创建一个分布式文件系统,对NameNode进行请求
2.NameNode对本地进行一个效验,检查目录结构,是否有可用节点等
3.检查完毕之后对客户端进行一个响应
4.客户端去上传一个Bock(数据快)到NameNode上
5.NameNode去选择本地的副本节点,考虑本地节点等,然后将可传输的节点返回给客户端
6.客户端去通过接受的可发送节点,去创建写入流,按照最近距离去发送数据
7.给DataNode创建管道
8.在上传文件之前,先创建一个packet(64),pack(64)里面有一个个的pack(chunk512byte +chunksum4byte),组成一个个大小的pack(64)通过缓冲区进行传输
9.按照优先选择建立的管道,去给距离最近的管道发送数据,第一个管道接受其中的一条数据,将其他的数据分发给另外的DataNode,另外的DateNoed写入完成之后会应答成功
10.有 一个ack机制,应答成功删除数据,致此数据写入完成

2.11 距离计算

  • 假设祖先是机架1的n-0,然后
  • 判断集群1的机架3和集群2的机架6的n-1的距离
  • 1.集群1的机架3的n-0找到机架3是1
  • 2.然后机架6的v-0找到机架6是1
  • 然后机架3找到祖先机架1是1
  • 机架6找到集群2是1
  • 集群2找到集群1是1
  • 集群1找到机架1是1
  • 1+1+1+1+1+1=6,这就是6个距离是6

2.12 副本的选择

  • 副本第一个放在距离自己最近的
  • 在另外的机架上方第二个副本
  • 副本3和副本2放在一个机架上
  • 为什么副本3不放在机架3上的?因为本机架传输是比跨机架传输块
  • 为什么机架1和机架2不放在同一个机架上呢,因为保证了数据的

    在默认 3 个副本的情况下,HDFS采用如下的放置策略:在机架 1 上放置第一个副本;在另一个机架 2 上放置第二副本;副本三与副本二放置在同一个机架上;如果有更多的副本,则随机选择机架,每个机架的副本数量有个上限值,计算方式通常是:(replicas - 1) / racks + 2这样放置的好处:避免一个机架出故障,导致所有数据丢失;同一个机架上的节点通信网络会比不同机架节点通信更好,副本二与副本三放置在同一个机架能够节省带宽;从单个文件看来,考虑带宽似乎没有多大意义,但是对于大规模数据的情况下,请求并发量大时,网络是非常重要的一个因素,特别是对于写请求,这里要了解 HDFS 写的流程。因为写副本的过程类似于流水线,先写副本一,但这里写完后就将写成功的结果返回给客户端了。之后由副本一将内容写到副本二,接着由副本二将内容写到副本三。假设副本三和副本一放置在一个机架上,那么就会产生两次不同机架间的写操作。而目前的情况是副本二和副本三在同一个机架,机架间的写操作只会发生在副本一到副本二之间,副本二和副本三的写操作是在同一个机架,节省了网络流量。

2.13 hadoop的数据读取流程(面试重点)


1.首先客户端创建一个分布式文件系统去想NameNode去请求下载数据
2.NameNode去返回元数据
3.客户端去创建流去遵从就近原则去读数据,如果DateNode1达到了极限就去读别的DatNode,达到负载均衡的效果

2.14 NameNodo和2nn的工作机制

我们的数据存储在哪里
存储在内存的话,好处:计算极速快,坏处:可靠性差
存储在磁盘的话:好处:可靠性高,坏处:计算速度慢

用fslmage来存储数据(如果随机读写的效率非常低,假设里面有一个a=10,对数据机械进行操作,a=a+10,去修改这个数据的值肯定是行不通的,但是可以追加数据,不修改元数据,追加最终结果)

内存中有数据
fslmage有数据
将fslmage的数据加上追加的数据,加载到内存,服务器启动的时候就会将fslmage的数据和追加的数据加载到内存中,服务器关机的时候将fslmage的数据和追加的数据进行合并

如果执行了很多的步骤,关机的时候合并很多的数据,效率虎非常低,进行一步操作把数据合并进去,那么需要用到2nn井2个文件进行一个定期的合并

1.首先服务器启动的时候会产生edits_inprogress_01和fsimage链各个文件
fsimage是加载内存上的数据过来成为了镜像文件
edits_inprogress_01是日志文件
2.用户进行删除请求
3.在edits_inprogress_01日志文件进行文件的记录
4.删除操作

snn的触发条件1.定时(一小时)2.关机合并
执行合并操作
5.生成edits_inprogress_02文件,后续的操作在02进行
6.将edits_inprogress_01尽心更名 edits_001
7.将镜像文件和edits_001拉过来拷贝,将镜像文件和edits_001拷贝到内存中
8.通过edits_001对镜像文件进行操作
9.最终生成一个新的fsimage.chkpoint
10.把fsimage.chkpoint拷贝回去
11.然后fsimage.chkpoint和edits_001就是最新的数据

2nn将数据写进去的条件是定时或者Edits的文件满的时候,滚动NameNode的数据加载到内存。

2.15 Fsimage(镜像文件) 和enits(操作日志)

  • NameNode被格式化之后将会在/opt/module/hadoop-3.1.3/data/dfs/name/current
    生成诸多文件(我这个已经有大量的操作所以才会这么多)
  • Fsimage 文件是HDFS对元数据的一个永久性检查,其中包含HDFS文件系统的所有目录和序列化信息
  • enits文件:存放HDFS操作系统的所有的更新的路径进行操作的时候首先会被记录到本文件中
  • sccn_txid文件保存的是一个数字,就是最后一个cdits_的数字,里面有集群id,有这个集群id才能找到你的集群

2.16 chkpoint的检查点时间

1)通常情况下,SecondaryNameNode 每隔一小时执行一次。

[hdfs-default.xml]
<property><name>dfs.namenode.checkpoint.period</name><value>3600s</value>
</property>

2)一分钟检查一次操作次数,当操作次数达到 1 百万时,SecondaryNameNode 执行一次。

<property><name>dfs.namenode.checkpoint.txns</name><value>1000000</value>
<description>操作动作次数</description>
</property>
<property><name>dfs.namenode.checkpoint.check.period</name><value>60s</value>
<description> 1 分钟检查一次操作次数</description>
</property>

2.17 DateNode和NameNode


1.集群启动后,向NameNode发送一次数据,注册成功
2.每隔6个小时上报自己所存储的快信息
3.每隔3秒向NameNode进行心跳检测

三:MapReduce

3.1 MapReduce的概述&优缺点

MapReduce:自己处理业务相关代码+自身的默许代码
优点:
1.易于编程,用户只关心,业务逻辑。实现框架的接口
2.良好的扩展性:可以动态的增加服务器,解决计算资源不够的问题
3.高容错性:任何一台计算机挂掉,可以将任务转移到其他的节点上
4.适合海量数据计算(TB/PB),几千台服务器共同计算,
缺点:
1.不擅长实时计算,mysql
2.不擅长流水式计算,Sparksteaming fink
3.不擅长DAG有向无图计算 Spark

3.2 MapReduce核心思想

1.统计单词的出现频率,a-p的的一个文件q-z的一个文件
2.每一台服务器都创建一个a-p的和q-z的分区
3.进行一个数据的分配:假如A服务器读100m,b读取读200m,c读取器读100m, Map阶段
进行的操作时:
读取数据,按照行处理
按照空格对单词就行切分
KV键值对(在内部会存取比如:hadoop 1 ,hadoop 1)
将所有的KV键值对按照单词的首字母分为2个分区,写到磁盘
4.Reduce阶段:a-p一个文件q-z一个文件
a-p的文件只统计服务器上的a-p分区的数据
q-z的文件只统计服务器上的q-z分区的数据
拿到数据之后,必须(hadoop 1 hadoop1 )一看hadoop都一样,name就会变成hadoop2

观看WordCount源码

在/opt/module/hadoop-3.1.3/share/hadoop/mapreduce
下的hadoop-mapreduce-examples-3.1.3.jar 下载到本地

在反编译软件拖进来

3.3 常用序列化类型

3.4 MapReduce编码规范

1.用户自定义的Mapper要继承父类

2.Mapper的输入数据是kv对的形式(KV的类型可以自己定义)

k放的是偏移量,也就是数据的下标,数据放的时候应该是k=0,v=Hadoop,为什么是7,因为从0开始的,一行的结尾回车也算一个
下一行的第一个就是8,

3.Mapper的业务逻辑是业务逻辑写在map方法中

4.Mapper的输出数据是KV对的形式,(KV可以自己定义)
5.map方法(MapTask)对每一个KV调用一次

  • 一行的内容调用一次,多行内容调用多次
  • Reducer
  • 用户自定义的Reducer要继承自己的父类
  • Reducer的输入类型应该要对应Mapper的输出类型
  • Reducer的业务逻辑写在reduce()方法中
  • ReducerTask的进程对每一组相同的K的<K,V>组调用一次reduce方法

  • Driver阶段:
  • 相当于yarn集群的客户端,用于提交我们整个的程序到yarn集群,提交的是封装了MapReduce程序相关运行的参数的对象

3.5 WordCount代码示例

  • pom
    <dependencies><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.30</version></dependency></dependencies>

  • 1)Maper应该干的事情
  • 将MapTask传给我们的文本内容转为字符串,
  • 将一行的数据进行一个空格的拆分
  • 将单词拆分为<K,V>类型的
  • 2)Reducer应该干的事
  • 汇总出Key,统计出个数
  • 3)Driver应该干的事
  • 获取配置信息,获取job对象实例
  • 获取本程序的jar包所在的本地路径
  • 关联Mapper/Reducer业务类
  • 指定Map输出类型的kv,
  • 指定Reduce输出的kv
  • 指定job原始文件所在的路径
  • 指定job所在的输出结果为目录,
  • 提交作业

3.5.1 编写Mapper

Mapper类要继承Mapper,指明输入的数据类型和输出的数据类型

package bj.sh.gy.MapperReduer.WordCount;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;/*** @author LXY* @desc  继承org.apache.hadoop.mapreduce.Mapper;下的Mapper,填入泛型* KEYIN,:map阶段输入的key类型,是longwritable  。输入的是迁移量* VALUEIN,:mp阶段的输入的values类型,是Text类型* KEYOUT,map阶段输出的key类型是text类型* VALUEOUT:map阶段输出的类型,是intwritable类型** 导入的类型都是 org.apache.hadoop.io包下的数据* @time 2022-10-18  21:01*/
public class WordCountMapper extends Mapper <LongWritable , Text,Text , IntWritable>{
}

重写map对象

public class WordCountMapper extends Mapper <LongWritable , Text,Text , IntWritable>{/**** @param key 是输入数据的key,* @param value 是输入数据的value* @param context  上下文对象* @throws IOException* @throws InterruptedException*/@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {super.map(key, value, context);}
}

在map中写入逻辑

  • context是Mapper和Reduce交互的桥梁
  • 接受行数据分割通过context上下文对象进行交互
public class WordCountMapper extends Mapper <LongWritable , Text,Text , IntWritable>{//private  Text text=new Text();private  IntWritable intWritable=new IntWritable(1);/**** @param key 是输入数据的key,* @param value 是输入数据的value* @param context  上下文对象* @throws IOException* @throws InterruptedException*/@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {//获取一行数据String Link = value.toString();//一行数据进行按照空格分割String[] words = Link.split(" ");//迭代for (String word : words) {text.set(word);  //将切割的每一个数据变为一个text对象context.write(text,intWritable);   //将数据和次数发送 }}
}

3.5.2 编写Reduce

  • 继承Reducer
/*** @author LXY* @desc* Text ,  :接受的k,和mapper发送的保持一致* IntWritable,接受的v,和mapper发送的保持一致* Text, 数出的k* IntWritable  输入的v* @time 2022-10-18  21:01*/
public class WordCountReducer extends Reducer<Text , IntWritable,Text,IntWritable> {}

重写reduce方法

  • Iterable类型是一个集合

  • 写逻辑

package bj.sh.gy.MapperReduer.WordCount;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/*** @author LXY* @desc* Text ,  :接受的k,和mapper发送的保持一致* IntWritable,接受的v,和mapper发送的保持一致* Text, 数出的k* IntWritable  输入的v* @time 2022-10-18  21:01*/
public class WordCountReducer extends Reducer<Text , IntWritable,Text,IntWritable> {private  IntWritable intWritable=new IntWritable();  //定义IntWritable类型@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {int sun=0;  //定义累计和变量for (IntWritable value : values) {sun+=value.get();   //累加数据}intWritable.set(sun);//把int变为intWritablecontext.write(key,intWritable);}
}

3.5.3 编写Driver

1.获取job
2.设置jar包路径
3.关联mapper和Reduce (让这3者关联起来)

package bj.sh.gy.MapperReduer.WordCount;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/*** @author LXY* @desc* @time 2022-10-18  21:02*/
public class WordCountDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {//1.获取jobConfiguration configuration=new Configuration();Job job=Job.getInstance(configuration);//2.设置jar包的路径job.setJarByClass(WordCountDriver.class);//3.关联map和reducejob.setMapperClass(WordCountMapper.class);job.setReducerClass(WordCountReducer.class);//4.设置map输入的kv类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);//5.设置reduce输出的kv类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);//6.设置输入路径和输出路径FileInputFormat.setInputPaths(job,new Path("E:\\input\\count"));FileOutputFormat.setOutputPath(job,new Path("E:\\output\\count"));//  7提交jobboolean b = job.waitForCompletion(true);System.exit(b?0:1);}
}

3.5.4 运行

  • 打开文件就可以看到想要的内容了.默认是排序输出的

3.5.5 DeBug运行

  • 在mapper的初始化方法打上断点,先执行初始化方法

  • 在map方法中

  • 在reduce中

3.5.6 集群运行

  • 修改输入和输出路径让其变成在控制台输入的路径和输出的路径

  • 打包

 <build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>3.6.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>
  • 放到桌面,改名

用这个短的

-

  • 将jar包拖进去
    启动集群对文件进行操作

  • 复制路径

  • shell命令

  • hadoop jar wc.jar bj.sh.gy.MapperReduer.WordCount2.WordCountDriver /input /output

3.6 序列化

  • 将内存中的数据转为字节码文件叫序列化,将字节码文件写入叫反序列化
  • Hadoop序列化的好处:存储空间小,快速,传输速度快,互操作性

3.7 自定义的bean对象实现序列化接口

在企业开发过程中,常用的基本序列化不能满足所有的需求,比如Hadoop框架内部传递的一个bean对象,那么该对象就要实现序列化接口
具体实现序列化的步骤

  • 必须实现Writable接口
  • 反序列化时,需要反射调用空参构造器,所以必须有空参构造器
public FlowBean() {
super();
}
  • 重写序列化方法
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
  • 重写反序列化接口
@Override
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
downFlow = in.readLong();
sumFlow = in.readLong();
}
  • 反序列化的顺序需要和序列化顺序一致

  • 想把结果显示在文件中需要重写tostring()方法

  • 需要将自定西的bean放在key中传输,还需要实现Comparable接口,因为MapReduce框架中的Shuffle过程需要要求对key进行排序

@Override
public int compareTo(FlowBean o) {
// 倒序排列,从大到小
return this.sumFlow > o.getSumFlow() ? -1 : 1;
}

3.8 案例练习

需求分析
1、需求:统计每一个手机号耗费的总上行流量、下行流量、总流量

2.输入数据
1 13930856527 127.0.0.1 1156 666 www.baidu.com 200

3.Mapper阶段,读取一行,对数据进行切割

4.抽取手机的手机号,上行流量和下行流量

5.以手机号为key,bean对象输出values,既:context.write(手机号,bean)

6.reduce阶段,对上行流量和下行流量进行累加操作得到总流量

7.bean对象要想实现传输,需要序列化

  • 编写bean
package bj.sh.gy.MapperReduer.WordCount3;import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;/*** @author LXY* @desc* @time 2022-10-20  17:55*/
public class FlowBean implements Writable  {private long upFlow;  //上行private long downFlow; //下行private long sumFlow;  //上下行public FlowBean() {}public long getUpFlow() {return upFlow;}public void setUpFlow(long upFlow) {this.upFlow = upFlow;}public long getDownFlow() {return downFlow;}public void setDownFlow(long downFlow) {this.downFlow = downFlow;}public long getSumFlow() {return sumFlow;}public void setSumFlow() {this.sumFlow=this.upFlow+this.downFlow;}//添加函数//将数据进行序列化@Overridepublic void write(DataOutput dataOutput) throws IOException {dataOutput.writeLong(upFlow);dataOutput.writeLong(downFlow);dataOutput.writeLong(sumFlow);}//反序列化@Overridepublic void readFields(DataInput dataInput) throws IOException {this. upFlow = dataInput.readLong();this.  downFlow = dataInput.readLong();this. sumFlow=dataInput.readLong();}@Overridepublic String toString() {return  upFlow +"\t" + downFlow +"\t" + sumFlow;}}
  • 编些mapper
package bj.sh.gy.MapperReduer.WordCount3;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*** @author LXY* @desc  继承org.apache.hadoop.mapreduce.Mapper;下的Mapper,填入泛型* KEYIN,:map阶段输入的key类型,是longwritable  。输入的是迁移量* VALUEIN,:mp阶段的输入的values类型,是Text类型* KEYOUT,map阶段输出的key类型是text类型* VALUEOUT:map阶段输出的类型,是intwritable类型** 导入的类型都是 org.apache.hadoop.io包下的数据* @time 2022-10-18  21:01*/
public class WordCountMapper extends Mapper<LongWritable,Text,Text,FlowBean>{FlowBean flowBean=new FlowBean();Text text=new Text();@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {String link = value.toString();  //获取String[] split = link.split("\t"); //分割flowBean.setUpFlow(Long.parseLong(split[split.length-4]));  //赋值flowBean.setDownFlow(Long.parseLong(split[split.length-3]));  //赋值flowBean.setSumFlow();   //相加text.set(split[1]);   //把手机号变为textcontext.write(text,flowBean);}
}
  • 编写Reduce
package bj.sh.gy.MapperReduer.WordCount3;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/*** @author LXY* @desc* Text ,  :接受的k,和mapper发送的保持一致* IntWritable,接受的v,和mapper发送的保持一致* Text, 数出的k* IntWritable  输入的v* @time 2022-10-18  21:01*/
public class WordCountReducer extends Reducer<Text,FlowBean,Text,FlowBean> {FlowBean flowBean=new FlowBean();@Overrideprotected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context) throws IOException, InterruptedException {long upFlow = 0;  //上行long downFlow=0; //下行long sumFlow=0;  //上下行for (FlowBean value : values) {   //拿到每个bean对象upFlow+=upFlow+value.getUpFlow();downFlow+=downFlow+value.getDownFlow();}flowBean.setUpFlow(upFlow);flowBean.setDownFlow(downFlow);flowBean.setSumFlow();context.write(key,flowBean);}
}
  • 编写Driver
package bj.sh.gy.MapperReduer.WordCount3;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/*** @author LXY* 21:02*/
public class WordCountDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {//1.获取jobConfiguration configuration=new Configuration();Job job=Job.getInstance(configuration);//2.设置jar包的路径job.setJarByClass(WordCountDriver.class);//3.关联map和reducejob.setMapperClass(WordCountMapper.class);job.setReducerClass(WordCountReducer.class);//4.设置map输入的kv类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);//5.设置reduce输出的kv类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);//6.设置输入路径和输出路径FileInputFormat.setInputPaths(job,new Path("E:\\input\\count2"));FileOutputFormat.setOutputPath(job,new Path("E:\\output2\\output2"));//  7提交jobboolean b = job.waitForCompletion(true);System.exit(b?0:1);}
}

3.9 MapperReduce框架原理


inputFromat默认是读取一行,有很多的实现类,可以实现不同的读取
Shuffle是最核心的内容,切片,分区等都是在这里实现
outputFormat是写入,可以写到mysql,es等等地方,

3.10 inputFromat数据输入

3.10.1 切片与MapTask并行度决定机制

  • Map阶段是考虑用什么方式来读取你的数据
  • 数据块:Block 是 HDFS 物理上把数据分成一块一块。数据块是 HDFS 存储数据单位。
    数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行
    存储。数据切片是 MapReduce 程序计算输入数据的单位,一个切片会对应启动一个 MapTask。


MapTask是不是越多越好呢?如果只有1kb的数据,开启3台服务器,是不是得不偿失
一个MapTask的初始化等等时间都超过了分析时间,这样是不合理的
将需要存储的数据,进行切片处理

  • 300m数据,按照100进行切,切3个,开3个虚拟机,每一个虚拟机一个MapTask,100来切的话,第一台虚拟机执行100,然后剩余的28要被第二台虚拟机拿去,跨服务器的交互(很慢),因为一个数据块就是128
  • 按照128来切,整好一个MapTask运行一个块(128),这样不会跨服务器交互,是最快的,也不会造成浪费,
  • 多个文件的话,不会将文件集群看成一个整体,而是将文件单独的看成一个整体

3.10.2 FileinputFromat 切片源码

  • 程序先找到数据存储的目录
  • 开始遍历处理目录下的每一个文件
  • 获取第一个文件xxx.txt
  • a) 获取文件大小
  • b)计算切片的大小
  • c)默认情况下切片大小=blocksize(块大小)
  • 提交切片规则到yaRn上,YaRn上的MrAPPMaster根据切片的大小,开启MapTask的个数
  • 集群模式默认一片是128,本地模式是32

3.10.3 FileinputFromat 切片机制

  • 简单的按照文件的长度进行切片,
  • 切片大小默认为Bllock的大小
  • 切片不考虑集群整体数据,而是根据某一个文件

3.10.4 TextinputFrom

  • 是FileInputFormat的实现类
  • 在运行MapReduce的时候,输入的文件格式包括,基于行的日志文件,二进制文件,数据库表等,那么根据不同的数据类型,MapReduce是如何读取这些数据的
  • FileinputFormat ,常见的接口实现类包括,TextinputFormat,KeyValueTextInputFomart ,NLineInputFormat,ComebineTextInputFormat,和自定义InputFormat等

-TextInputFormat是默认的FileInputFormat实现类,按行读取每条记录,键是存储在该整个文件中的起始字节偏移量,LongWritable类型,值是这行的内容,不包括任何行终止行,(换行符和回车符),Text类型

3.10.5 combineTextInputFormat

框架默认的切片机制是按照任务文件来划分切片,不管任务有多小都会是一个单独的切片,都会一个MapTask执行,效率低下

  • 1)应用场景:CombineTexInputFormat ,用于小文件过多的场景,将小文件从逻辑上划分为一个切片中,这样多个小文件就可以交给一个MapTask处理,
  • 2)虚拟存储切片的最大设置,根据实际情况定

3.10.6 CombineTextInputFormat 案例实操

将之前的woedcount复制

  • 修改路径
  • 结果,4个文件切了4片

  • 因为4个文件加起来不足4m,所以只有一个MapTask

3.11 MapReduce的工作流程(重要)


  • 1.有数据200m,在提交任务之前会进行一个切片,0-128,128-200两个
  • 2.提交任务,调用YaRn,将xml文件,jar包,切片数传递过去,
  • 3YaRn会根据传递过来的切片数调用MapperMaster去开启对应的MapTask
  • 4.在一个MapTask中去通过FilrInput(默认是TextFileInput)去调用readr方法去读取文件
  • 5.读取文件到mapper中在到Map方法中
  • 6.数据是写到了环形缓冲区中,缓冲区左边存的是索引,数据在哪个区,数据在哪个位置,右边存的是数据%
  • 7.环形缓冲区会默认写80%,到了80%会倒着写
  • 8.在环形缓冲区中,数据会进行分区,然后对数据进行一个快速排序
  • 9.数据溢出的时候,将数据写到磁盘中(提前在分区排好序了)
  • 10.所有卸载磁盘中的数据进行一个归并排序,进行分区
  • 11.数据合并,比如有数据a 1,a1,就会合并为a(1,1)
  • 12.数据合并完之后的分区MapperMaster会调用ReadueTask去对自己所需要的数据进行一个拉取
  • 13.拉取出来的数据进行合并,归并排序
  • 14.通过outputFormat对数据进行写出

3.12shuffe机制

Map方法之后,Reduce之前的过程是shuffe

  • 1.先将数据写到圆形缓冲区中,左边是索引等文件,右边是真实数据,等进度到了80%的时候的时候会对这80%的分区数据进行写入
  • 重复此步骤,写入完毕会有一次归并排序然后进行大合并,合并完成之后进行压缩,
  • Map的方法进行来了之后,Reduce会主动的拉取数据,将数据拉取到内存中,如果内存放不下了,会放在磁盘中,进行一个归并排序,按照相同的Key分组,进入到Reduce方法中

3.13 分区

要求将手机号按照要求输入到不同的文件中
135一个文件,136一个文件就用到了这个分区

  • 新建文件夹,将之前的wordcount文件复制


  • 实现,运用上面写的bean实例
package bj.sh.gy.MapperReduer.Partitoner2;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;/*** @author LXY* @desc* @time 2022-10-21  21:40*/
//继承Partitioner泛型参数,填写,map阶段传递的key和Value
public class PartitonerUtils extends Partitioner<Text,FlowBean> {int returl;  //定义返回值@Overridepublic int getPartition(Text text, FlowBean flowBean, int i) {String ipthon = text.toString();  //获取每个手机号String substring = ipthon.substring(0, 3);  //截取前三if ("136".equals(substring)){returl=0;}else if ("137".equals(substring)){returl=1;}else if ("138".equals(substring)){returl=2;}else if ("139".equals(substring)){returl=3;}else {returl =4;}return returl;}
}

  • 应用分区
    job.setPartitionerClass(PartitonerUtils.class);  //运用上刚才定义的分区job.setNumReduceTasks(5);  //设置5个分区


  • 分区总结:
  • 如果ReduceTask的数量>getPartition的结果数,则会产生几个空的输出文件
    • 如果1<ReduceTask的数量<getPartition的结果数,则会有数据无处安放,会报错
    • 如果1=ReduceTask,则会走默认的分区算法,最终会产生一个文件

3.14 排序

  • 排序是MapReduce框架最重要的操作之一

  • MapTask和ReduceTask均会对数据按照key进行排序,该操作属于Hadoop的默认行为,任何应用程序中的数据均会被排序,而不管逻辑上是否需要

  • 默认排序是按照字典进行排序,默认的排序方法快排
    MapTask阶段

  • 对于MapTask而言,他会将处理的结果暂时放到圆形缓冲区中,当唤醒缓冲区使用率达到一定的阈值之后,在对缓冲区内的数据进行一次快速排序,并将这些有序数据溢写到磁盘中,当数据处理完毕后,会对磁盘上的所以有数据进行归并排序,
    Reduce阶段
    对于ReduceTask阶段,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定的阈值,则会溢写到磁盘上,否则存储在内存中,如果磁盘上文件数目达到一定的阈值,则会进行一次归并排序生成一个更大的文件,如果内存中的文件大小数量超过一定的阈值后,则进行一次合并后将数据溢写到磁盘中,当所有数据拷贝完成后,ReduceTask统一对内存和磁盘上的数据进行一次归并排序

  • 1)部分排序

  • MapReduce根据输入记录的键对数据进行排序。保证输出每个文件内部有序

  • 2)全排序

  • 最终输出结果之后一个文件,切文件内部有序,实现方式只是设置一个ReduceTask。但该方法在处理大型文件的时候效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构

  • 3)辅助排序(GroupingComparator分组)

  • 在Reduce端对key进行分组,应用于:在接受的key为bean对象时,想让一个或者几个字段相同(全部字段比较不同)的key进入到同一个Reduce方法时,可以采用分组排序

  • 4)二次排序

  • 在自定义排序的过程中,如果CompareTo中的判断条件为两个既为二次排序

-自定义排序WritableComparable原理分析
bean对象作为key传输,需要实现WritableComparable接口重写comparable方法,就可以实现排序

3.15 全排序



知识点1:排序是对key值进行排序的
知识点2:如果key都相同就逐个输出

-实体类实现WritableComparable接口重写排序方法,对于流量综合进行排序

package bj.sh.gy.MapperReduer.WordCount5;import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;/*** @author LXY* @desc* @time 2022-10-20  17:55*/
public class FlowBean implements WritableComparable<FlowBean> {private long upFlow;  //上行private long downFlow; //下行private long sumFlow;  //上下行public FlowBean() {}public long getUpFlow() {return upFlow;}public void setUpFlow(long upFlow) {this.upFlow = upFlow;}public long getDownFlow() {return downFlow;}public void setDownFlow(long downFlow) {this.downFlow = downFlow;}public long getSumFlow() {return sumFlow;}public void setSumFlow(long sumFlow) {this.sumFlow = sumFlow;}//添加函数//将数据进行序列化@Overridepublic void write(DataOutput dataOutput) throws IOException {dataOutput.writeLong(upFlow);dataOutput.writeLong(downFlow);dataOutput.writeLong(sumFlow);}//反序列化@Overridepublic void readFields(DataInput dataInput) throws IOException {this. upFlow = dataInput.readLong();this.  downFlow = dataInput.readLong();this. sumFlow=dataInput.readLong();}@Overridepublic String toString() {return  upFlow +"\t" + downFlow +"\t" + sumFlow;}@Overridepublic int compareTo(FlowBean o) {if (this.sumFlow > o.sumFlow) {return -1;} else if (this.sumFlow < o.sumFlow) {return 1;} else {return 0;}}
}
  • map分割数据直接输出
package bj.sh.gy.MapperReduer.WordCount5;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*** @author LXY* @desc  继承org.apache.hadoop.mapreduce.Mapper;下的Mapper,填入泛型* KEYIN,:map阶段输入的key类型,是longwritable  。输入的是迁移量* VALUEIN,:mp阶段的输入的values类型,是Text类型* KEYOUT,map阶段输出的key类型是text类型* VALUEOUT:map阶段输出的类型,是intwritable类型** 导入的类型都是 org.apache.hadoop.io包下的数据* @time 2022-10-18  21:01*/
public class WordCountMapper extends Mapper<LongWritable,Text, FlowBean,Text>{FlowBean flowBean=new FlowBean();Text text=new Text();@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, FlowBean, Text>.Context context) throws IOException, InterruptedException {String link = value.toString();  //获取String[] split = link.split("\t"); //分割flowBean.setUpFlow(Long.parseLong(split[1]));  //赋值flowBean.setDownFlow(Long.parseLong(split[2]));  //赋值flowBean.setSumFlow(Long.parseLong(split[3]));  //赋值 //相加text.set(split[0]);   //把手机号变为textcontext.write(flowBean,text);}
}
  • reduce直接输出
package bj.sh.gy.MapperReduer.WordCount5;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/*** @author LXY* 21:02*/
public class WordCountDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {//1.获取jobConfiguration configuration=new Configuration();Job job=Job.getInstance(configuration);//2.设置jar包的路径job.setJarByClass(WordCountDriver.class);//3.关联map和reducejob.setMapperClass(WordCountMapper.class);job.setReducerClass(WordCountReducer.class);//4.设置map输入的kv类型job.setMapOutputKeyClass(FlowBean.class);job.setMapOutputValueClass(Text.class);//5.设置reduce输出的kv类型job.setOutputKeyClass(FlowBean.class);job.setOutputValueClass(Text.class);//6.设置输入路径和输出路径FileInputFormat.setInputPaths(job,new Path("E:\\output2\\output3"));FileOutputFormat.setOutputPath(job,new Path("E:\\output3\\output3"));//  7提交jobboolean b = job.waitForCompletion(true);System.exit(b?0:1);}
}
  • Driver类型
package bj.sh.gy.MapperReduer.WordCount5;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/*** @author LXY* 21:02*/
public class WordCountDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {//1.获取jobConfiguration configuration=new Configuration();Job job=Job.getInstance(configuration);//2.设置jar包的路径job.setJarByClass(WordCountDriver.class);//3.关联map和reducejob.setMapperClass(WordCountMapper.class);job.setReducerClass(WordCountReducer.class);//4.设置map输入的kv类型job.setMapOutputKeyClass(FlowBean.class);job.setMapOutputValueClass(Text.class);//5.设置reduce输出的kv类型job.setOutputKeyClass(FlowBean.class);job.setOutputValueClass(Text.class);//6.设置输入路径和输出路径FileInputFormat.setInputPaths(job,new Path("E:\\output2\\output3"));FileOutputFormat.setOutputPath(job,new Path("E:\\output3\\output3"));//  7提交jobboolean b = job.waitForCompletion(true);System.exit(b?0:1);}
}

3.15.1 二次排序

  • 刚才排序的效果不满意,如果结果相同我还要按照上行行排序,那我就这样操作

    在重写判断的方法中在进行二次判断

3.16 combiner

  • combiner是可选流程
  • 在圆形缓冲区溢写数据之前进行的一个一个操作,是可选的

    如果数据写入的时候是(a,1)(a,1),在combiner之后就会变成(a,2),因为MapTask是多数,比Reduce的压力小,(MapTask多)产生的溢写文件默认是一次归并排序是10个,如果是30个文件就要归并排序3次,那么可以再用一次Combiner
  • Combiner是MapReduc中Map和Reduce之之外的一个组件
  • Combiner的组件的父类是Reduce
  • Combiner和Reduce的区分就是执行的位置
  • combiner是每一个MaoTask所在的节点进行
  • Reduce是接收全局所有的MaoTask的输出结果
    -Combiner是多每一个MapTask的数据进行汇总,以减小少网络传输量
  • Combiner能够应用的前提是不能影响最终的业务逻辑,而且Combiner的输出key应该和MapReduc的输入类型要对应起来
  • 使用了cominber之后,Reduce阶段承受的数据,比之前要低(可以理解为负载均衡,以减小少网络传输量)

3.17 combiner代码实现

map

package bj.sh.gy.MapperReduer.Conbiner;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*** @author LXY* @desc  继承org.apache.hadoop.mapreduce.Mapper;下的Mapper,填入泛型* KEYIN,:map阶段输入的key类型,是longwritable  。输入的是迁移量* VALUEIN,:mp阶段的输入的values类型,是Text类型* KEYOUT,map阶段输出的key类型是text类型* VALUEOUT:map阶段输出的类型,是intwritable类型** 导入的类型都是 org.apache.hadoop.io包下的数据* @time 2022-10-18  21:01*/
public class WordCountMapper extends Mapper <LongWritable , Text,Text , IntWritable>{//private  Text text=new Text();private  IntWritable intWritable=new IntWritable(1);/**** @param key 是输入数据的key,* @param value 是输入数据的value* @param context  上下文对象* @throws IOException* @throws InterruptedException*/@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {//获取一行数据String Link = value.toString();//一行数据进行按照空格分割String[] words = Link.split(" ");//迭代for (String word : words) {text.set(word);  //将切割的每一个数据变为一个text对象context.write(text,intWritable);   //将数据和次数发送}}
}

reduce

package bj.sh.gy.MapperReduer.Conbiner;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/*** @author LXY* @desc* Text ,  :接受的k,和mapper发送的保持一致* IntWritable,接受的v,和mapper发送的保持一致* Text, 数出的k* IntWritable  输入的v* @time 2022-10-18  21:01*/
public class WordCountReducer extends Reducer<Text , IntWritable,Text,IntWritable> {private  IntWritable intWritable=new IntWritable();  //定义IntWritable类型@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {int sun=0;  //定义累计和变量for (IntWritable value : values) {sun+=value.get();   //累加数据}intWritable.set(sun);//把int变为intWritablecontext.write(key,intWritable);}
}

combiner

package bj.sh.gy.MapperReduer.Conbiner;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/*** @author LXY* @desc  接受的key是map阶段传递的key,values是ma阶段传递的values,怎么把数据拿过来的就什么格式传回去* @time 2022-10-26  22:37*/
public class Combiner extends Reducer<Text, IntWritable,Text , IntWritable> {private  IntWritable intWritable=new IntWritable();  //定义IntWritable类型@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {int sun=0;  //定义累计和变量for (IntWritable value : values) {sun+=value.get();   //累加数据}intWritable.set(sun);//把int变为intWritablecontext.write(key,intWritable);}}

driver

package bj.sh.gy.MapperReduer.Conbiner;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/*** @author LXY* 21:02*/
public class WordCountDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {//1.获取jobConfiguration configuration=new Configuration();Job job=Job.getInstance(configuration);//2.设置jar包的路径job.setJarByClass(WordCountDriver.class);//3.关联map和reducejob.setMapperClass(WordCountMapper.class);job.setReducerClass(WordCountReducer.class);//4.设置map输入的kv类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);//5.设置reduce输出的kv类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);job.setCombinerClass(Combiner.class);//6.设置输入路径和输出路径FileInputFormat.setInputPaths(job,new Path("E:\\input\\count"));FileOutputFormat.setOutputPath(job,new Path("E:\\output\\count6"));//  7提交jobboolean b = job.waitForCompletion(true);System.exit(b?0:1);}
}
  • Cominber的运用前提是能够应用的前提是不能影响最终的业务逻辑
  • 不使用自定义的Combiner也可以,在运用一次Cominber即可,效果是一样的

3.18 合并2张表的数据

  • 将订单表中的oder中的pid让其pd的数据进行替换(主外键关系),需要同时读两张表
  • 思路:
  • 将2张表的数据全部读取到实体类中,通过实体类的flag变量来区分哪张表
  • 对实体类赋值,将pid进行一个key,作为排序
  • 在reduce中,捕获key的迭代器,将订单放到list集合中,将水果名字放到实体类(一对多的关系)
  • 一个实体类,多个实体类(list<实体类>)
  • 实体类
package bj.sh.gy.MapperReduer.ReduceJoin;import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;/*** @author LXY* @desc* @time 2022-10-30  17:36*/
public class TableBean  implements Writable {private String sid; //订单的Idprivate String SPID;  //商品的idprivate int amount;   //商品数量private String name; //商品的名字private String flag; //标识位,指明是哪张表public TableBean() {}public String getSid() {return sid;}public void setSid(String sid) {this.sid = sid;}public String getSPID() {return SPID;}public void setSPID(String SPID) {this.SPID = SPID;}public int getAmount() {return amount;}public void setAmount(int amount) {this.amount = amount;}public String getName() {return name;}public void setName(String name) {this.name = name;}public String getFlag() {return flag;}public void setFlag(String flag) {this.flag = flag;}//序列化和反序列化@Overridepublic void write(DataOutput dataOutput) throws IOException {dataOutput.writeUTF(sid);dataOutput.writeUTF(SPID);dataOutput.writeInt(amount);dataOutput.writeUTF(name);dataOutput.writeUTF(flag);}@Overridepublic void readFields(DataInput dataInput) throws IOException {this.sid=dataInput.readUTF();this. SPID  =dataInput.readUTF();this. amount =dataInput.readInt();this. name=dataInput.readUTF();this. flag=dataInput.readUTF();}//输出订单id+m商品名+商品的数量@Overridepublic String toString() {return sid+"\t"+name+"\t"+amount;}
}
  • map
package bj.sh.gy.MapperReduer.ReduceJoin;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.IOException;/*** @author LXY* @desc* @time 2022-10-30  17:36*/
public class WordMap extends Mapper<LongWritable, Text,Text,TableBean> {String pathname; //文件名TableBean tableBean=new TableBean();  //实例化名字Text text=new Text();  //key值//初始化方法,每个文件进来调用一次@Overrideprotected void setup(Mapper<LongWritable, Text, Text, TableBean>.Context context) throws IOException, InterruptedException {//获取对应文件名称FileSplit inputSplit = (FileSplit) context.getInputSplit();pathname = inputSplit.getPath().getName();  //获取文件名}@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, TableBean>.Context context) throws IOException, InterruptedException {String link = value.toString();String[] split = link.split("\t");if (pathname.contains("oder")){text.set(split[1]);tableBean.setSid(split[0]);tableBean.setName("");tableBean.setSPID(split[1]);tableBean.setAmount(Integer.parseInt(split[2]));tableBean.setFlag("oder");}else {text.set(split[0]);tableBean.setSid("");tableBean.setName(split[1]);tableBean.setSPID(split[0]);tableBean.setAmount(0);tableBean.setFlag("pd");}context.write(text,tableBean);}}
  • reduce
package bj.sh.gy.MapperReduer.ReduceJoin;import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;/*** @author LXY* @desc* @time 2022-10-30  17:55*/
public class WodeReduce extends Reducer<Text,TableBean,TableBean, NullWritable> {@Overrideprotected void reduce(Text key, Iterable<TableBean> values, Reducer<Text, TableBean, TableBean, NullWritable>.Context context) throws IOException, InterruptedException {List<TableBean>list=new ArrayList<>();TableBean tableBean=new TableBean();for (TableBean value : values) {if (value.getFlag().equals("oder")){//创建一个临时 TableBean 对象接收 valueTableBean tmpOrderBean = new TableBean();try {BeanUtils.copyProperties(tmpOrderBean,value);} catch (IllegalAccessException e) {e.printStackTrace();} catch (InvocationTargetException e) {e.printStackTrace();}list.add(tmpOrderBean);}else {try {BeanUtils.copyProperties(tableBean,value);} catch (IllegalAccessException e) {e.printStackTrace();} catch (InvocationTargetException e) {e.printStackTrace();}}}System.out.println(list);for (TableBean bean : list) {bean.setName(tableBean.getName());context.write(bean,NullWritable.get());}}}
  • driver
package bj.sh.gy.MapperReduer.ReduceJoin;import bj.sh.gy.MapperReduer.WordCount.WordCountMapper;
import bj.sh.gy.MapperReduer.WordCount.WordCountReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/*** @author LXY* 21:02*/
public class WordCountDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {//1.获取jobConfiguration configuration=new Configuration();Job job=Job.getInstance(configuration);//2.设置jar包的路径job.setJarByClass(WordCountDriver.class);//3.关联map和reducejob.setMapperClass(WordMap.class);job.setReducerClass(WodeReduce.class);//4.设置map输入的kv类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(TableBean.class);//5.设置reduce输出的kv类型job.setOutputKeyClass(TableBean.class);job.setOutputValueClass(NullWritable.class);//6.设置输入路径和输出路径FileInputFormat.setInputPaths(job,new Path("E:\\input\\cout4"));FileOutputFormat.setOutputPath(job,new Path("E:\\output\\cout7"));//  7提交jobboolean b = job.waitForCompletion(true);System.exit(b?0:1);}
}

3.19 MapJoin与数据倾斜

-数据倾斜就是,多个mapreduce的多个分区,其中一个分区1条数据,另外一条数据100w数据,1条的执行玩了,100w的执行还要很久,这就是数据倾斜

  • mapjoin就是将数据在map阶段执行,就不需要reduce阶段了,因为多个maptask对上一个reduce执行效率太慢,加入一个maptask执行128的数据,就让这一个maptask去获取数据和对数据进行操作
  • 实现步骤,读取本地的缓存文件,进行匹配
  • MapJoin 适用于关联表中有小表的情形。
  • 2)优点
    思考:在 Reduce 端处理过多的表,非常容易产生数据倾斜。怎么办?
    在 Map 端缓存多张表,提前处理业务逻辑,这样增加 Map 端业务,减少 Reduce 端数
    据的压力,尽可能的减少数据倾斜。
    3)具体办法:采用 DistributedCache
    (1)在 Mapper 的 setup 阶段,将文件读取到缓存集合中。
    (2)在 Driver 驱动类中加载缓存。
//缓存普通文件到 Task 运行节点。
job.addCacheFile(new URI("file:///e:/cache/pd.txt"));
//如果是集群运行,需要设置 HDFS 路径
job.addCacheFile(new URI("hdfs://hadoop102:8020/cache/pd.txt"));

具体实现:

  • deiver
package bj.sh.gy.MapperReduer.each;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;/*** @author LXY* @desc* @time 2022-10-30  19:09*/
public class Driver {public static void main(String[] args) throws IOException, URISyntaxException, InterruptedException, ClassNotFoundException {// 1 获取 job 信息Configuration conf = new Configuration();Job job = Job.getInstance(conf);// 2 设置加载 jar 包路径job.setJarByClass(Driver.class);// 3 关联 mapperjob.setMapperClass(Mapper.class);// 4 设置 Map 输出 KV 类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);// 5 设置最终输出 KV 类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);// 加载缓存数据job.addCacheFile(new URI("file:///E:/input/tablecache/pd.txt"));// Map 端 Join 的逻辑不需要 Reduce 阶段,设置 reduceTask 数量为 0job.setNumReduceTasks(0);// 6 设置输入输出路径FileInputFormat.setInputPaths(job, new Path("E:\\input\\cout4"));FileOutputFormat.setOutputPath(job, new Path("E:\\output\\cout8"));// 7 提交boolean b = job.waitForCompletion(true);System.exit(b ? 0 : 1);}
}
  • map
package bj.sh.gy.MapperReduer.each;import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.commons.lang.StringUtils;import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;/*** @author LXY* @desc* @time 2022-10-30  19:08*/
public class Mapper extends org.apache.hadoop.mapreduce.Mapper<LongWritable, Text,Text, NullWritable> {Map<String,String>map=new HashMap<>();Text text=new Text();@Overrideprotected void setup(org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {//通过缓存文件得到小表数据 pd.txtURI[] cacheFiles = context.getCacheFiles();Path path = new Path(cacheFiles[0]);//获取文件系统对象,并开流FileSystem fs = FileSystem.get(context.getConfiguration());FSDataInputStream fis = fs.open(path);//通过包装流转换为 reader,方便按行读取BufferedReader reader = new BufferedReader(newInputStreamReader(fis, "UTF-8"));//逐行读取,按行处理String line;while (StringUtils.isNotEmpty(line = reader.readLine())) {//切割一行
//01 小米String[] split = line.split("\t");map.put(split[0], split[1]);}//关流IOUtils.closeStream(reader);}@Overrideprotected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {System.out.println(map.toString());String link = value.toString();String[] split = link.split("\t");text.set(split[0]+"\t"+map.get(split[1])+"\t"+split[2]);context.write(text,NullWritable.get());}
}

3.20数据的压缩

  • 压缩的好处
  • 减少磁盘IO,减少磁盘的存储空间
  • 缺点:
  • 增加cpu运作开销
  • 压缩原则:
    • 运算密集型的job,少用压缩
    • io密集的job,多用压缩
  • MR支持的压缩编码

  • 压缩方式选择
  • 压缩方式选择时重点考虑:压缩/解压缩速度、压缩率(压缩后存储大小)、压缩后是否
    可以支持切片。
  • Gzip 压缩
    • 优点:压缩率比较高;
      缺点:不支持 Split;压缩/解压速度一般;
  • Bzip2 压缩
    • 优点:压缩率高;支持 Split;
      缺点:压缩/解压速度慢。
  • Lzo 压缩
    • 优点:压缩/解压速度比较快;支持 Split;
      缺点:压缩率一般;想支持切片需要额外创建索引。
  • Snappy 压缩
    • 优点:压缩和解压缩速度快;
      缺点:不支持 Split;压缩率一般;
  • 压缩选择的位置


  • 实操
  • 压缩map-reduce阶段的压缩
  • 在driver中设置,数据发送reduce的时候进行数据的压缩,到了reduce的时候进行解压
    // 开启 map 端输出压缩conf.setBoolean("mapreduce.map.output.compress", true);
// 设置 map 端输出压缩方式conf.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);

  • 设置输出压缩
  // 设置 reduce 端输出压缩开启FileOutputFormat.setCompressOutput(job, true);
// 设置压缩的方式FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
// FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
// FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);


四:Yarn

  • Yarn是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式的操作平台,而MapReduce等运算程序则相当于运算之操作系统之上的程序,

4.1 Yarn的基础架构

YARN是一个由ResourceManager,NodeManager,ApplicationMaster和Container等组件构成

4.2 YARN的工作机制(执行流程)

  • 0:程序提交任务到节点上
  • 1.向ResourceManager申请一个application
  • 2.将任务所需要的资源提交到指定的路径
  • 3.提交job所需要的资源
  • 4.提交完毕之后申请运行MrAPPMaster
  • 5.将任务的请求初始化为一个Task,放入到一个调度队列中
  • 6.一个NodeManager领取到任务
  • 7之后开启一个容器
  • 8将资源从HDFS上拷贝资源到本地
  • 9通过切片个数入去申请运行对应的MapTask
  • 10.MrAppMast对领取到的NodeManager去发送任务运行脚本并且创建容器
  • 11.两个NodeManager同时领取到任务,去运行对应的MapTask,执行任务,进行分区,排序
  • 12.Mr等所有的MapTask运行完毕之后,向ResourceManager申请容器运行ReduceTask
  • 13:ReduceTask向执行完毕的MapTask领取对应的分区数据
  • 14.程序执行完毕之后MrAppMaster向ResourceManager申请注销自己

4.3 Yarn的任务调度器

  • Yarn的任务调度器有三种FIFO,容量(Capacity Scheduler)和公平(Fair Scheduler),Hadoop3.x的默认的任务调度器是容量(Capacity Scheduler)

4.3.1 先进先出调度器(FIFO)

FIFO调度器(First In Out)单队列,根据提交作业的先后顺序,先来先服务

4.3.2 容量调度器

  • 容量调度器是YaHoo开发的多任务调度器
  • 1.多队列:每个队列有一定的资源,队列内部采用FIFO的先进先出的策略
  • 2.容量保障:管理员可以为每个队列设置最低资源保障和资源使用上线
  • 3.灵活性:如果一个度列中资源有剩余,可以将资源共享给那些需要资源的队列,而一旦队列有了新的程序提交,则其他借调的资源则会归还给该队列
  • 4.多租户
  • 支持多影虎共享集群的方式同时运行
  • 为了防止同一个用户的作用独占资源中的队列,该调度器会对统一用户提交的资源进行一个资源限定
    ps:有3个队列队列A独占资源的20%,队列B独占资源的50%,队列C独占资源的30%
  • 多用户和多队列指的是:用户1提交给队列1的任务,给定其分配内存,用户2提交任务不会等待队列1执行完毕,因为用户1已经分配了内存了,用剩下的内存给队列2进行内存的分配
  • 最低资源保障:假如我只能提供60%的容量,那么这内存分配的原则是最少不能超过队列的%几进行分配
    -灵活性:某一队列资源富裕很多,就会借给别的队列,等自己有任务的时候就去将资源收回来

  • 资源的分配:
    • 一个用户提交的代码会使用深度算法,优先选择资源占比最低的队列去进行分配资源
  • 作业的执行流程:
    • 按照资源的优先级,如果没有按照优先级按照提交的时间去分配资源
  • 容器资源的分配
    • 按照容器的优先级进行分配资源
    • 如果优先级相同则先考虑任务和数据是否在同一个节点上
    • 考虑任务和数据是否在同一个机架上

4.3.3 公平资源调度器

  • 字面意思,为了公平
  • 何为公平:占用20%的资源,有4个任务,那么每个资源的分配是5%…
  • 1.与容量调度器的相同点:
  • (1)多队列:支持多队列操作
  • (2)容量保障:管理员可以为每个队列设置资源针对地保障和资源使用上限
  • (2)对租户:支持多用户共享集群和多应用的同时运行,为了防止用一个用户独占资源,调度器会对同一个用户提交的作业进行一个资源的限定
  • 2.与容量调度器不同的是:
  • (1)核心的调度策略不同
  • 容量调度器是按照资源使用率最低的队列进行
  • 公平调度器是选择对资源缺额比例最大的
  • (2)每个队列可以设置资源分配的方式
  • 容量调度器是可以配置的资源分配方式是(FIFO,DRF)
  • 公平调度器是可以设置资源分配的方式是(FIFO,FAIR,DRF)




4.4 Yarn的常用命令

4.4.1 查看任务

  • 之前是在命令行查看yarn的任务运行情况
#如果任务运行的时间比较长,可以通过如下命令查看
yarn application -list
  • (2)根据 Application 状态过滤:yarn application -list -appStates (所有状态:ALL、NEW、
    NEW_SAVING、SUBMITTED、ACCEPTED、RUNNING、FINISHED、FAILED、KILLED)
#查看运行结束的任务
yarn application -list -appStates FINISHED


  • 查看尝试提交的application的信息
#任务的id(最后)
yarn applicationattempt -list  application_1667396169845_0001
#可通过获得到的容器id和任务id查看容器的日志


4.4.2 查看容器

  • 查看容器的运行情况(任务未结束的时候才可以查看,结束了容器就释放了)
yarn container -list appattempt_1667396169845_0001_000001
  • 容器的状态
#后面跟的是容器的id(任务未结束的时候才可以查看,结束了容器就释放了)yarn container -status container_1667396169845_0001_01_00000

4.4.3 查看YarnNode的状态

#列出所有的节点yarn node -list -all

  • 刷新队列的配置文件(可以做到不停机刷新)
 yarn rmadmin -refreshQueues
  • 查看队列的信息
- yarn queue -status default

  • 从这也可以看到

4.4.4Yarn参数核心配置

-搭建完成之后需要配置和yarn相关的参数

4.5 容量调度器提交案例

  • 在生产环境如何创建队列
  • (1)调度器默认是一个defute队列,不能满足生产需求
  • (2)按照框架:hive,spark,fink每个框架的任务放入到指定的队列(企业用的不是特别多)
  • (3)按照业务模块,登录注册,购物车,下单,业务部门一,业务部门2,
  • 创建多队列的好糊
  • (1)因为员工不小心,写了一个递归死循环的代码,把所有的资源耗尽
  • (2)实现任务的降级使用,特殊时期保证重要的任务队列资源充足,11.11 6.18
  • 业务部门1(重要)》 业务部门2(比较重要)》下单==》(一般 )》购物车(一般)》登录注册(次要)

-需求:defaule队列占总容量的40%,最大资源容量占总容量的60%,hive队列占总内存的60%,最大资源内存占总容量的80%

  • 一个任务 最多占用总资源的%多少 1就是百分之百
  • 最大资源容量占总资源的%多少
  • 两个队列的运行状态
  • 设置什么人可以提交任务,默认是所有人都可以提交任务,可以配置root用户提交任务,或者liuxingyu用户才可以提交任务
  • 对队列的资源进行操作,默认是所有用户
  • 设置队列的优先级

    -任务最大生命周期

  • 在hadoop中的etc/hapool里面sz capacity-scheduler.xml
  • 将文件下载到桌面进行修改
<!--Licensed under the Apache License, Version 2.0 (the "License");you may not use this file except in compliance with the License.You may obtain a copy of the License athttp://www.apache.org/licenses/LICENSE-2.0Unless required by applicable law or agreed to in writing, softwaredistributed under the License is distributed on an "AS IS" BASIS,WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.See the License for the specific language governing permissions andlimitations under the License. See accompanying LICENSE file.
-->
<configuration><property><name>yarn.scheduler.capacity.maximum-applications</name><value>10000</value><description>Maximum number of applications that can be pending and running.</description></property><property><name>yarn.scheduler.capacity.maximum-am-resource-percent</name><value>0.1</value><description>Maximum percent of resources in the cluster which can be used to run application masters i.e. controls number of concurrent runningapplications.</description></property><property><name>yarn.scheduler.capacity.resource-calculator</name><value>org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator</value><description>The ResourceCalculator implementation to be used to compare Resources in the scheduler.The default i.e. DefaultResourceCalculator only uses Memory whileDominantResourceCalculator uses dominant-resource to compare multi-dimensional resources such as Memory, CPU etc.</description></property><!-- 指定多队列,增加 hive 队列 --><property><name>yarn.scheduler.capacity.root.queues</name><value>default,hive</value><description>The queues at the this level (root is the root queue).</description></property><!-- 降低 default 队列资源额定容量为 40%,默认 100% -->
<property><name>yarn.scheduler.capacity.root.default.capacity</name><value>40</value>
</property><property><name>yarn.scheduler.capacity.root.hive.capacity</name><value>60</value>
</property>
<!-- 用户提交占用多少 --><property><name>yarn.scheduler.capacity.root.default.user-limit-factor</name><value>1</value><description>Default queue user limit a percentage from 0.0 to 1.0.</description></property><property><name>yarn.scheduler.capacity.root.hive.user-limit-factor</name><value>1</value><description>Default queue user limit a percentage from 0.0 to 1.0.</description></property><!-- 降低 default 队列资源最大容量为 60%,默认 100% -->
<property><name>yarn.scheduler.capacity.root.default.maximum-capacity</name><value>60</value>
</property><!-- 降低 hive 队列资源最大容量为 60%,默认 100% -->
<property><name>yarn.scheduler.capacity.root.hive.maximum-capacity</name><value>80</value>
</property><!-- 设置为运行状态 --><property><name>yarn.scheduler.capacity.root.default.state</name><value>RUNNING</value><description>The state of the default queue. State can be one of RUNNING or STOPPED.</description></property><property><name>yarn.scheduler.capacity.root.hive.state</name><value>RUNNING</value><description>The state of the default queue. State can be one of RUNNING or STOPPED.</description></property><!-- 所有用户都可以提交 --><property><name>yarn.scheduler.capacity.root.default.acl_submit_applications</name><value>*</value><description>The ACL of who can submit jobs to the default queue.</description></property><property><name>yarn.scheduler.capacity.root.hive.acl_submit_applications</name><value>*</value><description>The ACL of who can submit jobs to the default queue.</description></property><!-- 对队列进行操作权限 --><property><name>yarn.scheduler.capacity.root.default.acl_administer_queue</name><value>*</value><description>The ACL of who can administer jobs on the default queue.</description></property><property><name>yarn.scheduler.capacity.root.hive.acl_administer_queue</name><value>*</value><description>The ACL of who can administer jobs on the default queue.</description></property><!-- 提交任务的优先级 --><property><name>yarn.scheduler.capacity.root.default.acl_application_max_priority</name><value>*</value><description>The ACL of who can submit applications with configured priority.For e.g, [user={name} group={name} max_priority={priority} default_priority={priority}]</description></property><property><name>yarn.scheduler.capacity.root.hive.acl_application_max_priority</name><value>*</value><description>The ACL of who can submit applications with configured priority.For e.g, [user={name} group={name} max_priority={priority} default_priority={priority}]</description></property><!-- 任务最大 的生命周期 --><property><name>yarn.scheduler.capacity.root.default.maximum-application-lifetime</name><value>-1</value><description>Maximum lifetime of an application which is submitted to a queuein seconds. Any value less than or equal to zero will be considered asdisabled.This will be a hard time limit for all applications in thisqueue. If positive value is configured then any application submittedto this queue will be killed after exceeds the configured lifetime.User can also specify lifetime per application basis inapplication submission context. But user lifetime will beoverridden if it exceeds queue maximum lifetime. It is point-in-timeconfiguration.Note : Configuring too low value will result in killing applicationsooner. This feature is applicable only for leaf queue.</description></property><property><name>yarn.scheduler.capacity.root.hive.maximum-application-lifetime</name><value>-1</value><description>Maximum lifetime of an application which is submitted to a queuein seconds. Any value less than or equal to zero will be considered asdisabled.This will be a hard time limit for all applications in thisqueue. If positive value is configured then any application submittedto this queue will be killed after exceeds the configured lifetime.User can also specify lifetime per application basis inapplication submission context. But user lifetime will beoverridden if it exceeds queue maximum lifetime. It is point-in-timeconfiguration.Note : Configuring too low value will result in killing applicationsooner. This feature is applicable only for leaf queue.</description></property><property><name>yarn.scheduler.capacity.root.default.default-application-lifetime</name><value>-1</value><description>Default lifetime of an application which is submitted to a queuein seconds. Any value less than or equal to zero will be considered asdisabled.If the user has not submitted application with lifetime value then thisvalue will be taken. It is point-in-time configuration.Note : Default lifetime can't exceed maximum lifetime. This feature isapplicable only for leaf queue.</description></property><property><name>yarn.scheduler.capacity.root.hive.default-application-lifetime</name><value>-1</value><description>Default lifetime of an application which is submitted to a queuein seconds. Any value less than or equal to zero will be considered asdisabled.If the user has not submitted application with lifetime value then thisvalue will be taken. It is point-in-time configuration.Note : Default lifetime can't exceed maximum lifetime. This feature isapplicable only for leaf queue.</description></property><!-- 下面是yarn的配置不需要动--><property><name>yarn.scheduler.capacity.node-locality-delay</name><value>40</value><description>Number of missed scheduling opportunities after which the CapacityScheduler attempts to schedule rack-local containers.When setting this parameter, the size of the cluster should be taken into account.We use 40 as the default value, which is approximately the number of nodes in one rack.Note, if this value is -1, the locality constraint in the container requestwill be ignored, which disables the delay scheduling.</description></property><property><name>yarn.scheduler.capacity.rack-locality-additional-delay</name><value>-1</value><description>Number of additional missed scheduling opportunities over the node-locality-delayones, after which the CapacityScheduler attempts to schedule off-switch containers,instead of rack-local ones.Example: with node-locality-delay=40 and rack-locality-delay=20, the scheduler willattempt rack-local assignments after 40 missed opportunities, and off-switch assignmentsafter 40+20=60 missed opportunities.When setting this parameter, the size of the cluster should be taken into account.We use -1 as the default value, which disables this feature. In this case, the numberof missed opportunities for assigning off-switch containers is calculated based onthe number of containers and unique locations specified in the resource request,as well as the size of the cluster.</description></property><property><name>yarn.scheduler.capacity.queue-mappings</name><value></value><description>A list of mappings that will be used to assign jobs to queuesThe syntax for this list is [u|g]:[name]:[queue_name][,next mapping]*Typically this list will be used to map users to queues,for example, u:%user:%user maps all users to queues with the same nameas the user.</description></property><property><name>yarn.scheduler.capacity.queue-mappings-override.enable</name><value>false</value><description>If a queue mapping is present, will it override the value specifiedby the user? This can be used by administrators to place jobs in queuesthat are different than the one specified by the user.The default is false.</description></property><property><name>yarn.scheduler.capacity.per-node-heartbeat.maximum-offswitch-assignments</name><value>1</value><description>Controls the number of OFF_SWITCH assignments allowedduring a node's heartbeat. Increasing this value can improvescheduling rate for OFF_SWITCH containers. Lower values reduce"clumping" of applications on particular nodes. The default is 1.Legal values are 1-MAX_INT. This config is refreshable.</description></property><property><name>yarn.scheduler.capacity.application.fail-fast</name><value>false</value><description>Whether RM should fail during recovery if previous applications'queue is no longer valid.</description></property></configuration>
  • 修改好了之后执行rm -rf capacity-scheduler.xml删除1002上的文件,然后将修改好的文件放进来,将文件分发到另外两台服务器上,然后启动hadoop
  • 测试
  • 执行wordcount 通过-D指定运行在那个队列
  • hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar wordcount -D mapreduce.job.queuename=hive /input /output3

  • 在代码中也可以配置

4.6 配置任务的优先级

4.7 配置多队列的公平调度器

  • 在yarn-size中配置
<property><name>yarn.resourcemanager.scheduler.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairS
cheduler</value><description>配置使用公平调度器</description>
</property>
<property><name>yarn.scheduler.fair.allocation.file</name><value>/opt/module/hadoop-3.1.3/etc/hadoop/fair-scheduler.xml</value><description>指明公平调度器队列分配配置文件</description>
</property>
<property><name>yarn.scheduler.fair.preemption</name><value>false</value><description>禁止队列间资源抢占</description>
</property>
  • 在指定的目录创建fair-scheduler.xml文件
<?xml version="1.0"?>
<allocations><!-- 单个队列中 Application Master 占用资源的最大比例,取值 0-1 ,企业一般配置 0.1
--><queueMaxAMShareDefault>0.5</queueMaxAMShareDefault><!-- 单个队列最大资源的默认值 test atguigu default --><queueMaxResourcesDefault>4096mb,4vcores</queueMaxResourcesDefault><!-- 增加一个队列 test --><queue name="test"><!-- 队列最小资源 --><minResources>2048mb,2vcores</minResources><!-- 队列最大资源 --><maxResources>4096mb,4vcores</maxResources><!-- 队列中最多同时运行的应用数,默认 50,根据线程数配置 --><maxRunningApps>4</maxRunningApps><!-- 队列中 Application Master 占用资源的最大比例 --><maxAMShare>0.5</maxAMShare><!-- 该队列资源权重,默认值为 1.0 --><weight>1.0</weight><!-- 队列内部的资源分配策略 --><schedulingPolicy>fair</schedulingPolicy></queue><!-- 增加一个队列 liuxingyu --><queue name="liuxingyu" type="parent"><!-- 队列最小资源 --><minResources>2048mb,2vcores</minResources><!-- 队列最大资源 --><maxResources>4096mb,4vcores</maxResources><!-- 队列中最多同时运行的应用数,默认 50,根据线程数配置 --><maxRunningApps>4</maxRunningApps><!-- 队列中 Application Master 占用资源的最大比例 --><maxAMShare>0.5</maxAMShare><!-- 该队列资源权重,默认值为 1.0 --><weight>1.0</weight><!-- 队列内部的资源分配策略 --><schedulingPolicy>fair</schedulingPolicy></queue><!-- 任务队列分配策略,可配置多层规则,从第一个规则开始匹配,直到匹配成功 --><queuePlacementPolicy><!-- 提交任务时指定队列,如未指定提交队列,则继续匹配下一个规则; false 表示:如果指
定队列不存在,不允许自动创建--><rule name="specified" create="false"/><!-- 提交到 root.group.username 队列,若 root.group 不存在,不允许自动创建;若
root.group.user 不存在,允许自动创建 --><rule name="nestedUserQueue" create="true"><rule name="primaryGroup" create="false"/></rule><!-- 最后一个规则必须为 reject 或者 default。Reject 表示拒绝创建提交失败,
default 表示把任务提交到 default 队列 --><rule name="reject" /></queuePlacementPolicy>
</allocations>
  • 重启,看效果

    -执行
    hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar pi -Dmapreduce.job.queuename=root.test 1 1

-走的是test队列

hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar pi 1 1

  • 如果不指定队列,默认就是走的liuxingyu队列

4.8 Yarn的tool接口

  • 之前自定义的mapreduce的har包,运行自定义的jar包,可以指定控制台输入路径和输出路径,执行发现没问题

    -接下来让jar包运行在指定的队列直接抛异常

  • 采用tool接口的方式来解决问题

4.8.1 代码实现

package com.rj.bd.Tool;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.io.IOException;
import java.util.Arrays;/*** @author LXY* 21:02*/public class WordCountDriver {private static Tool tool;public static void main(String[] args) throws Exception {// 1. 创建配置文件Configuration conf = new Configuration();// 2. 判断是否有 tool 接口switch (args[0]){case "wordcount":tool = new WordCoutTool();break;default:throw new RuntimeException(" No such tool: "+args[0] );}// 3. 用 Tool 执行程序// Arrays.copyOfRange 将老数组的元素放到新数组里面int run = ToolRunner.run(conf, tool, Arrays.copyOfRange(args, 1, args.length));System.exit(run);}
}
package com.rj.bd.Tool;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;import java.io.IOException;/*** @author lxy* @desc* @time 2022--11--06--17:35*/
public class WordCoutTool implements Tool {private Configuration conf;public int run(String[] args) throws Exception {Job job = Job.getInstance(conf);job.setJarByClass(WordCountDriver.class);job.setMapperClass(WordCountMapper.class);job.setReducerClass(WordCountReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));return job.waitForCompletion(true) ? 0 : 1;}public void setConf(Configuration configuration) {this.conf = conf;}public Configuration getConf() {return conf;}public static class WordCountMapper extends Mapper<LongWritable, Text,Text , IntWritable> {//private  Text text=new Text();private  IntWritable intWritable=new IntWritable(1);/**** @param key 是输入数据的key,* @param value 是输入数据的value* @param context  上下文对象* @throws IOException* @throws InterruptedException*/@Overrideprotected  void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {//获取一行数据String Link = value.toString();//一行数据进行按照空格分割String[] words = Link.split(" ");//迭代for (String word : words) {text.set(word);  //将切割的每一个数据变为一个text对象context.write(text,intWritable);   //将数据和次数发送}}}public static class WordCountReducer extends Reducer<Text , IntWritable,Text,IntWritable> {private  IntWritable intWritable=new IntWritable();  //定义IntWritable类型@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {int sun=0;  //定义累计和变量for (IntWritable value : values) {sun+=value.get();   //累加数据}intWritable.set(sun);//把int变为intWritablecontext.write(key,intWritable);}}}
  • 将jar包运行到yarn集群中
    hadoop-3.1.3]$ yarn jar YarnDemo.jar
    com.atguigu.yarn.WordCountDriver wordcount -
    Dmapreduce.job.queuename=root.test /input /output1

Hadoop 入门笔记相关推荐

  1. Hadoop学习笔记(1) ——菜鸟入门

     Hadoop学习笔记(1) --菜鸟入门 Hadoop是什么?先问一下百度吧: [百度百科]一个分布式系统基础架构,由Apache基金会所开发.用户能够在不了解分布式底层细节的情况下.开发分布式 ...

  2. [学习笔记]黑马程序员-Hadoop入门视频教程

    文章目录 参考资料 第一章:大数据导论与Linux基础(p1-p17) 1.1 大数据导论 1.1.1 企业数据分析方向 1.1.2 数据分析基本流程步骤 明确分析的目的和思路 数据收集 数据处理 数 ...

  3. Hadoop学习笔记(1)

    原文:http://www.cnblogs.com/zjfstudio/p/3859704.html Hadoop学习笔记(1) --菜鸟入门 Hadoop是什么?先问一下百度吧: [百度百科]一个分 ...

  4. Hadoop学习笔记—4.初识MapReduce

    一.神马是高大上的MapReduce MapReduce是Google的一项重要技术,它首先是一个 编程模型 ,用以进行大数据量的计算.对于大 数据量的计算,通常采用的处理手法就是并行计算.但对许多开 ...

  5. Hadoop学习笔记—15.HBase框架学习(基础知识篇)

    Hadoop学习笔记-15.HBase框架学习(基础知识篇) HBase是Apache Hadoop的数据库,能够对大型数据提供随机.实时的读写访问.HBase的目标是存储并处理大型的数据.HBase ...

  6. 大数据框架Hadoop篇之Hadoop入门

    1. 写在前面 今天开始,想开启大数据框架学习的一个新系列,之前在学校的时候就会大数据相关技术很是好奇,但苦于没有实践场景,对这些东西并没有什么体会,到公司之后,我越发觉得大数据的相关知识很重要,不管 ...

  7. 【大数据入门笔记系列】第六节 分布式计算框架MapReduce的工作流程

    [大数据入门笔记系列]第六节 分布式计算框架MapReduce的工作流程 前言 MapReduce分布式运算 MapReduceApplication MapTask ReduceTask split ...

  8. 【大数据入门笔记系列】第一节 大数据常用组件

    [大数据入门笔记系列]第一节 大数据常用组件 大数据释义 大数据组件 跳转 大数据释义 近些年来,坊间一直流传着这样的言论:"大数据时代,人人都在裸奔".对于外行人来说,对于&qu ...

  9. 【大数据入门笔记系列】第三节 Hdfs读、写数据处理流程

    [大数据入门笔记系列]第三节 Hdfs读.写数据处理流程 Hdfs简介 写数据处理流程 读数据处理流程 后记 跳转 Hdfs简介 一般而言,Hdfs是由一个NameNode节点和若干个DataNode ...

  10. 尚硅谷大数据技术Hadoop教程-笔记02【Hadoop-入门】

    视频地址:尚硅谷大数据Hadoop教程(Hadoop 3.x安装搭建到集群调优) 尚硅谷大数据技术Hadoop教程-笔记01[大数据概论] 尚硅谷大数据技术Hadoop教程-笔记02[Hadoop-入 ...

最新文章

  1. STM32单片机硬件I2C驱动程序(查询方式)
  2. 对于redis底层框架的理解(一)
  3. DotNet中配置文件的使用(一)
  4. 今日头条Web HTTP请求的白名单
  5. 更改应用程序图标_苹果更新 TestFlight 应用图标,增加更多拟真细节
  6. Java基础学习总结(97)——合格的Java的架构师需要具备的技术知识
  7. 使用Python搭建人脸识别考勤系统
  8. (完全解决)为什么二阶行列式的绝对值为面积
  9. TMS320F280049C 学习笔记4 Led_ex1_blinky 初始化函数及系统时钟
  10. Neverland Test 2.0
  11. key too large to index, failing 3346解决
  12. win10添加打印机失败,无法正常使用打印机的解决办法
  13. 介绍一个浏览器被劫持后不能跳转到主页的处理方法
  14. 【测试开发】Python—logging日志封装
  15. Springboot集成datax方案小记
  16. STM32与OLED显示姓名学号
  17. Idea打字变成繁体
  18. 随感 | 毕设风雨 | 前端之路 | 暗夜与旅人 | 早槐与龙鲤
  19. 仿QQ对话列表滑动删除与置顶的原理及实现
  20. AN OPEN-SOURCE SPEAKER GENDER DETECTION FRAMEWORK FOR MONITORING GENDER EQUALITY

热门文章

  1. 2021第四届全国大学生IT技能大赛“传智杯”AK
  2. 「Java工具类」Apache的Beanutils和PropertyUtils工具类
  3. 税控接口 - 模拟录入
  4. 城市道路5G智慧监控方案 让感知为安全护航
  5. python画3D图
  6. mx播放器有没有投屏功能_无线投屏、即插即用,投影仪其实可以更智能:明基 E580T...
  7. python制作一个超强的加密软件
  8. tps协议和onvif协议_ONVIF协议解读
  9. mysql怎么导出insert语句_mysql导出insert语句
  10. 如何正确设置同时使用内外网