大数据数仓项目第05天

课程目标

  • 能够整合Phoenix、HBase实现订单明细查询
  • 掌握使用Phoenix创建二级索引提高查询效率
  • 掌握Flink程序优化
  • 完成 Imply 安装
  • 能够使用 Druid 完成数据的摄取以及数据查询
  • 能够使用JDBC查询Druid中的数据
  • 使用Druid进行OLAP分析
  • 搭建实时数仓数据可视化项目
  • 理解Druid架构原理
  • 掌握使用 Superset 进行BI分析

Phoenix

什么是Phoenix

  • Phoenix是一个基于HBase的开源SQL引擎,可以使用标准的JDBC API代替HBase客户端API来创建表,插入数据,查询你的HBase数据
  • Phoenix完全使用Java编写,作为HBase内嵌的JDBC驱动。Phoenix查询引擎会将SQL查询转换为一个或多个HBase扫描,并编排执行以生成标准的JDBC结果集。直接使用HBase API、协同处理器与自定义过滤器,对于简单查询来说,其性能量级是毫秒,对于百万级别的行数来说,其性能量级是秒
  • Phoenix性能
    • Phoenix是构建在HBase之上的SQL引擎
    • Phoenix通过以下方式实现高性能操作HBase
      • 编译你的SQL查询为原生HBase的scan语句
      • 检测scan语句最佳的开始和结束的key
      • 精心编排你的scan语句让他们并行执行
      • 推送你的WHERE子句的谓词到服务端过滤器处理
      • 执行聚合查询通过服务端钩子(称为协同处理器)
      • 实现了二级索引来提升非主键字段查询的性能
      • 统计相关数据来提高并行化水平,并帮助选择最佳优化方案
      • 跳过扫描过滤器来优化IN,LIKE,OR查询
      • 优化主键的来均匀分布写压力

官网地址:

http://phoenix.apache.org/

Phoenix的安装部署

1、准备工作

  • 提前安装好ZK集群、hadoop集群、Hbase集群

2、安装包
下载地址:https://mirrors.cnnic.cn/apache/phoenix/apache-phoenix-4.14.0-cdh5.14.2/bin/

资料\安装\apache-phoenix-4.14.0-HBase-1.1-bin.tar.gz

3、上传、解压

  • 将对应的安装包上传到对应的Hbase集群其中一个服务器的一个目录下,并解压
tar -xvzf apache-phoenix-4.14.0-HBase-1.1-bin.tar.gz -C ../servers/

4、拷贝Phoenix整合HBase所需JAR包

将phoenix目录下的 phoenix-4.14.0-HBase-1.1-server.jar(phoenix-4.14.0-cdh5.14.2-server.jar)、phoenix-core-4.14.0-HBase-1.1.jar(phoenix-core-4.14.0-cdh5.14.2.jar)拷贝到各个 hbase的lib目录下

scp phoenix-4.14.0-HBase-1.1-server.jar phoenix-core-4.14.0-HBase-1.1.jar node1:/export/servers/hbase-1.1.1/lib
scp phoenix-4.14.0-HBase-1.1-server.jar phoenix-core-4.14.0-HBase-1.1.jar node2:/export/servers/hbase-1.1.1/lib
scp phoenix-4.14.0-HBase-1.1-server.jar phoenix-core-4.14.0-HBase-1.1.jar node3:/export/servers/hbase-1.1.1/lib

5、在Phoenix中配置HADOOP、配置HBASE

将hbase的配置文件hbase-site.xml、 hadoop/etc/hadoop下的core-site.xml 、hdfs-site.xml放到phoenix/bin/下,替换phoenix原来的配置文件

# 进入到 hbase bin目录
cd /export/servers/apache-phoenix-4.14.0-HBase-1.1-bin/bin# 备份原先的 hbase-site.xml文件
mv hbase-site.xml hbase-site.xml.bakln -s $HBASE_HOME/conf/hbase-site.xml .
ln -s $HADOOP_HOME/etc/hadoop/core-site.xml .
ln -s $HADOOP_HOME/etc/hadoop/hdfs-site.xml .

6、重启hbase集群,使Phoenix的jar包生效

7、验证是否成功

./sqlline.py node1:2181

出现如下界面说明启动成功

Setting property: [incremental, false]
Setting property: [isolation, TRANSACTION_READ_COMMITTED]
issuing: !connect jdbc:phoenix:node1:2181 none none org.apache.phoenix.jdbc.PhoenixDriver
Connecting to jdbc:phoenix:node1:2181
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/export/servers/apache-phoenix-4.14.0-HBase-1.1-bin/phoenix-4.14.0-HBase-1.1-client.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/export/servers/hadoop-2.6.0-cdh5.14.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
19/10/18 09:58:06 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Connected to: Phoenix (version 4.14)
Driver: PhoenixEmbeddedDriver (version 4.14)
Autocommit status: true
Transaction isolation: TRANSACTION_READ_COMMITTED
Building list of tables and columns for tab-completion (set fastconnect to true to skip)...
133/133 (100%) Done
Done
sqlline version 1.2.0

8、查看当前有哪些表

  • 输入!tables 查看都有哪些表
  • 红框部分是用户建的表,其他为Phoenix系统表,系统表中维护了用户表的元数据信息
+--------------+-------------+---------------+-+-----------------+---------------+---------+
| TABLE_SCHEM  | TABLE_NAME  |  TABLE_TYPE   | | IMMUTABLE_ROWS  | SALT_BUCKETS  | MULTI_T |
+--------------+-------------+---------------+-+-----------------+---------------+---------+
| SYSTEM       | CATALOG     | SYSTEM TABLE  | | false           | null          | false   |
| SYSTEM       | FUNCTION    | SYSTEM TABLE  | | false           | null          | false   |
| SYSTEM       | LOG         | SYSTEM TABLE  | | true            | 32            | false   |
| SYSTEM       | SEQUENCE    | SYSTEM TABLE  | | false           | null          | false   |
| SYSTEM       | STATS       | SYSTEM TABLE  | | false           | null          | false   |
+--------------+-------------+---------------+-+-----------------+---------------+---------+

9、退出Phoenix,输入!quit

Phoenix入门案例

需求一:

  • 使用SQL语句在Phoenix中,创建一个用户表。该用户表有以下列
ID 姓名 年龄 性别 地址
1 张三 30 北京西城区
2 李四 20 上海闵行区
  • 往表中插入两条数据,查询数据,并查看HBase中的数据

需求分析:

  • 直接在 Phoenix 中,使用 create table 语法创建表结构
  • 因为数据最终都需要保存在HBase中,故创建表的时候需要指定 HBase 中的列蔟名称

参考代码:

-- 创建表
create table if not exists "user_info"("id" varchar primary key,"cf"."name" varchar,"cf"."age" integer,"cf"."sex" varchar,"cf"."address" varchar
);-- 新增数据
upsert into "user_info" values('1', '张三', 30, '男', '北京市西城区');
upsert into "user_info" values('2', '李四', 20, '女', '上海市闵行区');

需求二:

  • 修改 id为1 用户的年龄为 35
-- 修改数据
upsert into "user_info"("id", "age") values('1', 35);

需求三:

  • 删除 id为2 用户数据
-- 删除数据
delete from "user_info" where "id" = '2';

建立与HBase表映射

在HBase已经存在表,需要使用 Phoenix 建立与 HBase的映射,从而以SQL的方式,通过Phoenix 操作HBase。

案例:

1、在HBase中,建立employee的映射表—数据准备

create 'employee','company','family'put 'employee','row1','company:name','ted'
put 'employee','row1','company:position','worker'
put 'employee','row1','family:tel','13600912345'put 'employee','row2','company:name','michael'
put 'employee','row2','company:position','manager'
put 'employee','row2','family:tel','1894225698'scan 'employee'

2、建立映射视图

  • 在HBase中已有表,在Phoenix中建立映射,必须要使用 create view

  • Phoenix是大小写敏感的

  • 所有命令都是大写

  • 如果表名不用双引号括起来,无论输入大写或小写,建立的表名都是大写

  • 如果要建立同时包含大写和小写的表名和字段名,用双引号把表名或者字段名括起来

在Phoenix中打开命令行

CREATE VIEW IF NOT EXISTS "employee" ("rowid" VARCHAR NOT NULL PRIMARY KEY, "company"."name" VARCHAR,"company"."position" VARCHAR, "family"."tel" VARCHAR
);

这个语句有几个注意点

  • IF NOT EXISTS可以保证如果已经有建立过这个表,配置不会被覆盖

  • 作为rowkey的字段用 PRIMARY KEY标定

  • 列簇用 columnFamily.columnName 来表示

  • 建立好后,查询一下数据

3、 查询所有映射表数据

0: jdbc:phoenix:node01> SELECT * FROM "employee";
+-------+----------+-----------+--------------+-------+
|  no   |   name   | position  |     tel      |  age  |
+-------+----------+-----------+--------------+-------+
| row1  | ted      | worker    | 13600912345  | null  |
| row2  | michael  | manager   | 1894225698   | null  |
+-------+----------+-----------+--------------+-------+

4、查询职位为 ‘worker’ 的所有员工数据

select * from "employee" where "position" = 'worker'

使用Phoenix构建二级索引加快查询效率

  • HBase通过rowkey来查询,否则就必须逐行地比较每一列的值,即全表扫瞄
  • 数据量较大的表,全表扫描的性能很差
  • 如果需要从多个角度查询数据,不可能使用 rowkey 来实现查询。此时可使用secondary index(二级索引)来完成这件事
  • Phoenix提供了对HBase secondary index的支持

配置HBase支持Phoenix二级索引

1、在每一个 HRegionServce的 hbase-site.xml 加入以下属性

<property> <name>hbase.regionserver.wal.codec</name> <value>org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec</value>
</property>

2、重启HBase集群使配置生效

使用Phoenix创建二级索引

1、创建索引

create local index "idx_tel" on "employee"("family"."tel");

2、查看执行计划,检查是否查询二级索引

explain select * from "employee" where "name" = 'ted';
explain select  * from "employee" where "tel" = '13600912345';

3、删除索引

drop index "idx_tel" on "employee";

4、查看表上的所有索引

!indexes "employee"

订单明细创建Phoenix映射表

建立映射表

实现步骤:

  • 在Phoenix中建立映射表
  • 实现各种查询

1、建表SQL语句:

