文章目录

  • 一 .Zookeeper
    • 1 请简述ZooKeeper的选举机制
    • 假设有五台服务器组成的zookeeper集群,它们的id从1-5,同时它们都是最新启动的,也就是没有历史数据,在存放数据量这一点上,都是一样的。假设这些服务器依序启动,来看看会发生什么。
    • ZooKeeper的监听原理是什么?
    • ZooKeeper的部署方式有哪几种?集群中的角色有哪些?集群最少需要几台机器?
    • ZooKeeper的常用命令
  • 二 Hive
      • 1 Hive表关联查询,如何解决数据倾斜的问题?
      • 2 请谈一下Hive的特点,Hive和RDBMS有什么异同?
      • 3 请说明Hive中 Sort By,Order By,Cluster By,Distrbute By各代表什么意思?
      • 4 Hive有哪些方式保存元数据,各有哪些特点?
      • 5 Hive内部表和外部表的区别?
      • 6 写出将 text.txt 文件放入 Hive中 test 表‘2016-10-10’ 分区的语句,test 的分区字段是 l_date。
      • 7 Hive自定义UDF函数的流程?
      • 8 对于Hive,你写过哪些udf函数,作用是什么?
      • 9 Hive 中的压缩格式TextFile、SequenceFile、RCfile 、ORCfile各有什么区别?
      • 10 Hive join过程中大表小表的放置顺序?
      • 11 Hive的两张表关联,使用MapReduce怎么实现?
      • 12 所有的Hive任务都会有MapReduce的执行吗?
      • 13 Hive的函数:UDF、UDAF、UDTF的区别?
      • 14 说说对Hive桶表的理解?
      • 15 Hive可以像关系型数据库那样建立多个库吗?
      • 16 Hive实现统计的查询语句是什么?
    • 17 Hive优化措施
      • 17.1 Fetch抓取
      • 17.2 本地模式
      • 17.3 表的优化
        • 17.3.1 小表、大表Join
        • 17.3.2 大表Join大表
        • 17.3.3 MapJoin
        • 17.3.4 Group By
        • 17.3.5 Count(Distinct) 去重统计
        • 17.3.6 笛卡尔积
        • 17.3.7 行列过滤
        • 17.3.8 动态分区调整
        • 17.3.9 分桶
        • 17.3.10 分区
      • 17.4 数据倾斜
        • 17.4.1 合理设置Map数
        • 17.4.3 复杂文件增加Map数
        • 17.4.4 合理设置Reduce数
      • 17.5 并行执行
      • 17.6 严格模式
      • 17.7 JVM重用
      • 17.8 推测执行
      • 17.9 压缩
        • 17.9.1 Hadoop源码编译支持Snappy压缩
        • 17.9.2 jar包安装
        • 17.9.3 编译源码
        • 17.9.4 Hadoop压缩配置
        • 17.9.5 开启Map输出阶段压缩
        • 17.9.6 开启Reduce输出阶段压缩
      • 17.10 执行计划(Explain)
      • 18 Hive数据分析面试题
  • 三 Flume
    • 1 flume有哪些组件,flume的source、channel、sink具体是做什么的
    • 2 你是如何实现flume数据传输的监控的
    • 3 flume的source,sink,channel的作用?你们source是什么类型?
  • 四 Kafka
    • 1 kafka的balance是怎么做的
    • 2 kafka的消费者有几种模式
    • 3 为什么kafka可以实现高吞吐?单节点kafka的吞吐量也比其他消息队列大,为什么?
      • kafka主要使用了以下几个方式实现了超高的吞吐率
    • 4 kafka的偏移量offset存放在哪儿,为什么?
    • 5 Kafka消费过的消息如何再消费
    • 6 Kafka里面用的什么方式拉的方式还是推的方式?如何保证数据不会出现丢失或者重复消费的情况?做过哪些预防措施,怎么解决以上问题的?Kafka元数据存在哪?
    • 数据重复消费的情况,如果处理?
    • 7 kafka支不支持事物。
    • 7. 事务实践
      • 7.1. 如何选择事务Id
      • 7.2. 事务性能以及如何优化?
    • 8 Kafka的原理?
  • 五 Hbase
    • 1 Hbase热点问题 读写请求会集中到某一个RegionServer上如何处理
    • 2 hbase查询一条记录的方法是什么?Hbase写入一条记录的方法是什么?
    • 3 Hbase优化及rowkey设计。

一 .Zookeeper

1 请简述ZooKeeper的选举机制

1)半数机制(Paxos 协议):集群中半数以上机器存活,集群可用。所以zookeeper适合装在奇数台机器上。
2)Zookeeper虽然在配置文件中并没有指定master和slave。但是,zookeeper工作时,是有一个节点为leader,其他则为follower,Leader是通过内部的选举机制临时产生的
3)以一个简单的例子来说明整个选举的过程。

假设有五台服务器组成的zookeeper集群,它们的id从1-5,同时它们都是最新启动的,也就是没有历史数据,在存放数据量这一点上,都是一样的。假设这些服务器依序启动,来看看会发生什么。

(1)服务器1启动,此时只有它一台服务器启动了,它发出去的报没有任何响应,所以它的选举状态一直是LOOKING状态。
(2)服务器2启动,它与最开始启动的服务器1进行通信,互相交换自己的选举结果,由于两者都没有历史数据,所以id值较大的服务器2胜出,但是由于没有达到超过半数以上的服务器都同意选举它(这个例子中的半数以上是3),所以服务器1、2还是继续保持LOOKING状态。
(3)服务器3启动,根据前面的理论分析,服务器3成为服务器1、2、3中的老大,而与上面不同的是,此时有三台服务器选举了它,所以它成为了这次选举的leader。
(4)服务器4启动,根据前面的分析,理论上服务器4应该是服务器1、2、 3、4中最大的,但是由于前面已经有半数以上的服务器选举了服务器3,所以它只能接收当小弟的命了。
(5)服务器5启动,同4一样当小弟。

ZooKeeper的监听原理是什么?

1)首先要有一个main()线程
2)在main线程中创建Zookeeper客户端,这时就会创建两个线程,一个负责网络连接通信(connet),一个负责监听(listener)。
3)通过connect线程将注册的监听事件发送给Zookeeper。
4)在Zookeeper的注册监听器列表中将注册的监听事件添加到列表中。
5)Zookeeper监听到有数据或路径变化,就会将这个消息发送给listener线程。
6)listener线程内部调用了proces()方法。

ZooKeeper的部署方式有哪几种?集群中的角色有哪些?集群最少需要几台机器?

1)单机模式、伪集群模式和集群模式
2)Leader和Follower
3)3

ZooKeeper的常用命令

ls create get delete set…

二 Hive

1 Hive表关联查询,如何解决数据倾斜的问题?

1) 过滤掉脏数据:如果大key是无意义的脏数据,直接过滤掉。本场景中大key无实际意义,为非常脏数据,直接过滤掉。
2)数据预处理:数据做一下预处理,尽量保证join的时候,同一个key对应的记录不要有太多。
3) 增加reduce个数:如果数据中出现了多个大key,增加reduce个数,可以让这些大key落到同一个reduce的概率小很多。
4) 转换为mapjoin:如果两个表join的时候,一个表为小表,可以用mapjoin做。
5) 大key单独处理:将大key和其他key分开处理
6)hive.optimize.skewjoin:会将一个join sql 分为两个job。另外可以同时设置下hive.skewjoin.key,默认为10000。参考:
https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties
参数对full outer join无效。
7)调整内存设置:适用于那些由于内存超限内务被kill掉的场景。通过加大内存起码能让任务跑起来,不至于被杀掉。该参数不一定会明显降低任务执行时间。如:
setmapreduce.reduce.memory.mb=5120 ;
setmapreduce.reduce.java.opts=-Xmx5000M -XX:MaxPermSize=128m ;

2 请谈一下Hive的特点,Hive和RDBMS有什么异同?

特点:
1)Hive处理的数据存储在HDFS
2)Hive分析数据底层的实现是MapReduce
3)执行程序运行在Yarn上
异同:
1)查询语言:由于SQL被广泛的应用在数据仓库中,因此,专门针对Hive的特性设计了类SQL的查询语言HQL。熟悉SQL开发的开发者可以很方便的使用Hive进行开发。
2) 数据存储位置:Hive 是建立在 Hadoop 之上的,所有 Hive 的数据都是存储在 HDFS 中的。而数据库则可以将数据保存在块设备或者本地文件系统中。
3)数据更新:由于Hive是针对数据仓库应用设计的,而数据仓库的内容是读多写少的。因此,Hive中不建议对数据的改写,所有的数据都是在加载的时候确定好的。而数据库中的数据通常是需要经常进行修改的,因此可以使用 INSERT INTO … VALUES 添加数据,使用 UPDATE … SET修改数据。
4)索引:Hive在加载数据的过程中不会对数据进行任何处理,甚至不会对数据进行扫描,因此也没有对数据中的某些Key建立索引。Hive要访问数据中满足条件的特定值时,需要暴力扫描整个数据,因此访问延迟较高。由于 MapReduce 的引入, Hive 可以并行访问数据,因此即使没有索引,对于大数据量的访问,Hive 仍然可以体现出优势。数据库中,通常会针对一个或者几个列建立索引,因此对于少量的特定条件的数据的访问,数据库可以有很高的效率,较低的延迟。由于数据的访问延迟较高,决定了 Hive 不适合在线数据查询。
5) 执行:Hive中大多数查询的执行是通过 Hadoop 提供的 MapReduce 来实现的。而数据库通常有自己的执行引擎。
6) 执行延迟:Hive 在查询数据的时候,由于没有索引,需要扫描整个表,因此延迟较高。另外一个导致 Hive 执行延迟高的因素是 MapReduce框架。由于MapReduce 本身具有较高的延迟,因此在利用MapReduce 执行Hive查询时,也会有较高的延迟。相对的,数据库的执行延迟较低。当然,这个低是有条件的,即数据规模较小,当数据规模大到超过数据库的处理能力的时候,Hive的并行计算显然能体现出优势。
7) 可扩展性:由于Hive是建立在Hadoop之上的,因此Hive的可扩展性是和Hadoop的可扩展性是一致的(世界上最大的Hadoop 集群在 Yahoo!,2009年的规模在4000 台节点左右)。而数据库由于 ACID 语义的严格限制,扩展行非常有限。目前最先进的并行数据库 Oracle 在理论上的扩展能力也只有100台左右。
8)数据规模:由于Hive建立在集群上并可以利用MapReduce进行并行计算,因此可以支持很大规模的数据;对应的,数据库可以支持的数据规模较小。

3 请说明Hive中 Sort By,Order By,Cluster By,Distrbute By各代表什么意思?

Sort By:每个Reducer内部有序;
Order By:全局排序,只有一个Reducer;
Cluster By:当distribute by和sorts by字段相同时,可以使用cluster by方式。cluster by除了具有distribute by的功能外还兼具sort by的功能。但是排序只能是倒序排序,不能指定排序规则为ASC或者DESC。
Distrbute By:类似MR中partition,进行分区,结合sort by使用,Hive要求DISTRIBUTE BY语句要写在SORT BY语句之前。

4 Hive有哪些方式保存元数据,各有哪些特点?

1)Single User Mode:默认安装hive,hive是使用derby内存数据库保存hive的元数据,这样是不可以并发调用hive的。
2)User Mode:通过网络连接到一个数据库中,是最经常使用到的模式。假设使用本机mysql服务器存储元数据。这种存储方式需要在本地运行一个mysql服务器,可并发调用
3)Remote Server Mode:在服务器端启动一个 MetaStoreServer,客户端利用 Thrift 协议通过 MetaStoreServer 访问元数据库。

5 Hive内部表和外部表的区别?

1)默认创建的表都是管理表,有时也被称为内部表。因为这种表,Hive会(或多或少地)控制着数据的生命周期。Hive默认情况下会将这些表的数据存储在由配置项hive.metastore.warehouse.dir(例如,/user/hive/warehouse)所定义的目录的子目录下。 当我们删除一个管理表时,Hive也会删除这个表中数据。管理表不适合和其他工具共享数据。
2)Hive并非认为其完全拥有这份数据。删除该表并不会删除掉这份数据,不过描述表的元数据信息会被删除掉。

6 写出将 text.txt 文件放入 Hive中 test 表‘2016-10-10’ 分区的语句,test 的分区字段是 l_date。

load data local inpath ‘/text.txt’ into table test partition(l_date=’ 2016-10-10’);

7 Hive自定义UDF函数的流程?

1)自定义一个Java类
2)继承UDF类
3)重写evaluate方法
2)传入时间,返回年月日。
4)打成jar包
6)在hive执行add jar方法
7)在hive执行创建模板函数
8)hql中使用

8 对于Hive,你写过哪些udf函数,作用是什么?

时间日期类UDF函数,传入时间,返回年,月,日…

9 Hive 中的压缩格式TextFile、SequenceFile、RCfile 、ORCfile各有什么区别?

