@R星校长

1 Scala

1.1 【重点】Scala六大特性

 1) 与java无缝整合
 2) 类型自动推断
 3) 并发和分布式
 4) Trait特质特性
 5) Match模式匹配
 6) 高阶函数

1.2 【重点】Scala & Java异同?

 1) 类型定义声明不同,Scala :val/var Java强类型定义。
 2) Scala中有类和对象,Java中有类。
 3) Scala、java都是是基于Jvm,scala面向对象+面向函数编程,java面向对象编程。
 4) Scala语言对于java语言简化。
 5) 语法使用不同:例如for,没有break,java中有swith,Scala中有match。

1.3 Scala类和对象、基本操作

 1) val定义常量,var定义变量,常量不可变,变量可变。
 2) Scala中自动推断类型,类型可以省略,每行后面都有分号,分号自动推断,分号可以省略。
 3) Scala类可以传参,类有了参数就有了默认的构造。object不能传参。class重写构造,this中的第一行需要先调用默认的构造。
 4) Scala类中的属性有getter setter方法。
 5) 当new一个类时,类中除了方法不执行,其他都执行。
 6) 一个scala文件中不能有重名的类和对象。
 7) 如果一个scala文件中类和对象的名称相同,这个类叫做这个对象的伴生类,这个对象叫做这个类的伴生对象,可以访问私有变量。
 8) Scala object对象相当于java中的单例,object中定义的都是静态的。
 9) 可以在object中使用applay方法实现类似object传参功能。
 10) 流程控制if…else…与java if… else…一样。
 11) 流程控制 for :

1 to 10 //1 到10 包含10
1 until (10,3)//1 到10,步长为3,不包含10
for(i <- 1 to 10){...}
for(i <- 1 to 10 ;j <- 1 to 20){...}
for(i <- 1 to 100 ;if (i%10==0))
val vector = for(i <- 1 to 100 if(i%20 ==0)) yield i

 12) 流程控制 while… do…while…

i += 1
i = i+1

1.4 函数和方法

1.4.1 方法定义

 1) 使用def定义方法。
 2) 方法参数必须指定类型。方法体的返回值类型可以省略。
 3) 默认方法将方法体中最后一行的计算结果当做返回值返回。
 4) 可以使用return返回值,但是一定要指定方法体的返回值类型。建议不需要写return。
 5) 如果方法名和方法体之间的“=”省略,无论方法最后一行返回值是什么,都会返回unit。对应的返回值会被丢弃。
 6) 如果方法体中的逻辑可以一行搞定,可以直接去掉方法的“{… …}”

1.4.2 递归方法

 1) 必须显式的指定方法的返回值类型。

1.4.3 参数有默认值的方法

 1) 参数类型后面跟上默认的值。

def fun(a:Int = 100,b:Int = 10) = {.. ...}

 2) 如何使用方法,传参时直接指定对应的变量值,例如:fun(a = 200,b=20)

1.4.4 参数可变长的方法

 1) 参数可变长方法定义:

  def fun(s:String*) = {... ...}

1.4.5 【重点】匿名函数

 1) 匿名函数定义:() => {},(a:Int,b:Int)={a+b}
 2) => :匿名函数
 3) 可以将匿名函数赋值给一个变量,下次直接调用这个变量传参后就是使用匿名函数。

1.4.6 嵌套方法

 1) 嵌套方法是方法中有方法。

1.4.7 偏应用表达式

//偏应用表达式可以使用在频繁调用一个方法,每次调用方法中大部分参数不变,少量参数改变的场景:
def showLog(date:Date,log:String) = {....}
val date = new Date()
val fun = showLog(date,_:String)
fun(“a”)
fun(“b”)

1.4.8 【重点】高阶函数

 1) 【重点】方法的参数是函数。
 2) 【重点】方法的返回是函数。
 3) 方法的参数和返回都是函数。就是前两个的合并。

1.4.9 柯里化函数

 1) 高阶函数的简化版

def fun(a:Int,b:Int)(c:Int,d:Int)={a+b+c+d}
fun(1,2)(3,4)

1.5 Scala String

 1) Scala中的String就是java中的String

1.6 Scala集合

1.6.1 Array

 1) 定义:

val array = new Array[String](长度)
val array = Array[Int](......)

 2) 遍历:for,foreach
 3) 方法:Array.concat(array*),map,flatMap,filter…
 4) 可变长:

val array = new ArrayBuffer[String]()
array.append(... ...)

1.6.2 List

 1) 定义:val list = ListString,有序不去重
 2) 遍历:for,foreach
 3) 方法:map[一对一],flatMap[一对多]
 4) 可变长:

val list = ListBuffer[String](...)
list.append(... ...)

1.6.3 Set

 1) 定义:val set = SetString,无序,去重。
 2) 遍历:for,foreach
 3) 方法:flatMap,map,fitler…
 4) 可变长:

val set = scala.collections.mutable.Set[String]()
set.add(... ...)

1.6.4 Map

 1) 定义:两种初始化值方式。

val map = Map[(String,Int)]("xx"->10,("xs",200))

 2) 遍历:

for,foreach,map.keys,map.values

 3) 方法:

val option:Option[Int] = map.get(key)
val value = option.getOrEles(xxx)
faltMap,map,filter,

 4) 可变长:

val map = scala.collections.mutable.Map[(String,Int)]()
map.put(k,v)

1.6.5 【重点】tuple

 1) tuple与集合一样,但是tuple中每个元素都有一种类型。
 2) tuple最多支持22位。最常用的是tuple2,二元组。
 3) tuple遍历:tuple.productIterator。
 4) 每个tuple都有toString方法,tuple2有swap方法。
 5) tuple取值:tuple._1/tuple._2/tuple._x
 6) tuple定义可以new,可以不new 还可以直接(xx),tuple中的每个位置都是一种类型。

1.7 【重点】Trait特质特性

 1) Trait相当于Java中的抽象类和接口整合。
 2) Trait 中可以定义常量变量,方法的实现和方法的不实现。
 3) Trait 继承多个类时,第一个关键字使用extends,之后使用with。
 4) 一个类如果继承多个Trait,Trait中有相同的方法名和变量名,需要在类中重新定义覆盖。

1.8 【重点】Match模式匹配

 1) 模式匹配格式

xx match {case i:Int =>{... }case “hello” =>{... ..}case _ =>{.... ...}
}

 2) Match不仅可以匹配值还可以匹配类型。
 3) Match从上往下匹配,匹配上之后,自动停止。
 4) Match匹配过程中有数据类型转换。
 5) case _ =>{xxx} 什么都匹配不上才匹配,放在最后一行。
 6) 方法体中只有match模式匹配,可以将xx match {xxx} 整体看成一行

1.9 偏函数

 1) 类似于java中的switch … case … ,是模式匹配的特殊情况,只能匹配一种类型,对应出来一种类型。

def  fun:PartialFunction[Int,String] = {case 1 =>{... ...;“hello”}case 2 =>{... ...;“scala”}case _ =>{“default”}
}

1.10 【重点】样例类

 1) 使用关键字case修饰的类就是样例类
 2) 样例类可以new 可以不用new。
 3) 样例类实现了toString,equles,hash…方法
 4) 样例类参数默认就是类的属性,自带getter,setter方法。

1.11 【重点】隐式转换

1.11.1 隐式值和隐式参数

 1) 使用关键字implicit修饰的值就是隐式值。
 2) 方法中参数使用implicit修饰,这个参数就是隐式参数。
 3) 一个方法中部分参数是隐式的,必须使用柯里化的方式定义,非隐式的参数放在第一个括号,隐式的参数使用implicit修改,放在后面括号中。
 4) 在同一个作用于内只能同种类型的隐式参数只能有一个。

1.11.2 【重点】隐式转换函数

 1) 使用implicit修饰的方法就是隐式转换函数。
 2) 隐式转换函数不污染类的前提下给类增加功能。
 3) 同一个作用域内不能有不同名称相同类型的隐式转换函数。