create view "dwd_order_detail"("rowid" varchar primary key,"detail"."ogId" varchar,"detail"."orderId" varchar,"detail"."goodsId" varchar,"detail"."goodsNum" varchar,"detail"."goodsPrice" varchar,"detail"."goodsName" varchar,"detail"."shopId" varchar,"detail"."goodsThirdCatId" varchar,"detail"."goodsThirdCatName" varchar,"detail"."goodsSecondCatId" varchar,"detail"."goodsSecondCatName" varchar,"detail"."goodsFirstCatId" varchar,"detail"."goodsFirstCatName" varchar,"detail"."areaId" varchar,"detail"."shopName" varchar,"detail"."shopCompany" varchar,"detail"."cityId" varchar,"detail"."cityName" varchar,"detail"."regionId" varchar,"detail"."regionName" varchar
);

2、在表上的以下列创建索引

列名 说明
goodsThirdCatName 三级分类
goodsSecondCatName 二级分类
goodsFirstCatName 一级分类
cityName 城市名称
regionName 大区名称

参考代码:

-- 创建索引
create local index "idx_dwd_order_detail" on "dwd_order_detail"("detail"."goodsThirdCatName", "detail"."goodsSecondCatName", "detail"."goodsFirstCatName", "detail"."cityName", "detail"."regionName");explain select * from "dwd_order_detail" where "goodsThirdCatName" = '其他蔬果' and "cityName" = '景德镇市分公司';

使用编写JDBC程序查询Phoenix数据

需求:

  • 编写Java 代码查询 订单明细

1、导入依赖

<properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties><dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>3.8.1</version></dependency><dependency><groupId>org.apache.phoenix</groupId><artifactId>phoenix-core</artifactId><version>4.14.0-HBase-1.1</version></dependency>
</dependencies>

2、编写JDBC程序

URL: jdbc:phoenix:node1:2181

3、新建一个 模块,注意不要从 itcast_shop_parent 继承(否则,会依赖CDH)

参考代码:

Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");Connection connection = DriverManager.getConnection("jdbc:phoenix:node1:2181", "", "");
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery("select * from \"dwd_order_detail\" limit 10 ");while(resultSet.next()) {String rowid = resultSet.getString("rowid");String ogId = resultSet.getString("ogId");String orderId = resultSet.getString("orderId");String goodsId = resultSet.getString("goodsId");String goodsNum = resultSet.getString("goodsNum");String goodsPrice = resultSet.getString("goodsPrice");String goodsName = resultSet.getString("goodsName");String shopId = resultSet.getString("shopId");String goodsThirdCatId = resultSet.getString("goodsThirdCatId");String goodsThirdCatName = resultSet.getString("goodsThirdCatName");String goodsSecondCatId = resultSet.getString("goodsSecondCatId");String goodsSecondCatName = resultSet.getString("goodsSecondCatName");String goodsFirstCatId = resultSet.getString("goodsFirstCatId");String goodsFirstCatName = resultSet.getString("goodsFirstCatName");String areaId = resultSet.getString("areaId");String shopName = resultSet.getString("shopName");String shopCompany = resultSet.getString("shopCompany");String cityId = resultSet.getString("cityId");String cityName = resultSet.getString("cityName");String regionId = resultSet.getString("regionId");String regionName = resultSet.getString("regionName");System.out.print(rowid);System.out.print(ogId);System.out.print(orderId);System.out.print(goodsId);System.out.print(goodsNum);System.out.print(goodsPrice);System.out.print(goodsName);System.out.print(shopId);System.out.print(goodsThirdCatId);System.out.print(goodsThirdCatName);System.out.print(goodsSecondCatId);System.out.print(goodsSecondCatName);System.out.print(goodsFirstCatId);System.out.print(goodsFirstCatName);System.out.print(areaId);System.out.print(shopName);System.out.print(shopCompany);System.out.print(cityId);System.out.print(cityName);System.out.print(regionId);System.out.print(regionName);System.out.println();
}resultSet.close();
statement.close();
connection.close();
}

Flink程序优化

使用Flink Checkpoint进行容错处理

checkpoint是Flink容错的核心机制。它可以定期地将各个Operator处理的数据进行快照存储( Snapshot )。如果Flink程序出现宕机,可以重新从这些快照中恢复数据。

  1. checkpoint coordinator(协调器)线程周期生成 barrier (栅栏),发送给每一个source
  2. source将当前的状态进行snapshot(可以保存到HDFS)
  3. source向coordinator确认snapshot已经完成
  4. source继续向下游transformation operator发送 barrier
  5. transformation operator重复source的操作,直到sink operator向协调器确认snapshot完成
  6. coordinator确认完成本周期的snapshot

配置以下checkpoint:

1、开启 checkpoint

2、设置 checkpoint 保存HDFS的位置

3、配置 checkpoint 的最小时间间隔(1秒)

4、配置 checkpoint 最大线程数 (1)

5、配置 checkpoint 超时时间 (60秒)

6、配置程序关闭,额外触发 checkpoint

7、配置重启策略 (尝试1次,延迟1秒启动)

8、给两个 source 添加 checkpoint 容错支持

  • 给需要进行checkpoint的operator设置 uid

参考代码

// 配置Checkpoint
env.enableCheckpointing(5000)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// checkpoint的HDFS保存位置
env.setStateBackend(new FsStateBackend("hdfs://node1:8020/flink/checkpoint/"))
// 配置两次checkpoint的最小时间间隔
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000)
// 配置最大checkpoint的并行度
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
// 配置checkpoint的超时时长
env.getCheckpointConfig.setCheckpointTimeout(60000)
// 当程序关闭,触发额外的checkpoint
env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1000))
// 3. 将配置添加到数据流中
val clickLogJSONDataStream: DataStream[String] = env.addSource(finkKafkaConsumer)
.uid(UUID.randomUUID().toString)
.setParallelism(3)
// clickLogJSONDataStream.print()val canalJsonDataStream: DataStream[String] = env.addSource(flinkKafkaCanalConsumer).uid(UUID.randomUUID().toString)
// canalJsonDataStream.print()

使用Flink时间窗口

生成watermark(水印)

1、实现 extractTimestamp 获取水印时间

设置 EventTime

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

2、获取当前的水印时间

val canalEntityWithWarterMark: DataStream[CanalEntity] = canalEntityDataStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[CanalEntity] {var currentMaxTimestamp = 0Lvar maxOutOfOrderness = 10 * 1000L // 最大允许的乱序时间是10soverride def getCurrentWatermark: Watermark = {return new Watermark(currentMaxTimestamp - maxOutOfOrderness)}override def extractTimestamp(t: CanalEntity, l: Long): Long = {currentMaxTimestamp = t.exe_timecurrentMaxTimestamp}
})

3、修改使用 apply 方法

// 设置5s的时间窗口
val windowDataStream: AllWindowedStream[CanalEntity, TimeWindow] = orderGoodsCanalEntityDataStream.
timeWindowAll(Time.seconds(5))                               // 设置5秒时间窗口
.allowedLateness(Time.seconds(10))                           // 设置最大延迟时间
.sideOutputLateData(new OutputTag[CanalEntity]("outlateData"))    // 设置延迟的数据存放地方val orderGoodsWideEntityDataStream: DataStream[OrderGoodsWideEntity] = windowDataStream.apply((timeWindow, iter, collector: Collector[OrderGoodsWideEntity]) => {var jedis = RedisUtil.getJedis()val iterator = iter.iterator// ... 此处省略 ...collector.collect(orderGoodsWideEntity)val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")println(sdf.format(new Date(timeWindow.getStart)) + " " + sdf.format(new Date(timeWindow.getEnd)))}
})

并行度优化

1、调整Kafka topic的分区数量

2、设置Kafka DataStream并行度和Kafka的分区一致

Flink反压原理

什么是背压问题

  • 流系统中消息的处理速度跟不上消息的发送速度,会导致消息的堆积
  • 许多日常问题都会导致背压
    • 垃圾回收卡顿可能会导致流入的数据快速堆积
    • 一个数据源可能生产数据的速度过快
  • 背压如果不能得到正确地处理,可能会导致 资源被耗尽 或者甚至出现更糟的情况导致数据丢失

在同一时间点,不管是流处理job还是sink,如果有1秒的卡顿,那么将导致至少500万条记录的积压。换句话说,source可能会产生一个脉冲,在一秒内数据的生产速度突然翻倍。

举例说明

1、正常情况

  • 消息处理速度 >= 消息的发送速度,不发生消息拥堵,系统运行流畅

2、异常情况

  • 消息处理速度< 消息的发送速度,发生了消息拥堵,系统运行不畅。

背压问题解决方案

可以采取三种方案:

  • 将拥堵的消息直接删除

    • 会导致数据丢失,许多流处理程序而言是不可接受的
  • 将缓冲区持久化,以方便在处理失败的情况下进行数据重放
    • 会导致缓冲区积压的数据越来越多
  • 将拥堵的消息缓存起来,并告知消息发送者减缓消息发送的速度
    • 对source进行限流来适配整个pipeline中最慢组件的速度,从而获得稳定状态

Flink如何解决背压问题

Flink内部自动实现数据流自然降速,而无需担心数据丢失。Flink所获取的最大吞吐量是由pipeline中最慢的组件决定

Flink解决背压问题的原理

1、TaskManager(TM)启动时,会初始化网络缓冲池(NetworkBufferPool)

  • 默认生成 2048 个内存块(MemorySegment)
  • 网络缓冲池是Task之间共享的

2、Task线程启动时,Flink 会为Task的 Input Gate(IG)和 ResultPartion(RS)分别创建一个 LocationBufferPool

  • LocationBufferPool的内存数量由Flink分配
  • 为了系统更容易应对瞬时压力,内存数量是动态分配的

3、Task线程执行时,Netty接收端接收到数据时,为了将数据保存拷贝到Task中

  • Task线程需要向本地缓冲池(LocalBufferPool)申请内存
  • 若本地缓冲池没有可用内存,则继续向网络缓冲池(NetworkBufferPool)申请内存
  • 内存申请成功,则开始从Netty中拷贝数据
  • 若缓冲池已申请的数量达到上限,或网络缓冲池(NetworkerBufferPool)也没有可用内存时,该Task的Netty Channel会暂停读取,上游的发送端会立即响应停止发送,Flink流系统进入反压状态