TextFile: 默认格式,数据不做压缩,磁盘开销大,数据解析开销大。可结合Gzip、Bzip2使用,但使用Gzip这种方式,hive不会对数据进行切分,从而无法对数据进行并行操作。
SequenceFile: SequenceFile是Hadoop API 提供的一种二进制文件,它将数据以<key,value>的形式序列化到文件中。这种二进制文件内部使用Hadoop 的标准的Writable 接口实现序列化和反序列化。它与Hadoop API中的MapFile 是互相兼容的。Hive 中的SequenceFile 继承自Hadoop API 的SequenceFile,不过它的key为空,使用value 存放实际的值, 这样是为了避免MR 在运行map 阶段的排序过程。
RCFile: RCFile是Hive推出的一种专门面向列的数据格式。 它遵循“先按列划分,再垂直划分”的设计理念。当查询过程中,针对它并不关心的列时,它会在IO上跳过这些列。需要说明的是,RCFile在map阶段从 远端拷贝仍然是拷贝整个数据块,并且拷贝到本地目录后RCFile并不是真正直接跳过不需要的列,并跳到需要读取的列, 而是通过扫描每一个row group的头部定义来实现的,但是在整个HDFS Block 级别的头部并没有定义每个列从哪个row group起始到哪个row group结束。所以在读取所有列的情况下,RCFile的性能反而没有SequenceFile高。
ORCfile: ORC是列式存储,有多种文件压缩方式,并且有着很高的压缩比。文件是可切分(Split)的。因此,在Hive中使用ORC作为表的文件存储格式,不仅节省HDFS存储资源,查询任务的输入数据量减少,使用的MapTask也就减少了。提供了多种索引,row group index、bloom filter index。ORC可以支持复杂的数据结构(比如Map等)

10 Hive join过程中大表小表的放置顺序?

小表在前,大表在后。Hive会将小表进行缓存。在0.7版本后。也能够用配置来自己主动优化:set hive.auto.convert.join=true;

11 Hive的两张表关联,使用MapReduce怎么实现?

可以将小表缓存在map端实现map端join防止数据倾斜。并将join字段作为key,在Reducer阶段实际执行相应的业务处理。

12 所有的Hive任务都会有MapReduce的执行吗?

不是。Hive中对某些情况的查询可以不必使用MapReduce计算。例如:SELECT * FROM employees;在这种情况下,Hive可以简单地读取employee对应的存储目录下的文件,然后输出查询结果到控制台。
在hive-default.xml.template文件中hive.fetch.task.conversion默认是more,老版本hive默认是minimal,该属性修改为more以后,在全局查找、字段查找、limit查找等都不走mapreduce。

13 Hive的函数:UDF、UDAF、UDTF的区别?

(1)UDF(User-Defined-Function)
一进一出
(2)UDAF(User-Defined Aggregation Function)
聚集函数,多进一出
类似于:count/max/min
(3)UDTF(User-Defined Table-Generating Functions)
一进多出
如lateral view explore()

14 说说对Hive桶表的理解?

分区针对的是数据的存储路径,分桶针对的是数据文件。分区提供一个隔离数据和优化查询的便利方式。不过,并非所有的数据集都可形成合理的分区,特别是之前所提到过的要确定合适的划分大小这个疑虑。
分桶是将数据集分解成更容易管理的若干部分的另一个技术。

15 Hive可以像关系型数据库那样建立多个库吗?

可以

16 Hive实现统计的查询语句是什么?

聚合函数:count(),sum(),avg(),max(),min()

17 Hive优化措施

17.1 Fetch抓取

Fetch抓取是指,Hive中对某些情况的查询可以不必使用MapReduce计算。例如:SELECT * FROM employees;在这种情况下,Hive可以简单地读取employee对应的存储目录下的文件,然后输出查询结果到控制台。
在hive-default.xml.template文件中hive.fetch.task.conversion默认是more,老版本hive默认是minimal,该属性修改为more以后,在全局查找、字段查找、limit查找等都不走mapreduce。

<property><name>hive.fetch.task.conversion</name><value>more</value><description>Expects one of [none, minimal, more].Some select queries can be converted to single FETCH task minimizing latency.Currently the query should be single sourced not having any subquery and should not haveany aggregations or distincts (which incurs RS), lateral views and joins.0. none : disable hive.fetch.task.conversion1. minimal : SELECT STAR, FILTER on partition columns, LIMIT only2. more  : SELECT, FILTER, LIMIT only (support TABLESAMPLE and virtual columns)</description></property>

案例实操:
1)把hive.fetch.task.conversion设置成none,然后执行查询语句,都会执行mapreduce程序。
hive (default)> set hive.fetch.task.conversion=none;
hive (default)> select * from emp;
hive (default)> select ename from emp;
hive (default)> select ename from emp limit 3;
2)把hive.fetch.task.conversion设置成more,然后执行查询语句,如下查询方式都不会执行mapreduce程序。
hive (default)> set hive.fetch.task.conversion=more;
hive (default)> select * from emp;
hive (default)> select ename from emp;
hive (default)> select ename from emp limit 3;

17.2 本地模式

大多数的Hadoop Job是需要Hadoop提供的完整的可扩展性来处理大数据集的。不过,有时Hive的输入数据量是非常小的。在这种情况下,为查询触发执行任务消耗的时间可能会比实际job的执行时间要多的多。对于大多数这种情况,Hive可以通过本地模式在单台机器上处理所有的任务。对于小数据集,执行时间可以明显被缩短。
用户可以通过设置hive.exec.mode.local.auto的值为true,来让Hive在适当的时候自动启动这个优化。
set hive.exec.mode.local.auto=true; //开启本地mr
//设置local mr的最大输入数据量,当输入数据量小于这个值时采用local mr的方式,默认为134217728,即128M
set hive.exec.mode.local.auto.inputbytes.max=50000000;
//设置local mr的最大输入文件个数,当输入文件个数小于这个值时采用local mr的方式,默认为4
set hive.exec.mode.local.auto.input.files.max=10;
案例实操:
1)开启本地模式,并执行查询语句
hive (default)> set hive.exec.mode.local.auto=true;
hive (default)> select * from emp cluster by deptno;
Time taken: 1.328 seconds, Fetched: 14 row(s)
2)关闭本地模式,并执行查询语句
hive (default)> set hive.exec.mode.local.auto=false;
hive (default)> select * from emp cluster by deptno;
Time taken: 20.09 seconds, Fetched: 14 row(s)

17.3 表的优化

17.3.1 小表、大表Join

将key相对分散,并且数据量小的表放在join的左边,这样可以有效减少内存溢出错误发生的几率;再进一步,可以使用map join让小的维度表(1000条以下的记录条数)先进内存。在map端完成reduce。
实际测试发现:新版的hive已经对小表JOIN大表和大表JOIN小表进行了优化。小表放在左边和右边已经没有明显区别。
案例实操
1.需求
测试大表JOIN小表和小表JOIN大表的效率
2.建大表、小表和JOIN后表的语句
// 创建大表
create table bigtable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by ‘\t’;
// 创建小表
create table smalltable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by ‘\t’;
// 创建join后表的语句
create table jointable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by ‘\t’;
3.分别向大表和小表中导入数据
hive (default)> load data local inpath ‘/opt/module/datas/bigtable’ into table bigtable;
hive (default)>load data local inpath ‘/opt/module/datas/smalltable’ into table smalltable;
4.关闭mapjoin功能(默认是打开的)
set hive.auto.convert.join = false;
5.执行小表JOIN大表语句
insert overwrite table jointable
select b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
from smalltable s
left join bigtable b
on b.id = s.id;
Time taken: 35.921 seconds
6.执行大表JOIN小表语句
insert overwrite table jointable
select b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
from bigtable b
left join smalltable s
on s.id = b.id;
Time taken: 34.196 seconds

17.3.2 大表Join大表

1.空KEY过滤
有时join超时是因为某些key对应的数据太多,而相同key对应的数据都会发送到相同的reducer上,从而导致内存不够。此时我们应该仔细分析这些异常的key,很多情况下,这些key对应的数据是异常数据,我们需要在SQL语句中进行过滤。例如key对应的字段为空,操作如下:
案例实操
(1)配置历史服务器
配置mapred-site.xml

mapreduce.jobhistory.address
hadoop102:10020

mapreduce.jobhistory.webapp.address
hadoop102:19888

启动历史服务器
sbin/mr-jobhistory-daemon.sh start historyserver
查看jobhistory
http://192.168.1.102:19888/jobhistory
(2)创建原始数据表、空id表、合并后数据表
// 创建原始表
create table ori(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by ‘\t’;
// 创建空id表
create table nullidtable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by ‘\t’;
// 创建join后表的语句
create table jointable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by ‘\t’;
(3)分别加载原始数据和空id数据到对应表中
hive (default)> load data local inpath ‘/opt/module/datas/ori’ into table ori;
hive (default)> load data local inpath ‘/opt/module/datas/nullid’ into table nullidtable;
(4)测试不过滤空id
hive (default)> insert overwrite table jointable
select n.* from nullidtable n left join ori o on n.id = o.id;
Time taken: 42.038 seconds
Time taken: 37.284 seconds
(5)测试过滤空id
hive (default)> insert overwrite table jointable
select n.* from (select * from nullidtable where id is not null ) n left join ori o on n.id = o.id;
Time taken: 31.725 seconds
Time taken: 28.876 seconds
2.空key转换
有时虽然某个key为空对应的数据很多,但是相应的数据不是异常数据,必须要包含在join的结果中,此时我们可以表a中key为空的字段赋一个随机的值,使得数据随机均匀地分不到不同的reducer上。例如:
案例实操:
不随机分布空null值:
(1)设置5个reduce个数
set mapreduce.job.reduces = 5;
(2)JOIN两张表
insert overwrite table jointable
select n.* from nullidtable n left join ori b on n.id = b.id;
结果:如图6-13所示,可以看出来,出现了数据倾斜,某些reducer的资源消耗远大于其他reducer。

图6-13 空key转换
随机分布空null值
(1)设置5个reduce个数
set mapreduce.job.reduces = 5;
(2)JOIN两张表
insert overwrite table jointable
select n.* from nullidtable n full join ori o on
case when n.id is null then concat(‘hive’, rand()) else n.id end = o.id;
结果:如图6-14所示,可以看出来,消除了数据倾斜,负载均衡reducer的资源消耗

图6-14 随机分布空值

17.3.3 MapJoin

如果不指定MapJoin或者不符合MapJoin的条件,那么Hive解析器会将Join操作转换成Common Join,即:在Reduce阶段完成join。容易发生数据倾斜。可以用MapJoin把小表全部加载到内存在map端进行join,避免reducer处理。
1.开启MapJoin参数设置
(1)设置自动选择Mapjoin
set hive.auto.convert.join = true; 默认为true
(2)大表小表的阈值设置(默认25M一下认为是小表):
set hive.mapjoin.smalltable.filesize=25000000;
2.MapJoin工作机制,如图6-15所示

图6-15 MapJoin工作机制
案例实操:
(1)开启Mapjoin功能
set hive.auto.convert.join = true; 默认为true
(2)执行小表JOIN大表语句
insert overwrite table jointable
select b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
from smalltable s
join bigtable b
on s.id = b.id;
Time taken: 24.594 seconds
(3)执行大表JOIN小表语句
insert overwrite table jointable
select b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
from bigtable b
join smalltable s
on s.id = b.id;
Time taken: 24.315 seconds

17.3.4 Group By

默认情况下,Map阶段同一Key数据分发给一个reduce,当一个key数据过大时就倾斜了。
并不是所有的聚合操作都需要在Reduce端完成,很多聚合操作都可以先在Map端进行部分聚合,最后在Reduce端得出最终结果。
1.开启Map端聚合参数设置
(1)是否在Map端进行聚合,默认为True
hive.map.aggr = true
(2)在Map端进行聚合操作的条目数目
hive.groupby.mapaggr.checkinterval = 100000
(3)有数据倾斜的时候进行负载均衡(默认是false)
hive.groupby.skewindata = true
当选项设定为 true,生成的查询计划会有两个MR Job。第一个MR Job中,Map的输出结果会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的Group By Key有可能被分发到不同的Reduce中,从而达到负载均衡的目的;第二个MR Job再根据预处理的数据结果按照Group By Key分布到Reduce中(这个过程可以保证相同的Group By Key被分布到同一个Reduce中),最后完成最终的聚合操作。

17.3.5 Count(Distinct) 去重统计

数据量小的时候无所谓,数据量大的情况下,由于COUNT DISTINCT操作需要用一个Reduce Task来完成,这一个Reduce需要处理的数据量太大,就会导致整个Job很难完成,一般COUNT DISTINCT使用先GROUP BY再COUNT的方式替换:
案例实操
1.创建一张大表
hive (default)> create table bigtable(id bigint, time bigint, uid string, keyword
string, url_rank int, click_num int, click_url string) row format delimited
fields terminated by ‘\t’;
2.加载数据
hive (default)> load data local inpath ‘/opt/module/datas/bigtable’ into table
bigtable;
3.设置5个reduce个数
set mapreduce.job.reduces = 5;
4.执行去重id查询
hive (default)> select count(distinct id) from bigtable;
Stage-Stage-1: Map: 1 Reduce: 1 Cumulative CPU: 7.12 sec HDFS Read: 120741990 HDFS Write: 7 SUCCESS
Total MapReduce CPU Time Spent: 7 seconds 120 msec
OK
c0
100001
Time taken: 23.607 seconds, Fetched: 1 row(s)
5.采用GROUP by去重id
hive (default)> select count(id) from (select id from bigtable group by id) a;
Stage-Stage-1: Map: 1 Reduce: 5 Cumulative CPU: 17.53 sec HDFS Read: 120752703 HDFS Write: 580 SUCCESS
Stage-Stage-2: Map: 1 Reduce: 1 Cumulative CPU: 4.29 sec HDFS Read: 9409 HDFS Write: 7 SUCCESS
Total MapReduce CPU Time Spent: 21 seconds 820 msec
OK
_c0
100001
Time taken: 50.795 seconds, Fetched: 1 row(s)
虽然会多用一个Job来完成,但在数据量大的情况下,这个绝对是值得的。

17.3.6 笛卡尔积

尽量避免笛卡尔积,join的时候不加on条件,或者无效的on条件,Hive只能使用1个reducer来完成笛卡尔积。

17.3.7 行列过滤

列处理:在SELECT中,只拿需要的列,如果有,尽量使用分区过滤,少用SELECT *。
行处理:在分区剪裁中,当使用外关联时,如果将副表的过滤条件写在Where后面,那么就会先全表关联,之后再过滤,比如:
案例实操:
1.测试先关联两张表,再用where条件过滤
hive (default)> select o.id from bigtable b
join ori o on o.id = b.id
where o.id <= 10;
Time taken: 34.406 seconds, Fetched: 100 row(s)
2.通过子查询后,再关联表
hive (default)> select b.id from bigtable b
join (select id from ori where id <= 10 ) o on b.id = o.id;
Time taken: 30.058 seconds, Fetched: 100 row(s)

17.3.8 动态分区调整

关系型数据库中,对分区表Insert数据时候,数据库自动会根据分区字段的值,将数据插入到相应的分区中,Hive中也提供了类似的机制,即动态分区(Dynamic Partition),只不过,使用Hive的动态分区,需要进行相应的配置。
1.开启动态分区参数设置
(1)开启动态分区功能(默认true,开启)
hive.exec.dynamic.partition=true
(2)设置为非严格模式(动态分区的模式,默认strict,表示必须指定至少一个分区为静态分区,nonstrict模式表示允许所有的分区字段都可以使用动态分区。)
hive.exec.dynamic.partition.mode=nonstrict
(3)在所有执行MR的节点上,最大一共可以创建多少个动态分区。
hive.exec.max.dynamic.partitions=1000
(4)在每个执行MR的节点上,最大可以创建多少个动态分区。该参数需要根据实际的数据来设定。比如:源数据中包含了一年的数据,即day字段有365个值,那么该参数就需要设置成大于365,如果使用默认值100,则会报错。
hive.exec.max.dynamic.partitions.pernode=100
(5)整个MR Job中,最大可以创建多少个HDFS文件。
hive.exec.max.created.files=100000
(6)当有空分区生成时,是否抛出异常。一般不需要设置。
hive.error.on.empty.partition=false
2.案例实操
需求:将ori中的数据按照时间(如:20111230000008),插入到目标表ori_partitioned_target的相应分区中。
(1)创建分区表
create table ori_partitioned(id bigint, time bigint, uid string, keyword string,
url_rank int, click_num int, click_url string)
partitioned by (p_time bigint)
row format delimited fields terminated by ‘\t’;
(2)加载数据到分区表中
hive (default)> load data local inpath ‘/home/atguigu/ds1’ into table
ori_partitioned partition(p_time=‘20111230000010’) ;
hive (default)> load data local inpath ‘/home/atguigu/ds2’ into table ori_partitioned partition(p_time=‘20111230000011’) ;
(3)创建目标分区表
create table ori_partitioned_target(id bigint, time bigint, uid string,
keyword string, url_rank int, click_num int, click_url string) PARTITIONED BY (p_time STRING) row format delimited fields terminated by ‘\t’;
(4)设置动态分区
set hive.exec.dynamic.partition = true;
set hive.exec.dynamic.partition.mode = nonstrict;
set hive.exec.max.dynamic.partitions = 1000;
set hive.exec.max.dynamic.partitions.pernode = 100;
set hive.exec.max.created.files = 100000;
set hive.error.on.empty.partition = false;

hive (default)> insert overwrite table ori_partitioned_target partition (p_time)
select id, time, uid, keyword, url_rank, click_num, click_url, p_time from ori_partitioned;
(5)查看目标分区表的分区情况
hive (default)> show partitions ori_partitioned_target;

17.3.9 分桶

1)分桶表数据存储
分区针对的是数据的存储路径;分桶针对的是数据文件。
分区提供一个隔离数据和优化查询的便利方式。不过,并非所有的数据集都可形成合理的分区,特别是之前所提到过的要确定合适的划分大小这个疑虑。
分桶是将数据集分解成更容易管理的若干部分的另一个技术。
1.先创建分桶表,通过直接导入数据文件的方式
(1)数据准备