1.11.3 隐式类

 1) 使用implicit修饰的类就是隐式类。
 2) 隐式类必须定义在object或者class中。
 3) 同一个作用域内不能有相同类型不同名称的隐式类。

1.12 【重点】迭代器处理数据模式

 1) 使用迭代器处理数据模式在多次数据转换过程中可以减少内存的使用,Spark底层采用Scala源码编写,RDD的特征就是迭代器处理数据模式。

1.13 Actor 通信模型

 1) Spark1.6 之前节点底层通信使用的akka,akka底层是actor实现。
 2) Spark2.0+底层通信使用是netty。
 3) 给Actor发送消息代码【了解】
 4) Actor之间发送消息代码【了解】

2 Spark

2.1 【重点】Spark 特点

 1) Spark分布式的可以基于内存的计算框架
 2) 由于可以基于内存计算和有DAG有向无环图,Spark 速度快
 3) Spark 简单易用,java、scala、python、SQL、R
 4) Spark中的各个模块通用。
 5) 部署方式简单,Standalone、yarn、k8s

2.2 【重点】Spark 与 MR 不同点?

 1) Spark可以基于内存处理数据,MR每次落地磁盘,基于磁盘迭代。
 2) Spark有DAG有向无环图给任务做优化。
 3) Spark中有高阶封装的api,MR中只有map端和reduce端,需要手动实现两侧逻辑。
 4) Spark是粗粒度资源申请,MR是细粒度资源申请。
 5) Spark 处理数据时,数据在流转过程中默认只有一份,MR有三份。
 6) Spark Shuffle相对于MR有优化。
 7) Spark支持批处理/SQL处理/流处理,MR仅支持批处理。

2.3 Spark 技术栈

 1) Hadoop 相关:

yarn/hdfs/hive/mapreduce/storm

 2) Spark 相关:

sparkcore/sparksql/sparkstring/spark mechine learning

2.4 【重点】Spark 核心 RDD

2.4.1 RDD 五大特性

 1) RDD由一系列partition组成。
 2) 算子(函数)是作用在partition上的。
 3) RDD之间有依赖关系。
 4) 分区器是作用在K,V格式的RDD上。
 5) partition对外提供最佳的计算位置,利于数据处理的本地化。

2.4.2 RDD 注意问题

 1) 什么是K,V格式的RDD?
 RDD中的元素是一个个tuple2,RDD就是K,V格式的RDD。
 2) sc.textFile…底层读取的就是MR读取hdfs文件的方法,首先Split,每个split对应一个block,这里每个split对应一个partition。
 3) 哪里体现RDD的弹性?
  a. RDD分区数可多可少。
  b. RDD之间有依赖关系。
 4) 哪里体现RDD的分布式?
  a. partition是分布在多个节点上的。

2.5 【重点】Spark 算子

2.5.1 Transformation 类

 1) Transformatioin类算子是懒执行的,也叫作转换算子,需要action算子触发执行。

 2) transformation算子

map,mapToPair,flatMap,filter,sample,reduceByKey,sortBy,sortByKey,mapPartitions,mapPartitionWithIndex,join,leftOuterJoin,rightOuterJoin,fullOuterJoin,union,subtract,intersetion,repartition,coalesce,distinct,groupByKey,zip,zipWithIndex,mean,mapValues,flatMapToPair,mapPartitionsToPair,combineByKey,aggregateByKey,repartitionAndSortWithInPartitions,partitionBy,randomSample

2.5.2 Action 类

 1) action又叫行动算子,每个Spark applicatioin中有一个action算子就有一个job。触发Transformation类算子执行。
 2) action算子:

count,foreach,take,first,collect,reduce,top,takeOrdered,takeSample,foreachPartition,countByKey,countByValue,collectAsMap

2.5.3 【重点】持久化算子

 1) cache

  cache() = persist() = persist(StorageLevel.MEMORY_ONLY),默认将数据存在内存中。

 2) persist

  a. 可以手动指定持久化级别,常用的持久化级别:

MEMORY_ONLY
MEMORY_ONLY_SER
MEMORY_AND_DISK_SER
MEMORY_AND_DISK

  b. 避免使用“_2”级别和“DISK_ONLY”级别。

  c. cache&persist注意:

  cache 和persist都是懒执行,需要action触发执行,持久化的单位是partition。对RDD使用cache或者persist之后,可以赋值给一个变量,下次使用这个变量就是使用的持久化的算子。建议不赋值给变量。如果采用第二种方式,算子后面不能紧跟action算子

 3) checkpoint
  a. 当RDD的lineage非常长,计算逻辑非常复杂时,可以使用对RDD进行checkpoint,checkpoint主要用在保存状态上。
  b. checkpoint & persist(DISK_ONLY)区别:
   ① persist|cache 当application执行完成之后,数据会被清空,由Spark框架管理数据。
   ② checkpoint当application执行完成之后,数据不会被清空,由外部的存储系统管理,所以,checkpoint多用于状态管理。
 4) checkpoint执行流程
  当Spark job执行完成之后,Spark框架会从后往前回溯,找到checkpointRDD进行标记,回溯完成之后,Spark框架重新启动一个job,计算checkpointRDD的结果,将结果持久化到指定的目录中。
优化:对哪个RDD checkpoiont之前先cache下。

2.6 Spark 集群搭建

2.6.1 搭建 Standalone 集群

 1) 不需要重点掌握。由CDH管理。

2.6.2 基于 Yarn 搭建 Spark

 1) 只需要在提交任务的客户端上 $SPARK_HOME/conf/spark-env.sh中配置:export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

2.6.3 客户端搭建

 1) 将Spark安装包原封不动的发送到新的节点,当前节点上提交Spark任务,当前节点就是Spark的客户端。

 2) CDH管理集群时,可以指定每台节点都可以提交Spark任务。

2.7 【重点】Spark任务提交模式

2.7.1 基于Standalone

 1) client模式

  命令:

./spark-submit --master spark://mynode1:7077 --class ...jar ..
./spark-submit --master spark://mynode1:7077 --deploy-mode client --class jar ...

  过程:

  a. 在客户端提交Spark任务,Driver是在客户端启动。
  b. 客户端向Master申请资源,Master收到请求,找到满足资源的Worker节点启动Executor
  c. Executor启动之后,注册给Driver,Driver掌握计算资源。
  d. Driver发送task,监控task执行,回收结果。

  注意:

  standalone-client 模式适用于程序测试,不适用于生产环境,当在客户端提交多个Sparkapplication时,有单节点网卡流量激增问题。在客户端可以看到task的执行和结果。

 2) cluster模式
  命令:

./spark-submit --master spark://mynode1:7077 --deploy-mode cluster --class ....jar ..

  过程:

  a. 在客户端提交Spark任务,首先客户端向Master申请启动Driver。
  b. Master收到请求之后,随机找到满足资源的一台Worker节点启动Driver。
  c. Driver启动之后,向Master申请资源,Master找到满足资源的Worker节点启动Executor。
  d. Executor启动之后注册给Driver,Driver发送task,监控task执行和结果。

  注意:

  standalone-cluster模式适用于生产环境,当在客户端提交多个Spark application时,将client模式的单节点网卡流量激增问题分散到了集群。这种模式在客户端看不到task的执行和结果,要去webui中查看。

2.7.2 【重点】基于 Yarn

 1) client模式
  命令:

./spark-submit --master yarn --class ..jar ...
./spark-submit --master yarn-client --class ..jar ..
./spark-submit --master yarn --deploy-mode client --class ..jar .

  过程:

  a. 在客户端提交Spark application,首先Driver在客户端启动。
  b. 客户端向ResourceManager申请启动ApplicationMaster。
  c. RM收到启动请求之后,随机找到一台NodeManager启动AM。
  d. AM启动之后,向RM申请资源用于启动Executor。
  e. RM收到请求,返回给AM一批NM节点。AM连接NM节点启动Executor。
  f. Executor启动之后,注册给Driver,Driver发送task,监控task执行和回收结果。

  注意:

  yarn-client 模式适用于程序测试,不适用于生产环境,当在客户端提交多个Sparkapplication时,有单节点网卡流量激增问题。在客户端可以看到task的执行和结果。

 2) cluster模式

  命令:

./spark-submit --master yarn-cluster --class ..jar ...
./spark-submit --master yarn --deploy-mode cluster --class ..jar ...

  过程:

  a. 在客户端提交Spark application,客户端向ResourceManager申请启动ApplicationMaster。
  b. RM收到启动请求之后,随机找到一台NodeManager启动AM,AM这里相当于Driver。
  c. AM启动之后,向RM申请资源用于启动Executor。
  d. RM收到请求,返回给AM一批NM节点,AM连接NM节点启动Executor。
  e. Executor启动之后,注册给AM,AM发送task,监控task执行和回收结果。

  注意:

  yarn-cluster模式适用于生产环境,当在客户端提交多个Spark application时,将client模式的单节点网卡流量激增问题分散到了集群。这种模式在客户端看不到task的执行和结果,要去webui中查看。

2.8 Spark中的术语

2.8.1 资源层面

  Standalone:Master->Worker->Executor->ThreadPool
  Yarn:ResourceManager->NodeManager->Executor->ThreadPool

2.8.2 任务层面

  Applicatoin->job->Stage->tasks

2.9 【重点】RDD的宽窄依赖

2.9.1 RDD的宽依赖(shuffle依赖)

  父RDD partition与子RDD partition之间的关系是一对多

2.9.2 RDD的窄依赖

  父RDD partition与子RDD partition之间的关系是一对一
  父RDD partition与子RDD partition之间的关系是多对一

2.10 【重点】Spark Stage

 1) Stage是由一组并行的task组成。
 2) Stage的并行度是由Stage中finalRDD的partition个数决定。
 3) Stage处理数据模式迭代器处理数据模式,也叫pipeline管道处理数据模式。
 4) 管道中的数据何时落地?
  a. shuffle write时
  b. 对RDD进行持久化时
 5) 如何提高Stage中的并行度?
  a. sc.textFile(xx,minnumPartition)
  b. sc.parallelize(xx,num)
  c. sc.makeRDD(num)
  d. sc.parallelizePairs(num)
  e. spark.default.parallelism
  f. spark.sql.shuffle.partitions =200
  g. 自定义分区器:kvRDD.partitionBy(new Partitioner…)
  h. reduceByKey(xx,num)|join(xx,num)|repartition|coalesce
  i. SparkStreaming + Kafak Direct模式:与读取的topic的partition个数一致

2.11 【重点】Spark 资源调度和任务调度

2.11.1 资源调度

 1) 集群启动,Worker向Master汇报资源,Master掌握集群资源信息。
 2) 在客户端提交Spark任务,当new SparkContext时,会创建两个对象DAGScheduler 和 TaskScheduler ,TaskScehduler向Master申请资源。
 3) Master找到满足资源的Worker节点启动Executor。
 4) Executor注册给Driver,Driver掌握一批计算资源节点。

2.11.2 任务调度

 5) 当Spark Application有一个action算子,就有一个job,每个job中都有RDD宽窄依赖关系形成的一个DAG有向无环图。
 6) DAGScheduler负责按照RDD的宽窄依赖关系,切割job 划分stage,将Stage以taskSet的形式提交给TaskScheduler。
 7) TaskScheduler获取TaskSet,遍历出一个个的Task发送到Executor的线程池中执行。
 8) Driver 监控task执行,回收结果。

2.11.3 注意问题

 1) TaskScheduler 可以重试失败的task,重试3次,如果task依然失败,由DAGScheduler重试task所在的Stage,重试4次,如果task执行依然失败,当前stage所在的job就失败了,job失败,application就失败了。
 2) TaskScheduer还可以重试执行缓慢的task,推测执行,默认是关闭的,对于ETL的数据要把推测执行关闭。–了解

2.12 【重点】粗粒度资源申请和细粒度资源申请

2.12.1 粗粒度资源申请-Spark

  Application执行之前首先将所有的资源申请完毕,Application每个job执行时就不需要申请资源,直接使用申请好的资源,job执行快,application也执行快,当最后一个job执行完成之后,这批资源才会被释放。
  优点:job执行快,application执行快。
  缺点:集群的资源不能充分利用

2.12.2 细粒度资源申请-mapreduce

  Application执行后,由每个job自己申请资源,自己释放资源,每个job执行完成之后,资源立即被释放,由于每个job自己申请资源,释放资源,application执行速度相对慢。
  优点:集群资源可以充分利用
  缺点:任务执行相对慢

2.13 Spark-submit 提交参数

 1) --master:指定Spark资源调度模式,yarn,standalone,local
 2) --deploy-mode:指定Spark任务提交的运行的模式,默认是client
 3) --conf:指定Spark运行配置
 4) --name:指定Spark application 运行的名称
 5) --driver-class-path:指定Driver端使用的jar包
 6) --jars:指定Driver端和Executor端使用的jar包,包含–driver-class-path的jar包。
 7) --files:指定Executor端使用的配置文件
例如:./spark-submit --master yarn-cluster --files hive-site.xml --class …jar
 8) --supervise:Driver失败重新启动Driver。
 9) --driver-cores:指定启动一个Driver使用的core。
 10) --driver-memory:指定启动一个Driver使用的内存。
 11) --executor-cores :指定启动一个Executor使用的core。
 12) --executor-memory:指定启动一个Executor使用的内存。
 13) --total-executor-cores:standalone模式下,提交一个Spark application 总共使用的core。
 14) --num-executors:yarn模式下,提交一个Spark application 启动Executor,默认2。

2.14 Spark 源码

2.14.1 源码

 1) Spark Master启动。
 2) Spark Worker启动。
 3) Spark-Submit任务提交。
 4) Spark任务提交,Driver启动。
 5) Spark向Master申请资源进行资源调度。【重点】
 6) Spark action算子进行任务调度。

2.14.2 【重点】资源调度结论

 1) Executor是在集群中分散启动。利于数据处理的本地化。
 2) 如果不指定任何参数,Spark集群为当前的application 每个Worker启动一个Executor,当前Executor使用当前节点的所有的core和1G内存。
 3) 如果想要在一台worker节点上启动多个Executor,需要指定–executor-cores。
 4) 指定–total-executor-core来限制一个application使用的总core。
 5) Executor启动不仅和core有关还和内存有关。

2.15 PV&UV 案例

 pv:page view,页面浏览量。
 uv:unquie vistor 独立访客数。

2.16 【重点】Spark 二次排序

  借助对象,将数据封装对象,对象实现comparable接口,实现compareTo方法,然后使用sortByKey来进行排序。

2.17 【重点】Spark 分组取 topN

 1) 使用原生集合排序。
 2) 使用定长数组的方式。

2.18 【重点】Spark 广播变量

  当Executor端使用到Driver端的变量副本,如果不使用广播变量,每个Executor中有多少task就有多少Driver端的变量副本,如何使用广播变量,每个Executor中只有一份Driver端的变量副本。

  注意问题:

  1) 不能将RDD广播出去,可以将RDD的结果广播出去。
  2) 广播变量只能在Driver端定义初始化。
  3) 在Executor中不能改变广播变量的值。

2.19 【重点】Spark 累加器

 1) 相当于集群中统筹变量。
 2) 累加器只能在Driver端定义,在Executor端使用。
 3) 自定义累加器,自定义类继承AccumulatorV2[IN,OUT]
  iszero()–判断每个partition中的累加器的值是否是初始值,判断结果必须和reset中保持一致。
  copy() --将累加器复制到每个分区中。
  reset() --给每个partition中的累加器做初始值。
  add() --发生在Executor端,每个RDD的partition累加数据。
  merge() --发生在Driver,将每个分区中的累加器结果累加。
  vale() --返回最后的累加器的值。

