标签

PostgreSQL , 聚合函数 , 自定义 , AGGREGATE , 并行 , COMBINEFUNC


背景

PostgreSQL 9.6开始就支持并行计算了,意味着聚合、扫描、排序、JOIN等都开始支持并行计算。对于聚合操作来说,并行计算与非并行计算是有差异的。

例如avg聚合,对一张表进行计算时,一个任务中操作和多个并行任务操作,算法是不一样的。

PostgreSQL提供了一套标准的接口,可以支持聚合函数的并行操作。

自定义并行聚合的原理和例子

创建聚合函数的语法如下:

CREATE AGGREGATE name ( [ argmode ] [ argname ] arg_data_type [ , ... ] ) (  SFUNC = sfunc,  STYPE = state_data_type  [ , SSPACE = state_data_size ]  [ , FINALFUNC = ffunc ]  [ , FINALFUNC_EXTRA ]  [ , COMBINEFUNC = combinefunc ]  [ , SERIALFUNC = serialfunc ]  [ , DESERIALFUNC = deserialfunc ]  [ , INITCOND = initial_condition ]  [ , MSFUNC = msfunc ]  [ , MINVFUNC = minvfunc ]  [ , MSTYPE = mstate_data_type ]  [ , MSSPACE = mstate_data_size ]  [ , MFINALFUNC = mffunc ]  [ , MFINALFUNC_EXTRA ]  [ , MINITCOND = minitial_condition ]  [ , SORTOP = sort_operator ]  [ , PARALLEL = { SAFE | RESTRICTED | UNSAFE } ]
)

相比非并行,多了一个过程,那就是combinefunc的过程(也叫partial agg)。

非并行模式的聚合流程大致如下:

循环
sfunc( internal-state, next-data-values ) ---> next-internal-state  最后调用一次(可选)
ffunc( internal-state ) ---> aggregate-value

并行模式的聚合流程大致如下,如果没有写combinefunc,那么实际上聚合过程并没有实现并行而只是扫描并行:

下面这个例子,我们可以观察到一个COUNT操作的并行聚合。

postgres=# set max_parallel_workers=4;
SET
postgres=# set max_parallel_workers_per_gather =4;
SET
postgres=# set parallel_setup_cost =0;
SET
postgres=# set parallel_tuple_cost =0;
SET
postgres=# alter table test set (parallel_workers =4);
ALTER TABLE
postgres=# explain (analyze,verbose,timing,costs,buffers) select count(*) from test;  QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------  -- final并行,可有可无,看具体的聚合算法  Finalize Aggregate  (cost=15837.02..15837.03 rows=1 width=8) (actual time=57.296..57.296 rows=1 loops=1)  Output: count(*)  Buffers: shared hit=3060  ->  Gather  (cost=15837.00..15837.01 rows=4 width=8) (actual time=57.287..57.292 rows=5 loops=1)  Output: (PARTIAL count(*))  Workers Planned: 4  Workers Launched: 4  Buffers: shared hit=3060  -- 一下就是combinefunc完成的聚合并行(显示为PARTIAL agg)  ->  Partial Aggregate  (cost=15837.00..15837.01 rows=1 width=8) (actual time=52.333..52.333 rows=1 loops=5)  Output: PARTIAL count(*)  Buffers: shared hit=12712  Worker 0: actual time=50.917..50.918 rows=1 loops=1  Buffers: shared hit=2397  Worker 1: actual time=51.293..51.294 rows=1 loops=1  Buffers: shared hit=2423  Worker 2: actual time=51.062..51.063 rows=1 loops=1  Buffers: shared hit=2400  Worker 3: actual time=51.436..51.436 rows=1 loops=1  Buffers: shared hit=2432  ->  Parallel Seq Scan on public.test  (cost=0.00..15212.00 rows=250000 width=0) (actual time=0.010..30.499 rows=200000 loops=5)  Buffers: shared hit=12712  Worker 0: actual time=0.013..30.343 rows=190269 loops=1  Buffers: shared hit=2397  Worker 1: actual time=0.010..30.401 rows=192268 loops=1  Buffers: shared hit=2423  Worker 2: actual time=0.013..30.467 rows=190350 loops=1  Buffers: shared hit=2400  Worker 3: actual time=0.009..30.221 rows=192861 loops=1  Buffers: shared hit=2432  Planning time: 0.074 ms  Execution time: 60.169 ms
(31 rows)

了解了并行聚合的原理后,我们就可以写自定义聚合函数的并行计算了。

例子

例如我们要支持一个数组的聚合,并且在聚合过程中我们要实现对元素去重。

1、创建测试表

create table test(id int, col int[]);

2、生成测试数据

CREATE OR REPLACE FUNCTION public.gen_arr(integer, integer)  RETURNS integer[]  LANGUAGE sql  STRICT
AS $function$  select array(select ($1*random())::int from generate_series(1,$2));
$function$;  insert into test select random()*1000, gen_arr(500,10) from generate_series(1,10000);

3、创建聚合函数

例子1,没有combinefunc,只支持扫描并行。

数组去重函数