(2)创建分桶表
create table stu_buck(id int, name string)
clustered by(id)
into 4 buckets
row format delimited fields terminated by ‘\t’;
(3)查看表结构
hive (default)> desc formatted stu_buck;
Num Buckets: 4
(4)导入数据到分桶表中
hive (default)> load data local inpath ‘/opt/module/datas/student.txt’ into table
stu_buck;
(5)查看创建的分桶表中是否分成4个桶,如图6-7所示

图6-7 未分桶
发现并没有分成4个桶。是什么原因呢?
2.创建分桶表时,数据通过子查询的方式导入
(1)先建一个普通的stu表
create table stu(id int, name string)
row format delimited fields terminated by ‘\t’;
(2)向普通的stu表中导入数据
load data local inpath ‘/opt/module/datas/student.txt’ into table stu;
(3)清空stu_buck表中数据
truncate table stu_buck;
select * from stu_buck;
(4)导入数据到分桶表,通过子查询的方式
insert into table stu_buck
select id, name from stu;
(5)发现还是只有一个分桶,如图6-8所示

图6-8 未分桶
(6)需要设置一个属性
hive (default)> set hive.enforce.bucketing=true;
hive (default)> set mapreduce.job.reduces=-1;
hive (default)> insert into table stu_buck
select id, name from stu;

图6-9 分桶
(7)查询分桶的数据
hive (default)> select * from stu_buck;
OK
stu_buck.id stu_buck.name
1004 ss4
1008 ss8
1012 ss12
1016 ss16
1001 ss1
1005 ss5
1009 ss9
1013 ss13
1002 ss2
1006 ss6
1010 ss10
1014 ss14
1003 ss3
1007 ss7
1011 ss11
1015 ss15
2)分桶抽样查询
对于非常大的数据集,有时用户需要使用的是一个具有代表性的查询结果而不是全部结果。Hive可以通过对表进行抽样来满足这个需求。
查询表stu_buck中的数据。
hive (default)> select * from stu_buck tablesample(bucket 1 out of 4 on id);
注:tablesample是抽样语句,语法:TABLESAMPLE(BUCKET x OUT OF y) 。
y必须是table总bucket数的倍数或者因子。hive根据y的大小,决定抽样的比例。例如,table总共分了4份,当y=2时,抽取(4/2=)2个bucket的数据,当y=8时,抽取(4/8=)1/2个bucket的数据。
x表示从哪个bucket开始抽取,如果需要取多个分区,以后的分区号为当前分区号加上y。例如,table总bucket数为4,tablesample(bucket 1 out of 2),表示总共抽取(4/2=)2个bucket的数据,抽取第1(x)个和第4(x+y)个bucket的数据。
注意:x的值必须小于等于y的值,否则
FAILED: SemanticException [Error 10061]: Numerator should not be bigger than denominator in sample clause for table stu_buck

17.3.10 分区

分区表实际上就是对应一个HDFS文件系统上的独立的文件夹,该文件夹下是该分区所有的数据文件。Hive中的分区就是分目录,把一个大的数据集根据业务需要分割成小的数据集。在查询时通过WHERE子句中的表达式选择查询所需要的指定的分区,这样的查询效率会提高很多。
1)分区表基本操作
1.引入分区表(需要根据日期对日志进行管理)
/user/hive/warehouse/log_partition/20170702/20170702.log
/user/hive/warehouse/log_partition/20170703/20170703.log
/user/hive/warehouse/log_partition/20170704/20170704.log
2.创建分区表语法
hive (default)> create table dept_partition(
deptno int, dname string, loc string
)
partitioned by (month string)
row format delimited fields terminated by ‘\t’;
3.加载数据到分区表中
hive (default)> load data local inpath ‘/opt/module/datas/dept.txt’ into table default.dept_partition partition(month=‘201709’);
hive (default)> load data local inpath ‘/opt/module/datas/dept.txt’ into table default.dept_partition partition(month=‘201708’);
hive (default)> load data local inpath ‘/opt/module/datas/dept.txt’ into table default.dept_partition partition(month='201707’);

图6-5 加载数据到分区表

图6-6 分区表
4.查询分区表中数据
单分区查询
hive (default)> select * from dept_partition where month=‘201709’;
多分区联合查询
hive (default)> select * from dept_partition where month=‘201709’
union
select * from dept_partition where month=‘201708’
union
select * from dept_partition where month=‘201707’;

_u3.deptno _u3.dname _u3.loc _u3.month
10 ACCOUNTING NEW YORK 201707
10 ACCOUNTING NEW YORK 201708
10 ACCOUNTING NEW YORK 201709
20 RESEARCH DALLAS 201707
20 RESEARCH DALLAS 201708
20 RESEARCH DALLAS 201709
30 SALES CHICAGO 201707
30 SALES CHICAGO 201708
30 SALES CHICAGO 201709
40 OPERATIONS BOSTON 201707
40 OPERATIONS BOSTON 201708
40 OPERATIONS BOSTON 201709
5.增加分区
创建单个分区
hive (default)> alter table dept_partition add partition(month=‘201706’) ;
同时创建多个分区
hive (default)> alter table dept_partition add partition(month=‘201705’) partition(month=‘201704’);
6.删除分区
删除单个分区
hive (default)> alter table dept_partition drop partition (month=‘201704’);
同时删除多个分区
hive (default)> alter table dept_partition drop partition (month=‘201705’), partition (month=‘201706’);
7.查看分区表有多少分区
hive> show partitions dept_partition;
8.查看分区表结构
hive> desc formatted dept_partition;

# Partition Information
# col_name              data_type               comment
month                   string

2)分区表注意事项
1.创建二级分区表
hive (default)> create table dept_partition2(
deptno int, dname string, loc string
)
partitioned by (month string, day string)
row format delimited fields terminated by ‘\t’;
2.正常的加载数据
(1)加载数据到二级分区表中
hive (default)> load data local inpath ‘/opt/module/datas/dept.txt’ into table
default.dept_partition2 partition(month=‘201709’, day=‘13’);
(2)查询分区数据
hive (default)> select * from dept_partition2 where month=‘201709’ and day=‘13’;
3.把数据直接上传到分区目录上,让分区表和数据产生关联的三种方式
(1)方式一:上传数据后修复
上传数据
hive (default)> dfs -mkdir -p
/user/hive/warehouse/dept_partition2/month=201709/day=12;
hive (default)> dfs -put /opt/module/datas/dept.txt /user/hive/warehouse/dept_partition2/month=201709/day=12;
查询数据(查询不到刚上传的数据)
hive (default)> select * from dept_partition2 where month=‘201709’ and day=‘12’;
执行修复命令
hive> msck repair table dept_partition2;
再次查询数据
hive (default)> select * from dept_partition2 where month=‘201709’ and day=‘12’;
(2)方式二:上传数据后添加分区
上传数据
hive (default)> dfs -mkdir -p
/user/hive/warehouse/dept_partition2/month=201709/day=11;
hive (default)> dfs -put /opt/module/datas/dept.txt /user/hive/warehouse/dept_partition2/month=201709/day=11;
执行添加分区
hive (default)> alter table dept_partition2 add partition(month=‘201709’,
day=‘11’);
查询数据
hive (default)> select * from dept_partition2 where month=‘201709’ and day=‘11’;
(3)方式三:上传数据后load数据到分区
创建目录
hive (default)> dfs -mkdir -p
/user/hive/warehouse/dept_partition2/month=201709/day=10;
上传数据
hive (default)> load data local inpath ‘/opt/module/datas/dept.txt’ into table
dept_partition2 partition(month=‘201709’,day=‘10’);
查询数据
hive (default)> select * from dept_partition2 where month=‘201709’ and day=‘10’;

17.4 数据倾斜

17.4.1 合理设置Map数

1)通常情况下,作业会通过input的目录产生一个或者多个map任务。
主要的决定因素有:input的文件总个数,input的文件大小,集群设置的文件块大小。
2)是不是map数越多越好?
答案是否定的。如果一个任务有很多小文件(远远小于块大小128m),则每个小文件也会被当做一个块,用一个map任务来完成,而一个map任务启动和初始化的时间远远大于逻辑处理的时间,就会造成很大的资源浪费。而且,同时可执行的map数是受限的。
3)是不是保证每个map处理接近128m的文件块,就高枕无忧了?
答案也是不一定。比如有一个127m的文件,正常会用一个map去完成,但这个文件只有一个或者两个小字段,却有几千万的记录,如果map处理的逻辑比较复杂,用一个map任务去做,肯定也比较耗时。
针对上面的问题2和3,我们需要采取两种方式来解决:即减少map数和增加map数;
#####17.4.2 小文件进行合并
在map执行前合并小文件,减少map数:CombineHiveInputFormat具有对小文件进行合并的功能(系统默认的格式)。HiveInputFormat没有对小文件合并功能。
set hive.input.format= org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

17.4.3 复杂文件增加Map数