2.20 SparkWebUI

 1) Spark-shell 提交Spark任务。
 2) Spark WEBUI
  a. 如何查看任务有数据倾斜
  b. 如何查看task的gc情况
  c. 如何查看task的执行时间。
  d. 查看executor的执行日志。
  e. 查看持久化的数据。

2.21 Spark 历史日志服务器搭建

  历史日志服务器的端口:http://node:18080

2.22 【重点】端口

50070  -- HDFS
8088    --Yarn
8020    --HDFS RPC
9000    --非高可用RPC
2181    --ZOOKEEPER
6379    --redis
9092    --kafka
8080    --spark Master
18080   --spark历史日志服务器
8081    --Spark worker | flink | azkaban
4040    --Spark application
60010   --HBase 1.0版本之前 webui
16010   --HBase 1.0+ 之后webui
19888   --mr历史日志服务器
9083    --hive metastore
7077    --Spark 提交任务
7070    --kylin webui
7180    --cloudera manager WEBUI
21000   --impala 端口
8888    --hue的通信端口

2.23 Master HA -针对 Standalone

  只是针对Standalone集群,yarn集群中RS有高可用。使用zookeeper来协调搭建。

  搭建步骤:

 1) 在每台节点的 spark-env.sh 中配置:

export SPARK_DAEMON_JAVA_OPTS =
"-Dspark.deploy.recoveryMode=xxx        指定HA恢复模式
-Dspark.deploy.zookeeper.url=xxx   指定zooekeeper的集群节点
-Dspark.deploy.zookeeper.dir=xxx" zookeeper存储数据的路径

 2) 选择一台standby-Master配置spark-env.sh

SPARK_MASTER_HOST=XXX

 3) 在Master-Alive节点启动Spark集群 : start-all.sh
 4) 在Master-standby节点启动Master : start-master.sh
 5) 切换验证Master-HA

 注意:

 1) Spark MasterHA切换影响提交Driver。对用户透明。
 2) Spark MasterHA切换不影响集群中运行的任务。

2.24 【重点】SparkShuffle-SortShuffleManager

2.24.1 普通机制:

  产生磁盘小文件个数 : 2*M(map task 个数)

  过程:

  a. map task处理完的数据首先写往一个5M的内存数据结构
  b. SortShuffle中有估算内存机制,如果下次写入的数据大于了内存结构中剩余的内存,会申请内存,申请内存 = 2*估算 - 当前
  c. 如何申请到内存就继续写往内存,如果申请不到,就溢写磁盘,溢写磁盘有排序。
  d. 多次溢写磁盘的文件,最终会合并成一个文件和一个索引文件,合并过程中有排序。

2.24.2 bypass 机制:

 产生磁盘小文件个数:2*M

 过程:

  与sortShuffleManager中的普通机制一样,只是溢写磁盘没有排序,线性的将数据写往一个文件,没有排序。

 对于只是将数据打散的场景可以使用bypass机制,触发条件:
  a. spark.shuffle.sort.bypassMergeThreshold=200 ,需要reduce task个数小于等于spark.shuffle.sort.bypassMergeThreshold值。
  b. map端不能有预聚合操作。

2.25 【重点】shuffle文件寻址

2.25.1 对象:

 MapOutputTracker 管理磁盘小文件,主从结构对象
  MapOutputTrackerMaster – Driver
  MapOutputTrackerWorker – Executor
 BlockManager 块管理,主从结构对象
  BlockManagerMaster – Driver
   DiskStore:管理磁盘数据
   MemoryStore:管理内存数据
   BlockTransferService:负责拉取数据
  BlockManagerSlaves – Executor
   DiskStore:管理磁盘数据
   MemoryStore:管理内存数据
   BlockTransferService:负责拉取数据

2.25.2 【重点】Shuffle 文件寻址过程

 1) Map task处理完成数据之后,将数据结果和位置信息封装到一个MapStatus对象中,通过MapOutputTrackerWoker汇报给Driver中的MapOutputTrackerMaster,Driver掌握了一批数据文件的信息。
 2) Reduce task处理数据之前,首先向Driver获取数据位置信息,由blockManager连接数据所在的节点。
 3) 连接上之后,由blockTransferService启动5个线程拉取数据,默认一次拉取48M。拉取的数据存放在Executor的shuffle内存中。

2.25.3 reduce oom 问题:

 1) 减少拉取的数据量
 2) 增加Executor内存
 3) 增加Executor总体内存

2.26 【重点】Spark内存管理-统一内存管理

 300M:预留内存
 (总-300M):
  0.4:task运行内存 (Spark1.6 0.25)
  0.6:–spark.memory.fraction (Spark1.6 0.75)
   0.5:shuffle聚合内存
   0.5:RDD缓存+广播变量 --spark.memory.storageFraction

2.27 Spark Shuffle参数

 以下参数可以设置在SparkConf/spark-defaults.xml中/设置–conf ,建议spark-submit 配置。
 1) spark.reducer.maxSizeInFlight 48M
  blockTransferService一次拉去数据量。可以增加拉去缓存量,减少节点之间拉取数据次数。
 2) spark.shuffle.io.maxRetries 3
  拉取数据失败时,重试task次数。
 3) spark.shuffle.io.retryWait 5s
  两次task重试的间隔。
 4) spark.shuffle.sort.bypassMergeThreshold 200
  reduce task个数小于此值,开启bypass机制。

3 Kylin 配置

 1. kylin.metadata.url
  指定元数据库路径,默认值为 kylin_metadata@hbase

 2. kylin.metadata.sync-retries
  指定元数据同步重试次数,默认值为 3

 3. kylin.env.hdfs-working-dir
  指定 Kylin 服务所用的 HDFS 路径,默认值为 /kylin,请确保启动 Kylin 实例的用户有读写该目录的权限

 4. kylin.env
  指定 Kylin 部署的用途,参数值可选DEV,QA,PROD,默认值为DEV,在 DEV 模式下一些开发者功能将被启用。开发环境为DEV,测试环境为QA,生产环境为PROD

 5. kylin.env.zookeeper-base-path
  指定Kylin服务所用的ZooKeeper路径,默认值为/kylin。kylin构建cube的一些字典和任务元数据会存放在Hbase依赖的zookeeper中

 6. kylin.server.mode
  指定Kylin实例的运行模式,参数值可选 all,job,query,默认值为all,job 模式代表该服务仅用于任务调度,不用于查询;query 模式代表该服务仅用于查询,不用于构建任务的调度;all 模式代表该服务同时用于任务调度和 SQL 查询。

 7. kylin.web.query-timeout
  设置webui提交任务超时时间,默认为300秒

 8. kylin.source.hive.client
  指定 Hive 命令行类型,参数值可选 cli 或 beeline,默认值为 cli

 9. kylin.storage.hbase.table-name-prefix
  指定向Hbase中存储结果数据表前缀,默认值为 KYLIN_

 10. kylin.storage.hbase.namespace
  指定 HBase 存储默认的 namespace,默认值为 default

4 Kylin 使用

4.1 名词解释

  cuboid:维度的任意组合。
  cube:所有的维度组合,包含所有的cuboid。

4.2 创建 cube

4.2.1 新建项目

 1. 由顶部菜单栏进入 Model 页面,然后点击 Manage Projects。

 2. 点击“+Project”按钮添加一个新的项目。
 3. 填写下列表单并点击submit按钮提交请求。
 4. 成功后,底部会显示通知。

4.2.2 同步 Hive 表

 1. 在顶部菜单栏点击 Model,然后点击左边的Data Source标签,它会列出所有加载进 Kylin 的表,点击 Load Table 按钮。
 2. 输入表名并点击 Sync 按钮提交请求。
 3. 【可选】如果你想要浏览 hive 数据库来选择表,点击 Load Table From Tree 按钮。
 4. 【可选】展开数据库节点,点击选择要加载的表,然后点击 Sync 按钮。
 5. 成功的消息将会弹出,在左边的 Tables 部分,新加载的表已经被添加进来。点击表将会展开列。
 6. 在后台,Kylin 将会执行 MapReduce 任务计算新同步表的基数(cardinality),任务完成后,刷新页面并点击表名,基数值将会显示在表信息中。