4、经过 Task 处理后,由 Task 写入到 ResultPartion(RS)中

  • 当Task线程写数据到ResultPartion(RS)时,也会向网络缓冲池申请内存
  • 如果没有可用内存块,也会阻塞Task,暂停写入

5、Task处理完毕数据后,会将内存块交还给本地缓冲池(LocalBufferPool)

  • 如果本地缓冲池申请内存的数量超过池子设置的数量,将内存块回收给 网络缓冲池。如果没超过,会继续留在池子中,减少反复申请开销

Druid简介

大数据分析平台架构分类

数据分析的基础架构可以分为以下几类:

  • 使用Hadoop/Spark进行分析
  • 将Hadoop/Spark的结果导入 RDBMS 中提供数据分析
  • 将结果注入到容量更大的 NoSQL中,解决数据分析的存储瓶颈,例如:HBase
  • 将数据源进行流式处理,对接流式计算框架,例如:Flink、Spark Streaming,结果保存到 RDBMS、NoSQL中
  • 将数据源进行流式处理,对接分析数据库,例如:Druid

为什么会有Druid

基于Hadoop大数据平台的问题

基于 Hadoop 的大数据平台,有如下一些问题:

  • 无法保障查询性能

    • 对于Hadoop使用的MapReduce批处理框架,数据何时能够查询没有性能保证
  • 随机IO问题
    • HDFS以集群硬盘作为存储资源池的分布式文件系统
    • 在海量数据的处理过程中,会引起大量的读写操作,随机IO是高并发场景下的性能瓶颈
  • 数据可视化问题
    • HDFS对于数据分析以及数据的即席查询,HDFS并不是最优的选择

传统的Hadoop大数据处理架构更倾向于一种“后台批处理的数据仓库系统”,其作为海量历史数据保存、冷数据分析,确实是一个优秀的通用解决方案,但

  • 无法保证高并发环境下海量数据的查询分析性能
  • 无法实现海量实时数据的查询分析与可视化

Druid的诞生

  • Druid是由一个名为 MetaMarket 的公司开发的
  • 2011年,MetaMarket 开始研发自己的"轮子"Druid,将Druid定义为“开源、分布式、面向列式存储的实时分析数据存储系统”
  • 要解决的"痛点"是
    • 在高并发环境下,保证海量数据查询分析性能
    • 同时提供海量实时数据的查询、分析与可视化功能

Druid是什么

Druid是面向海量数据的、用于实时查询与分析的OLAP存储系统。Druid的四大关键特性如下:

  • 亚秒级的OLAP查询分析

    • 采用了列式存储、倒排索引、位图索引等关键技术
  • 在亚秒级别内完成海量数据的过滤、聚合以及多维分析等操作
  • 实时流数据分析
    • 传统分析型数据库采用的批量导入数据,进行分析的方式
    • Druid提供了实时流数据分析,以及高效实时写入
  • 实时数据在亚秒级内的可视化
  • 丰富的数据分析功能
    • Druid提供了友好的可视化界面
  • SQL查询语言
    • REST查询接口
  • 高可用性与高可拓展性
    • Druid工作节点功能单一,不相互依赖
    • Druid集群在管理、容错、灾备、扩容都很容易

阿里巴巴也曾创建过一个开源项目叫 Druid (简称阿里 Druid),它是一个数据库连接池项目。阿里 Druid 和 我们要讨论的 Druid 没有任何关系,它们解决完全不同的问题

Druid的典型应用架构

国内哪些公司在使用Druid

  • 腾讯

    • 腾讯企点采用Druid用于分析大量的用户行为,帮助提升客户价值
  • 阿里巴巴
    • 阿里搜索组使用Druid的实时分析功能用于获取用户交互行为
  • 新浪微博
    • 新浪广告团队使用Druid构建数据洞察系统的实时分析部分,每天处理数十亿的消息
  • 小米
    • Druid用于小米统计的后台数据收集和分析
    • 也用于广告平台的数据分析
  • 滴滴打车
    • Druid是滴滴实时大数据处理的核心模块,用于滴滴实时监控系统,支持数百个关键业务指标
    • 通过Druid,滴滴能够快速得到各种实时的数据洞察
  • 优酷土豆
    • Druid用于其广告的数据处理和分析

Druid 对比其他OLAP

Druid vs. Elasticsearch

  • Druid在导入过程会对原始数据进行Rollup,而ES会保存原始数据
  • Druid专注于OLAP,针对数据导入以及快速聚合操作做了优化
  • Druid不支持全文检索

Druid vs. Key/Value Stores (HBase/Cassandra/OpenTSDB)

  • Druid采用列式存储,使用倒排和bitmap索引,可以做到快速扫描相应的列

Druid vs. Spark

  • Spark SQL的响应还不做到亚秒
  • Druid可以做到超低的响应时间,例如亚秒,而且高并发面向用户的应用。

Druid vs SQL-on-Hadoop (Impala/Drill/Spark SQL/Presto)

  • Driud查询速度更快
  • 数据导入,Druid支持实时导入,SQL-on-Hadoop一般将数据存储在Hdfs上,Hdfs的写入速度有可能成为瓶颈
  • SQL支持,Druid也支持SQL,但Druid不支持Join操作

Druid vs. Kylin

  • Kylin不支持实时查询,Druid支持,但是Kylin3.0已经支持实时查询
  • Kylin支持表连接(Join),Druid不支持
  • Druid可以进行数据的存储,Kylin只能进行数据的OLAP分析

安装imply

Imply介绍

  • Imply也是Druid的核心团队开发的,它基于Apache Druid开发了一整套大数据分析解决方案
  • Imply基于Druid进行了一些组件开发,提供开源社区版本和商业版,简化了部署

集群规划

主机名称 IP地址 角色 数据库
node1 192.168.88.120 zk、kafka、druid(overlord、coordinator) MySQL
node2 192.168.88.121 zk、kafka、druid(middleManager、historical)
node3 192.168.88.122 zk、kafka、druid(broker、router)

下载imply

Imply-3.0.4 基于 apache-druid-0.15.0-Incubating

1、下载imply

cd /export/softwares/
wget https://static.imply.io/release/imply-3.0.4.tar.gz

2、直接使用资料 imply安装包jps’

将该 imply安装包\imply-3.0.4.tar.gz 安装包上传到 /exports/softwares

解压imply-3.0.4

tar -xvzf imply-3.0.4.tar.gz -C ../servers
cd ../servers/imply-3.0.4

配置imply-3.0.4

mysql中创建imply相关的数据库

CREATE DATABASE `druid` DEFAULT CHARACTER SET utf8;
CREATE DATABASE `pivot` DEFAULT CHARACTER SET utf8;

注意事项

  • MySQL版本必须使用5.5及以上版本(Druid和Pivot使用utf8字符集)

修改并上传配置文件

1、将 imply 安装目录的 conf 目录重命名为 conf.bak

mv conf conf.bak

1、上传 imply配置文件\conf.zip 到 imply-3.0.4 安装目录

2、解压缩

unzip conf.zip

3、修改 conf/druid/_common/common.runtime.properties 文件

  • 修改zookeeper的配置
druid.zk.service.host=node1:2181,node2:2181,node3:2181
  • 修改MySQL的配置
druid.metadata.storage.type=mysql
druid.metadata.storage.connector.connectURI=jdbc:mysql://node1:3306/druid
druid.metadata.storage.connector.user=root
druid.metadata.storage.connector.password=123456

4、修改 conf/pivot/config.yaml 配置文件

  • 修改mysql的配置
stateStore:type: mysqllocation: mysqlconnection: 'mysql://root:123456@node1:3306/pivot'

将配置好的 imply 分发到不同节点

scp -r imply-3.0.4/ node2:$PWD
scp -r imply-3.0.4/ node3:$PWD

配置环境变量

在每台服务器上配置DRUID_HOME环境变量

# DRUID
export DRUID_HOME=/export/servers/imply-3.0.4

source /etc/profile 重新加载环境变量

启动 imply 集群

1、启动zk集群

2、node1节点(使用外部zk而不使用imply自带zk启动overlord和coordinator)

# 使用外部zk而不使用imply自带zk启动overlord和coordinator
/export/servers/imply-3.0.4/bin/supervise -c /export/servers/imply-3.0.4/conf/supervise/master-no-zk.conf

3、node2节点(启动historical和middlemanager)

/export/servers/imply-3.0.4/bin/supervise -c /export/servers/imply-3.0.4/conf/supervise/data.conf

4、node3节点(启动broker和router)

/export/servers/imply-3.0.4/bin/supervise -c /export/servers/imply-3.0.4/conf/supervise/query.conf

注意事项

  • 如果希望imply运行在后台,在每个执行命令后面加 --daemonize,

访问WebUI

组件名 URL
broker http://node3:8888
coordinator、overlord http://node1:8081/index.html
middleManager、historical http://node1:8090/console.html

Druid入门案例

需求:

  • 使用Druid 分析 2019年5月8日 按照商品分类、商品区域的产品订单总额

要实现该入门案例:

1、上传测试数据到每个Linux服务器

  • 在一台Druid服务器中创建 /root/druid/data1/ 目录

    mkdir -p /root/druid/data1

  • druid测试数据源\商品订单数据\order.json到服务器的 /root/druid/data1/ 目录中

  • 将 /root/druid/data1 分发到每一台服务器

2、摄取数据到Druid中

2.1 打开postman,请求地址设置为 http://node1:8090/druid/indexer/v1/task

2.2 请求方式选择为POST

2.3 body > raw > JSON(application/json)

2.4 将 资料中的index_ad_event.json文件 粘贴到 postman中

2.5 发送请求

3、执行 SQL 查询

3.1 打开 Druid 控制台 http://node3:8888

3.2 打开 Query 选项卡,执行以下SQL实现 按照商品分类、商品区域的产品订单总额

-- 分析2019年5月8日,按照商品分类、商品区域的产品订单总额
SELECTcategory,areaName,SUM(money) AS total_money,SUM("count") AS total_count
FROM "demo_order"
WHERE TIME_FORMAT("__time", 'yyyyMMdd') = '20190508'
GROUP BY category, areaName

Druid数据摄取

Druid支持流式和批量两种方式的数据摄入,针对不同类型的数据,Druid将外部数据源分为两种形式:

  • 流式数据源

    • 指的是持续不断地生产数据的数据源。例如:消息队列、日志、文件等
  • 静态数据源
    • 指的是数据已经生产完毕,不会有新数据产生的数据源。例如:文件系统的文件