当input的文件都很大,任务逻辑复杂,map执行非常慢的时候,可以考虑增加Map数,来使得每个map处理的数据量减少,从而提高任务的执行效率。
增加map的方法为:根据computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M公式,调整maxSize最大值。让maxSize最大值低于blocksize就可以增加map的个数。
案例实操:
1.执行查询
hive (default)> select count() from emp;
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2.设置最大切片值为100个字节
hive (default)> set mapreduce.input.fileinputformat.split.maxsize=100;
hive (default)> select count(
) from emp;
Hadoop job information for Stage-1: number of mappers: 6; number of reducers: 1

17.4.4 合理设置Reduce数

1.调整reduce个数方法一
(1)每个Reduce处理的数据量默认是256MB
hive.exec.reducers.bytes.per.reducer=256000000
(2)每个任务最大的reduce数,默认为1009
hive.exec.reducers.max=1009
(3)计算reducer数的公式
N=min(参数2,总输入数据量/参数1)
2.调整reduce个数方法二
在hadoop的mapred-default.xml文件中修改
设置每个job的Reduce个数
set mapreduce.job.reduces = 15;
3.reduce个数并不是越多越好
1)过多的启动和初始化reduce也会消耗时间和资源;
2)另外,有多少个reduce,就会有多少个输出文件,如果生成了很多个小文件,那么如果这些小文件作为下一个任务的输入,则也会出现小文件过多的问题;
在设置reduce个数的时候也需要考虑这两个原则:处理大数据量利用合适的reduce数;使单个reduce任务处理数据量大小要合适;

17.5 并行执行

Hive会将一个查询转化成一个或者多个阶段。这样的阶段可以是MapReduce阶段、抽样阶段、合并阶段、limit阶段。或者Hive执行过程中可能需要的其他阶段。默认情况下,Hive一次只会执行一个阶段。不过,某个特定的job可能包含众多的阶段,而这些阶段可能并非完全互相依赖的,也就是说有些阶段是可以并行执行的,这样可能使得整个job的执行时间缩短。不过,如果有更多的阶段可以并行执行,那么job可能就越快完成。
通过设置参数hive.exec.parallel值为true,就可以开启并发执行。不过,在共享集群中,需要注意下,如果job中并行阶段增多,那么集群利用率就会增加。
set hive.exec.parallel=true; //打开任务并行执行
set hive.exec.parallel.thread.number=16; //同一个sql允许最大并行度,默认为8。
当然,得是在系统资源比较空闲的时候才有优势,否则,没资源,并行也起不来。

17.6 严格模式

Hive提供了一个严格模式,可以防止用户执行那些可能意向不到的不好的影响的查询。
通过设置属性hive.mapred.mode值为默认是非严格模式nonstrict 。开启严格模式需要修改hive.mapred.mode值为strict,开启严格模式可以禁止3种类型的查询。

hive.mapred.mode
strict

The mode in which the Hive operations are being performed.
In strict mode, some risky queries are not allowed to run. They include:
Cartesian Product.
No partition being picked up for a query.
Comparing bigints and strings.
Comparing bigints and doubles.
Orderby without limit.

1)对于分区表,除非where语句中含有分区字段过滤条件来限制范围,否则不允许执行。换句话说,就是用户不允许扫描所有分区。进行这个限制的原因是,通常分区表都拥有非常大的数据集,而且数据增加迅速。没有进行分区限制的查询可能会消耗令人不可接受的巨大资源来处理这个表。
2)对于使用了order by语句的查询,要求必须使用limit语句。因为order by为了执行排序过程会将所有的结果数据分发到同一个Reducer中进行处理,强制要求用户增加这个LIMIT语句可以防止Reducer额外执行很长一段时间。
3)限制笛卡尔积的查询。对关系型数据库非常了解的用户可能期望在执行JOIN查询的时候不使用ON语句而是使用where语句,这样关系数据库的执行优化器就可以高效地将WHERE语句转化成那个ON语句。不幸的是,Hive并不会执行这种优化,因此,如果表足够大,那么这个查询就会出现不可控的情况。

17.7 JVM重用

JVM重用是Hadoop调优参数的内容,其对Hive的性能具有非常大的影响,特别是对于很难避免小文件的场景或task特别多的场景,这类场景大多数执行时间都很短。
Hadoop的默认配置通常是使用派生JVM来执行map和Reduce任务的。这时JVM的启动过程可能会造成相当大的开销,尤其是执行的job包含有成百上千task任务的情况。JVM重用可以使得JVM实例在同一个job中重新使用N次。N的值可以在Hadoop的mapred-site.xml文件中进行配置。通常在10-20之间,具体多少需要根据具体业务场景测试得出。

<property><name>mapreduce.job.jvm.numtasks</name><value>10</value><description>How many tasks to run per jvm. If set to -1, there isno limit. </description>
</property>

这个功能的缺点是,开启JVM重用将一直占用使用到的task插槽,以便进行重用,直到任务完成后才能释放。如果某个“不平衡的”job中有某几个reduce task执行的时间要比其他Reduce task消耗的时间多的多的话,那么保留的插槽就会一直空闲着却无法被其他的job使用,直到所有的task都结束了才会释放。

17.8 推测执行

在分布式集群环境下,因为程序Bug(包括Hadoop本身的bug),负载不均衡或者资源分布不均等原因,会造成同一个作业的多个任务之间运行速度不一致,有些任务的运行速度可能明显慢于其他任务(比如一个作业的某个任务进度只有50%,而其他所有任务已经运行完毕),则这些任务会拖慢作业的整体执行进度。为了避免这种情况发生,Hadoop采用了推测执行(Speculative Execution)机制,它根据一定的法则推测出“拖后腿”的任务,并为这样的任务启动一个备份任务,让该任务与原始任务同时处理同一份数据,并最终选用最先成功运行完成任务的计算结果作为最终结果。
设置开启推测执行参数:Hadoop的mapred-site.xml文件中进行配置

<property><name>mapreduce.map.speculative</name><value>true</value><description>If true, then multiple instances of some map tasks may be executed in parallel.</description>
</property><property><name>mapreduce.reduce.speculative</name><value>true</value><description>If true, then multiple instances of some reduce tasks may be executed in parallel.</description>
</property>

不过hive本身也提供了配置项来控制reduce-side的推测执行:

 <property><name>hive.mapred.reduce.tasks.speculative.execution</name><value>true</value><description>Whether speculative execution for reducers should be turned on. </description></property>

关于调优这些推测执行变量,还很难给一个具体的建议。如果用户对于运行时的偏差非常敏感的话,那么可以将这些功能关闭掉。如果用户因为输入数据量很大而需要执行长时间的map或者Reduce task的话,那么启动推测执行造成的浪费是非常巨大大。

17.9 压缩

17.9.1 Hadoop源码编译支持Snappy压缩

1)资源准备
1.CentOS联网
配置CentOS能连接外网。Linux虚拟机ping www.baidu.com 是畅通的
注意:采用root角色编译,减少文件夹权限出现问题
2.jar包准备(hadoop源码、JDK8 、maven、protobuf)
(1)hadoop-2.7.2-src.tar.gz
(2)jdk-8u144-linux-x64.tar.gz
(3)snappy-1.1.3.tar.gz
(4)apache-maven-3.0.5-bin.tar.gz
(5)protobuf-2.5.0.tar.gz

17.9.2 jar包安装

注意:所有操作必须在root用户下完成
1.JDK解压、配置环境变量JAVA_HOME和PATH,验证java-version(如下都需要验证是否配置成功)
[root@hadoop101 software] # tar -zxf jdk-8u144-linux-x64.tar.gz -C /opt/module/
[root@hadoop101 software]# vi /etc/profile
#JAVA_HOME
export JAVA_HOME=/opt/module/jdk1.8.0_144
export PATH=PATH:PATH:PATH:JAVA_HOME/bin
[root@hadoop101 software]#source /etc/profile
验证命令:java -version
2.Maven解压、配置 MAVEN_HOME和PATH
[root@hadoop101 software]# tar -zxvf apache-maven-3.0.5-bin.tar.gz -C /opt/module/
[root@hadoop101 apache-maven-3.0.5]# vi /etc/profile
#MAVEN_HOME
export MAVEN_HOME=/opt/module/apache-maven-3.0.5
export PATH=PATH:PATH:PATH:MAVEN_HOME/bin
[root@hadoop101 software]#source /etc/profile
验证命令:mvn -version

17.9.3 编译源码

1.准备编译环境
[root@hadoop101 software]# yum install svn
[root@hadoop101 software]# yum install autoconf automake libtool cmake
[root@hadoop101 software]# yum install ncurses-devel
[root@hadoop101 software]# yum install openssl-devel
[root@hadoop101 software]# yum install gcc*
2.编译安装snappy
[root@hadoop101 software]# tar -zxvf snappy-1.1.3.tar.gz -C /opt/module/
[root@hadoop101 module]# cd snappy-1.1.3/
[root@hadoop101 snappy-1.1.3]# ./configure
[root@hadoop101 snappy-1.1.3]# make
[root@hadoop101 snappy-1.1.3]# make install
查看snappy库文件
[root@hadoop101 snappy-1.1.3]# ls -lh /usr/local/lib |grep snappy
3.编译安装protobuf
[root@hadoop101 software]# tar -zxvf protobuf-2.5.0.tar.gz -C /opt/module/
[root@hadoop101 module]# cd protobuf-2.5.0/
[root@hadoop101 protobuf-2.5.0]# ./configure
[root@hadoop101 protobuf-2.5.0]# make
[root@hadoop101 protobuf-2.5.0]# make install
查看protobuf版本以测试是否安装成功
[root@hadoop101 protobuf-2.5.0]# protoc --version
4.编译hadoop native
[root@hadoop101 software]# tar -zxvf hadoop-2.7.2-src.tar.gz
[root@hadoop101 software]# cd hadoop-2.7.2-src/
[root@hadoop101 software]# mvn clean package -DskipTests -Pdist,native -Dtar -Dsnappy.lib=/usr/local/lib -Dbundle.snappy
执行成功后,/opt/software/hadoop-2.7.2-src/hadoop-dist/target/hadoop-2.7.2.tar.gz即为新生成的支持snappy压缩的二进制安装包。

17.9.4 Hadoop压缩配置

1)MR支持的压缩编码
表6-8
压缩格式 工具 算法 文件扩展名 是否可切分
DEFAULT 无 DEFAULT .deflate 否
Gzip gzip DEFAULT .gz 否
bzip2 bzip2 bzip2 .bz2 是
LZO lzop LZO .lzo 是
Snappy 无 Snappy .snappy 否
为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器,如下表所示:
表6-9
压缩格式 对应的编码/解码器
DEFLATE org.apache.hadoop.io.compress.DefaultCodec
gzip org.apache.hadoop.io.compress.GzipCodec
bzip2 org.apache.hadoop.io.compress.BZip2Codec
LZO com.hadoop.compression.lzo.LzopCodec
Snappy org.apache.hadoop.io.compress.SnappyCodec
压缩性能的比较:
表6-10
压缩算法 原始文件大小 压缩文件大小 压缩速度 解压速度
gzip 8.3GB 1.8GB 17.5MB/s 58MB/s
bzip2 8.3GB 1.1GB 2.4MB/s 9.5MB/s
LZO 8.3GB 2.9GB 49.3MB/s 74.6MB/s
http://google.github.io/snappy/
On a single core of a Core i7 processor in 64-bit mode, Snappy compresses at about 250 MB/sec or more and decompresses at about 500 MB/sec or more.
2) 压缩参数配置
要在Hadoop中启用压缩,可以配置如下参数(mapred-site.xml文件中):
表6-11
参数 默认值 阶段 建议
io.compression.codecs
(在core-site.xml中配置) org.apache.hadoop.io.compress.DefaultCodec, org.apache.hadoop.io.compress.GzipCodec, org.apache.hadoop.io.compress.BZip2Codec,
org.apache.hadoop.io.compress.Lz4Codec 输入压缩 Hadoop使用文件扩展名判断是否支持某种编解码器
mapreduce.map.output.compress false mapper输出 这个参数设为true启用压缩
mapreduce.map.output.compress.codec org.apache.hadoop.io.compress.DefaultCodec mapper输出 使用LZO、LZ4或snappy编解码器在此阶段压缩数据
mapreduce.output.fileoutputformat.compress false reducer输出 这个参数设为true启用压缩
mapreduce.output.fileoutputformat.compress.codec org.apache.hadoop.io.compress. DefaultCodec reducer输出 使用标准工具或者编解码器,如gzip和bzip2
mapreduce.output.fileoutputformat.compress.type RECORD reducer输出 SequenceFile输出使用的压缩类型:NONE和BLOCK

17.9.5 开启Map输出阶段压缩

开启map输出阶段压缩可以减少job中map和Reduce task间数据传输量。具体配置如下:
案例实操:
1.开启hive中间传输数据压缩功能
hive (default)>set hive.exec.compress.intermediate=true;
2.开启mapreduce中map输出压缩功能
hive (default)>set mapreduce.map.output.compress=true;
3.设置mapreduce中map输出数据的压缩方式
hive (default)>set mapreduce.map.output.compress.codec=
org.apache.hadoop.io.compress.SnappyCodec;
4.执行查询语句
hive (default)> select count(ename) name from emp;

17.9.6 开启Reduce输出阶段压缩