4.2.3 新建 Data Model

  创建 cube 前,需定义一个数据模型。数据模型定义了一个星型(star schema)或雪花(snowflake schema)模型。一个模型可以被多个 cube 使用。
 1.点击顶部的 Model ,然后点击 Models 标签。点击 +New 按钮,在下拉框中选择 New Model。

 2.输入 model 的名字和可选的描述。
 3.在 Fact Table 中,为模型选择事实表。
 4.【可选】点击 Add Lookup Table 按钮添加一个 lookup 表。选择表名和关联类型(内连接或左连接)
 5.点击 New Join Condition 按钮,左边选择事实表的外键,右边选择 lookup 表的主键。如果有多于一个 join 列重复执行。
 6.点击 “OK”,重复4,5步来添加更多的 lookup 表。完成后,点击 “Next”。

 7.Dimensions 页面允许选择在子 cube 中用作维度的列,然后点击Columns列,在下拉框中选择需要的列。
 8.点击 “Next” 到达 “Measures” 页面,选择作为 measure 的列,其只能从事实表中选择。
 9.点击 “Next” 到达 “Settings” 页面,如果事实表中的数据每日增长,选择 Partition Date Column 中相应的 日期列以及日期格式,否则就将其留白。

 10.【可选】选择是否需要 “time of the day” 列,默认情况下为 No。如果选择 Yes, 选择 Partition Time Column 中相应的 time 列以及 time 格式
 11.【可选】如果在从 hive 抽取数据时候想做一些筛选,可以在 Filter 中输入筛选条件。

 12.点击 Save 然后选择 Yes 来保存 data model。创建完成,data mod就会列在左边 Models 列表中。

4.2.4 新建 Cube

  创建完 data model,可以开始创建 cube。点击顶部 Model,然后点击Models 标签。点击 +New 按钮,在下拉框中选择 New Cube。

步骤1. Cube 信息

  选择 data model,输入 cube 名字;点击 Next 进行下一步。cube 名字可以使用字母,数字和下划线(空格不允许)。Notification Email List 是运用来通知job执行成功或失败情况的邮箱列表。Notification Events 是触发事件的状态。
步骤2. 维度

  点击 Add Dimension,在弹窗中显示的事实表和 lookup 表里勾选输入需要的列。Lookup 表的列有2个选项:“Normal” 和 “Derived”(默认)。“Normal” 添加一个普通独立的维度列,“Derived” 添加一个 derived 维度,derived 维度不会计算入 cube,将由事实表的外键推算出。
  选择所有维度后点击 “Next”。

步骤3. 度量

 1.点击 +Measure 按钮添加一个新的度量。
 2.根据它的表达式共有8种不同类型的度量:SUM、MAX、MIN、COUNT、COUNT_DISTINCT TOP_N, EXTENDED_COLUMN 和 PERCENTILE。请合理选择 COUNT_DISTINCT 和 TOP_N 返回类型,它与 cube 的大小相关。

  • SUM
  • MIN
  • MAX
  • COUNT
  • DISTINCT_COUNT

 这个度量有两个实现:

 1) 近似实现 HyperLogLog,选择可接受的错误率,低错误率需要更多存储

 2) 精确实现 bitmap
  注意:distinct 是一种非常重的数据类型,和其他度量相比构建和查询会更慢。

  • TOP_N

  TopN 度量在每个维度结合时预计算,它比未预计算的在查询时间上性能更好;需要两个参数:一是被用来作为 Top 记录的度量列,Kylin 将计算它的 SUM 值并做倒序排列;二是 literal ID,代表最 Top 的记录,例如 seller_id;

  合理的选择返回类型,将决定多少 top 记录被监察:top 10, top 100, top 500, top 1000, top 5000 or top 10000。

  注意:如果您想要使用 TOP_N,您需要为 “ORDER | SUM by Column” 添加一个 SUM 度量。例如,如果您创建了一个根据价格的总和选出 top100 的卖家的度量,那么也应该创建一个 SUM(price) 度量。

  • EXTENDED_COLUMN

  Extended_Column 作为度量比作为维度更节省空间。一列和另一列可以生成新的列。

  • PERCENTILE

  Percentile 代表了百分比。100为最合适的值。
步骤4. 更新设置

  这一步骤是为增量构建 cube 而设计的。

  • Auto Merge Thresholds: 自动合并小的 segments 到中等甚至更大的 segment。如果不想自动合并,删除默认2个选项。

  • Volatile Range: 默认为0,会自动合并所有可能的 cube segments,或者用 ‘Auto Merge’ 将不会合并最新的 [Volatile Range] 天的 cube segments。

  • Retention Threshold: 只会保存 cube 过去几天的 segment,旧的 segment 将会自动从头部删除;0表示不启用这个功能。

  • Partition Start Date: cube 的开始日期.
    步骤5. 高级设置

  • Aggregation Groups: Cube 中的维度可以划分到多个聚合组中。默认 kylin 会把所有维度放在一个聚合组,当维度较多时,产生的组合数可能是巨大的,会造成 Cube 爆炸;如果你很好的了解你的查询模式,那么你可以创建多个聚合组。在每个聚合组内,使用 “Mandatory Dimensions”, “Hierarchy Dimensions” 和 “Joint Dimensions” 来进一步优化维度组合。

  • Mandatory Dimensions: 必要维度,用于总是出现的维度。例如,如果你的查询中总是会带有 “ORDER_DATE” 做为 group by 或 过滤条件, 那么它可以被声明为必要维度。这样一来,所有不含此维度的 cuboid 就可以被跳过计算。

  • Hierarchy Dimensions: 层级维度,例如 “国家” -> “省” -> “市” 是一个层级;不符合此层级关系的 cuboid 可以被跳过计算,例如 [“省”], [“市”]. 定义层级维度时,将父级别维度放在子维度的左边。

  • Joint Dimensions:联合维度,有些维度往往一起出现,或者它们的基数非常接近(有1:1映射关系)。例如 “user_id” 和 “email”。把多个维度定义为组合关系后,所有不符合此关系的 cuboids 会被跳过计算。

  • Rowkeys: 是由维度编码值组成。”Dictionary” (字典)是默认的编码方式; 字典只能处理中低基数(少于一千万)的维度;如果维度基数很高(如大于1千万), 选择 “false” 然后为维度输入合适的长度,通常是那列的最大长度值; 如果超过最大值,会被截断。请注意,如果没有字典编码,cube 的大小可能会非常大。
    你可以拖拽维度列去调整其在 rowkey 中位置; 位于rowkey前面的列,将可以用来大幅缩小查询的范围。通常建议将 mandantory强制维度放在开头,然后是在过滤 ( where 条件)中起到很大作用的维度;如果多个列都会被用于过滤,将高基数的维度(如 user_id)放在低基数的维度(如 age)的前面, 这样防止数据在Hbase中有倾斜。

  • Mandatory Cuboids: 维度组合白名单。确保你想要构建的 cuboid 能被构建。

  • Cube Engine: cube 构建引擎。有两种:MapReduce 和 Spark。如果你的 cube 只有简单度量(SUM, MIN, MAX),建议使用 Spark。如果 cube 中有复杂类型度量(COUNT DISTINCT, TOP_N),建议使用 MapReduce。

  • Advanced Dictionaries: “Global Dictionary” 是用于精确计算 COUNT DISTINCT 的字典, 它会将一个非 integer的值转成 integer,以便于 bitmap 进行去重。如果你要计算 COUNT DISTINCT 的列本身已经是 integer 类型,那么不需要定义Global Dictionary。Global Dictionary 会被所有 segment 共享,因此支持在跨 segments 之间做上卷去重操作。请注意,Global Dictionary 随着数据的加载,可能会不断变大。
    “Segment Dictionary” 是另一个用于精确计算 COUNT DISTINCT 的字典,与 Global Dictionary 不同的是,它是基于一个 segment 的值构建的,因此不支持跨 segments 的汇总计算。如果你的 cube 不是分区的或者能保证你的所有 SQL 按照 partition_column 进行 group by, 那么你应该使用 “Segment Dictionary” 而不是 “Global Dictionary”,这样可以避免单个字典过大的问题。
    请注意:”Global Dictionary” 和 “Segment Dictionary” 都是单向编码的字典,仅用于 COUNT DISTINCT 计算(将非 integer 类型转成 integer 用于 bitmap计算),他们不支持解码,因此不能为普通维度编码。

  • Advanced Snapshot Table: 为全局 lookup 表而设计,提供不同的存储类型。

  • Advanced ColumnFamily: 如果有超过一个的COUNT DISTINCT 或 TopN 度量, 你可以将它们放在更多列簇中,以优化与HBase 的I/O。

