HybridDB PostgreSQL Sort、Group、distinct 聚合、JOIN 不惧怕数据倾斜的黑科技和原理 - 多阶段聚合...
标签
PostgreSQL , Greenplum , JOIN , group by , distinct , 聚合 , 非分布键 , 数据倾斜 , 多阶段聚合
背景
对于分布式系统,数据分布存储,例如随机、哈希分布。
Greenplum数据库支持两种数据分布模式:
1、哈希(指定单个、或多个字段)
2、随机分布(无需指定任何字段)
数据分布存储后,面临一些挑战:
JOIN,排序,group by,distinct。
1、JOIN涉及非分布键字段
2、排序,如何保证输出顺序全局有序
3、group by非分布键字段
4、distinct设计非分布键字段
一些功能不完整的数据库,可能无法支持以上功能。
Greenplum商业化数十年,功能方面非常完善,那么它有什么秘密法宝呢?
( HybridDB for PostgreSQL基于GPDB开源版本改进而来,已包含这个功能。 )
非分布键 JOIN,排序,group by,distinct
1、非分布键 group by
例子,
tbl_ao_col表是c1的分布键,但是我们group by使用了c398字段,因此看看它是怎么做的呢?请看执行计划的解释。
postgres=# explain analyze select c398,count(*),sum(c399),avg(c399),min(c399),max(c399) from tbl_ao_col group by c398; QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------------------------------------- Gather Motion 48:1 (slice2; segments: 48) (cost=123364.18..123582.28 rows=9693 width=96) // 返回结果 Rows out: 10001 rows at destination with 120 ms to end, start offset by 1.921 ms. -> HashAggregate (cost=123364.18..123582.28 rows=202 width=96) // 重分布后再次聚合。 Group By: tbl_ao_col.c398 Rows out: Avg 208.4 rows x 48 workers. Max 223 rows (seg17) with 0.001 ms to first row, 54 ms to end, start offset by 35 ms. -> Redistribute Motion 48:48 (slice1; segments: 48) (cost=122928.00..123121.86 rows=202 width=96) // 第一次聚合后,记录数以及降低到了几千行,因此重分布后即使出现倾斜,关系也不大。 Hash Key: tbl_ao_col.c398 Rows out: Avg 8762.2 rows x 48 workers at destination. Max 9422 rows (seg46) with 31 ms to end, start offset by 63 ms. -> HashAggregate (cost=122928.00..122928.00 rows=202 width=96) // 这一步是在segment节点聚合 Group By: tbl_ao_col.c398 Rows out: Avg 8762.2 rows x 48 workers. Max 8835 rows (seg2) with 0.004 ms to first row, 8.004 ms to end, start offset by 82 ms. -> Append-only Columnar Scan on tbl_ao_col (cost=0.00..107928.00 rows=20834 width=16) Rows out: 0 rows (seg0) with 28 ms to end, start offset by 64 ms. Slice statistics: (slice0) Executor memory: 377K bytes. (slice1) Executor memory: 1272K bytes avg x 48 workers, 1272K bytes max (seg0). (slice2) Executor memory: 414K bytes avg x 48 workers, 414K bytes max (seg0). Statement statistics: Memory used: 128000K bytes Settings: optimizer=off Optimizer status: legacy query optimizer Total runtime: 122.173 ms
(22 rows)
执行计划解读:
非分布键 GROUP BY,首先会在本地节点group by,然后按GROUP BY字段进行数据重分布,然后再在本地节点GROUP BY,最后返回GROUP BY结果给master节点,返回给用户。
Greenplum会根据group by的字段的distinct值的比例,考虑是直接重分布数据,还是先在本地聚合后再重分布数据(减少重分布的数据量)。
2、非分布键 distinct
例子,
tbl 为 随机分布
postgres=# explain analyze select count(distinct c2) from tbl; QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------------------------ Aggregate (cost=1549462.55..1549462.56 rows=1 width=8) Rows out: 1 rows with 0.002 ms to first row, 0.645 ms to end, start offset by 1.681 ms. -> Gather Motion 48:1 (slice2; segments: 48) (cost=1548947.03..1549450.04 rows=1001 width=4) Rows out: 1001 rows at destination with 498 ms to end, start offset by 1.684 ms. -> HashAggregate (cost=1548947.03..1548959.55 rows=21 width=4) Group By: tbl.c2 Rows out: Avg 20.9 rows x 48 workers. Max 31 rows (seg17) with 0.002 ms to first row, 152 ms to end, start offset by 39 ms. -> Redistribute Motion 48:48 (slice1; segments: 48) (cost=1548912.00..1548932.02 rows=21 width=4) Hash Key: tbl.c2 Rows out: Avg 1001.0 rows x 48 workers at destination. Max 1488 rows (seg17) with 309 ms to end, start offset by 39 ms. -> HashAggregate (cost=1548912.00..1548912.00 rows=21 width=4) Group By: tbl.c2 Rows out: Avg 1001.0 rows x 48 workers. Max 1001 rows (seg0) with 0.006 ms to first row, 271 ms to end, start offset by 42 ms. -> Append-only Columnar Scan on tbl (cost=0.00..1048912.00 rows=2083334 width=4) Rows out: 0 rows (seg0) with 25 ms to end, start offset by 42 ms. Slice statistics: (slice0) Executor memory: 327K bytes. (slice1) Executor memory: 764K bytes avg x 48 workers, 764K bytes max (seg0). (slice2) Executor memory: 292K bytes avg x 48 workers, 292K bytes max (seg0). Statement statistics: Memory used: 128000K bytes Settings: enable_bitmapscan=off; enable_seqscan=off; optimizer=off Optimizer status: legacy query optimizer Total runtime: 502.576 ms
(24 rows)
执行计划解读:
非分布键 求distinct,首先会在本地节点hash 聚合,然后按distinct字段进行数据重分布,然后再在本地节点hash 聚合,最后返回结果给master节点,返回给用户。
Greenplum会根据字段的distinct值的比例,考虑是直接重分布数据,还是先在本地聚合后再重分布数据(减少重分布的数据量)。
3、非分布键 distinct + 非分布键 group by
tbl 为 随机分布
postgres=# explain analyze select count(distinct c2) from tbl group by c3; QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------- Gather Motion 48:1 (slice2; segments: 48) (cost=1805483.56..1805484.83 rows=101 width=12) Rows out: 101 rows at destination with 990 ms to end, start offset by 519 ms. -> HashAggregate (cost=1805483.56..1805484.83 rows=3 width=12) Group By: partial_aggregation.c3 Rows out: Avg 2.5 rows x 41 workers. Max 4 rows (seg9) with 0.005 ms to first row, 0.284 ms to end, start offset by 577 ms. -> HashAggregate (cost=1802703.29..1803967.05 rows=2107 width=8) Group By: tbl.c3, tbl.c2 Rows out: Avg 2465.9 rows x 41 workers. Max 4004 rows (seg9) with 0.001 ms to first row, 260 ms to end, start offset by 577 ms. -> Redistribute Motion 48:48 (slice1; segments: 48) (cost=1798912.00..1800934.02 rows=2107 width=8) Hash Key: tbl.c3 Rows out: Avg 118362.0 rows x 41 workers at destination. Max 192192 rows (seg9) with 663 ms to end, start offset by 577 ms. -> HashAggregate (cost=1798912.00..1798912.00 rows=2107 width=8) Group By: tbl.c3, tbl.c2 Rows out: Avg 101100.9 rows x 48 workers. Max 101101 rows (seg0) with 0.005 ms to first row, 747 ms to end, start offset by 562 ms. -> Append-only Columnar Scan on tbl (cost=0.00..1048912.00 rows=2083334 width=8) Rows out: 0 rows (seg0) with 40 ms to end, start offset by 560 ms. Slice statistics: (slice0) Executor memory: 327K bytes. (slice1) Executor memory: 1117K bytes avg x 48 workers, 1117K bytes max (seg0). (slice2) Executor memory: 435K bytes avg x 48 workers, 452K bytes max (seg0). Statement statistics: Memory used: 128000K bytes Settings: enable_bitmapscan=off; enable_seqscan=off; optimizer=off Optimizer status: legacy query optimizer Total runtime: 1511.120 ms
(25 rows)
distinct和group by都是非分布键,Greenplum分布式执行计划优雅的解决了非分布键group by与distinct数据重分布带来的网络传输的问题。
4、非分布键 join
对于两个表JOIN时,采用了非分布键时,Greenplum会自动对数据进行重分布(或者小表使用广播模式)。
PS
join字段有数据倾斜时,需要注意。
本例为1000万个重复ID作为JOIN字段。JOIN重分布后,会落到一个节点。
postgres=# explain analyze select a.c1,count(*) from a join b on (a.id=b.id) group by a.c1; QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Gather Motion 48:1 (slice3; segments: 48) (cost=0.00..2730.45 rows=1 width=12) Rows out: 1 rows at destination with 7190 ms to end, start offset by 2.357 ms. -> GroupAggregate (cost=0.00..2730.45 rows=1 width=12) Group By: a.c1 Rows out: 1 rows (seg22) with 0.001 ms to first row, 0.320 ms to end, start offset by 54 ms. -> Sort (cost=0.00..2730.44 rows=1 width=12) Sort Key: a.c1 Rows out: 1 rows (seg22) with 0.001 ms to end, start offset by 54 ms. Executor memory: 33K bytes avg, 33K bytes max (seg0). Work_mem used: 33K bytes avg, 33K bytes max (seg0). Workfile: (0 spilling, 0 reused) -> Redistribute Motion 48:48 (slice2; segments: 48) (cost=0.00..2730.44 rows=1 width=12) Hash Key: a.c1 Rows out: 1 rows at destination (seg22) with 7138 ms to end, start offset by 54 ms. -> Result (cost=0.00..2730.44 rows=1 width=12) Rows out: 1 rows (seg42) with 0.003 ms to end, start offset by 77 ms. -> GroupAggregate (cost=0.00..2730.44 rows=1 width=12) Group By: a.c1 Rows out: 1 rows (seg42) with 0.002 ms to first row, 1054 ms to end, start offset by 77 ms. -> Sort (cost=0.00..2730.44 rows=1 width=4) Sort Key: a.c1 Rows out: 10000000 rows (seg42) with 0.003 ms to end, start offset by 77 ms. Executor memory: 1400K bytes avg, 65676K bytes max (seg42). Work_mem used: 1400K bytes avg, 65676K bytes max (seg42). Workfile: (1 spilling, 0 reused) Work_mem wanted: 481337K bytes avg, 481337K bytes max (seg42) to lessen workfile I/O affecting 1 workers. -> Hash Join (cost=0.00..2730.44 rows=1 width=4) Hash Cond: b.id = a.id Rows out: 10000000 rows (seg42) with 0.014 ms to first row, 4989 ms to end, start offset by 77 ms. Executor memory: 6511K bytes avg, 6513K bytes max (seg18). Work_mem used: 6511K bytes avg, 6513K bytes max (seg18). Workfile: (0 spilling, 0 reused) -> Redistribute Motion 48:48 (slice1; segments: 48) (cost=0.00..471.72 rows=208130 width=4) Hash Key: b.id Rows out: 10000000 rows at destination (seg42) with 0.004 ms to end, start offset by 77 ms. -> Table Scan on b (cost=0.00..436.27 rows=208130 width=4) Rows out: Avg 208333.3 rows x 48 workers. Max 208430 rows (seg17) with 4.815 ms to first row, 824 ms to end, start offset by 92 ms. -> Hash (cost=436.27..436.27 rows=208475 width=8) Rows in: (No row requested) 0 rows (seg0) with 0 ms to end. -> Table Scan on a (cost=0.00..436.27 rows=208475 width=8) Rows out: Avg 208333.3 rows x 48 workers. Max 208401 rows (seg18) with 34 ms to first row, 46 ms to end, start offset by 63 ms. Slice statistics: (slice0) Executor memory: 330K bytes. (slice1) Executor memory: 1129K bytes avg x 48 workers, 1129K bytes max (seg0). (slice2) * Executor memory: 2139K bytes avg x 48 workers, 66504K bytes max (seg42). Work_mem: 65676K bytes max, 481337K bytes wanted. (slice3) Executor memory: 372K bytes avg x 48 workers, 388K bytes max (seg22). Work_mem: 33K bytes max. Statement statistics: Memory used: 128000K bytes Memory wanted: 1444908K bytes Settings: enable_bitmapscan=on; enable_seqscan=on; optimizer=on Optimizer status: PQO version 1.602 Total runtime: 7193.902 ms
(49 rows)
JOIN两个非分布键
对于两个表JOIN时,采用了非分布键时,Greenplum会自动对数据进行重分布(或者小表使用广播模式)。
postgres=# explain analyze select a.c1,count(*) from a join b on (a.id=b.id) group by a.c1; QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Gather Motion 48:1 (slice4; segments: 48) (cost=0.00..990.85 rows=101 width=12) Rows out: 101 rows at destination with 752 ms to first row, 753 ms to end, start offset by 732 ms. -> GroupAggregate (cost=0.00..990.85 rows=3 width=12) Group By: a.c1 Rows out: Avg 2.5 rows x 41 workers. Max 4 rows (seg9) with 746 ms to end, start offset by 738 ms. -> Sort (cost=0.00..990.85 rows=3 width=12) Sort Key: a.c1 Rows out: Avg 118.2 rows x 41 workers. Max 192 rows (seg9) with 746 ms to end, start offset by 738 ms. Executor memory: 58K bytes avg, 58K bytes max (seg0). Work_mem used: 58K bytes avg, 58K bytes max (seg0). Workfile: (0 spilling, 0 reused) -> Redistribute Motion 48:48 (slice3; segments: 48) (cost=0.00..990.85 rows=3 width=12) Hash Key: a.c1 Rows out: Avg 118.2 rows x 41 workers at destination. Max 192 rows (seg9) with 594 ms to first row, 746 ms to end, start offset by 738 ms. -> Result (cost=0.00..990.85 rows=3 width=12) Rows out: Avg 101.0 rows x 48 workers. Max 101 rows (seg0) with 675 ms to first row, 676 ms to end, start offset by 740 ms. -> HashAggregate (cost=0.00..990.85 rows=3 width=12) Group By: a.c1 Rows out: Avg 101.0 rows x 48 workers. Max 101 rows (seg0) with 675 ms to first row, 676 ms to end, start offset by 740 ms. Executor memory: 4185K bytes avg, 4185K bytes max (seg0). -> Hash Join (cost=0.00..964.88 rows=208191 width=4) Hash Cond: a.id = b.id Rows out: Avg 208333.3 rows x 48 workers. Max 208401 rows (seg18) with 282 ms to first row, 661 ms to end, start offset by 767 ms. Executor memory: 4883K bytes avg, 4885K bytes max (seg18). Work_mem used: 4883K bytes avg, 4885K bytes max (seg18). Workfile: (0 spilling, 0 reused) (seg18) Hash chain length 1.3 avg, 4 max, using 159471 of 262151 buckets. -> Redistribute Motion 48:48 (slice1; segments: 48) (cost=0.00..444.59 rows=208378 width=8) Hash Key: a.id Rows out: Avg 208333.3 rows x 48 workers at destination. Max 208401 rows (seg18) with 0.112 ms to first row, 104 ms to end, start offset by 1048 ms. -> Table Scan on a (cost=0.00..436.27 rows=208378 width=8) Rows out: Avg 208333.3 rows x 48 workers. Max 208422 rows (seg31) with 0.117 ms to first row, 64 ms to end, start offset by 749 ms. -> Hash (cost=440.42..440.42 rows=208191 width=4) Rows in: Avg 208333.3 rows x 48 workers. Max 208401 rows (seg18) with 250 ms to end, start offset by 798 ms. -> Redistribute Motion 48:48 (slice2; segments: 48) (cost=0.00..440.42 rows=208191 width=4) Hash Key: b.id Rows out: Avg 208333.3 rows x 48 workers at destination. Max 208401 rows (seg18) with 0.219 ms to first row, 132 ms to end, start offset by 798 ms. -> Table Scan on b (cost=0.00..436.27 rows=208191 width=4) Rows out: Avg 208333.3 rows x 48 workers. Max 208388 rows (seg3) with 0.146 ms to first row, 77 ms to end, start offset by 760 ms. Slice statistics: (slice0) Executor memory: 313K bytes. (slice1) Executor memory: 1096K bytes avg x 48 workers, 1096K bytes max (seg0). (slice2) Executor memory: 1096K bytes avg x 48 workers, 1096K bytes max (seg0). (slice3) Executor memory: 25518K bytes avg x 48 workers, 25518K bytes max (seg0). Work_mem: 4885K bytes max. (slice4) Executor memory: 374K bytes avg x 48 workers, 382K bytes max (seg0). Work_mem: 58K bytes max. Statement statistics: Memory used: 128000K bytes Settings: enable_bitmapscan=on; enable_seqscan=on; optimizer=on Optimizer status: PQO version 1.602 Total runtime: 1486.335 ms
(48 rows)
非分布键 排序
1、merge sort
为了保证全局有序,以及数据排序的效率。
Greenplum使用了merge sort,首先在数据节点本地排序(所有节点并行),然后master节点向segment请求数据,在master节点merge sort合并。
体现了排序的效率。
非分布键 group by 和 distinct 的原理
对于非分布键的分组聚合请求,Greenplum采用了多阶段聚合如下:
第一阶段,在SEGMENT本地聚合。(Greenplum会根据字段的distinct值的比例,考虑是直接重分布数据,还是先在本地聚合后再重分布数据(减少重分布的数据量)。)
第二阶段,根据分组字段,将结果数据重分布。
第三阶段,再次在SEGMENT本地聚合。
第四阶段,返回结果给master,有必要的话master节点调用聚合函数的final func(已经是很少的记录数和运算量)。
非分布键 JOIN 的原理
1、对于JOIN为分布键的表,Greenplum根据表的大小,选择对这张表根据JOIN列重分布(大表),或广播(小表)。
2、重分布完成后,SEGMENT节点并行的执行本地JOIN。
参考
《Greenplum 行存、列存,堆表、AO表的原理和选择》
《分布式DB(Greenplum)中数据倾斜的原因和解法 - 阿里云HybridDB for PostgreSQL最佳实践》
窗口,强制重分布
《日增量万亿+级 实时分析、数据规整 - 阿里云HybridDB for PostgreSQL最佳实践》
HybridDB PostgreSQL Sort、Group、distinct 聚合、JOIN 不惧怕数据倾斜的黑科技和原理 - 多阶段聚合...相关推荐
- 【Spark调优】小表join大表数据倾斜解决方案
[Spark调优]小表join大表数据倾斜解决方案 参考文章: (1)[Spark调优]小表join大表数据倾斜解决方案 (2)https://www.cnblogs.com/wwcom123/p/1 ...
- 【hive】数据倾斜-大表小表join优化mapjoin
真正让你明白Hive调优系列3:笛卡尔乘积,小表join大表,Mapjoin等问题 0.Hive中的优化分类 真正想要掌握Hive的优化,要熟悉相关的MapReduce,Yarn,hdfs底层源 ...
- Spark数据倾斜-采样倾斜key并分拆join操作-详细图解与代码
本文修改自[1]中的方案六. 下面的方案简述来自[1] 方案适用场景:两个RDD/Hive表进行join的时候,如果数据量都比较大,无法采用"解决方案五",那么此时可以看一下两个R ...
- 19_clickhouse,数据查询与写入优化,分布式子查询优化,外部聚合/排序优化,基于JOIN引擎的优化,SQL优化案例,物化视图提速,查询优化常用经验法则,选择和主键不一样的排序键,数据入库优化
25.数据查询与写入优化 25.1.分布式子查询优化 25.1.1.分布式表的IN查询示例1(普通IN子查询.IN子查询为本地表) 25.1.2.分布式表的IN查询示例2(普通IN子查询.IN子查询为 ...
- MongoDB学习笔记~管道中的分组实现group+distinct
2019独角兽企业重金招聘Python工程师标准>>> mongoDB的管道是个好东西,它可以将很多操作批处理实现,即将多个命令放入一个管道,然后去顺序的执行它们,今天我要说的是,利 ...
- PostgreSQL 数据库中 DISTINCT 关键字的 4 种用法
文章目录 DISTINCT DISTINCT ON IS DISTINCT FROM 聚合函数与 DISTINCT 大家好,我是只谈技术不剪发的 Tony 老师.PostgreSQL 不但高度兼容 S ...
- Postgresql中的hybrid hash join(无状态机讲解)
hybrid hash join hybrid hash join是基于grace hash join 的优化. 在postgresql中的grace hash join 是这样做的:inner ta ...
- 如何解决微服务的数据聚合Join问题?
单库Join问题 有后端开发经验的同学应该了解,对于传统SQL数据库,我们通常以正规化(normalization)的方式来建模数据.正规化的好处是数据冗余少,不足之处是数据聚合Join会比较麻烦.实 ...
- 061 hive中的三种join与数据倾斜
一:hive中的三种join 1.map join 应用场景:小表join大表 一:设置mapjoin的方式: )如果有一张表是小表,小表将自动执行map join. 默认是true. <pro ...
- join丢失数据_15、Hive数据倾斜与解决方案
数据倾斜 1.什么是数据倾斜 由于数据分布不均匀,造成数据大量的集中到一点,造成数据热点 2.数据倾斜的现象 在执行任务的时候,任务进度长时间维持在99%左右,查看任务监控页面,发现只有少量(1个或几 ...
最新文章
- python下载网络错误_下载失败,出现“网络错误”+40000
- boost::mpl模块实现sizeof相关的测试程序
- Swift之深入解析构造过程和析构过程
- 白左机器人_乔治高中 - George School | FindingSchool
- c++中计算2得n次方_PLC-上海会通松下PLC中的数据类型有哪些?
- python整型为空的情况_深度剖析凭什么python中整型不会溢出
- android 手机内存uri_Android通过Uri转化为本地绝对路径的方案(全版本适配4.1-7.0)...
- 中文字体font-family常用列表
- php经典面试题与答案(转先锋教程网)
- fedora9光盘挂载
- Veeam Backup Replication 9.5 备份方式详解
- 制作价目表用什么软件
- pdf添加水印的方法,pdf加水印步骤
- 格式刷只能刷一次?教你如何刷一辈子
- 比较两个字符串s1和s2的大小,如果s1s2,则输出一个正数;若s1=s2,则输出0;若s1小于s2,则输出一个负数。要求:不用strcpy函数;两个字符串用gets函数读入。
- 基于linux网络流量监控与分析软件的设计与实现shell,Shell图形化监控网络流量
- teamlab什么意思_不好意思,我们的2019毕业季聚会,和前辈的不一样
- 乐博Android客户端(新浪微博)1.01发布,欢迎各位童鞋试用
- Redis的数据结构及底层原理
- C++实现gotoxy函数