当Hive将输出写入到表中时,输出内容同样可以进行压缩。属性hive.exec.compress.output控制着这个功能。用户可能需要保持默认设置文件中的默认值false,这样默认的输出就是非压缩的纯文本文件了。用户可以通过在查询语句或执行脚本中设置这个值为true,来开启输出结果压缩功能。
案例实操:
1.开启hive最终输出数据压缩功能
hive (default)>set hive.exec.compress.output=true;
2.开启mapreduce最终输出数据压缩
hive (default)>set mapreduce.output.fileoutputformat.compress=true;
3.设置mapreduce最终数据输出压缩方式
hive (default)> set mapreduce.output.fileoutputformat.compress.codec =
org.apache.hadoop.io.compress.SnappyCodec;
4.设置mapreduce最终数据输出压缩为块压缩
hive (default)> set mapreduce.output.fileoutputformat.compress.type=BLOCK;
5.测试一下输出结果是否是压缩文件
hive (default)> insert overwrite local directory
‘/opt/module/datas/distribute-result’ select * from emp distribute by deptno sort by empno desc;

17.10 执行计划(Explain)

1.基本语法
EXPLAIN [EXTENDED | DEPENDENCY | AUTHORIZATION] query
2.案例实操
(1)查看下面这条语句的执行计划
hive (default)> explain select * from emp;
hive (default)> explain select deptno, avg(sal) avg_sal from emp group by deptno;
(2)查看详细执行计划
hive (default)> explain extended select * from emp;
hive (default)> explain extended select deptno, avg(sal) avg_sal from emp group by deptno;

18 Hive数据分析面试题

场景举例.北京市学生成绩分析.
成绩的数据格式:时间,学校,年级,姓名,科目,成绩
样例数据如下:
2013,北大,1,裘容絮,语文,97
2013,北大,1,庆眠拔,语文,52
2013,北大,1,乌洒筹,语文,85
2012,清华,0,钦尧,英语,61
2015,北理工,3,冼殿,物理,81
2016,北科,4,况飘索,化学,92
2014,北航,2,孔须,数学,70
2012,清华,0,王脊,英语,59
2014,北航,2,方部盾,数学,49
2014,北航,2,东门雹,数学,77
问题:
18.1 情景题:分组TOPN
1.分组TOPN选出 今年每个学校,每个年级,分数前三的科目.
hive -e "

select t.*
from
(
selectschool,class,subjects,score,row_number() over (partition by school,class,subjects order by score desc) rank_code
from spark_test_wx
where partition_id = "2017"
) t
where t.rank_code <= 3;

18.2 情景题:where与having
– 今年 清华 1年级 总成绩大于200分的学生 以及学生数

SELECTschool,class,NAME,sum(score) AS total_score,count(1) over (PARTITION BY school, class) nct
FROMspark_test_wx
WHEREpartition_id = "2017"
AND school = "清华"
AND class = 1
GROUP BYschool,class,NAME
HAVINGtotal_score > 200;

having是分组(group by)后的筛选条件,分组后的数据组内再筛选,也就是说HAVING子句可以让我们筛选成组后的各组数据。
where则是在分组,聚合前先筛选记录。也就是说作用在GROUP BY子句和HAVING子句前。

18.3 情景题:数据倾斜
今年加入进来了10个学校,学校数据差异很大计算每个学校的平均分。简述需要注意的点。
该题主要是考察数据倾斜的处理方式。
Group by 方式很容易产生数据倾斜。需要注意一下几点
1)Map端部分聚合
hive.map.aggr=true(用于设定是否在 map 端进行聚合,默认值为真,相当于combine)
hive.groupby.mapaggr.checkinterval=100000(用于设定 map 端进行聚合操作的条数)
2)有数据倾斜时进行负载均衡
设定hive.groupby.skewindata,当选项设定为true是,生成的查询计划有两个MapReduce任务。
在第一个MapReduce中,map的输出结果集合会随机分布到reduce中, 每个reduce做部分聚合操作,并输出结果。这样处理的结果是,相同的Group By Key有可能分发到不同的reduce中,从而达到负载均衡的目的;
第二个MapReduce任务再根据预处理的数据结果按照Group By Key分布到reduce 中(这个过程可以保证相同的Group By Key分布到同一个reduce 中),最后完成最终的聚合操作。
18.4 情景题:分区表
假设我创建了一张表,其中包含了2016年客户完成的所有交易的详细信息:CREATE TABLE transaction_details (cust_id INT, amount FLOAT, month STRING, country STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘,’ ;
现在我插入了100万条数据,我想知道每个月的总收入。
问:如何高效的统计出结果。写出步骤即可。

19 Hive中,建的表为压缩表,但是输入文件为非压缩格式,会产生怎样的现象或者结果?
使用load加载数据的时候,不会是压缩文件。使用查询插入数据的时候,存储的数据是压缩的。

20 已知表a是一张内部表,如何将它转换成外部表?请写出相应的hive语句。
alter table a set tblproperties(‘EXTERNAL’=‘TRUE’);

21 Hive中mapjoin的原理和实际应用?
原理就是在Map阶段将小表读入内存,顺序扫描大表完成Join。
通过MapReduce Local Task,将小表读入内存,生成HashTableFiles上传至Distributed Cache中,这里会对HashTableFiles进行压缩。
MapReduce Job在Map阶段,每个Mapper从Distributed Cache读取HashTableFiles到内存中,顺序扫描大表,在Map阶段直接进行Join,将数据传递给下一个MapReduce任务。
作用:优化hive语句,避免出现在Reducer端的数据倾斜。
22 订单详情表ord_det(order_id订单号,sku_id商品编号,sale_qtty销售数量,dt日期分区)任务计算2016年1月1日商品销量的Top100,并按销量降级排序

 SELECTsku_id,sale_qtty
FROMord_det PARTITION (dt = ’20160101’)
ORDER BYsale_qtty DESC
LIMIT 100;

23 某日志的格式如下:

pin|-|request_tm|-url|-|sku_id|-|amount

分隔符为‘|-|’,
数据样例为:
张三|-|q2013-11-23 11:59:30|-|www.jd.com|-|100023|-|110.15
假设本地数据文件为sample.txt,先将其导入到hive的test库的表t_sample中,并计算每个用户的总消费金额,写出详细过程包括表结构。

1.https://www.cnblogs.com/cxchanpin/p/6911286.html
org.apache.hadoop.hive.serde2.RegexSerDe

create table t_sample1(pin String,
request_tm String,
url String,
sku_id bigint,
amount double)
ROW FORMAT  SERDE 'org.apache.hadoop.hive.contrib.serde2.MultiDelimitSerDe'
WITH  SERDEPROPERTIES ("field.delim"="|-|")
stored as  textfile ;

hive-site.xml

<property><name>hive.aux.jars.path</name><value>file:////opt/module/hive-1.2.1/lib/hive-contrib-1.2.1.jar</value><description>Added by tiger.zeng on 20120202.These JAR file are available to all users for all jobs</description>
</property>

3.先清洗数据,改变其分割符变为‘\t’,存入本地文件jd.txt中(注意split()方法也是需要转义的)

CREATE external TABLE t_sample (pin string,request_tm string,url string,sku_id INT,amount DOUBLE
) ROW format delimited FIELDS TERMINATED BY "\t";
LOAD DATA LOCAL inpath '/opt/module/datas/jd.txt' INTO TABLE t_sample;
SELECT*
FROMt_sample;SELECTpin,sum(amount)
FROMt_sample
GROUP BYpin;

24 有一张很大的表:TRLOG,该表大概有2T左右

CREATE TABLE TRLOG
(   PLATFORM string,
USER_ID int,
CLICK_TIME string,
CLICK_URL string)
row format delimited fields terminated by ‘\t’;

数据:
PLATFORM USER_ID CLICK_TIME CLICK_URL
WEB 12332321 2013-03-21 13:48:31.324 /home/
WEB 12332321 2013-03-21 13:48:32.954 /selectcat/er/
WEB 12332321 2013-03-21 13:48:46.365

/er/viewad/12.html
WEB             12332321        2013-03-21 13:48:53.651     /er/viewad/13.html
……  ……  ……  ……

把上述数据处理为如下结构的表ALLOG:

CREATE TABLE ALLOG
(PLATFORM string,
USER_ID int,
SEQ int,
FROM_URL string,
TO_URL string)
row format delimited fields terminated by ‘\t’;

整理后的数据结构:
PLATFORM USER_ID SEQ FROM_URL TO_URL
WEB 12332321 1 NULL /home/
WEB 12332321 2 /home/ /selectcat/er/
WEB 12332321 3 /selectcat/er/ /er/viewad/12.html
WEB 12332321 4 /er/viewad/12.html /er/viewad/13.html
WEB 12332321 1 NULL /m/home/
WEB 12332321 2 /m/home/ /m/selectcat/fang/ PLATFORM和USER_ID还是代表平台和用户ID:SEQ字段代表用户按时间排序后的访问顺序,FROM_URL和TO_URL分别代表用户从哪一页跳转到哪一页。某个用户的第一条访问记录的FROM_URL是NULL(空值)。实现基于纯Hive SQL的ETL过程,从TRLOG表生成ALLOG表:(结果是一套SQL)

INSERT INTO TABLE allog SELECTplatform,user_id,row_number () over (PARTITION BY user_idORDER BYclick_time) seq,lag (click_url, 1) over (PARTITION BY user_idORDER BYclick_time) AS from_url,click_url AS to_url
FROMtrlog;

25 已知一个表STG.ORDER,有如下字段:Date,Order_id,User_id,amount。请给出sql进行统计:数据样例:2017-01-01,10029028,1000003251,33.57。
1)给出2017年每个月的订单数、用户数、总成交金额。

SELECTcount(Order_id) order_count,count(DISTINCT(User_id)) user_count,sum(amount) amount_sum,substring(Date, 1, 7) MONTH
FROMSTG. ORDER
WHEREsubstring(Date, 1, 4) = ’2017’
GROUP BYMONTH;

2)给出2017年11月的新客数(指在11月才有第一笔订单)。

SELECTcount(1)
FROM(SELECTOrder_id,DATE,LAG (DATE, 1) over(partition by User_id order by Date) firstOrderFROMSTG. ORDER) t1
WHEREfirstOrder IS NULL
AND SUBSTRING(DATE, 1, 7) = '2017-11';

三 Flume

1 flume有哪些组件,flume的source、channel、sink具体是做什么的

source组件是专门用来收集数据的,可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy
source组件把数据收集来以后,临时存放在channel中,即channel组件在agent中是专门用来存放临时数据的——对采集到的数据进行简单的缓存,可以存放在memory、jdbc、file等等。
sink组件是用于把数据发送到目的地的组件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、Hbase、solr、自定义。

2 你是如何实现flume数据传输的监控的

使用第三方框架Ganglia实时监控flume。

3 flume的source,sink,channel的作用?你们source是什么类型?

source组件是专门用来收集数据的,可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy
source组件把数据收集来以后,临时存放在channel中,即channel组件在agent中是专门用来存放临时数据的——对采集到的数据进行简单的缓存,可以存放在memory、jdbc、file等等。
sink组件是用于把数据发送到目的地的组件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、Hbase、solr、自定义。
监控后台日志:exec
监控后台产生日志的端口:netcat

四 Kafka

1 kafka的balance是怎么做的

分区,副本

2 kafka的消费者有几种模式

自动提交offset
手动提交offset
手动提交partition的offset

3 为什么kafka可以实现高吞吐?单节点kafka的吞吐量也比其他消息队列大,为什么?

Kafka是分布式消息系统,需要处理海量的消息,Kafka的设计是把所有的消息都写入速度低容量大的硬盘,以此来换取更强的存储能力,但实际上,使用硬盘并没有带来过多的性能损失

kafka主要使用了以下几个方式实现了超高的吞吐率

1)顺序读写
kafka的消息是不断追加到文件中的,这个特性使kafka可以充分利用磁盘的顺序读写性能,顺序读写不需要硬盘磁头的寻道时间,只需很少的扇区旋转时间,所以速度远快于随机读写
Kafka官方给出了测试数据(Raid-5,7200rpm):
顺序 I/O: 600MB/s
随机 I/O: 100KB/s
2)零拷贝
先简单了解下文件系统的操作流程,例如一个程序要把文件内容发送到网络
这个程序是工作在用户空间,文件和网络socket属于硬件资源,两者之间有一个内核空间
在操作系统内部,整个过程为:

在Linux kernel2.2 之后出现了一种叫做"零拷贝(zero-copy)"系统调用机制,就是跳过“用户缓冲区”的拷贝,建立一个磁盘空间和内存的直接映射,数据不再复制到“用户态缓冲区”
系统上下文切换减少为2次,可以提升一倍的性能

3)文件分段
kafka的队列topic被分为了多个区partition,每个partition又分为多个段segment,所以一个队列中的消息实际上是保存在N多个片段文件中

通过分段的方式,每次文件操作都是对一个小文件的操作,非常轻便,同时也增加了并行处理能力
4)批量发送
Kafka允许进行批量发送消息,先将消息缓存在内存中,然后一次请求批量发送出去
,比如可以指定缓存的消息达到某个量的时候就发出去,或者缓存了固定的时间后就发送出去 ,如100条消息就发送,或者每5秒发送一次,这种策略将大大减少服务端的I/O次数
5)数据压缩
Kafka还支持对消息集合进行压缩,Producer可以通过GZIP或Snappy格式对消息集合进行压缩,压缩的好处就是减少传输的数据量,减轻对网络传输的压力,Producer压缩之后,在Consumer需进行解压,虽然增加了CPU的工作,但在对大数据处理上,瓶颈在网络上而不是CPU,所以这个成本很值得

4 kafka的偏移量offset存放在哪儿,为什么?