步骤6. 重写配置

  Kylin 允许在 Cube 级别覆盖部分 kylin.properties 中的配置,你可以在这里定义覆盖的属性。如果你没有要配置的,点击 Next 按钮。
步骤7. 概览 & 保存

  你可以概览你的 cube 并返回之前的步骤进行修改。点击 Save 按钮完成 cube 创建。
  恭喜,到此为止,cube 创建完成。

4.3 构建 cube

  我们这里以sample.sh导入的“learn-kylin”工程为例构建cube。

4.3.1 构建 cube

 1. 在Models页面中,点击 cube 栏右侧的Action下拉按钮并选择Build操作。
 2. 选择后会出现一个弹出窗口,点击 Start Date 或者 End Date 输入框选择这个增量 cube 构建的起始日期。
  可以在hive中查询下kylin_sales表中的最大最小时间:

1.   select min(part_dt) from kylin_sales; #结果为:2012-01-01
2.  select max(part_dt) from kylin_sales; #结果为:2014-01-01

  这里设置Start Date为2012-01-01,End Date为2013-01-01,并提交任务,提交之后,在monitor中可以看到可以看到job执行。
 3. 新建的 job 是 “pending” 状态;一会儿,它就会开始运行并且你可以通过刷新 web 页面或者点击刷新按钮来查看进度。
 4. 等待 job 完成。期间如要放弃这个 job,点击 Actions->Discard 按钮。
 5. 等到job 100%完成,cube 的状态就会变为 “Ready”,意味着它已经准备好进行 SQL 查询。在Model页,找到 cube,然后点击 cube 名展开消息,在 “Storage” 标签下,列出 cube segments。每一个 segment 都有 start/end 时间;Hbase 表的信息也会列出。如果有更多的源数据,重复以上的步骤将它们构建进 cube。每次构建会生成一个Segment。
 6. 查询SQL,测试速度

  分别在Hive和Kylin中执行如下SQL语句:

1.   select part_dt, sum(price) as total_sold, count(distinct seller_id) as sellers
2.  from kylin_sales
3.  group by part_dt
4.  order by part_dt

  Hive中执行时,需要加上对应的时间,SQL语句如下:

select part_dt, sum(price) as total_sold, count(distinct seller_id) as sellers
from kylin_sales
where part_dt >=to_date('2012-01-01 00:00:00') and part_dt < to_date('2013-01-01 00:00: 00')
group by part_dt
order by part_dt;

  结果和时间如下:
  在Kylin中执行如下语句,结果如下:

4.3.2 cube 执行流程

  • 构建一个中间平表(Hive Table):将Model中的fact表和look up表构建成一个大的Flat Hive Table。
  • 重新分配Flat Hive Tables。
  • 从事实表中抽取维度的Distinct值。
  • 对所有维度表进行压缩编码,生成维度字典。
  • 计算和统计所有的维度组合,并保存,其中,每一种维度组合,称为一个Cuboid。
  • 创建HTable。
  • 构建最基础的Cuboid数据。
  • 利用算法构建N维到0维的Cuboid数据。
  • 构建Cube。
  • 将Cuboid数据转换成HFile。
  • 将HFile直接加载到HBase Table中。
  • 更新Cube信息。
  • 清理Hive。

4.3.3 合并 Segment

  这里重新build cube,选择时间为2013-01-01 00:00:00 至2014-01-01 00:00:00。如下图示:
  会生成新的job,可以在monitor页面查看,等待新的任务执行完成之后,可以点击Cube查看当前cube有两个Segment,这里每个Segment对应Hbase中的一张表。
  再次执行查询SQL时,将会扫描这两个Segment,也就是会扫描Hbase中两张表,这样如果Segment很多时,需要将多个Segment进行合并,会使多个Hbase表进行合并到一张Hbase表中,这样再次查询时,只需要扫描一张Hbase表即可。

  合并Segment:

 1. 点击Cube对应的Action -> Merge:
 2. 选择需要合并的Segment片段,点击Submit
 3. submit之后生成新的job,执行完成之后,查看新的Segment。
  Segment合并之后,会将原来的合并的Segment段全部删除。

4.3.4 job 监控

  在Monitor页面,点击job详情按钮查看显示于右侧的详细信息。
  job 详细信息为跟踪一个 job 提供了它的每一步记录。你可以将光标停放在一个步骤状态图标上查看基本状态和信息。

5 Kylin 构建 Cube 算法

  Kylin中Cube的思想是用空间换时间, 通过预先的计算,把索引及结果存储起来,以换取查询时候的高性能。在Kylin v1.5以前,Kylin中的Cube只有一种算法:layered cubing,也称逐层算法,它是逐层由底向上,把所有组合算完的过程。Kylin v1.5以后,推出Fast Cubing,也称快速数据立方算法,是一个新的Cube算法。

5.1 layered cubing

5.1.1 基于 MR

  这个算法的对cube的计算就像它的名字一样是按layer进行的。
  以一个n维cube(即事实表有n个维度)为例:
  player-1:以source data(源数据)为基础计算出一个n维的cuboid;
  player-2:以上一层的n维cuboid维基础,计算出n个n-1维的cuboid;
  … …
  player-k+1:以上一层的n-k+1维cuboid为基础,计算出n!/[(n-k)!k!]=个n-k维的cuboid;
  … …
  player-n+1:以上一层的1维cuboid为基础,计算出1个0维的cuboid。
  如下图:
  用官网上一个4维cube的例子来说明一下具体过程。

  在player-1,根据源数据得到1个4-D的cuboid;然后cong中任意取出三个维度得到4个3-D cuboids;接着从3-D cuboids出发,任意取出其中两个维度得到6个2-D cuboids;再以2-D cuboids为基础,任意取出其中一个维度得到4个1-D cuboids;最后根据1-D cuboids 计算出一个0-D cuboid。

  此算法的 MapperReducer 都比较简单。Mapper以上一层Cuboid的结果(Key-Value对)作为输入。由于Key是由各维度值拼接在一起,从其中找出要聚合的维度,去掉它的值成新的Key,并对Value进行操作,然后把新Key和Value输出,进而Hadoop MapReduce对所有新Key进行排序、洗牌(shuffle)、再送到Reducer处;Reducer的输入会是一组有相同Key的Value集合,对这些Value做聚合计算,再结合Key输出就完成了一轮计算。

  优点:这个算法的原理很清晰,主要就是利用了MR,sorting、grouping、shuffing全部由MR完成,开发人员只需要关注cubing的逻辑,由于hadoop的成熟,该算法的运行很稳定。

  缺点:cube的维度越高,需要的MR任务越多(n-D cube 至少需要n 个MR)太多的shuffing操作(mapper端不做聚合,所有在下一层中具有相同维度的值有combiner 和reducer聚合),对hdfs读写比较多(每一层MR的结果会写到hdfs然后下一层MR从hdfs 读取数据)。

5.1.2 基于 Spark

  “by-layer” Cubing把一个大任务划分为许多步骤,每一步骤的计算依赖于上一个步骤的输出结果,所以当某一个步骤的计算出现问题时,可以再次读取上一步骤的结果重新计算,而不用从头开始。使得“by-layer” Cubing算法稳定可靠,当换到spark上时,便保留了这个算法。因此在spark上这个算法也被称为“By layer Spark Cubing”。
  如上图所示,与在MR上相比,每一层的计算结果不再输出到hdfs,而是放在RDD中。由于RDD存储在内存中,从而有效避免了MR上过多的读写操作。

  性能对比:

5.2 Fast cubing

  快速 Cube 算法(Fast Cubing)是麒麟团队对新算法的一个统称,它还被称作“逐段” (By Segment) 或“逐块” (By Split) 算法。

  该算法的主要思想是,对Mapper所分配的数据块,将它计算成一个完整的小Cube 段(包含所有Cuboid);每个Mapper将计算完的Cube段输出给Reducer做合并,生成大Cube,也就是最终结果;下图解释了此流程。新算法的核心思想是清晰简单的,就是最大化利用Mapper端的CPU和内存,对分配的数据块,将需要的组合全都做计算后再输出给Reducer;由Reducer再做一次合并(merge),从而计算出完整数据的所有组合。如此,经过一轮Map-Reduce就完成了以前需要N轮的Cube计算。
  在Mapper内部也可以有一些优化,下图是一个典型的四维Cube的生成树;第一步会计算Base Cuboid(所有维度都有的组合),再基于它计算减少一个维度的组合。基于parent节点计算child节点,可以重用之前的计算结果;当计算child节点时,需要parent节点的值尽可能留在内存中; 如果child节点还有child,那么递归向下,所以它是一个深度优先遍历。当有一个节点没有child,或者它的所有child都已经计算完,这时候它就可以被输出,占用的内存就可以释放。
  如果内存够的话,可以多线程并行向下聚合。如此可以最大限度地把计算发生在 Mapper 这一端,一方面减少 shuffle 的数据量,另一方面减少 Reducer 端的计算量。

  优点:总的IO量比以前大大减少。此算法可以脱离Map-Reduce而对数据做Cube计算,故可以很容易地在其它场景或框架下执行,例如Streaming 和Spark。

  缺点:代码比以前复杂了很多,由于要做多层的聚合,并且引入多线程机制,同时还要估算JVM可用内存,当内存不足时需要将数据暂存到磁盘,所有这些都增加复杂度。对 Hadoop 资源要求较高,用户应尽可能在 Mapper 上多分配内存;如果内存很小,该算法需要频繁借助磁盘,性能优势就会较弱。在极端情况下(如数据量很大同时维度很多),任务可能会由于超时等原因失败。

5.3 算法选择

  用户无需担心使用什么算法构建cube,Kylin会自动选择合适的算法。Kylin在计算Cube之前对数据进行采样,在“fact distinct”步,利用HyperLogLog模拟去重,估算每种组合有多少不同的key,从而计算出每个Mapper输出的数据大小,以及所有Mapper之间数据的重合度,据此来决定采用哪种算法更优。

  • 如果每个Mapper之间的key交叉重合度较低,fast cubing更适合;因为Mapper端将这块数据最终要计算的结果都达到了,Reducer只需少量的聚合。另一个极端是,每个Mapper计算出的key跟其它 Mapper算出的key深度重合,这意味着在reducer端仍需将各个Mapper的数据抓取来再次聚合计算;如果key的数量巨大,该过程IO开销依然显著。对于这种情况,Layered-Cubing更适合。
  • 在对上百个Cube任务的时间做统计分析后,Kylin选择了7做为默认的算法选择阀值(参数kylin.cube.algorithm.auto.threshold):如果各个Mapper的小Cube的行数之和,大于reduce后的Cube行数的8倍(各个Mapper的小Cube的行数之和 / reduce后的Cube行数 > 7),采用Layered Cubing, 反之采用Fast Cubing(本质就是各个Mapper之间的key重复度越小,就用Fast Cubing,重复度越大,就用Layered Cubing)

6 Kylin 构建 Cube 实战

6.1 向 Hive 中导入数据

  根据“CreateData.java”文件生成对应的事实表,然后执行SQL语句将对应的维度表和事实表导入到Hive中。

  维度表region_tbl,数据如下:

1.   G01|北京
2.  G02|江苏
3.  G03|浙江
4.  G04|上海
5.  G05|广州
6.

  维度表city_tbl,数据如下:

1.   G01|G0101|朝阳
2.  G01|G0102|海淀
3.  G02|G0201|南京
4.  G02|G0202|宿迁
5.  G03|G0301|杭州
6.  G03|G0302|嘉兴
7.  G04|G0401|徐汇
8.  G04|G0402|虹口
9.  G05|G0501|广州
10. G05|G0502|珠海
11.

  事实表web_access_fact_tbl数据内容举例如下:

1.   2019-08-01|L3H1WG60WD9IELX6YL|G04|G0401|2187|Mac OS|4
2.  2019-08-29|9MOYSWCVLBJ7E3GB6J|G05|G0501|1729|Android 5.0|4
3.  2019-08-18|EGCC46W3OO5CW2Q9W8|G05|G0502|3968|Android 5.0|2
4.  2019-08-22|8MFUQGRE028ZR304FT|G03|G0302|7327|Android 5.0|3
5.  2019-08-26|YB0GXNMF59CBC49CZ3|G05|G0502|8874|Android 5.0|9
6.  2019-08-22|X9MPZ2OX2U10S3DO17|G01|G0101|4771|Mac OS|9
7.  2019-08-07|BYXUH2FZQ36ZNM6YJS|G04|G0402|6601|Mac OS|9
8.  ... ...
9.

  在Hive中创建对应的表,将数据导入到Hive中,执行如下命令:

1.   create table web_access_fact_tbl
2.  (
3.  day date,
4.  cookieid string,
5.  regionid string,
6.  cityid string,
7.  siteid string,
8.  os string,
9.  pv bigint
10. )
11. row format delimited fields terminated by '|' stored as textfile;
12.
13. load data local inpath '/software/test/fact_data.txt' into table web_access_fact_tbl;
14.
15. create table region_tbl
16. (
17. regionid string,
18. regionname string
19. )
20. row format delimited fields terminated by '|' stored as textfile;
21.
22. load data local inpath '/software/test/region.txt' into table region_tbl;
23.
24. create table city_tbl
25. (
26. regionid string,
27. cityid string,
28. cityname string
29. )
30. row format delimited fields terminated by '|' stored as textfile;
31.
32. load data local inpath '/software/test/city.txt' into table city_tbl;
33.

6.2 在 Kylin 中创建项目构建 cub

 1. 在Kylin中创建project:
  点击页面左上方“+”号,add project,创建项目:
 2. 给项目DataSource 导入数据:
 3. 创建新的model

  在kylin页面左侧点击“new”->“New Model”,输入model名称,点击“Next”:
  在DataModel中,选择事实表,添加维度表(lookup table),同时指定关联关系,点击“Next”:
  在“Dimensions”页面中,选择每个表的维度信息,注意:在事实表中要预留出某些列是度量信息列,某些列不能即是维度列又是度量列,如下图,选择完成后,点击“Next”:
  在“Measures”页面中,选择度量信息列,这些列必须出现在事实表中,并且不能是维度列,如下图:
  在“settings”页面中,选择分区信息和过滤条件。这里的分区信息指的是按照时间分区,如果没有可以跳过页面设置,点击“Save”保存模型。
 4. 模型创建完成之后,创建Cube

  点击Kylin页面左上角“New”->“New Cube”,在页面中选择刚刚创建的Model,同时输入Cube名称,点击“Next”:
  在“Dimensions”页面中,设置维度。点击“Add Dimensions”,注意,在这里如果维度表中的某些列是可以由事实表关联字段推测出来,也就是关联字段,可以选择成“Derived”列,这样,这个列不参与构建维度,可以减少维度。
  在“Measures”页面中,默认有Count聚合,可以添加其他聚合函数,点击“Next”:
  在“Refresh Setting”页面中可以设置segment合并规则,点击“Next”:
  在“Advanced Setting”高级设置中,可以设置必要维度、层级维度、联合维度、执行引擎及rowkey顺序和列族等信息,点击“Next”,在“Configuration Overwrites”中设置配置项,点击“Next”->”Save”,完成Cube构建。

 5. 构建cube

  点击对应cube的“action”->“build”。