postgres=# create or replace function uniq(int[]) returns int[] as $$  select array( select unnest($1) group by 1);
$$ language sql strict parallel safe;
CREATE FUNCTION

数组合并与去重函数

postgres=# create or replace function array_uniq_cat(anyarray,anyarray) returns anyarray as $$  select uniq(array_cat($1,$2));
$$ language sql strict parallel safe;
CREATE FUNCTION

聚合函数(不带COMBINEFUNC)

create aggregate arragg (anyarray) (sfunc = array_uniq_cat, stype=anyarray, PARALLEL=safe);

并行查询例子:

postgres=# set max_parallel_workers=4;
SET
postgres=# set max_parallel_workers_per_gather =4;
SET
postgres=# set parallel_setup_cost =0;
SET
postgres=# set parallel_tuple_cost =0;
SET
postgres=# alter table test set (parallel_workers =4);
ALTER TABLE
postgres=# explain (analyze,verbose,timing,costs,buffers) select id, arragg(col) from test group by id ;

很明显没有设置COMBINEFUNC时,未使用并行聚合。

postgres=# explain (analyze,verbose,timing,costs,buffers) select id, arragg(col) from test group by id ;  QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------  HashAggregate  (cost=4139.74..4141.74 rows=200 width=36) (actual time=602.957..603.195 rows=1001 loops=1)  Output: id, arragg(col)  Group Key: test.id  Buffers: shared hit=6  ->  Gather  (cost=0.00..163.37 rows=15748 width=36) (actual time=0.328..43.734 rows=10000 loops=1)  Output: id, col  Workers Planned: 4  Workers Launched: 4  Buffers: shared hit=6  -- 只有并行扫描,没有并行聚合。  ->  Parallel Seq Scan on public.test  (cost=0.00..163.37 rows=3937 width=36) (actual time=0.017..0.891 rows=2000 loops=5)  Output: id, col  Buffers: shared hit=124  Worker 0: actual time=0.019..0.177 rows=648 loops=1  Buffers: shared hit=8  Worker 1: actual time=0.022..0.180 rows=648 loops=1  Buffers: shared hit=8  Worker 2: actual time=0.017..3.772 rows=7570 loops=1  Buffers: shared hit=94  Worker 3: actual time=0.015..0.189 rows=648 loops=1  Buffers: shared hit=8  Planning time: 0.084 ms  Execution time: 603.450 ms
(22 rows)

例子2,有combinefunc,支持并行聚合。

drop aggregate arragg(anyarray);  create aggregate arragg (anyarray) (sfunc = array_uniq_cat, stype=anyarray, COMBINEFUNC = array_uniq_cat, PARALLEL=safe);

使用了并行聚合。

postgres=# explain (analyze,verbose,timing,costs,buffers) select id, arragg(col) from test group by id ;  QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------  Finalize HashAggregate  (cost=1361.46..1363.46 rows=200 width=36) (actual time=285.489..285.732 rows=1001 loops=1)  Output: id, arragg(col)  Group Key: test.id  Buffers: shared hit=36  ->  Gather  (cost=1157.46..1159.46 rows=800 width=36) (actual time=63.654..74.163 rows=4297 loops=1)  Output: id, (PARTIAL arragg(col))  Workers Planned: 4  Workers Launched: 4  Buffers: shared hit=36  -- 并行聚合  ->  Partial HashAggregate  (cost=1157.46..1159.46 rows=200 width=36) (actual time=57.367..57.727 rows=859 loops=5)  Output: id, PARTIAL arragg(col)  Group Key: test.id  Buffers: shared hit=886  Worker 0: actual time=54.788..54.997 rows=857 loops=1  Buffers: shared hit=213  Worker 1: actual time=56.881..57.255 rows=861 loops=1  Buffers: shared hit=213  Worker 2: actual time=55.415..55.813 rows=856 loops=1  Buffers: shared hit=212  Worker 3: actual time=56.453..56.854 rows=838 loops=1  Buffers: shared hit=212  ->  Parallel Seq Scan on public.test  (cost=0.00..163.37 rows=3937 width=36) (actual time=0.011..0.736 rows=2000 loops=5)  Output: id, col  Buffers: shared hit=124  Worker 0: actual time=0.009..0.730 rows=1981 loops=1  Buffers: shared hit=25  Worker 1: actual time=0.012..0.773 rows=2025 loops=1  Buffers: shared hit=25  Worker 2: actual time=0.015..0.741 rows=1944 loops=1  Buffers: shared hit=24  Worker 3: actual time=0.012..0.751 rows=1944 loops=1  Buffers: shared hit=24  Planning time: 0.073 ms  Execution time: 285.949 ms
(34 rows)

实际上并行聚合与分布式数据库聚合阶段原理是一样的,分布式数据库自定义聚合可以参考末尾的文章。

参考

https://www.postgresql.org/docs/10/static/sql-createaggregate.html

https://www.postgresql.org/docs/10/static/xaggr.html#XAGGR-PARTIAL-AGGREGATES

《PostgreSQL aggregate function customize》

《Greenplum 最佳实践 - 估值插件hll的使用(以及hll分式聚合函数优化)》

《Postgres-XC customized aggregate introduction》