从kafka-0.9版本及以后,kafka的消费者组和offset信息就不存zookeeper了,而是存到broker服务器上,所以,如果你为某个消费者指定了一个消费者组名称(group.id),那么,一旦这个消费者启动,这个消费者组名和它要消费的那个topic的offset信息就会被记录在broker服务器上
1)概述
Kafka版本[0.10.1.1],已默认将消费的 offset 迁入到了 Kafka 一个名为 __consumer_offsets 的Topic中。其实,早在 0.8.2.2 版本,已支持存入消费的 offset 到Topic中,只是那时候默认是将消费的 offset 存放在 Zookeeper 集群中。那现在,官方默认将消费的offset存储在 Kafka 的Topic中,同时,也保留了存储在 Zookeeper 的接口,通过 offsets.storage 属性来进行设置。
2)内容
其实,官方这样推荐,也是有其道理的。之前版本,Kafka其实存在一个比较大的隐患,就是利用 Zookeeper 来存储记录每个消费者/组的消费进度。虽然,在使用过程当中,JVM帮助我们完成了一些优化,但是消费者需要频繁的去与 Zookeeper 进行交互,而利用ZKClient的API操作Zookeeper频繁的Write其本身就是一个比较低效的Action,对于后期水平扩展也是一个比较头疼的问题。如果期间 Zookeeper 集群发生变化,那 Kafka 集群的吞吐量也跟着受影响。在此之后,官方其实很早就提出了迁移到 Kafka 的概念,只是,之前是一直默认存储在 Zookeeper集群中,需要手动的设置,如果,对 Kafka 的使用不是很熟悉的话,一般我们就接受了默认的存储(即:存在 ZK 中)。在新版 Kafka 以及之后的版本,Kafka 消费的offset都会默认存放在 Kafka 集群中的一个叫 __consumer_offsets 的topic中。

5 Kafka消费过的消息如何再消费

1)修改offset
2)通过使用不同的group来消费

6 Kafka里面用的什么方式拉的方式还是推的方式?如何保证数据不会出现丢失或者重复消费的情况?做过哪些预防措施,怎么解决以上问题的?Kafka元数据存在哪?

pull拉的方式
使用同步模式的时候,有3种状态保证消息被安全生产,在配置为1(只保证写入leader成功)的话,如果刚好leader partition挂了,数据就会丢失。
还有一种情况可能会丢失消息,就是使用异步模式的时候,当缓冲区满了,如果配置为0(还没有收到确认的情况下,缓冲池一满,就清空缓冲池里的消息),
数据就会被立即丢弃掉。
在数据生产时避免数据丢失的方法:
只要能避免上述两种情况,那么就可以保证消息不会被丢失。
就是说在同步模式的时候,确认机制设置为-1,也就是让消息写入leader和所有的副本。
还有,在异步模式下,如果消息发出去了,但还没有收到确认的时候,缓冲池满了,在配置文件中设置成不限制阻塞超时的时间,也就说让生产端一直阻塞,这样也能保证数据不会丢失。
在数据消费时,避免数据丢失的方法:如果使用了storm,要开启storm的ackfail机制;如果没有使用storm,确认数据被完成处理之后,再更新offset值。低级API中需要手动控制offset值。

数据重复消费的情况,如果处理?

(1)去重:将消息的唯一标识保存到外部介质中,每次消费处理时判断是否处理过;
(2)不管:大数据场景中,报表系统或者日志信息丢失几条都无所谓,不会影响最终的统计分析结果。

7 kafka支不支持事物。

支持

  1. 为什么要支持事务
    我们在Kafka中设计事务的目的主要是为了满足“读取-处理-写入”这种模式的应用程序。这种模式下数据的读写是异步的,比如Kafka的Topics。这种应用程序更广泛的被称之为流处理应用程序。
    第一代流处理应用程序可以容忍不准确的数据处理,比如,查看网页点击数量的应用程序,能够允许计数器存在一些错误(多算或者漏算)。
    然而,随着这些应用的普及,对于流处理计算语义有更多要求的需求也在增多。比如,一些金融机构部使用流处理应用来处理用户账户的借贷方,这种情况下,处理中的错误是不能容忍的,我们需要每一条消息都被处理一次,没有任何例外。
    更正式的说,如果流处理应用程序消费消息A并产生消息B,使得B=F(A),则exactlyonce则意味着仅当B成功时才认为A被消耗,反之亦然。
    当在Kafka的producer和consumer的配置属性中使用at-least-once传入语义的时候,一个流处理应用程序能够处理下面的场景:
  2.    由于内部重试,producer.send()方法使得消息B可能被重复写入。这将由Procedure的幂等特性解决,不是这篇文章其余部分的重点。
    
  3.    我们可能会对消息A进行重新处理,这会导致重复的消息B被写入,违背了exactly once的处理语义,如果流处理应用程序在B写入成功但是在A被成功标记之前崩溃,则可能会被重新处理,因此,当它恢复时,他将在此消费A并再次写入B,导致重复。
    
  4.    最后,在分布式环境中,应用程序会崩溃或者更糟,一旦和系统其它部分连接丢失,通常情况下,新的实例会自动启动以取代丢失实例。通过这个过程,可能会有多个实例处理相同的输入topic并写入相同的输出topic,从而导致重复的输出并违背exactly once的处理语义,这个我们称之为“僵尸实例”的问题。
    

我们在Kafka中设计了事务API来解决第二个和第三个问题,事务能够保证这些“读取-处理-写入”操作成为一个与那字操作并且在一个周期中保证精确处理,满足exactly once处理语义。
2. 事务语义
2.1. 多分区原子写入
事务能够保证Kafka topic下每个分区的原子写入。事务中所有的消息都将被成功写入或者丢弃。例如,处理过程中发生了异常并导致事务终止,这种情况下,事务中的消息都不会被Consumer读取。现在我们来看下Kafka是如何实现原子的“读取-处理-写入”过程的。
首先,我们来考虑一下原子“读取-处理-写入”周期是什么意思。简而言之,这意味着如果某个应用程序在某个topic tp0的偏移量X处读取到了消息A,并且在对消息A进行了一些处理(如B = F(A))之后将消息B写入topic tp1,则只有当消息A和B被认为被成功地消费并一起发布,或者完全不发布时,整个读取过程写入操作是原子的。
现在,只有当消息A的偏移量X被标记为消耗时,消息A才被认为是从topic tp0消耗的,消费到的数据偏移量(record offset)将被标记为提交偏移量(Committing offset)。在Kafka中,我们通过写入一个名为offsets topic的内部Kafka topic来记录offset commit。消息仅在其offset被提交给offsets topic时才被认为成功消费。
由于offset commit只是对Kafkatopic的另一次写入,并且由于消息仅在提交偏移量时被视为成功消费,所以跨多个主题和分区的原子写入也启用原子“读取-处理-写入”循环:提交偏移量X到offset topic和消息B到tp1的写入将是单个事务的一部分,所以整个步骤都是原子的。
2.2. 粉碎“僵尸实例”
我们通过为每个事务Producer分配一个称为transactional.id的唯一标识符来解决僵尸实例的问题。在进程重新启动时能够识别相同的Producer实例。
API要求事务性Producer的第一个操作应该是在Kafka集群中显示注册transactional.id。 当注册的时候,Kafka broker用给定的transactional.id检查打开的事务并且完成处理。 Kafka也增加了一个与transactional.id相关的epoch。Epoch存储每个transactional.id内部元数据。
一旦这个epoch被触发,任何具有相同的transactional.id和更旧的epoch的Producer被视为僵尸,并被围起来, Kafka会拒绝来自这些Procedure的后续事务性写入。
2.3. 读事务消息
现在,让我们把注意力转向数据读取中的事务一致性。
Kafka Consumer只有在事务实际提交时才会将事务消息传递给应用程序。也就是说,Consumer不会提交作为整个事务一部分的消息,也不会提交属于中止事务的消息。
值得注意的是,上述保证不足以保证整个消息读取的原子性,当使用Kafka consumer来消费来自topic的消息时,应用程序将不知道这些消息是否被写为事务的一部分,因此他们不知道事务何时开始或结束;此外,给定的Consumer不能保证订阅属于事务一部分的所有Partition,并且无法发现这一点,最终难以保证作为事务中的所有消息被单个Consumer处理。
简而言之:Kafka保证Consumer最终只能提供非事务性消息或提交事务性消息。它将保留来自未完成事务的消息,并过滤掉已中止事务的消息。
3. 事务处理Java API
事务功能主要是一个服务器端和协议级功能,任何支持它的客户端库都可以使用它。 一个Java编写的使用Kafka事务处理API的“读取-处理-写入”应用程序示例:

[java] view plain copy
KafkaProducer producer = createKafkaProducer(  “bootstrap.servers”, “localhost:9092”,  “transactional.id”, “my-transactional-id”);  producer.initTransactions();  KafkaConsumer consumer = createKafkaConsumer(  “bootstrap.servers”, “localhost:9092”,  “group.id”, “my-group-id”,  "isolation.level", "read_committed");  consumer.subscribe(singleton(“inputTopic”));  while (true) {  ConsumerRecords records = consumer.poll(Long.MAX_VALUE);  producer.beginTransaction();  for (ConsumerRecord record : records)  producer.send(producerRecord(“outputTopic”, record));  producer.sendOffsetsToTransaction(currentOffsets(consumer), group);   producer.commitTransaction();
}

第7-10行指定KafkaConsumer只应读取非事务性消息,或从其输入主题中提交事务性消息。流处理应用程序通常在多个读取处理写入阶段处理其数据,每个阶段使用前一阶段的输出作为其输入。通过指定read_committed模式,我们可以在所有阶段完成一次处理。
第1-5行通过指定transactional.id配置并将其注册到initTransactionsAPI来设置Procedure。在producer.initTransactions()返回之后,由具有相同的transactional.id的Producer的另一个实例启动的任何事务将被关闭和隔离。
第14-21行显示了“读取-处理-写入”循环的核心:读取一部分记录,启动事务,处理读取的记录,将处理的结果写入输出topic,将消耗的偏移量发送到offset topic,最后提交事务。有了上面提到的保证,我们就知道offset和输出记录将作为一个原子单位。
4. 事务工作原理
在本节中,我们将简要介绍上面介绍的事务API引入的新组件和新数据流。更详细的信息,你可以阅读原始设计文档,或观看介绍Kafka MeetUp的Sliders。
下面示例的目标是在调试使用了事务的应用程序时,如何对事务进行优化以获得更好的性能。

  1. 事务协调器和事务日志
    在Kafka 0.11.0中与事务API一起引入的组件是上图右侧的事务Coordinator和事务日志。
    事务Coordinator是每个KafkaBroker内部运行的一个模块。事务日志是一个内部的Kafka Topic。每个Coordinator拥有事务日志所在分区的子集,即, 这些borker中的分区都是Leader。
    每个transactional.id都通过一个简单的哈希函数映射到事务日志的特定分区。这意味着只有一个Broker拥有给定的transactional.id。
    通过这种方式,我们利用Kafka可靠的复制协议和Leader选举流程来确保事务协调器始终可用,并且所有事务状态都能够持久存储。
    值得注意的是,事务日志只保存事务的最新状态而不是事务中的实际消息。消息只存储在实际的Topic的分区中。事务可以处于诸如“Ongoing”,“prepare commit”和“Completed”之类的各种状态中。正是这种状态和关联的元数据存储在事务日志中。
  2. 数据流
    数据流在抽象层面上有四种不同的类型。
    A. producer和事务coordinator的交互
    执行事务时,Producer向事务协调员发出如下请求:
  3.    initTransactions API向coordinator注册一个transactional.id。 此时,coordinator使用该transactional.id关闭所有待处理的事务,并且会避免遇到僵尸实例。 每个Producer会话只发生一次。
    
  4.    当Producer在事务中第一次将数据发送到分区时,首先向coordinator注册分区。
    
  5.    当应用程序调用commitTransaction或abortTransaction时,会向coordinator发送一个请求以开始两阶段提交协议。
    

B. Coordinator和事务日志交互
随着事务的进行,Producer发送上面的请求来更新Coordinator上事务的状态。事务Coordinator会在内存中保存每个事务的状态,并且把这个状态写到事务日志中(这是以三种方式复制的,因此是持久保存的)。
事务Coordinator是读写事务日志的唯一组件。如果一个给定的Borker故障了,一个新的Coordinator会被选为新的事务日志的Leader,这个事务日志分割了这个失效的代理,它从传入的分区中读取消息并在内存中重建状态。
C. Producer将数据写入目标Topic所在分区
在Coordinator的事务中注册新的分区后,Producer将数据正常地发送到真实数据所在分区。这与producer.send流程完全相同,但有一些额外的验证,以确保Producer不被隔离。
D. Topic分区和Coordinator的交互
在Producer发起提交(或中止)之后,协调器开始两阶段提交协议。
在第一阶段,Coordinator将其内部状态更新为“prepare_commit”并在事务日志中更新此状态。一旦完成了这个事务,无论发生什么事,都能保证事务完成。
Coordinator然后开始阶段2,在那里它将事务提交标记写入作为事务一部分的Topic分区。
这些事务标记不会暴露给应用程序,但是在read_committed模式下被Consumer使用来过滤掉被中止事务的消息,并且不返回属于开放事务的消息(即那些在日志中但没有事务标记与他们相关联)。
一旦标记被写入,事务协调器将事务标记为“完成”,并且Producer可以开始下一个事务。

7. 事务实践