批量(离线)数据摄取

批量数据可以通过两种方式来摄入:

摄取本地文件

需求:

  • 将摄取服务器本地上的 ad_event.json 数据到Druid中

操作步骤:

1、在某一个服务器节点中创建 /root/druid/data2 文件夹

2、上传数据文件和摄取配置文件

  • 将资料文件夹中的 druid测试数据源\广告点击数据中的 ad_event.json 上传到 /root/druid/data2目录中
  • 将 /root/druid/data2 目录分发到每个服务器节点

3、使用postman提交本地批量索引任务

  • 将index_ad_event.json文件中的内容拷贝到 postman 中
  • 发送post请求到http://node1:8090/druid/indexer/v1/task

4、可以在Overlord(http://node1:8090/console.html) 中查看到任务信息

6、在 http://node3:8888中测试查询数据

SELECT *
FROM "ad_event"
LIMIT 1

摄取HDFS文件

Druid支持加载HDFS上的数据。它会使用 HadoopDruidIndexer 加载批量数据,将数据生成 segments 文件,存放在HDFS上,再从HDFS下载 segments 文件到本地。然后便可从Druid中查询数据。

需求:

  • 摄取HDFS上的wikiticker-2015-09-12-sampled.json文件到Druid中

操作步骤:

1、启动HDFS集群、YARN集群

2、上传 druid测试数据源\维基百科访问日志数据到任意服务器 /root/druid/data3 目录,再将 wikiticker-2015-09-12-sampled.json 文件上传到HDFS

hadoop fs -put wikiticker-2015-09-12-sampled.json /

3、修改 index_wikiticker-2015-9-12-sample.json 文件中配置 HDFS 的地址

4、使用 postman 提交索引任务

  • 将index_wikiticker-2015-9-12-sample.json文件中的内容拷贝到 postman 中
  • 发送post请求到http://node1:8090/druid/indexer/v1/task

5、到 Druid控制台中执行SQL查询

SELECT *
FROM "wikiticker"
LIMIT 1

流式(实时)数据摄取

Kafka索引服务方式摄取

需求:

  • 实时摄取Kafka中 metrics topic的数据到 Druid中

操作步骤:

1、启动 Kafka 集群

2、在Kafka集群上创建一个名为metrics的topic

bin/kafka-topics.sh --create --zookeeper node1:2181,node2:2181,node3:2181, --partitions 1 --replication-factor 1 --topic metrics

3、定义摄取配置文件

  • 修改 druid测试数据源\kafka实时摄取数据中的 index-metrics-kafka.json 文件中的kafka服务器地址

4、打开postman提交索引任务

  • 将 index-metrics-kafka.json 文件中的内容拷贝到 postman 中
  • 发送post请求到http://node1:8090/druid/indexer/v1/supervisor

在Overlord中可以看到

4、在Kafka集群上开启一个控制台producer

bin/kafka-console-producer.sh --broker-list node1:9092,node2:2181,node3:2181 --topic metrics

5、在Kafka producer控制台中粘贴如下数据

{"time":"2019-07-23T17:57:58Z","url":"/foo/bar","user":"alice","latencyMs":32}
{"time":"2019-07-23T17:57:59Z","url":"/","user":"bob","latencyMs":11}
{"time":"2019-07-23T17:58:00Z","url": "/foo/bar","user":"bob","latencyMs":45}

6、在 Druid Console中执行以下SQL查询

SELECT *
from "metrics-kafka"
LIMIT 1

摄取配置文件结构说明

主体结构

摄取配置文件主要由以下几个部分组成:

  • type:文件上传方式(index、index_hadoop、kafka)
  • spec
    • dataSchema:数据解析模式
    • ioConfig:数据源
    • turningConfig:优化配置(分区规则、分区大小)
{// ① 文件上传方式// 1.1 index       - 上传本地文件// 1.2 index_hadoop - 上传HDFS文件// 1.3 kafka        - 拉取Kafka流数据"type": "index","spec": {// ② 数据解析模式"dataSchema": {...},// ③ 摄取数据源"ioConfig": {...},// ④ 摄取过程优化配置"tuningConfig": {...}}
}

数据解析模式

数据解析模式,主要为针对数据文件,定义了一系列规则:

  1. 获取时间戳属性
  2. 维度属性
  3. 度量属性
  4. 定义如何进行指标计算
  5. 配置粒度规则
// ② 数据摄取模式
"dataSchema": {// 2.1 数据源(表)"dataSource": "ad_event_local",// 2.2 解析器"parser": {// 2.2.1 解析字符串文本"type": "String","parseSpec": {// 2.2.1.1 字符串文本格式为JSON"format": "json",// 2.2.1.2 指定维度列名,维度与时间一致,导入时聚合"dimensionsSpec": {"dimensions": ["city","platform"]},// 2.2.1.3 指定时间戳的列,以及时间戳格式化方式"timestampSpec": {"format": "auto","column": "timestamp"}}},// 2.3 指标计算规则"metricsSpec": [{//name表示列名"name": "count","type": "count"},{// 2.3.1 聚合计算后指标的列名"name": "click",// 2.3.2 聚合函数:count、longSum、doubleSum、longMin、doubleMin、doubleMax"type": "longSum","fieldName": "click"}]// 2.4 粒度规则"granularitySpec": {"type": "uniform",// 2.4.1 按天来生成 segment (每天生成一个segment)"segmentGranularity": "day",// 2.4.2 查询的最小粒度(最小粒度为小时)"queryGranularity": "hour",// 2.4.3 加载原始数据的时间范围,批量数据导入需要设置/流式导入无需设置"intervals": ["2018-12-01/2018-12-03"]},}

数据源配置

数据源配置主要指定:

  • 要加载数据的类型
  • 从哪儿加载数据
"ioConfig": {"type": "index","inputSpec": {// 3.1 本地文件 local/ HDFS使用 hadoop"type": "local",// 3.2 路径"baseDir": "/root/data/",// 3.3 只过滤出来哪个文件"filter": "ad_event.json"}
}

优化配置

通常在优化配置中可以指定一些优化选项

"tuningConfig": {"type": "index",// 4.1 分区类型"partitionsSpec": {"type": "hashed",// 4.2 每个分区的目标行数(这里配置每个分区500W行)"targetPartitionSize": 5000000}
}

了解Druid WebUI生成 spec

Druid数据查询

下面以 「 广告点击数据 」为例,演示在Druid中使用不同方式来进行数据查询、分析。

1、JSON API方式

2、SQL方式(重点)

JSON API方式(了解)

JSON查询语法结构

Druid最早提供JSON API的方式查询数据,通过JSON格式来定义各种查询组件实现数据查询。

将JSON数据提交请求到: http://node3:8082/druid/v2?pretty

{"queryType":"search",// 1. 指定要查询的数据源"dataSource":"ad_event",// 2. 聚合器,描述如何进行聚合// 2.1 对哪个指标字段进行聚合// 2.2 进行哪种聚合// 2.3 指定聚合后的列名"aggregations":[{"type":"longSum",        "name":"click",            "fieldName":"click"       },{"type":"longSum","name":"pv","fieldName":"count"}],// 3. 指定查询的时间范围,前闭后开"intervals":["2018-06-02/2019-06-06"]
}

使用Postman来测试JSON API查询

  • 复制用于查询的JSON数据
{"queryType":"search","dataSource":"ad_event","aggregations":[{"type":"longSum",     "name":"click",            "fieldName":"click"       },{"type":"longSum","name":"pv","fieldName":"count"}],"intervals":["2018-06-02/2019-06-06"]
}
  • 发送请求到 http://node3:8082/druid/v2?pretty

SQL 方式(重点)

  • 使用Druid SQL查询,可以使用SQL查询来代替Druid原生基于JSON的查询方式
  • Druid SQL将SQL语句解析为原生JSON API方式,再执行查询

Druid SQL可视化界面

Druid 提供了一个图形界面SQL查询接口

查询语法

在Druid中,每一个数据源在Druid中都对应一张表,可以直接通过SELECT语句查询表中的数据

1、语法结构

Druid SQL支持的SELECT查询语法结构

[ EXPLAIN PLAN FOR ]
[ WITH tableName [ ( column1, column2, ... ) ] AS ( query ) ]
SELECT [ ALL | DISTINCT ] { * | exprs }
FROM table
[ WHERE expr ]
[ GROUP BY exprs ]
[ HAVING expr ]
[ ORDER BY expr [ ASC | DESC ], expr [ ASC | DESC ], ... ]
[ LIMIT limit ]
[ UNION ALL <another query> ]

1.1 EXPLAIN PLAN FOR

  • 在SELECT语句前面添加EXPLAIN PLAN FOR,可以查看到Druid SQL是如何解释为Druid JSON API查询的
  • SELECT语句并没有真正地执行

1.2 WITH tableName

  • 定义一个SQL片断,该SQL片断会被整个SQL语句所用到
WITH cr1 AS
(SELECT city, SUM(click) as click from ad_event GROUP BY 1)
select * from cr1 where city = 'beijing'

1.3 GROUP BY

  • GROUP BY 语句可以使用 1、2、…位置来替代
SELECT city, SUM(click) as click
from ad_event
GROUP BY 1

ORDER BY 也支持类似GROUP BY 的语法

1.4 UNION ALL

UNION ALL操作符表示将多个SELECT语句放在一起(并集),每个SELECT语句都会一个接一个单独执行(并不是并行执行),Druid当前并不支持 UNION(不支持去重)

2、Druid SQL不支持的功能

  • JOIN语句
  • DDL/DML语句

聚合函数

Druid SQL中的聚合函数可以使用以下语法:

AGG(expr) FILTER(WHERE whereExpr)

这样聚合函数只会聚合符合条件的行数据

SELECT city, sum("count") filter(where city != 'beijing') FROM "ad_event" GROUP BY city;

使用JDBC查询Druid中的数据

Druid提供了JDBC接口,JavaWeb项目可以直接使用 JDBC 连接Druid进行实时数据分析。

需求:

  • 获取 metrics-kafka 数据源中,不同用户的访问次数

实现步骤:

1、创建 druid_jdbc Maven模块

2、导入依赖

3、编写JDBC代码连接Druid获取数据

  • 加载Druid JDBC驱动
  • 获取Druid JDBC连接
  • 构建SQL语句
  • 构建Statement,执行SQL获取结果集
  • 关闭Druid连接

具体实现:

1、导入依赖

<dependency><groupId>org.apache.calcite.avatica</groupId><artifactId>avatica</artifactId><version>1.13.0</version>
</dependency>
<dependency><groupId>org.apache.calcite.avatica</groupId><artifactId>avatica-core</artifactId>
</dependency>

2、获取数据

/*** 使用JDBC操作Druid,获取实时分析结果*/
public class Main {public static void main(String[] args) throws Exception {// 1. 加载Druid JDBC驱动Class.forName("org.apache.calcite.avatica.remote.Driver");// 2. 获取Druid JDBC连接Connection connection = DriverManager.getConnection("jdbc:avatica:remote:url=http://node3:8888/druid/v2/sql/avatica/", new Properties());// 3. 构建SQL语句String sql = "SELECT user, sum(views) as view_count FROM \"metrics-kafka\" GROUP BY 1 ORDER BY 1";// 4. 构建Statement,执行SQL获取结果集Statement statement = connection.createStatement();ResultSet resultSet = statement.executeQuery(sql);// 5. 迭代ResultSetwhile(resultSet.next()) {String user = resultSet.getString("user");long view_count = resultSet.getLong("view_count");System.out.println(user + " -> " + view_count);}// 6. 关闭Druid连接resultSet.close();statement.close();connection.close();}
}

Druid实时OLAP分析

开发环境准备

启动Druid

1、node1节点(使用外部zk而不使用imply自带zk启动overlord和coordinator)

# 使用外部zk而不使用imply自带zk启动overlord和coordinator
/export/servers/imply-3.0.4/bin/supervise -c /export/servers/imply-3.0.4/conf/supervise/master-no-zk.conf

2、node2节点(启动historical和middlemanager)

/export/servers/imply-3.0.4/bin/supervise -c /export/servers/imply-3.0.4/conf/supervise/data.conf

3、node3节点(启动broker和router)

/export/servers/imply-3.0.4/bin/supervise -c /export/servers/imply-3.0.4/conf/supervise/query.conf

访问WebUI

组件名 URL
broker http://node3:8888
coordinator、overlord http://node1:8081/index.html
middleManager、historical http://node1:8090/console.html

点击流日志指标分析

操作步骤:

1、打开 postman

2、修改摄取Kafka实时数据 配置文件

  • 打开 Druid实时数据分析项目_配置文件\index_kafka_dws_click_log.json数据文件
  • 修改 Kafka 集群地址
  • 修改 topic 地址

3、复制JSON配置文件到 postman

4、发送请求到 http://node1:8090/druid/indexer/v1/supervisor

每日PV分析

SELECT SUM("count") as totalPVfrom"dws_click_log"where TIME_FORMAT("__time", 'yyyy-MM-dd') = '2010-09-05'

每日UV分析

SELECT COUNT(DISTINCT "uid") as totalPVfrom"dws_click_log"where TIME_FORMAT("__time", 'yyyy-MM-dd') = '2010-09-05'

每日IP分析

SELECT COUNT(DISTINCT "ip") as totalPVfrom"dws_click_log"where TIME_FORMAT("__time", 'yyyy-MM-dd') = '2010-09-05'

每日用户访问来源流量占比(百度、知乎、新浪、首页…)

SELECTreferDomain,  SUM("count") as total_countFROM"dws_click_log"WHERETIME_FORMAT("__time", 'yyyy-MM-dd') ='2010-09-05'GROUP BY 1

每日不同城市访问来源流量占比

SELECTprovince, city,SUM("count") as total_countFROM"dws_click_log"WHERETIME_FORMAT("__time", 'yyyy-MM-dd') ='2010-09-05'GROUP BY province,city

订单数指标分析

操作步骤:

1、打开 postman

2、修改摄取Kafka实时数据 配置文件

  • 打开 Druid实时数据分析项目_配置文件\index_kafka_dws_order.json数据文件
  • 修改 Kafka 集群地址
  • 修改 topic 地址

3、复制JSON配置文件到 postman

4、发送请求到 http://node1:8090/druid/indexer/v1/supervisor

日订单数分析

SELECTSUM("count") as total_countFROM"dws_order"WHERETIME_FORMAT("__time", 'yyyy-MM-dd') = '2019-01-06'

周订单数分析

SELECTSUM("count") as total_countFROM"dws_order"WHERE"__time" >= CURRENT_TIMESTAMP - INTERVAL '7' DAY

月订单数分析

SELECTSUM("count") as total_countFROM"dws_order"WHERE"__time" >= CURRENT_TIMESTAMP - INTERVAL '1' MONTH

今日各区域订单数(地图)

SELECTareaId,sum("count") as total_countFROM"dws_order"WHERETIME_FORMAT("__time", 'yyyy-MM-dd') = '2019-01-06' GROUP BY 1ORDER BY 2 DESC

周订单数趋势分析

分析一周内每日订单数

SELECTTIME_FORMAT("__time", 'yyyy-MM-dd') as "date",sum("count") as total_countFROM  "dws_order"WHERE"__time" >= CURRENT_TIMESTAMP - INTERVAL '7' DAYGROUP BY 1

今日区域订单的订单数Top8

  • 按照区域分组
  • 按照订单数降序排列
  • 取前8条
SELECTareaId,SUM("count") total_countFROM"dws_order"WHERETIME_FORMAT("__time", 'yyyy-MM-dd') = '2019-01-06' GROUP BY 1ORDER BY 2 DESCLIMIT 8

周销售环比分析

  • 获取上周每天总销售额
  • 获取本周每天中销售额
-- 上周SELECT'上周' as "week",TIME_FORMAT("__time", 'yyyy-MM-dd') as "date1",SUM("totalMoney") as total_moneyFROM"dws_order"WHERE"__time" BETWEEN (CURRENT_TIMESTAMP - INTERVAL '14' DAY) AND (CURRENT_TIMESTAMP - INTERVAL '7' DAY)GROUP BY 1,2UNION ALLSELECT'本周' as "week",TIME_FORMAT("__time", 'yyyy-MM-dd') as "date1",SUM("totalMoney") as total_moneyFROM"dws_order"WHERE"__time" >= CURRENT_TIMESTAMP - INTERVAL '7' DAYGROUP BY 1,2;

24小时销售额分析

SELECTTIME_FORMAT("__time", 'HH') as "hour",SUM("totalMoney") as "totalMoney"FROM"dws_order"WHERE"__time" >= CURRENT_TIMESTAMP - INTERVAL '1' DAYGROUP BY 1ORDER BY 2 DESC

今日top4地区销售排行

SELECTareaId,SUM("totalMoney") as "totalMoney"FROM"dws_order"WHERE"__time" >= CURRENT_TIMESTAMP - INTERVAL '1' DAYGROUP BY 1ORDER BY 2 DESCLIMIT 4

每日实际支付买家数

SELECT COUNT(DISTINCT "userId") as "totalCount"FROM "dws_order" WHERE TIME_FORMAT("__time", 'yyyy-MM-dd') = '2019-01-06'  and isPay=1

每日购物车支付转换率

SELECT SUM("count") as "totalCount",       --找到总的已支付的订单SUM(case when isFromCart=0 then 1 else 0 end) as "cartTotalCount"    --直接下单的订单数量FROM "dws_order" WHERE TIME_FORMAT("__time", 'yyyy-MM-dd') = '2019-01-06'  and isPay=1

商品消息数指标分析

操作步骤:

1、打开 postman

2、修改摄取Kafka实时数据 配置文件

  • 打开 Druid实时数据分析项目_配置文件\index_kafka_dws_goods.json数据文件
  • 修改 Kafka 集群地址
  • 修改 topic 地址

3、复制JSON配置文件到 postman

4、发送请求到 http://node1:8090/druid/indexer/v1/supervisor

每日商家商品数量

SELECTshopId,COUNT(DISTINCT "goodsId") as total_countFROM "dws_goods"WHERETIME_FORMAT("__time", 'yyyy-MM-dd') ='2019-02-23'GROUP BY shopId

每日商家商品品牌数量

SELECTshopId,COUNT(DISTINCT "brandId") as total_countFROM "dws_goods"WHERETIME_FORMAT("__time", 'yyyy-MM-dd') ='2019-02-23'GROUP BY shopId

每日首发上架商品数

SELECTshopId,COUNT(DISTINCT "goodsId") as total_countFROM "dws_goods"WHERETIME_FORMAT("__time", 'yyyy-MM-dd') ='2019-02-23' and isSale=1GROUP BY shopId

购物车消息数指标分析

操作步骤:

1、打开 postman

2、修改摄取Kafka实时数据 配置文件

  • 打开 Druid实时数据分析项目_配置文件\index_kafka_dws_cart.json数据文件
  • 修改 Kafka 集群地址
  • 修改 topic 地址

3、复制JSON配置文件到 postman

4、发送请求到 http://node1:8090/druid/indexer/v1/supervisor

每日加入购物车次数

SELECTSUM("count") as total_countFROM "dws_cart"WHERETIME_FORMAT("__time", 'yyyy-MM-dd') ='2019-12-16'

每日加入购物车买家数

SELECTCOUNT(DISTINCT "userId") as total_countFROM "dws_cart"WHERETIME_FORMAT("__time", 'yyyy-MM-dd') ='2019-12-16'

每日加入购物车商品数

SELECT SUM("totalGoods") as "totalCount"FROM "dws_cart" WHERE TIME_FORMAT("__time", 'yyyy-MM-dd') = '2019-12-16'

评论数指标分析

操作步骤:

1、打开 postman

2、修改摄取Kafka实时数据 配置文件

  • 打开 Druid实时数据分析项目_配置文件\index_kafka_dws_comments.json数据文件
  • 修改 Kafka 集群地址
  • 修改 topic 地址

3、复制JSON配置文件到 postman

4、发送请求到 http://node1:8090/druid/indexer/v1/supervisor

每日买家评价数

select userId,SUM("count") as totalCountfrom dws_commentsWHERETIME_FORMAT("__time", 'yyyy-MM-dd') = '2019-12-06' GROUP BY userId

每日买家评价卖家数

select userId,shopId,SUM("count") as totalCountfrom dws_commentsWHERETIME_FORMAT("__time", 'yyyy-MM-dd') = '2019-12-06' GROUP BY 1,2

每日买家好评率

select nuserId,SUM("count") as totalCount,SUM(case when starScore>'3' then "count" else 0 end) as goodCount from dws_commentsWHERETIME_FORMAT("__time", 'yyyy-MM-dd') = '2019-12-06' GROUP BY userId

数据可视化项目

操作步骤:

1、导入 itcast_dw_web 项目

2、修改 DashboardServiceImpl.java 中 Redis 服务器地址

3、修改 utils.DruidHelper 中Druid的 url 地址

4、修改druid连接字符串的表名:dws_od改成dws_order

5、启动 Jetty 服务器

6、打开浏览器访问 http://localhost:8080/itcast_dw_web

Druid架构与原理

Druid系统架构详解

Druid有5种节点:

  • Overlord
  • MiddleManager
  • Coordinator
  • Historical
  • Broker

以下是这几个节点的主要功能:

  • Overlord、MiddleManager

    • 负责处理索引任务
    • Overlord是MiddleManager的master节点
  • Coordinator、Historical
    • 负责管理分发Segment
    • Coordinator是Historical的master节点
  • Broker
    1. 负责接收Client查询请求
    2. 拆分子查询给MiddleManager和Historical节点
    3. 合并查询结果返回给Client

索引服务

  • 索引服务是数据摄入创建和销毁Segment的重要方式
  • Druid提供一组支持索引服务(Indexing Service)的组件,即Overlord和MiddleManager节点
  • 索引服务采用的是主从架构,Overlord为主节点,MiddleManager是从节点

索引服务架构图如下图所示:

索引服务由三部分组件组成:

  • Overlord组件

    • 分配任务给MiddleManager
  • MiddleManager组件
    • 用于管理Peon的
  • Peon(劳工)组件
    • 用于执行任务

部署:

  • MiddleManager和Overlord组件可以部署在相同节点也可以跨节点部署
  • Peon和MiddleManager是部署在同一个节点上的

索引服务架构和Yarn的架构很像:

  • Overlaod => ResourceManager,负责集群资源管理和任务分配
  • MiddleManager => NodeManager,负责接受任务和管理本节点的资源
  • Peon => Container,执行节点上具体的任务

Overlord节点

  • Overlord是索引服务的主节点,对外负责接受索引任务,对内负责将任务分解并下发给MiddleManager
  • Overlord有两种运行模式:
    • 本地模式(Local Mode):默认模式。本地模式下的Overlord不仅负责任务协调工作,还会负责启动一些peon来完成具体的任务。
    • 远程模式(Remote Mode):该模式下,Overlord和MiddleManager运行在不同的节点上,它仅负责任务的协调工作,不负责完成具体的任务。
  • Overlord提供了一个UI客户端,可以用于查看任务、运行任务和终止任务等
    • http://node1:8090/console.html
  • Overlord提供了RESETful的访问形式,所以客户端可以通过HTTP POST形式向请求节点提交任务
    • 提交任务:http://node1:8090/druid/indexer/v1/task
    • 杀死任务:http://node1:8090/druid/indexer/v1/task/{task_id}/shutdown

MiddleManager节点

  • MiddleManager是执行任务的工作节点
  • MiddleManager会将任务单独发给每个单独JVM运行的Peon
  • 每个Peon一次只能运行一个任务

Coordinator节点

  • Coordinator是Historical的mater节点,主要负责管理和分发Segment
  • 具体工作就是
    • 告知Historical加载或删除Segment
    • 管理Segment副本以及负载Segment在Historical上的均衡
  • Coordinator是定期运行的,通过Zookeeper获取当前集群状态,通过评估集群状态来进行均衡负载Segment
  • Coordinator连接数据库(MetaStore),获取Segment信息和规则(Rule),Coordinator根据数据库中表的数据来进行集群 segment 管理
  • Coordinator提供了一UI界面,用于显示集群信息和规则配置
    • http://node1:8081/index.html#/

Historical节点

  • Historical节点负责管理历史Segment
  • Historical节点通过Zookeeper监听指定的路径来发现是否有新的Segment需要加载
  • Historical节点收到有新的Segment时候,就会检测本地cache和磁盘,查看是否有该Segment信息。如果没有Historical节点会从Zookeeper中拉取该Segment相关的信息,然后进行下载

Broker节点

  • Broker节点负责转发Client查询请求的
  • Broker通过zookeeper能够知道哪个Segment在哪些节点上,将查询转发给相应节点
  • 所有节点返回数据后,Broker会将所有节点的数据进行合并,然后返回给Client

Druid数据存储

Druid提供对大数据集的实时摄入和高效复杂查询的性能,主要原因:基于Datasource与Segment的数据存储结构

数据存储

  • Druid中的数据存储在被称为DataSource中,DataSource类似RDMS中的table
  • 每个DataSource按照时间划分,每个时间范围称为一个chunk((比如按天分区,则一个chunk为一天))
  • 在chunk中数据由被分为一个或多个segment
    • segment是数据实际存储结构,Datasource、Chunk只是一个逻辑概念
  • 每个segment都是一个单独的文件,通常包含几百万行数据
  • segment是按照时间组织成的chunk,所以在按照时间查询数据时,效率非常高

数据分区

  • Druid处理的是事件数据,每条数据都会带有一个时间戳,可以使用时间进行分区
  • 上图指定了分区粒度为为天,那么每天的数据都会被单独存储和查询

Segment

  • Segment是数据存储、复制、均衡和计算的基本单元
  • Segment具备不可变性,一个Segment一旦创建完成后(MiddleManager节点发布后)就无法被修改
  • 只能通过生成一个新的Segment来代替旧版本的Segment

Segment内部存储结构

  • Druid采用列式存储,每列数据都是在独立的结构中存储
  • Segment中的数据类型主要分为三种
    • 时间戳
    • 维度列
    • 指标列

  • 时间戳列和指标列

    • Druid采用LZ4压缩每列的整数或浮点数
    • 收到查询请求后,会拉出所需的行数据(对于不需要的列不会拉出来),并且对其进行解压缩
  • 维度列
    • 维度列需要支持filter和group by
    • Druid使用了字典编码(Dictionary Encoding)和位图索引(Bitmap Index)来存储每个维度列
    • 每个维度列需要三个数据结构
      • 需要一个字典数据结构,将维度值映射成一个整数ID
      • 使用上面的字典编码,将该列所有维度值放在一个列表中
      • 对于列中不同的值,使用bitmap数据结构标识哪些行包含这些值。

Druid针对维度列之所以使用这三个数据结构,是因为:

  • 使用字典将字符串映射成整数ID,可以紧凑的表示维度数据
  • 使用Bitmap位图索引可以执行快速过滤操作
    • 找到符合条件的行号,以减少读取的数据量
    • Bitmap可以快速执行AND和OR操作

roll-up聚合

  • Druid通过一个roll-up的处理,将原始数据在注入的时候就进行汇总处理
  • roll-up可以压缩我们需要保存的数据量
  • Druid会把选定的相同维度的数据进行聚合操作,可减少存储的大小
  • Druid可以通过 queryGranularity 来控制注入数据的粒度。 最小的queryGranularity 是 millisecond(毫秒级)

Roll-up聚合前:

time app_key area value
2019-10-05 10:00:00 area_key1 Beijing 1
2019-10-05 10:30:00 area_key1 Beijing 1
2019-10-05 11:00:00 area_key1 Beijing 1
2019-10-05 11:00:00 area_key2 Shanghai 2

Roll-up聚合后:

time app_key area value
2019-10-05 area_key1 Beijing 3
2019-10-05 area_key2 Shanghai 2

位图索引

以下为一个DataSource(表)中存储的数据。

  • 第一列为时间,Appkey和area都是维度列,value为metric列
  • Druid会在导入阶段自动对数据进行Rollup,将维度相同组合的数据进行聚合处理

按天聚合后的数据如下

Druid通过建立位图索引,来实现快速进行数据查找。

索引如下所示:

  • 索引位图可以看作是HashMap<String, Bitmap>

    • key就是维度的取值
    • value就是该表中对应的行是否有该维度的值

以SQL查询为例:
1)boolean条件查询

select sum(value)
from AD_areauser
where time=’2017-10-11’ and Appkey in (‘appkey1’,’appkey2’) and area=’北京’

执行过程分析:

  1. 根据时间段定位到segment
  2. Appkey in (‘appkey1’, ‘appkey2’) and area=’北京’查到各自的bitmap
    • (appkey1(1000) or appkey2(0110)) and (北京(1100) ) = 1000 or 0110 = 1110
    • (1000 or 0110) and 1100 = 1110 and 1100 = 1100
    • 符合条件的列为第一行和第二行,这两行的 sum(value) 的和为26.

2)group by 查询

select area, sum(value)
from AD_areauser
where time=2017-10-11and Appkey in (‘appkey1’,’appkey2’)
group by area

该查询与上面的查询不同之处在于将符合条件的列

  • appkey1(1000) or appkey2(0110) = (1110)
  • 将第一行、第二行、第三行取出来
  • 在内存中做分组聚合。结果为:北京:26, 上海:13.

本次项目使用Druid来进行实时OLAP分析,通过Flink预处理Kafka的数据,再将预处理后的数据下沉到Kafka中。再基于Druid进行数据分析。

Superset

BI VS 报表工具

  • 报表工具是数据展示工具,而BI(商业智能)是数据分析工具。报表工具可以制作各类数据报表、图形报表的工具,甚至还可以制作电子发票联、流程单、收据等。

  • BI可以将数据进行模型构建,制作成Dashboard,相比于报表,侧重点在于分析,操作简单、数据处理量大。常常基于企业搭建的数据平台,连接数据仓库进行分析。

简介

Superset是一款开源的现代化企业级BI。它是目前开源的数据分析和可视化工具中比较好用的,功能简单但可以满足我们对数据的基本需求,支持多种数据源,图表类型多,易维护,易进行二次开发。

功能

  • 丰富的数据可视化集
  • 易于使用的界面,用于浏览和可视化数据
  • 创建和共享仪表板
  • 与主要身份验证提供程序(数据库,OpenID,LDAP,OAuth和REMOTE_USER通过Flask AppBuilder集成)集成的企业就绪身份验证
  • 可扩展的高粒度安全性/权限模型,允许有关谁可以访问单个要素和数据集的复杂规则
  • 一个简单的语义层,允许用户通过定义哪些字段应显示在哪些下拉列表中以及哪些聚合和功能度量可供用户使用来控制如何在UI中显示数据源
  • 通过SQLAlchemy与大多数说SQL的RDBMS集成
  • 与Druid.io的深度集成

支持的数据库

superset现在支持的所有数据库或分析引擎:

  • Amazon Athena
  • Amazon Redshift
  • Apache Drill
  • Apache Druid
  • Apache Hive
  • Apache Impala
  • Apache Kylin
  • Apache Pinot
  • Apache Spark SQL
  • BigQuery
  • ClickHouse
  • Google Sheets
  • Greenplum
  • IBM Db2
  • MySQL
  • Oracle
  • PostgreSQL
  • Presto
  • Snowflake
  • SQLite
  • SQL Server
  • Teradata
  • Vertica

界面

安装

安装python3

首先升级python版本,我们使用Anaconda来安装Python3版本的python。

1、首先去Anaconda官网下载安装脚本

资料\superset\Anaconda3-2019.07-Linux-x86_64.sh

2、上传Anaconda3-2019.07-Linux-x86_64.sh

使用 FileZilla 上传到node3 /export/softwares

3、运行Anaconda3-2019.07-Linux-x86_64.sh脚本

sh Anaconda3-2019.07-Linux-x86_64.sh

安装过程输入:回车、yes、

Anaconda安装目录设置为:/export/servers/anaconda

4、配置环境变量

vim /etc/profile
#Anaconda
export PATH=$PATH:/root/anaconda3/bin
source /etc/profile

5、验证是否安装python3成功

python3

提示出现python3.x版本即安装成功!!

退出使用quit();

注意:对于重新打开的终端连接会出现base字样,消除方法:

若在终端中输入conda deactivate,也可消除base字样,但是一次性的,再次打开终端依然存在base字样。在.bashrc文件(home目录下)添加命令:conda deactivate可以永久消除base字样。

至此python3已经安装成功。

安装superset

1、安装依赖

yum upgrade python-setuptools
yum install gcc gcc-c++ libffi-devel python-devel python-pip python-wheel openssl-devel libsasl2-devel openldap-devel

1、pip安装superset

cd /export/servers/anaconda3/
pip install superset

需要联网下载文件等待一段时间

2、创建管理员用户名和密码

fabmanager create-admin --app superset

记住以下信息,登录使用:

Username [admin]: admin
User first name [admin]: admin
User last name [user]: admin
Email [admin@fab.org]:
Password: 123456
Repeat for confirmation: 123456
Recognized Database Authentications.
Admin User admin created.

3、初始化superset

superset db upgrade

4、装载初始化数据

superset load_examples

5、创建默认角色和权限

superset init

6、启动superset

superset run -h node3 -p 8080 --with-threads --reload --debugger

7、登录superset

http://node3:8080/superset/welcome

用户名: admin

密码:123456

切换到中文

8、Superset 初体验

superset入门案例

需求:

  • 使用Superset展示不同性别的用户人数

  • 效果图

准备环境

yum install python-devel -y
yum install mysql-devel -y
yum install gcc -y
pip install mysqlclient

实现步骤:

1、导入MySQL数据源

导入资料中的 superset\数据源\superset_demo.sql

2、添加新的数据库

mysql的url地址

mysql://root:123456@node1/superset_demo?charset=utf8

3、点击 SQLLab > SQL Editor编写以下SQL语句

选择 数据库

选择表,查看表的列

参考SQL语句:

selectcase when gender = 0 then '男'when gender = 1 then '女'else '保密'end as gender,count(id) as total_cnt
from t_user
group by gender

4、保存查询

5、点击 saved queries

  • 运行查询,点击 Explore 浏览数据

6、配置图表类型为 Bar Chart 条形图

7、指定统计指标 sum(total_cnt)

8、指定序列为 gender(性别)

Superset功能介绍

  • 用户权限
  • Sources
  • Manage
  • Charts
  • Dashboards
  • SQL Lab

Superset实战 - MySQL订单分析案例

Superset Charts图表展示实战

1、根据日期统计,每日订单总额(趋势图)

select str_to_date(date1,'%Y-%m-%d') date1,sum(price) total_price
fromdm_sales
group by date1;

2、根据日期、渠道统计订单总额(Sunburst Chart)

selectdate1,channelname,sum(price) total_price
fromdm_sales
group by date1,channelname

3、根据日期、区域统计订单总额(数据透视表)

selectstr_to_date(date1,'%Y-%m-%d') date1,regionname,sum(amount) as total_amount,sum(price) as total_price
fromdm_sales
group bydate1,regionname

4、根据日期、区域、渠道、产品统计订单数、订单总额(层级环图)

selectdate1,regionname,channelname,productname,sum(price) as total_price
fromdm_sales
group bydate1,regionname,channelname,productname

Superset Dashboards看板展示实战

将之前设计好的图标整合到看板中

操作步骤:

1、点击 Dashboards > 添加看板

2、拖动之前开发好的 Charts 到看板中

Superset权限控制

Superset初始化权限之后,创建5个角色,分别为Admin,Alpha,Gamma,sql_lab以及Public。Admin,Alpha和Gamma角色,分配了很多的菜单/视图权限,如果手工去修改,改错的可能性很大,加之Superset并没有说明每一项权限的完整文档,所以不建议去修改这些角色的定义。灵活使用预置的角色,可以快速满足业务上安全控制需求。

角色权限介绍

  • Admin:拥有所有权限

  • Alpha:能访问所有数据源,增加或者更改数据源,但不能给更改其他用户权限。

  • Gamma:必须结合其他能访问数据源的角色才能访问数据。这个角色所能访问的切片和看板,也是基于能访问数据源所创建的切片和看板。

  • sql_lab:能访问SQL Lab菜单。

  • Public:默认没有任何权限

匿名访问

所有用户都能访问某一个看板,需要进行如下设置 :

1、更改config.py文件,设置如下部分,PUBLIC_ROLE_LIKE_GAMMA = True

vim /export/servers/anaconda3/lib/python3.7/site-packages/superset/config.py

2、需要运行superset init命令,这个命令会给“Public”角色设置与“Gamma”一样的权限

superset init

3、将匿名用户所需要访问的数据库和数据源分配给“Public”角色。例如,基于superset_demo数据库的grade_test创建了看板,如果匿名用户需要查看这个看板,那将如下权限分配给“Public”。

  • all database access on all_database_access
  • all datasource access on all_datasource_access

  • 删除一些菜单权限:

分享页面或者嵌入html

html页面:

<html>
<head>
<title>dashboard</title>
</head>
<body><div class="dashboard"><!-- <iframe src="http://node3:8080/superset/dashboard/7/"  style="height=100%; width=100%" ></iframe > --><iframe name="myframe" src="http://node3:8080/r/10" frameborder="0" scrolling="auto" width="100%" height="100%" onload="document.all['myframe'].style.height=myframe.document.body.scrollHeight" ></iframe></div></body>
</html>

角色介绍

实际业务中,不同的职能部门访问的数据不一样,例如财务部需要访问成本,应收,应付等数据,供应链需要访问库存数量,发货数据等,怎样简洁的设置,快速满足这种业务需求?

如前文所述,“Gamma”拥有大部分基础的权限,但是必须结合其他能访问数据源的角色才能访问数据。所以,可以给用户分配“Gamma”角色和针对部门分别创建的数据源角色来进行控制。

例如,针对财务用户,创建角色“Finance”,将成本,应收,应付的数据表权限赋予这个角色,财务用户就分配“Gamma”和“Finance”。

针对供应链用户,创建角色“SCM”,将库存和发货数据表权限赋予这个角色,供应链用户就配“Gamma”和“SCM”。

如果是公司的霸道总裁,需要看所有的看板,就可以给霸道总裁赋予“Gamma”和“Finance”,“SCM”角色。

我们创建2个角色,分别是main角色可以查看访问main的数据,

examples角色可以查看和访问 examples 数据源。

1、创建 main 角色

  • database access on [main] 拥有访问 main 数据库的权限
  • datasource access on [main] 拥有访问main 数据源的权限
  • can dashboard on Superset 拥有访问 main 数据源创建的 dashboard的权限

2、创建examples角色

  • database access on [examples] 拥有访问 examples数据库的权限
  • datasource access on [examples] 拥有访问examples数据源的权限
  • can dashboard on Superset 拥有访问 examples数据源创建的 dashboard的权限

3、创建用户

  • main_user: 关联gamma、sqllab与main角色;

  • examples_user: 关联gamma、sqllab与examples角色;

  • 用不同的用户登录查看每个用户具有的table,以及能查看到的dashboard!!

使用Supset进行业务开发

Superset对接MySQL展示离线指标数据

准备:

  • 导入 资料\superset\数据源\ads_dim_table.sql

1、添加之前离线阶段开发好的 itcast_ads_shop 数据库数据源

  • jdbc连接:mysql://root:123456@node1/itcast_ads_shop?charset=utf8

2、统计指定日期 按照大区获取订单总额

selectt2.orgname as regionname,allprice
fromads_trade_order t1left join itcast_org t2on t1.regionid = t2.orgid
where dt = '20190905' and regionid != '9999'

2、统计指定日期 按照大区、商品一级分类ID、订单总额

selectt4.orgname,t2.catname,allprice
fromads_trade_order t1left join itcast_goods_cats t2 on t1.firstcatid  = t2.catId left join itcast_goods_cats  t3on t1.secondcatid  = t3.catId left join itcast_org t4on t1.regionid  = t4.orgId left join itcast_org t5on t1.cityid  = t5.orgId
where dt = '20190905' and regionid != '9999' and firstcatid != '9999'

Superset对接Kylin展示离线指标数据

环境准备:

  • 添加 kylin 支持
 pip install kylinpy
  • 启动Kylin

    • 启动HDFS
    • 启动HBASE
    • 启动Hive metastore、hiveserver2

添加 kylin JDBC数据库

  • kylin的JDBC url 为
kylin://ADMIN:KYLIN@node1:7070/itcast_shop

1、统计大区、店铺分类、支付方式订单总额

select REGIONID ,SHOPID ,PAYTYPE ,SUM (GOODSPRICE ) total_money
FROM TMP_ORDER_GOODS_CAT_ORG
group by REGIONID, SHOPID ,PAYTYPE

2、统计大区、一级分类、支付方式订单笔数、订单总额

select REGIONID ,FIRSTCAT ,PAYTYPE ,count(DISTINCT ORDERID ) total_amount,SUM (GOODSPRICE ) total_money
FROM TMP_ORDER_GOODS_CAT_ORG
group by REGIONID, FIRSTCAT , PAYTYPE

Superset对接Druid展示订单实时指标数据

1、环境准备

安装依赖

pip install pydruid

Druid的JBDC URL为

  • druid://node3:8888/druid/v2/sql/

2、创建druid的database

3、今日top4地区销售排行

SELECT
areaId,
SUM("totalMoney") as totalMoney
FROMdws_od
WHERE
__time > CURRENT_TIMESTAMP - INTERVAL '1' DAY
GROUP BY 1
ORDER BY 2 DESC
LIMIT 4

4、Top8区域订单的订单数

SELECTareaId,SUM("count") as totalCnt
FROMdws_od
GROUP BY 1
ORDER BY 2 DESC
LIMIT 8

创建看板

examples角色可以查看和访问 examples 数据源。

1、创建 main 角色

  • database access on [main] 拥有访问 main 数据库的权限
  • datasource access on [main] 拥有访问main 数据源的权限
  • can dashboard on Superset 拥有访问 main 数据源创建的 dashboard的权限

[外链图片转存中…(img-DEdT4nAh-1651114444962)]

2、创建examples角色

  • database access on [examples] 拥有访问 examples数据库的权限
  • datasource access on [examples] 拥有访问examples数据源的权限
  • can dashboard on Superset 拥有访问 examples数据源创建的 dashboard的权限

[外链图片转存中…(img-JwN0grGI-1651114444963)]

3、创建用户

  • main_user: 关联gamma、sqllab与main角色;

[外链图片转存中…(img-TXpYPTOy-1651114444966)]

  • examples_user: 关联gamma、sqllab与examples角色;

[外链图片转存中…(img-M8LHS04P-1651114444967)]

  • 用不同的用户登录查看每个用户具有的table,以及能查看到的dashboard!!

使用Supset进行业务开发

Superset对接MySQL展示离线指标数据

准备:

  • 导入 资料\superset\数据源\ads_dim_table.sql

1、添加之前离线阶段开发好的 itcast_ads_shop 数据库数据源

  • jdbc连接:mysql://root:123456@node1/itcast_ads_shop?charset=utf8

2、统计指定日期 按照大区获取订单总额

selectt2.orgname as regionname,allprice
fromads_trade_order t1left join itcast_org t2on t1.regionid = t2.orgid
where dt = '20190905' and regionid != '9999'

[外链图片转存中…(img-NgFzGqCt-1651114444968)]

2、统计指定日期 按照大区、商品一级分类ID、订单总额

selectt4.orgname,t2.catname,allprice
fromads_trade_order t1left join itcast_goods_cats t2 on t1.firstcatid  = t2.catId left join itcast_goods_cats  t3on t1.secondcatid  = t3.catId left join itcast_org t4on t1.regionid  = t4.orgId left join itcast_org t5on t1.cityid  = t5.orgId
where dt = '20190905' and regionid != '9999' and firstcatid != '9999'

[外链图片转存中…(img-ZL3Crzbu-1651114444970)]

Superset对接Kylin展示离线指标数据

环境准备:

  • 添加 kylin 支持
 pip install kylinpy
  • 启动Kylin

    • 启动HDFS
    • 启动HBASE
    • 启动Hive metastore、hiveserver2

添加 kylin JDBC数据库

  • kylin的JDBC url 为
kylin://ADMIN:KYLIN@node1:7070/itcast_shop

1、统计大区、店铺分类、支付方式订单总额

select REGIONID ,SHOPID ,PAYTYPE ,SUM (GOODSPRICE ) total_money
FROM TMP_ORDER_GOODS_CAT_ORG
group by REGIONID, SHOPID ,PAYTYPE

[外链图片转存中…(img-u4rmg4VA-1651114444974)]

2、统计大区、一级分类、支付方式订单笔数、订单总额

select REGIONID ,FIRSTCAT ,PAYTYPE ,count(DISTINCT ORDERID ) total_amount,SUM (GOODSPRICE ) total_money
FROM TMP_ORDER_GOODS_CAT_ORG
group by REGIONID, FIRSTCAT , PAYTYPE

[外链图片转存中…(img-PuaMWg3u-1651114444975)]

Superset对接Druid展示订单实时指标数据

1、环境准备

安装依赖

pip install pydruid

Druid的JBDC URL为

  • druid://node3:8888/druid/v2/sql/

2、创建druid的database

[外链图片转存中…(img-R1ewQNLf-1651114444976)]

3、今日top4地区销售排行

[外链图片转存中…(img-ewVjLC0E-1651114444979)]

SELECT
areaId,
SUM("totalMoney") as totalMoney
FROMdws_od
WHERE
__time > CURRENT_TIMESTAMP - INTERVAL '1' DAY
GROUP BY 1
ORDER BY 2 DESC
LIMIT 4

4、Top8区域订单的订单数

[外链图片转存中…(img-mAthm1Wb-1651114444980)]

SELECTareaId,SUM("count") as totalCnt
FROMdws_od
GROUP BY 1
ORDER BY 2 DESC
LIMIT 8

创建看板

[外链图片转存中…(img-zRED3vgA-1651114444982)]

大数据智慧数字电商第五课 程序整合 可视化和BI分析相关推荐

  1. 大数据智慧数字电商第四课 数据拉取和etl处理

    大数据数仓项目第04天 课程目标 能够点击流日志实时拉宽处理 能够对订单数据以及订单明细数据进行实时etl处理 能够使用Flink异步IO拉取Redis维度数据 能够对商品数据以及购物车和评论数据实时 ...

  2. 大数据智慧数字电商第三课 数据解析和etl落地dwd

    实时数仓第3天讲义 课程目标 能够编写Flink程序解析Kafka中的ProtoBuf 能够实现维度数据全量装载.增量更新 能够使用Logparsing框架解析Nginx点击流日志 能够实现点击流消息 ...

  3. 大数据智慧数字电商第一课 实时数仓技术选型和架构设计

    实时数仓第1天讲义 学习目标 理解实时数仓项目的基本需求.整体架构 了解常用实施方案 能够编写Canal客户端采集binlog消息 理解google ProtoBuf序列化方式 理解Canal采集原理 ...

  4. 大数据下的电商风控体系——李学庆

    由51CTO举办的WOT"互联网+"时代大数据技术峰会上,来自京东商城安全管理部经理李学庆做了以<大数据下的电商风控体系>为主题的演讲.本文章是把本次分享的干货亮点整理 ...

  5. 除了啤酒与尿布 大数据又助电商玩口碑营销

    在快消行业,啤酒与尿布成了大数据应用的经典案例:全球零售业巨头沃尔玛通过对消费者购物行为进行分析时发现,男性顾客在购买婴儿尿片时,常常会顺便搭配几瓶啤酒来犒劳自己,于是沃尔玛将啤酒和尿布陈列在一起,大 ...

  6. 《Storm技术内幕与大数据实践》作者陈敏敏谈大数据技术在电商领域的应用

    在10月15~17日的QCon上海2015上,1号店资深架构师.<Storm技术内幕与大数据实践>一书作者陈敏敏将分享<1号店通用精准化平台架构以及大数据营销实践>.在大会开始 ...

  7. 大数据下的电商新打法

    在依靠大数据训练出人工智能后,电商获取流量的方式已经全然改变了. Part1: 从数字化到智能化 行业的发展总会依照一定的轨迹,从自动化到数字化,再从数字化到智能化.之前的电商做到了数字化,把产品信息 ...

  8. 大数据项目之电商分析平台(2)

    第三章  .程序框架解析 3.1.模块分析 3.1.1.commons模块 1. conf 包 代码清单 3-1 ConfigurationManager类 /** * 配置工具类 */ object ...

  9. 大数据项目实战——电商推荐系统设计

    摘要 1 项目体系架构设计 1.1系统架构设计 项目以推荐系统建设领域知名的经过修改过的中文亚马逊电商数据集作为依托,以某电商网站真实业务数据架构为基础,构建了符合实践项目的一体化的电商推荐系统,包含 ...

最新文章

  1. 1-1 机器学习和深度学习综述-paddle
  2. 基于加权投票的尖峰神经活动数据高效解码
  3. 修改jar中的class文件
  4. array_multisort - 如何保持键值,不重置键值,键名保持不变
  5. django升级问题
  6. android开发Proguard混淆与反射
  7. git add失效问题以及git status结果与github网页结果不一致(转载+自己整理)
  8. [IPhone] 仅在Debug编译的NSLog
  9. html5 canvas文字颜色,我可以通过HTML5 Canvas中的字符文本颜色来做吗?
  10. 60分钟入门PyTorch,官方教程手把手教你训练第一个深度学习模型
  11. iOS系统什么天气app可以访问锁屏?
  12. Apizza-响应json数据上设置浮动注释
  13. maven的下载与安装教程(超详细)
  14. PDA扫描 Geenk scan 的方法列表
  15. 全网最全数据分析师面试干货-业务逻辑篇
  16. 运动健身耳机什么好?四款运动蓝牙耳机之好评
  17. 什么是Tableau(BI工具)
  18. android5.1修改默认锁屏方式(去除锁屏)
  19. put短语(put短语归纳)
  20. (一)Ubuntu安装详细教程(从镜像制作到NVIDIA驱动安装全流程)——超详细的图文教程

热门文章

  1. ios安装python的步骤,iOS常见砸壳方法
  2. 解决微信ios端+sendReq: not found
  3. 腾讯、阿里校招面试真题-常见经典题
  4. macbook pro m1 在 EXCEL 中安装 Excel2Latex
  5. F5 Networks:应用交付的“4G”挑战
  6. 黄冈师范学院计算机科学与技术学院,黄冈师范学院 计算机科学与技术学院 刘小俊老师简介 联系方式 手机电话 邮箱...
  7. 马来西亚吉隆坡召开的2010OpenWebAsia大会
  8. C++20,说说 Module 那点事儿
  9. C语言 从键盘输入圆的半径,计算并输出圆的正内接六边形的面积,以及这个正六边形面积占圆面积之百分比
  10. 跳石板-2017网易校招