PostgreSQL 10 自定义并行计算聚合函数的原理与实践相关推荐

  1. SparkSQL自定义AVG强类型聚合函数与弱类型聚合函数汇总

    AVG是求平均值,所以输出类型是Double类型 1)创建弱类型聚合函数类extends UserDefinedAggregateFunction class MyAgeFunction extend ...

  2. Flink SQL自定义聚合函数

    <2021年最新版大数据面试题全面开启更新> 本篇幅介绍Flink Table/SQL中如何自定义一个聚合函数,介绍其基本用法.撤回定义以及与源码结合分析每个方法的调用位置. 基本使用 F ...

  3. mysql 聚合函数求积_Oracle聚合求和和聚合求积(顺便解决BOM展开的问题)

    我们在日常的工作中,经常遇到了针对某一列的值,进行求和,求平均值,在一些特殊的业务场景下,我们需要对某一列进行求积操作,那我们该如何实现呢,下面先介绍,我 们对字符串的求和操作. 针对字符串的求和操作 ...

  4. MySQL分组查询跟聚合函数

    MySQL分组查询跟聚合函数 一.分组查询的语句 GROUP BY { <列名> | <表达式> | <位置> } [ASC | DESC] 这个语句中间{ < ...

  5. mysql函数数组参数_MYSQL数组聚合函数,如PostgreSQL array_agg

    我在MYSQL上有两个表,我想知道MYSQL上是否有任何聚合函数作为array_agg()FROM postgreSQL. 表1属性仅有8条记录 表2捕获该属性的记录,因此对于同一属性有时可以是1或n ...

  6. pythontransform详解_Python自定义聚合函数merge与transform区别详解

    1.自定义聚合函数,结合agg使用 2. 同时使用多个聚合函数 3. 指定某一列使用某些聚合函数 4.merge与transform使用 import pandas as pd import nump ...

  7. 《卸甲笔记》-PostgreSQL和Oracle的SQL差异分析之三:rownum和聚合函数

    PostgreSQL是世界上功能最强大的开源数据库,在国内得到了越来越多机构和开发者的青睐和应用.随着PostgreSQL的应用越来越广泛,Oracle向PostgreSQL数据库的数据迁移需求也越来 ...

  8. Hive函数(内置函数(字符串函数,数学函数,日期函数,集合函数,条件函数,聚合函数,表生成函数)和自定义函数(自定义函数创建流程,临时函数,永久函数)))(四)

    Hive函数(内置函数和自定义函数) 一.内置函数 1.字符串函数 (1)ascii (2)base64 (3)concat (4)concat_ws (5)format_number (6)subs ...

  9. 【大数据】Presto开发自定义聚合函数

    Presto 在交互式查询任务中担当着重要的职责.随着越来越多的人开始使用 SQL 在 Presto 上分析数据,我们发现需要将一些业务逻辑开发成类似 Hive 中的 UDF,提高 SQL 使用人员的 ...

最新文章

  1. 简约之美Jodd-http--应用一箩筐
  2. 关于华为海思Hi35XX系列开发的思考与总结
  3. 自定义hive url parse函数
  4. 2020牛客暑期多校训练营(第一场)
  5. libhiredis.so.0.13: cannot open shared object file: No such file or directory in Unknown on line
  6. laravel-admin form中的switch控件 不改变状态提交后值为0
  7. Java NIO学习篇之通道FileChannel详解
  8. 软件测试必备工具安装包
  9. 2021年qs世界大学计算机科学排名,2015年QS世界大学计算机专业排名
  10. 亿达中国运用云服务,亿达中国打造智能园区
  11. 计算机c盘要满了电脑会卡吗,C盘满了 电脑卡顿了,怎么清理空间
  12. 【docker系列】docker深入浅出之安装Nginx+PHP+MySQL
  13. 国家统计局可以获取到全国5级行政区域数据
  14. DFT信道估计步骤及实例
  15. 计算机桌面壁纸在哪个文件夹,Win10桌面背景在哪个文件夹 Win10桌面背景所在文件夹介绍...
  16. mysql不等于多个数怎么写_mysql不等于符号怎么写
  17. #详解# 激活函数中的 饱和、软饱和、硬饱和
  18. 【UEFI实战】UEFI中使用汇编代码
  19. 网易AI携手新东方布局“AI+教育”
  20. Smartbi成功入选《2021中国企业数智化转型升级发展研究报告》

热门文章

  1. QT每日一练day4:ubuntu中使用QT
  2. win8计算机里没有用户,win8.1系统没有选择切换账户菜单怎么办|win8.1系统切换用户的方法...
  3. python培训来袭_从入门到精通!2020年Python最佳学习路线重磅来袭!
  4. flutter图片预览_flutter - 图片预览放大滑动(photo_view)
  5. java304是什么错误_304 Not Modified 的意思 (Web的Cache问题)
  6. 人工智能的数学基础(一):绪论
  7. oracle上浮下浮分析函数_Oracle分析函数简析
  8. 通过连接池无法连接mysql_连接池无法链接数据库
  9. hue集成mysql报错_hue集成hive访问报database is locked
  10. Node.js 模块化开发