现在我们已经理解了事务的语义以及它们是如何工作的,我们将注意力转向利用事务编写实际应用方面。

7.1. 如何选择事务Id

transactional.id在屏蔽僵尸中扮演着重要的角色。但是在一个保持一个在Producer会话中保持一致的标识符并且正确地屏蔽掉僵尸实例是有点棘手的。
正确隔离僵尸实例的关键在于确保读取进程写入周期中的输入Topic和分区对于给定的transactional.id总是相同的。如果不是这样,那么有可能丢失一部分消息。
例如,在分布式流处理应用程序中,假设Topic分区tp0最初由transactional.idT0处理。如果在某个时间点之后,它可以通过transactional.id T1映射到另一个Producer,那么T0和T1之间就不会有栅栏了。所以tp0的消息可能被重新处理,违反了一次处理保证。
实际上,可能需要将输入分区和transactional.id之间的映射存储在外部存储中,或者对其进行静态编码。Kafka Streams选择后一种方法来解决这个问题。

7.2. 事务性能以及如何优化?

Ø Producer打开事务之后的性能
让我们把注意力转向事务如何执行。
首先,事务只造成中等的写入放大。额外的写入在于:
对于每个事务,我们都有额外的RPC向Coordinator注册分区。这些是批处理的,所以我们比事务中的partition有更少的RPC。
在完成事务时,必须将一个事务标记写入参与事务的每个分区。同样,事务Coordinator在单个RPC中批量绑定到同一个Borker的所有标记,所以我们在那里保存RPC开销。但是在事务中对每个分区进行额外的写操作是无法避免的。
最后,我们将状态更改写入事务日志。这包括写入添加到事务的每批分区,“prepare_commit”状态和“complete_commit”状态。
我们可以看到,开销与作为事务一部分写入的消息数量无关。所以拥有更高吞吐量的关键是每个事务包含更多的消息。
实际上,对于Producer以最大吞吐量生产1KB记录,每100ms提交消息导致吞吐量仅降低3%。较小的消息或较短的事务提交间隔会导致更严重的降级。
增加事务时间的主要折衷是增加了端到端延迟。回想一下,Consum阅读事务消息不会传递属于公开传输的消息。因此,提交之间的时间间隔越长,消耗的应用程序就越需要等待,从而增加了端到端的延迟。
Ø Consumer打开之后的性能
Consumer在开启事务的场景比Producer简单得多,它需要做的是:

  1.    过滤掉属于中止事务的消息。
    
  2.    不返回属于公开事务一部分的事务消息。
    

因此,当以read_committed模式读取事务消息时,事务Consumer的吞吐量没有降低。这样做的主要原因是我们在读取事务消息时保持零拷贝读取。
此外,Consumer不需要任何缓冲等待事务完成。相反,Broker不允许提前抵消包括公开事务。
因此,Consumer是非常轻巧和高效的。

8 Kafka的原理?

1) Kafka架构组件
Kafka中发布订阅的对象是topic。我们可以为每类数据创建一个topic,把向topic发布消息的客户端称作producer,从topic订阅消息的客户端称作consumer。Producers和consumers可以同时从多个topic读写数据。一个kafka集群由一个或多个broker服务器组成,它负责持久化和备份具体的kafka消息。
topic:消息存放的目录即主题
Producer:生产消息到topic的一方
Consumer:订阅topic消费消息的一方
Broker:Kafka的服务实例就是一个broker
2)Kafka Topic&Partition
消息发送时都被发送到一个topic,其本质就是一个目录,而topic由是由一些Partition Logs(分区日志)组成, 每个Partition中的消息都是有序的,生产的消息被不断追加到Partition log上,其中的每一个消息都被赋予了一个唯一的offset值。
Kafka集群会保存所有的消息,不管消息有没有被消费;我们可以设定消息的过期时间,只有过期的数据才会被自动清除以释放磁盘空间。比如我们设置消息过期时间为2天,那么这2天内的所有消息都会被保存到集群中,数据只有超过了两天才会被清除。
Kafka需要维持的元数据只有一个–消费消息在Partition中的offset值,Consumer每消费一个消息,offset就会加1。其实消息的状态完全是由Consumer控制的,Consumer可以跟踪和重设这个offset值,这样的话Consumer就可以读取任意位置的消息。
把消息日志以Partition的形式存放有多重考虑,第一,方便在集群中扩展,每个Partition可以通过调整以适应它所在的机器,而一个topic又可以有多个Partition组成,因此整个集群就可以适应任意大小的数据了;第二就是可以提高并发,因为可以以Partition为单位读写了。
3)Kafka 核心组件
(1) Replications、Partitions 和Leaders
通过上面介绍的我们可以知道,kafka中的数据是持久化的并且能够容错的。Kafka允许用户为每个topic设置副本数量,副本数量决定了有几个broker来存放写入的数据。如果你的副本数量设置为3,那么一份数据就会被存放在3台不同的机器上,那么就允许有2个机器失败。一般推荐副本数量至少为2,这样就可以保证增减、重启机器时不会影响到数据消费。如果对数据持久化有更高的要求,可以把副本数量设置为3或者更多。
Kafka中的topic是以partition的形式存放的,每一个topic都可以设置它的partition数量,Partition的数量决定了组成topic的log的数量。Producer在生产数据时,会按照一定规则(这个规则是可以自定义的)把消息发布到topic的各个partition中。上面将的副本都是以partition为单位的,不过只有一个partition的副本会被选举成leader作为读写用。
关于如何设置partition值需要考虑的因素。一个partition只能被一个消费者消费(一个消费者可以同时消费多个partition),因此,如果设置的partition的数量小于consumer的数量,就会有消费者消费不到数据。所以,推荐partition的数量一定要大于同时运行的consumer的数量。另外一方面,建议partition的数量大于集群broker的数量,这样leader partition就可以均匀的分布在各个broker中,最终使得集群负载均衡。在Cloudera,每个topic都有上百个partition。需要注意的是,kafka需要为每个partition分配一些内存来缓存消息数据,如果partition数量越大,就要为kafka分配更大的heap space。
(2)Producers
Producers直接发送消息到broker上的leader partition,不需要经过任何中介一系列的路由转发。为了实现这个特性,kafka集群中的每个broker都可以响应producer的请求,并返回topic的一些元信息,这些元信息包括哪些机器是存活的,topic的leader partition都在哪,现阶段哪些leader partition是可以直接被访问的。
Producer客户端自己控制着消息被推送到哪些partition。实现的方式可以是随机分配、实现一类随机负载均衡算法,或者指定一些分区算法。Kafka提供了接口供用户实现自定义的分区,用户可以为每个消息指定一个partitionKey,通过这个key来实现一些hash分区算法。比如,把userid作为partitionkey的话,相同userid的消息将会被推送到同一个分区。
以Batch的方式推送数据可以极大的提高处理效率,kafka Producer 可以将消息在内存中累计到一定数量后作为一个batch发送请求。Batch的数量大小可以通过Producer的参数控制,参数值可以设置为累计的消息的数量(如500条)、累计的时间间隔(如100ms)或者累计的数据大小(64KB)。通过增加batch的大小,可以减少网络请求和磁盘IO的次数,当然具体参数设置需要在效率和时效性方面做一个权衡。
Producers可以异步的并行的向kafka发送消息,但是通常producer在发送完消息之后会得到一个future响应,返回的是offset值或者发送过程中遇到的错误。这其中有个非常重要的参数“acks”,这个参数决定了producer要求leader partition 收到确认的副本个数,如果acks设置数量为0,表示producer不会等待broker的响应,所以,producer无法知道消息是否发送成功,这样有可能会导致数据丢失,但同时,acks值为0会得到最大的系统吞吐量。
若acks设置为1,表示producer会在leader partition收到消息时得到broker的一个确认,这样会有更好的可靠性,因为客户端会等待直到broker确认收到消息。若设置为-1,producer会在所有备份的partition收到消息时得到broker的确认,这个设置可以得到最高的可靠性保证。
Kafka 消息有一个定长的header和变长的字节数组组成。因为kafka消息支持字节数组,也就使得kafka可以支持任何用户自定义的序列号格式或者其它已有的格式如Apache Avro、protobuf等。Kafka没有限定单个消息的大小,但我们推荐消息大小不要超过1MB,通常一般消息大小都在1~10kB之前。
(3) Consumers
Kafka提供了两套consumer api,分为high-level api和sample-api。Sample-api 是一个底层的API,它维持了一个和单一broker的连接,并且这个API是完全无状态的,每次请求都需要指定offset值,因此,这套API也是最灵活的。
在kafka中,当前读到消息的offset值是由consumer来维护的,因此,consumer可以自己决定如何读取kafka中的数据。比如,consumer可以通过重设offset值来重新消费已消费过的数据。不管有没有被消费,kafka会保存数据一段时间,这个时间周期是可配置的,只有到了过期时间,kafka才会删除这些数据。
High-level API封装了对集群中一系列broker的访问,可以透明的消费一个topic。它自己维持了已消费消息的状态,即每次消费的都是下一个消息。
High-level API还支持以组的形式消费topic,如果consumers有同一个组名,那么kafka就相当于一个队列消息服务,而各个consumer均衡的消费相应partition中的数据。若consumers有不同的组名,那么此时kafka就相当与一个广播服务,会把topic中的所有消息广播到每个consumer。

五 Hbase

1 Hbase热点问题 读写请求会集中到某一个RegionServer上如何处理

对表采用预分区,将数据分散到不同的region中,region交给不同的regionserver来维护

2 hbase查询一条记录的方法是什么?Hbase写入一条记录的方法是什么?

Get get = new Get(rowkey);
Put put = new Put (rowkey)
Put.add…

3 Hbase优化及rowkey设计。

1) 高可用
在HBase中Hmaster负责监控RegionServer的生命周期,均衡RegionServer的负载,如果Hmaster挂掉了,那么整个HBase集群将陷入不健康的状态,并且此时的工作状态并不会维持太久。所以HBase支持对Hmaster的高可用配置。
1.关闭HBase集群(如果没有开启则跳过此步)
[atguigu@hadoop102 hbase]$ bin/stop-hbase.sh
2.在conf目录下创建backup-masters文件
[atguigu@hadoop102 hbase]$ touch conf/backup-masters
3.在backup-masters文件中配置高可用HMaster节点
[atguigu@hadoop102 hbase]$ echo hadoop103 > conf/backup-masters
4.将整个conf目录scp到其他节点
[atguigu@hadoop102 hbase]$ scp -r conf/ hadoop103:/opt/module/hbase/
[atguigu@hadoop102 hbase]$ scp -r conf/ hadoop104:/opt/module/hbase/
5.打开页面测试查看
http://hadooo102:16010
2)预分区
每一个region维护着startRow与endRowKey,如果加入的数据符合某个region维护的rowKey范围,则该数据交给这个region维护。那么依照这个原则,我们可以将数据所要投放的分区提前大致的规划好,以提高HBase性能。
1.手动设定预分区
hbase> create ‘staff1’,‘info’,‘partition1’,SPLITS => [‘1000’,‘2000’,‘3000’,‘4000’]
2.生成16进制序列预分区
create ‘staff2’,‘info’,‘partition2’,{NUMREGIONS => 15, SPLITALGO => ‘HexStringSplit’}
3.按照文件中设置的规则预分区
创建splits.txt文件内容如下:
aaaa
bbbb
cccc
dddd
然后执行:
create ‘staff3’,‘partition3’,SPLITS_FILE => ‘splits.txt’
4.使用JavaAPI创建预分区
//自定义算法,产生一系列Hash散列值存储在二维数组中
byte[][] splitKeys = 某个散列值函数
//创建HBaseAdmin实例
HBaseAdmin hAdmin = new HBaseAdmin(HBaseConfiguration.create());
//创建HTableDescriptor实例
HTableDescriptor tableDesc = new HTableDescriptor(tableName);
//通过HTableDescriptor实例和散列值二维数组创建带有预分区的HBase表
hAdmin.createTable(tableDesc, splitKeys);
3)RowKey设计
一条数据的唯一标识就是rowkey,那么这条数据存储于哪个分区,取决于rowkey处于哪个一个预分区的区间内,设计rowkey的主要目的 ,就是让数据均匀的分布于所有的region中,在一定程度上防止数据倾斜。接下来我们就谈一谈rowkey常用的设计方案。
1.生成随机数、hash、散列值
比如:
原本rowKey为1001的,SHA1后变成:dd01903921ea24941c26a48f2cec24e0bb0e8cc7
原本rowKey为3001的,SHA1后变成:49042c54de64a1e9bf0b33e00245660ef92dc7bd
原本rowKey为5001的,SHA1后变成:7b61dec07e02c188790670af43e717f0f46e8913
在做此操作之前,一般我们会选择从数据集中抽取样本,来决定什么样的rowKey来Hash后作为每个分区的临界值。
2.字符串反转
20170524000001转成10000042507102
20170524000002转成20000042507102
这样也可以在一定程度上散列逐步put进来的数据。
3.字符串拼接
20170524000001_a12e
20170524000001_93i7
4)内存优化
HBase操作过程中需要大量的内存开销,毕竟Table是可以缓存在内存中的,一般会分配整个可用内存的70%给HBase的Java堆。但是不建议分配非常大的堆内存,因为GC过程持续太久会导致RegionServer处于长期不可用状态,一般16~48G内存就可以了,如果因为框架占用内存过高导致系统内存不足,框架一样会被系统服务拖死。
5)基础优化
1.允许在HDFS的文件中追加内容
hdfs-site.xml、hbase-site.xml
属性:dfs.support.append
解释:开启HDFS追加同步,可以优秀的配合HBase的数据同步和持久化。默认值为true。
2.优化DataNode允许的最大文件打开数
hdfs-site.xml
属性:dfs.datanode.max.transfer.threads
解释:HBase一般都会同一时间操作大量的文件,根据集群的数量和规模以及数据动作,设置为4096或者更高。默认值:4096
3.优化延迟高的数据操作的等待时间
hdfs-site.xml
属性:dfs.image.transfer.timeout
解释:如果对于某一次数据操作来讲,延迟非常高,socket需要等待更长的时间,建议把该值设置为更大的值(默认60000毫秒),以确保socket不会被timeout掉。
4.优化数据的写入效率
mapred-site.xml
属性:
mapreduce.map.output.compress
mapreduce.map.output.compress.codec
解释:开启这两个数据可以大大提高文件的写入效率,减少写入时间。第一个属性值修改为true,第二个属性值修改为:org.apache.hadoop.io.compress.GzipCodec或者其他压缩方式。
5.优化DataNode存储
属性:dfs.datanode.failed.volumes.tolerated
解释: 默认为0,意思是当DataNode中有一个磁盘出现故障,则会认为该DataNode shutdown了。如果修改为1,则一个磁盘出现故障时,数据会被复制到其他正常的DataNode上,当前的DataNode继续工作。
6.设置RPC监听数量
hbase-site.xml
属性:hbase.regionserver.handler.count
解释:默认值为30,用于指定RPC监听的数量,可以根据客户端的请求数进行调整,读写请求较多时,增加此值。
7.优化HStore文件大小
hbase-site.xml
属性:hbase.hregion.max.filesize
解释:默认值10737418240(10GB),如果需要运行HBase的MR任务,可以减小此值,因为一个region对应一个map任务,如果单个region过大,会导致map任务执行时间过长。该值的意思就是,如果HFile的大小达到这个数值,则这个region会被切分为两个Hfile。
8.优化hbase客户端缓存
hbase-site.xml
属性:hbase.client.write.buffer
解释:用于指定HBase客户端缓存,增大该值可以减少RPC调用次数,但是会消耗更多内存,反之则反之。一般我们需要设定一定的缓存大小,以达到减少RPC次数的目的。
9.指定scan.next扫描HBase所获取的行数
hbase-site.xml
属性:hbase.client.scanner.caching
解释:用于指定scan.next方法获取的默认行数,值越大,消耗内存越大。
10.flush、compact、split机制
当MemStore达到阈值,将Memstore中的数据Flush进Storefile;compact机制则是把flush出来的小文件合并成大的Storefile文件。split则是当Region达到阈值,会把过大的Region一分为二。
涉及属性:
即:128M就是Memstore的默认阈值
hbase.hregion.memstore.flush.size:134217728
即:这个参数的作用是当单个HRegion内所有的Memstore大小总和超过指定值时,flush该HRegion的所有memstore。RegionServer的flush是通过将请求添加一个队列,模拟生产消费模型来异步处理的。那这里就有一个问题,当队列来不及消费,产生大量积压请求时,可能会导致内存陡增,最坏的情况是触发OOM。
hbase.regionserver.global.memstore.upperLimit:0.4
hbase.regionserver.global.memstore.lowerLimit:0.38
即:当MemStore使用内存总量达到hbase.regionserver.global.memstore.upperLimit指定值时,将会有多个MemStores flush到文件中,MemStore flush 顺序是按照大小降序执行的,直到刷新到MemStore使用内存略小于lowerLimit