6.3 结果 SQL 查询

  进入Kylin页面中的“Insight”,查询如下业务,统计每天每个地区每个城市总的pv量和访问系统类别总数

1.   select a."DAY",b.regionname,c.cityname,sum(a.pv) as totol_pv,count(distinct a.os) as os_count
2.  from web_access_fact_tbl a
3.  join region_tbl b on a.regionid=b.regionid
4.  join city_tbl c on a.cityid=c.cityid
5.  group by a."DAY",b.regionname,c.cityname
6.  order by a."DAY",b.regionname

  查询结果如下:
  注意:kylin建cube所用的字段最好不要采用kylin 关键字,例如:year, month, day, hour等。否则写SQL时,不太友好。关键词必须全部大写,且被双引号(必须是双引号,单引是自定义常量)包住。
  注意构建kylin中model时,事实表和维度表之间的关联是使用的left join 还是 inner join,如果使用left join 在使用kylin时,也应使用left join 进行关联。

7 JDBC 访问 Kylin

  使用Java api连接kylin url格式为:

  jdbc:kylin://:/<kylin_project_name>,必须指定“kylin_project_name”并且用户需要确保它在Kylin server上存在。默认的port为7070

7.1 使用 Statement 查询

1.   public class KylinDemo {2.
3.    public static void main(String[] args) throws Exception {4.      Driver driver = (Driver)   Class.forName("org.apache.kylin.jdbc.Driver").newInstance();
5.      Properties info = new Properties();
6.      info.put("user", "ADMIN");
7.      info.put("password", "KYLIN");
8.      Connection conn = driver.connect("jdbc:kylin://mynode3:7070/learn_kylin", info);
9.      Statement state = conn.createStatement();
10.     ResultSet rs = state.executeQuery(
11.       "select part_dt,sum(price) as total_selled,count(distinct seller_id) as sellers       from kylin_sales group by part_dt order by part_dt limit 5 ");
12.     while (rs.next()) {13.       System.out.println(rs.getString(1) + "\t" + rs.getString(2) + "\t" + rs.getString(3));
14.     }
15.     conn.close();
16.   }
17. }
18.

7.2 使用 PreparedStatement 查询

19.  public class KylinPreparedDemo {20.
21.   public static void main(String[] args) throws Exception {22.     Driver driver = (Driver)    Class.forName("org.apache.kylin.jdbc.Driver").newInstance();
23.     Properties info = new Properties();
24.     info.put("user", "ADMIN");
25.     info.put("password", "KYLIN");
26.     Connection conn = driver.connect("jdbc:kylin://mynode3:7070/learn_kylin", info);
27.     PreparedStatement pstmt = conn
28.       .prepareStatement("select * from kylin_category_groupings where leaf_categ_id =?");
29.     pstmt.setLong(1, 10058);
30.     ResultSet rs = pstmt.executeQuery();
31.     while (rs.next()) {32.       System.out.println(rs.getString(1) + "\t" + rs.getString(2) + "\t" +       rs.getString(3));
33.     }
34.   }
35. }

2021-03-19~20 大数据课程笔记 day58day59相关推荐

  1. 2021-02-26~27~28 大数据课程笔记 day37day38day39

    @R星校长 音乐数据中心平台 1.1 数据库与ER建模 1.1.1 数据库(DataBase)   数据库是按照数据结构来组织.存储和管理数据的仓库,是一个长期存储在计算机内的.有组织的.可共享的.统 ...

  2. 2021-03-08~09~10~11~12 大数据课程笔记 day47day48day49day50day51

    @R星校长 大数据技术之Flink 第一章 初识Flink   在当前数据量激增的时代,各种业务场景都有大量的业务数据产生,对于这些不断产生的数据应该如何进行有效的处理,成为当下大多数公司所面临的问题 ...

  3. 2021-03-17~18 大数据课程笔记 day56day57

    @R星校长 1 基础概念和Kylin简介 1.1 OLTP与OLAP   数据处理大致可以分成两大类:联机事务处理OLTP(on-line transaction processing).联机分析处理 ...

  4. 大数据课程笔记3:Kolmogorov Complexity

    这是大数据算法的课程笔记,这节讲的是Kolmogorov Complexity的定义以及三个性质. 定义 先有个图灵机的定义,然后有了一个Universal Turing Machine (UTM)的 ...

  5. 数据结构大作业_大数据课程笔记

    前言: 到目前为止有了一个月的时间,学习了python基础及算法.常用计算库numpy和pandas数据的导入和各种处理.matplotlib和seaborn做数据可视化 以及上周的大数据框架hado ...

  6. 2021-02-12 大数据课程笔记 day23

    @R星校长 redis 概述 为什么使用 redis? 什么是 Redis? Redis是用C语言开发的一个开源的高性能键值对(key-value)内存数据库. 它提供六种数据类型来存储值:str ...

  7. 2021-03-13 大数据课程笔记 day52

    @R星校长 基于Flink的城市交通监控平台 1.1 项目整体介绍   近几年来,随着国内经济的快速发展,高速公路建设步伐不断加快,全国机动车辆.驾驶员数量迅速增长,交通管理工作日益繁重,压力与日俱增 ...

  8. 2021-01-30 大数据课程笔记 day10

    @R星校长 课程重点 HDFS 完全分布式搭建(熟练) Hadoop 3.x 新特性(了解) Hadoop Federation(了解) Hadoop HA(掌握) Hadoop HA 集群搭建(熟练 ...

  9. 2021-01-27 大数据课程笔记 day7

    @R星校长 Nginx 问题引入 单个 tomcat 支持最高并发 怎么解决高并发问题,解决单个服务器过载问题? Nginx概述 Nginx 介绍 1. Nginx ("engine x&q ...

最新文章

  1. map函数的简单用法。
  2. 基于OpenLayers+rbush实现高德轨迹样式
  3. Spinner弹出框遮挡住显示框的解决办法
  4. linux cdig 工具,linux常用工具su与su -
  5. 利用函数wavread对语音信号进行采样_语音识别第4讲:语音特征参数MFCC
  6. 对datatable操作经验-排序和分页
  7. SAP CRM email office integration
  8. 滨江机器人餐厅_餐厅来了机器人服务员
  9. gem install sass 本地配置和淘宝源无效的解决办法
  10. 非root安装php nginx,非root用户安装nginx
  11. BZOJ 4538: [Hnoi2016]网络
  12. Bailian3711 字符串移位包含问题【字符串循环匹配】(POJ NOI0107-19)
  13. JS === 实现通过点击td 跳转相应的图片
  14. 使用jstack和TDA进行java线程dump分析
  15. 通俗理解什么是隐马尔科夫模型(hmm)
  16. 我们如何学习:学会学习再学习
  17. Linux系列:linux中查看文件时显示行号
  18. 16k a4_A4纸和16K的纸张大小有没有区别
  19. nett服务器接收消息的方法,C#(一沙框架) .net core3.1 SignalR 服务端推送消息至客户端的实现方法,用弹窗插件进行显示,非常美观实用...
  20. 抖音热搜 API数据接口

热门文章

  1. linux sd卡启动盘制作工具,fedora liveusb creator linux u盘启动盘制作工具
  2. Linux(Centos/Redhat/ubuntu)安装WPS办公软件 *详细
  3. Jetpack-Lifecycle
  4. mac最美的鸟瞰屏幕保护APP:Aerial for Mac(最新)
  5. 50首音乐,喝茶时听一听,身心舒畅
  6. [python]使用pyinstaller打包带界面的Pytorch程序的多个问题
  7. 利用VBA编程制作互动效果的PPT
  8. 点击上传图片/上传视频
  9. 钉钉windows端多开软件_抖音很火的备忘录软件是什么?比较火的备忘录便签软件...
  10. 看书标记【R语言 商务数据分析实战9】