① Rowkey长度原则
Rowkey 是一个二进制码流,Rowkey 的长度被很多开发者建议说设计在10~100 个字节,不过建议是越短越好,不要超过16 个字节。
原因如下:
(1)数据的持久化文件HFile 中是按照KeyValue 存储的,如果Rowkey 过长比如100 个字节,1000 万列数据光Rowkey 就要占用100*1000 万=10 亿个字节,将近1G 数据,这会极大影响HFile 的存储效率;
(2)MemStore 将缓存部分数据到内存,如果Rowkey 字段过长内存的有效利用率会降低,系统将无法缓存更多的数据,这会降低检索效率。因此Rowkey 的字节长度越短越好。
(3)目前操作系统是都是64 位系统,内存8 字节对齐。控制在16 个字节,8 字节的整数倍利用操作系统的最佳特性。
② Rowkey散列原则
如果Rowkey 是按时间戳的方式递增,不要将时间放在二进制码的前面,建议将Rowkey的高位作为散列字段,由程序循环生成,低位放时间字段,这样将提高数据均衡分布在每个Regionserver 实现负载均衡的几率。如果没有散列字段,首字段直接是时间信息将产生所有新数据都在一个 RegionServer 上堆积的热点现象,这样在做数据检索的时候负载将会集中在个别RegionServer,降低查询效率。
③ Rowkey唯一原则
必须在设计上保证其唯一性。

六 Oozie
1 有没有试过OZ调度Sqoop
将每天的MR分析结果导入mysql中。
2 有没有使用OZ调度hadoop任务
定时每天分析前一天指标MR。
七 Hadoop HA
1 HAnamenode是如何工作的?
1) HDFS-HA工作机制
通过双namenode消除单点故障
2)HDFS-HA工作要点
(1)元数据管理方式需要改变:
内存中各自保存一份元数据;
Edits日志只有Active状态的namenode节点可以做写操作;
两个namenode都可以读取edits;
共享的edits放在一个共享存储中管理(qjournal和NFS两个主流实现);
(2)需要一个状态管理功能模块
实现了一个zkfailover,常驻在每一个namenode所在的节点,每一个zkfailover负责监控自己所在namenode节点,利用zk进行状态标识,当需要进行状态切换时,由zkfailover来负责切换,切换时需要防止brain split现象的发生。
(3)必须保证两个NameNode之间能够ssh无密码登录。
(4)隔离(Fence),即同一时刻仅仅有一个NameNode对外提供服务
3) HDFS-HA自动故障转移工作机制
前面学习了使用命令hdfs haadmin -failover手动进行故障转移,在该模式下,即使现役NameNode已经失效,系统也不会自动从现役NameNode转移到待机NameNode,下面学习如何配置部署HA自动进行故障转移。自动故障转移为HDFS部署增加了两个新组件:ZooKeeper和ZKFailoverController(ZKFC)进程。ZooKeeper是维护少量协调数据,通知客户端这些数据的改变和监视客户端故障的高可用服务。HA的自动故障转移依赖于ZooKeeper的以下功能:
(1)故障检测:集群中的每个NameNode在ZooKeeper中维护了一个持久会话,如果机器崩溃,ZooKeeper中的会话将终止,ZooKeeper通知另一个NameNode需要触发故障转移。
(2)现役NameNode选择:ZooKeeper提供了一个简单的机制用于唯一的选择一个节点为active状态。如果目前现役NameNode崩溃,另一个节点可能从ZooKeeper获得特殊的排外锁以表明它应该成为现役NameNode。
ZKFC是自动故障转移中的另一个新组件,是ZooKeeper的客户端,也监视和管理NameNode的状态。每个运行NameNode的主机也运行了一个ZKFC进程,ZKFC负责:
(1)健康监测:ZKFC使用一个健康检查命令定期地ping与之在相同主机的NameNode,只要该NameNode及时地回复健康状态,ZKFC认为该节点是健康的。如果该节点崩溃,冻结或进入不健康状态,健康监测器标识该节点为非健康的。
(2)ZooKeeper会话管理:当本地NameNode是健康的,ZKFC保持一个在ZooKeeper中打开的会话。如果本地NameNode处于active状态,ZKFC也保持一个特殊的znode锁,该锁使用了ZooKeeper对短暂节点的支持,如果会话终止,锁节点将自动删除。
(3)基于ZooKeeper的选择:如果本地NameNode是健康的,且ZKFC发现没有其它的节点当前持有znode锁,它将为自己获取该锁。如果成功,则它已经赢得了选择,并负责运行故障转移进程以使它的本地NameNode为active。故障转移进程与前面描述的手动故障转移相似,首先如果必要保护之前的现役NameNode,然后本地NameNode转换为active状态。

大数据技术之大数据基础阶段考试题(二)相关推荐

  1. 大数据技术之_23_Python核心基础学习_01_计算机基础知识 + Python 入门 (9.5小时)

    大数据技术之_23_Python核心基础学习_01 第一章 计算机基础知识 1.1 课程介绍 1.2 计算机是什么 1.3 计算机的组成 1.4 计算机的使用方式 1.5 windows 的命令行 1 ...

  2. 大数据技术之_23_Python核心基础学习_04_ 异常 + 文件(3.5小时)

    大数据技术之_23_Python核心基础学习_04 第七章 异常 7.1 异常的简介和异常的处理 7.2 异常的传播 7.3 异常对象 7.4 自定义异常对象 第八章 文件 8.1 文件--打开 8. ...

  3. 大数据是什么和大数据技术十大核心原理详解

     一.数据核心原理   从"流程"核心转变为"数据"核心   大数据时代,计算模式也发生了转变,从"流程"核心转变为"数据&quo ...

  4. 大数据技术十大核心原理

    一.数据核心原理--从"流程"核心转变为"数据"核心 大数据时代,计算模式也发生了转变,从"流程"核心转变为"数据"核心 ...

  5. 数据科学与大数据技术和大数据管理与应用哪个好

    学计算机学与技术好还是学大数据好? 本人认为学大数据好一些.首先,当前计算机科学与技术和大数据这两个专业的热度都比较高,这两个专业本身也没有所谓的好坏之分,而且这两个专业本身也有非常紧密的联系,当前计 ...

  6. 大数据技术十大核心原理详解

    一.数据核心原理--从"流程"核心转变为"数据"核心 大数据时代,计算模式也发生了转变,从"流程"核心转变为"数据"核心 ...

  7. 后Hadoop时代的大数据技术思考:数据即服务

    1. Hadoop 的神话正在破灭 IBM leads BigInsights for Hadoop out behind barn. Shots heard IBM has announced th ...

  8. mongodb的delete_大数据技术之MongoDB数据删除

    本篇文章探讨了大数据技术之MongoDB数据删除,希望阅读本篇文章以后大家有所收获,帮助大家对相关内容的理解更加深入. 1.查看集合 show collections 2.向集合中添加数据 db.ru ...

  9. 2021年全国职业院校技能大赛 “大数据技术与应用”—模拟赛题(二)

    2021年全国职业院校技能大赛 "大数据技术与应用" --模拟赛题(二) 文章适合了解大数据技术与应用技能大赛 赛题.文章在编写过程中难免有疏漏和错误,欢迎大佬指出文章的不足之处: ...

  10. 【云计算与大数据技术】大数据概念和发展背景讲解(图文解释 超详细)

    一.什么是大数据 大数据是一个不断发展的概念,可以指任何体量或负载下那个超出常规数据处理方法和处理能力的数据,数据本身可以是结构化,半结构化甚至是非结构化的,随着物联网技术与可穿戴设备的飞速发展,数据 ...

最新文章

  1. python 句子中没有中文_AI伪原创,我们是认真的。[Python实现]
  2. 解决win7 64位操作系统下安装PL/SQL后连接报错问题: make sure you have the 32 bits oracle client installed
  3. GitHub被“中介”攻击了?啥是中间人攻击?
  4. 启明云端分享| RK3568核心板到底有哪些吸引眼球的地方呢
  5. Linux 下的 AddressSanitizer
  6. HDU ACM 1181 变形课 (广搜BFS + 动态数组vector)-------第一次使用动态数组vector
  7. 创建链表和遍历链表算法演示
  8. TensorFlow第八步 Nesterov's accelerated gradient descent+L2 regularization
  9. 1008.protobuf语句内存对齐的错误
  10. python连载第12篇 for循环 源码+ 答案
  11. 测试工程师职业素养:懂得拒绝无效工作
  12. 脉歌蓝牙耳机线评测_漂亮的高音质蓝牙耳机 脉歌MACAW TX-90评测
  13. 一个小问题:深度学习模型如何处理大小可变的输入
  14. 《软件工程》总结——第四章
  15. Eclipse中在refernced Libraries里面加jar包与在WEB-INF/lib下加jar包的区别
  16. android 屏蔽焦点,android – 如何在视图失去焦点时屏蔽EditText中的文本.
  17. Java开发必备软件安装大全(建议学生党初学Java开发收藏)
  18. Epicor 客制化 - 常用对象
  19. React-组件公共逻辑抽离的两种方式
  20. switch按钮文字切换

热门文章

  1. kali linux超级用户_Kali Linux操作系统将从本月发布的新版本开始默认不再使用root账户...
  2. 有一楼梯共M级,刚开始时你在第一级,若每次只能跨上一级或二级,要走上第M级,共有多少种走法?请编程实现。
  3. Python学习FAQ
  4. 记录一下,Unity多个场景来回切换
  5. rh5885v3服务器查看配置信息,rh5885 v3 服务器配置
  6. 计算机组成原理数据流通图作用,计算机组成原理-第14-15讲(第5章).ppt
  7. WIN10 电脑设置自动拨号联网任务,实现开机、断网自动重连
  8. 数据仓库---JPivot连接MySQL VS PostgreSQL
  9. 地震勘探基础(十三)之地震资料解释
  10. 深富策略:权重调整拖累指数 下周操作要谨慎