标签

PostgreSQL , 标签 , 推荐系统 , 实时圈人 , 数组 , gin , gist , 索引 , rum , tsvector , tsquery , 万亿 , user , tag , 淘宝


背景

我们仅用了PostgreSQL的两个小特性,却解决了业务困扰已久的大问题。

推荐系统是广告营销平台的奶牛,其核心是精准、实时、高效。

这么多广告平台,到底谁家强?谁的核心牛逼?

1. 精准,指对用户的描述精准,通常需要基于大量的用户行为数据,经历深度学习后形成的用户画像,或称之为标签系统。 标签的准确性关系到推荐的精准度,比如你可能不会对一个正常的年轻人推荐老花眼镜(当然如果有其他购买意向的标签来指出他有购买老花眼镜的欲望除外)。

2. 实时,指标签的更新实时性,很多标签是具有非常强的时效性的,比如一次营销的目标人群,又或者用户最近浏览的一些商品可能是有潜在购买欲望的商品,都具备时效性。如果你的标签生成是隔天,或者个很多天的,那么可能已经错过了推荐时机。因此实时性在推荐系统中是非常重要的。

3. 高效,指基于标签圈人的动作的效率与并发能力,作为购买广告的金主,当然是期望他们拿到数据的速度越快越好。并且会有很多人向你的平台购买广告,这考验的是并发能力。

做到以上三点,这样的广告平台才具备一定的竞争力。

除此之外还需要关注的是平台的成本,包括硬件的成本,开发成本,维护成本等。

下面将以电商的推荐系统为例,介绍推荐系统的数据库设计与优化技巧。

以及如何让营销潇洒 (低成本,高并发,高效率) 的迈入毫秒时代。

电商推荐系统 部分需求介绍

比如一家店铺,如何找到它的目标消费群体?

要回答这个问题,首先我们需要收集一些数据,比如:

1. 这家店铺以及其他的同类店铺的浏览、购买群体。

我们在逛电商时,会产生一些行为的记录,比如在什么时间,逛了哪些店铺,看了哪些商品,最后在哪家店铺购买了什么商品。

然后,对于单个商店来说,有哪些用户逛过他们的商店,购买过哪些商品,可以抽取出一部分人群。

2. 得到这些用户群体后,筛选出有同类消费欲望、或者具备相同属性的群体。

对这部分人群的属性进行分析,可以获得一个更大范围的群体,从而可以对这部分群体进行营销。

以上是对电商推荐系统的两个简单的推理。

量级

电商的用户量级,放眼全球,可能会达到几十亿的级别。

店铺数量,放眼全球,可能会达到千万级别。

商品数量(细分种类,比如条形码),放眼全球,可能会达到亿级。

店铺标签数量,针对单个用户而言,逛了哪些店,多少次,看了哪些商品,多少次,买了哪些商品等。通常一个人,在一定的时间范围内,会产生上千的这样的标签。

用户标签,画像,10万级别,这个量级可以描述清楚人的属性。

根据以上的估算,user_tags可能达到万亿(user几十亿, 店铺\商品浏览相关的标签数千级别)的级别。

高效设计

数据库设计

我们首先整理一下关键因素

用户ID、浏览过的店铺 以及浏览次数、浏览过的商品 以及浏览次数、购买的商品 以及购买数量。(次数\数量 可以根据区间,设置为枚举类型,比如0表示100次以下,1表示100到500次,。。。)

这几个要素很容易从用户的行为数据中生成,从而当某家店铺需要做推广,或者针对某个产品做推广时,可以结合这些因素,产生一批目标人群。

比如1周内浏览护手霜相关商品超过10次的人群。

表结构设计

1. 店铺、商品编成ID

2. 浏览过多少次、购买了多少某个商品

由于每个用户在某个时间段内,都可能浏览或者购买多个店铺或商品。如果每个商店,每个商品都使用一条记录来存储,会产生很多很多的记录。浪费空间,并且影响查询效率。

PostgreSQL 支持数组类型,可以很好的完成这样的任务,减少存储,同时支持数组索引,提高查询效率。

3. 表结构如下

3.1 范围表,约几十或上百条记录

字段和描述

class int,    -- 维度,对应用户标签表的s1,s2,s3,s4
id int,       -- 偏移量(或者叫枚举值)
description   -- 描述(例如 1-10000,10001-100000,。。。。。)

3.2 用户标签表

uid int primary key,  -- 用户ID
s1 int[],  -- 浏览过的店铺以及次数范围(店铺ID哈希 + 范围表id)
s2 int[],  -- 浏览过的商品以及次数范围(商品ID哈希 + 范围表id)
s3 int[],  -- 购买的商品以及数量范围(商品ID哈希 + 范围表id)
s4 int[],   -- ....其他维度以此类推
时间区间1,   -- 比如按天, 每天统计一次,写入该表

3.3 次数阶梯化

浏览次数,购买个数都是连续值,为了更好的进行挖掘,建议将其阶梯化。

对应3.1的设计,例如1-10000一个阶级,10001-100000又一个阶级。

例子

轨迹 s1 对应的阶梯
1 -> 0
2 -> 1-10
3 -> 11-100
4 -> 101-500
5 -> 501-
...
9 -> 10000+

3.4 将(商品、店铺ID)与阶梯组合成一个新的值 - 方法1

使用text[]例如 店铺ID:OFFSET 表示。text[]数组效率可能不如整型数组INT[],空间也比INT[]要大一点。

如果业务方可以容忍,使用text[]开发工作量小点。

例子

userid|s1|s2|s3
1|{'1:2', '109:9'}|{'2:2', '88:19'}|{'2:1', '88:2'}

含义解释:

  • 用户ID:1,

  • 浏览了店铺1(次数阶梯=2)、店铺109(次数阶梯9),

  • 浏览了商品2(次数阶梯=2)、商品88(次数阶梯19),

  • 购买了商品2(次数阶梯=1)、商品88(次数阶梯2)。

3.5 将(商品、店铺ID)与阶梯组合成一个新的值 - 方法2

方法1用了text数组,方法2将使用int/int8数组,效率更高。

要使用一个int/int8新值表达两层含义(原始店铺、商品ID,以及阶梯),需要公式支持。

公式设计如下,(公式、常量)。

以流量店铺次数(s1)字段为例:

新值起始ID = new_start_val = 1                  -- 常量,用户可以自由指定,但是固定下来了就不要变它
对应维度步长(比如流量店铺的阶梯数) = step = 9   -- 常量,用户可以自由指定(每个维度的阶数可以不一样),但是固定下来了就不要变它
店铺ID = dp_id                                  -- 指原来的店铺ID
int/int8新值 = new_val                          -- 生成的,带有两层含义(店铺ID,阶数)的新值已知店铺ID求new_val(写入、查询过程):$new_val = new_start_val + (step+1)*(dp_id-1)已知new_val求店铺ID(翻译过程):$dp_id = 1 + (new_val-new_start_val)/(step+1)

例子(step=19阶, new_start_val=1)

浏览店铺ID=1,1阶
浏览店铺ID=192,15阶根据以上信息、常量、公式 生成new_val数组:  {1, 3821}根据以上数组、常量、公式 翻译出店铺ID:  {1, 192}

4. 分区

例如,建议每500万或1000万一个分区,查询时,可以并行查询,提高效率。

如果要快速圈得所有的用户,建议使用并行查询(plproxy,每个分区一个连接,并行查询)。

如果要快速的得到用户,流式返回,建议使用继承(如果是多节点,可以使用postgres_fdw+pg_pathman,或者postgres_fdw+继承),使用游标返回。

性能指标

几十亿用户,每个用户将时间区间的浏览过的店铺、商品、购买过的商品以及数量级聚合成标签化的数组,产生万亿级别的user_tags组合。

根据tags从几十亿的用户群体中圈选人群,能达到什么样的性能呢?

由于使用了索引,如果使用流式返回的话可以控制在10毫秒左右。

是不是顿时觉得分析型的业务进入了毫秒时代?

如果你对PostgreSQL接触不多,可能会感到很惊奇,接触多了就习惯了,PostgreSQL有很多功能会帮你解决很多问题,有时候甚至给你大开脑洞的。

实时设计

前面讲了如何高效的获得用户,接下来我们要看看如何实时的更新TAG了。

流处理

目的是实时的更新用户的TAG,比如一个用户,一天可能产生几万比浏览的跟踪记录,这些记录要合并到他的标签中。

如果活跃用户达到亿级别,那么一天产生的更新流水就达到了万亿级别,这个怎么能实时的在数据库中处理呢?估计很多用户会使用T+1的方式,放弃实时性。

但是实际上,并不是做不到的,比如我们可以使用PostgreSQL数据库的流处理功能来实现这种超高流水的更新。

你可能要疑问了,数据库能处理流吗?数据如何在数据库中完成实时的更新呢?

PostgreSQL社区的一个开源产品pipelinedb,(基于postgresql,与postgresql全兼容),就是用来干这个的,它会帮你实时的进行合并,(用户可以设置合并的时间间隔,或者累计的ROWS变更数)达到阈值后,进行持久化的动作,否则会先持续的在内存中进行更新。

有两篇文章可以参考

《"物联网"流式处理应用 - 用PostgreSQL实时处理(万亿每天)》

《流计算风云再起 - PostgreSQL携PipelineDB力挺IoT》

当然如果用户没有实时的要求,T+1 就能满足需求的话,你大可不必使用pipelinedb.

为什么要在数据库中完成流式处理

我们知道,标签数据最后都要进到数据库后,才能施展数据库的圈人功能,完成圈人的查询,如果不在数据库中实现流计算,而是使用类似JSTROM的框架的话,实际上是使用JSTROM挡了一层,比如将1000亿次的更新转化成了1亿的更新。

但是使用外部的流处理会引入一些问题

1. 额外增加了JSTROM所需的计算资源,并行效率实际上还不如pipelinedb

2. 用户查数据的时效性不如直接放在数据库中的流计算

3. 增加了开发成本

压测

进入压测环节,我选择了一台32CORE,2块SSD卡,512GB的内存的机器进行压测。

存放3.2亿用户,每个用户4个数组字段,每个字段包括1000个元素,即4000*3.2亿 = 1.28万亿 user_tags。

用例1

10张表,每张表存储1000万用户,4个标签字段,使用tsvector存储标签。

使用rum索引。

postgres=# create tablespace tbs1 location '/u01/digoal/tbs1';
CREATE TABLESPACE  postgres=# create tablespace tbs2 location '/u02/digoal/tbs2';
CREATE TABLESPACE  do language plpgsql $$
declare  i int;  suffix text;  tbs text;
begin  for i in 0..10 loop  if i=0 then  suffix := '';  tbs := 'tbs1';  elsif i >=1 and i<=5 then  suffix := i::text;  tbs := 'tbs1';  else  suffix := i::text;  tbs := 'tbs2';  end if;  if i=0 then  execute 'create unlogged table test'||suffix||'(uid int primary key USING INDEX TABLESPACE '||tbs||', s1 tsvector, s2 tsvector, s3 tsvector, s4 tsvector) with (autovacuum_enabled=off, toast.autovacuum_enabled=off) tablespace '||tbs;  else  execute 'create unlogged table test'||suffix||'(uid int primary key USING INDEX TABLESPACE '||tbs||', s1 tsvector, s2 tsvector, s3 tsvector, s4 tsvector) inherits(test) with (autovacuum_enabled=off, toast.autovacuum_enabled=off) tablespace '||tbs;  end if;  execute 'create index idx_test'||suffix||'_s1 on test'||suffix||' using rum(s1 rum_tsvector_ops) tablespace '||tbs;  execute 'create index idx_test'||suffix||'_s2 on test'||suffix||' using rum(s2 rum_tsvector_ops) tablespace '||tbs;  execute 'create index idx_test'||suffix||'_s3 on test'||suffix||' using rum(s3 rum_tsvector_ops) tablespace '||tbs;   execute 'create index idx_test'||suffix||'_s4 on test'||suffix||' using rum(s4 rum_tsvector_ops) tablespace '||tbs;  end loop;
end;
$$;  select relname,reltablespace from pg_class  where relname ~ 'test' order by 2,1;

产生测试数据的脚本

vi test.sql  \set uid1 random(1,10000000)
\set uid2 random(10000001,20000000)
\set uid3 random(20000001,30000000)
\set uid4 random(30000001,40000000)
\set uid5 random(40000001,50000000)
\set uid6 random(50000001,60000000)
\set uid7 random(60000001,70000000)
\set uid8 random(70000001,80000000)
\set uid9 random(80000001,90000000)
\set uid10 random(90000001,100000000)
insert into test1 (uid,s1,s2,s3,s4) select :uid1+id, (select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)) from generate_series(1,1000) t(id) on conflict do nothing;
insert into test2 (uid,s1,s2,s3,s4) select :uid2+id, (select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)) from generate_series(1,1000) t(id) on conflict do nothing;
insert into test3 (uid,s1,s2,s3,s4) select :uid3+id, (select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)) from generate_series(1,1000) t(id) on conflict do nothing;
insert into test4 (uid,s1,s2,s3,s4) select :uid4+id, (select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)) from generate_series(1,1000) t(id) on conflict do nothing;
insert into test5 (uid,s1,s2,s3,s4) select :uid5+id, (select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)) from generate_series(1,1000) t(id) on conflict do nothing;
insert into test6 (uid,s1,s2,s3,s4) select :uid6+id, (select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)) from generate_series(1,1000) t(id) on conflict do nothing;
insert into test7 (uid,s1,s2,s3,s4) select :uid7+id, (select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)) from generate_series(1,1000) t(id) on conflict do nothing;
insert into test8 (uid,s1,s2,s3,s4) select :uid8+id, (select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)) from generate_series(1,1000) t(id) on conflict do nothing;
insert into test9 (uid,s1,s2,s3,s4) select :uid9+id, (select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)) from generate_series(1,1000) t(id) on conflict do nothing;
insert into test10 (uid,s1,s2,s3,s4) select :uid10+id, (select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)) from generate_series(1,1000) t(id) on conflict do nothing;  nohup pgbench -M prepared -n -r -P 1 -f ./test.sql -c 64 -j 64 -T 1000000 >/dev/null 2>&1 &

标签由500万个唯一ID+20个唯一ID的组合过程,每个tsvector中存放1000个这样的组合。

用例2

10张表,每张表存储1000万用户,4个标签字段,使用text[]存储标签。

索引使用的是GIN索引,其他与用例1一致。

do language plpgsql $$
declare  i int;  suffix text;  tbs text;
begin  for i in 0..10 loop  if i=0 then  suffix := '';  tbs := 'tbs1';  elsif i >=1 and i<=5 then  suffix := i::text;  tbs := 'tbs1';  else  suffix := i::text;  tbs := 'tbs2';  end if;  if i=0 then  execute 'create unlogged table test'||suffix||'(uid int primary key USING INDEX TABLESPACE '||tbs||', s1 text[], s2 text[], s3 text[], s4 text[]) with (autovacuum_enabled=off, toast.autovacuum_enabled=off) tablespace '||tbs;  else  execute 'create unlogged table test'||suffix||'(uid int primary key USING INDEX TABLESPACE '||tbs||', s1 text[], s2 text[], s3 text[], s4 text[]) inherits(test) with (autovacuum_enabled=off, toast.autovacuum_enabled=off) tablespace '||tbs;  end if;  execute 'create index idx_test'||suffix||'_s1 on test'||suffix||' using gin(s1 ) tablespace '||tbs;  execute 'create index idx_test'||suffix||'_s2 on test'||suffix||' using gin(s2 ) tablespace '||tbs;  execute 'create index idx_test'||suffix||'_s3 on test'||suffix||' using gin(s3 ) tablespace '||tbs;   execute 'create index idx_test'||suffix||'_s4 on test'||suffix||' using gin(s4 ) tablespace '||tbs;  end loop;
end;
$$;  select relname,reltablespace from pg_class  where relname ~ 'test' order by 2,1;

用例3以及压测

64张分区表,每个分区500万记录,使用int数组存储标签,标签总量400万,每个用户4000个随机标签,确保圈人时可以圈到足够多的人群。

同样使用GIN索引圈人。

alter role postgres set gin_pending_list_limit='128MB';  do language plpgsql $$
declare  i int;  suffix text;  tbs text;
begin  for i in 0..64 loop  if i=0 then  suffix := '';  tbs := 'tbs1';  elsif i >=1 and i<=32 then  suffix := i::text;  tbs := 'tbs1';  else  suffix := i::text;  tbs := 'tbs2';  end if;  if i=0 then  execute 'create unlogged table test'||suffix||'(uid int primary key USING INDEX TABLESPACE '||tbs||', s1 int[], s2 int[], s3 int[], s4 int[]) with (autovacuum_enabled=off, toast.autovacuum_enabled=off) tablespace '||tbs;  else  execute 'create unlogged table test'||suffix||'(uid int primary key USING INDEX TABLESPACE '||tbs||', s1 int[], s2 int[], s3 int[], s4 int[]) inherits(test) with (autovacuum_enabled=off, toast.autovacuum_enabled=off) tablespace '||tbs;  end if;  execute 'create index idx_test'||suffix||'_s1 on test'||suffix||' using gin(s1 ) tablespace '||tbs;  execute 'create index idx_test'||suffix||'_s2 on test'||suffix||' using gin(s2 ) tablespace '||tbs;  execute 'create index idx_test'||suffix||'_s3 on test'||suffix||' using gin(s3 ) tablespace '||tbs;   execute 'create index idx_test'||suffix||'_s4 on test'||suffix||' using gin(s4 ) tablespace '||tbs;  end loop;
end;
$$;  select relname,reltablespace from pg_class  where relname ~ 'test' order by 2,1;

生成测试数据的脚本

vi test1.sh  for ((i=1;i<=64;i++))
do
echo "\set uid random($((($i-1)*5000000+1)),$(($i*5000000)))" > test$i.sql  echo "insert into test$i (uid,s1,s2,s3,s4) select :uid, (select array_agg(trunc(random()*4000000)) from generate_series(1,1000)) s1,(select array_agg(trunc(random()*4000000)) from generate_series(1,1000)) s2,(select array_agg(trunc(random()*4000000)) from generate_series(1,1000)) s3, (select array_agg(trunc(random()*4000000)) from generate_series(1,1000)) s4 on conflict do nothing;" >> test$i.sql  done  . ./test1.sh

开始生成测试数据

vi test2.sh  for ((i=1;i<=64;i++))
do
nohup pgbench -M prepared -n -r -P 1 -f ./test$i.sql -c 1 -j 1 -T 1000000 >/dev/null 2>&1 &
done  . ./test2.sh

输出插完后将pengding list 合并

执行vacuum analyze或gin_clean_pending_list即可,参考

https://www.postgresql.org/docs/9.6/static/functions-admin.html#FUNCTIONS-ADMIN-INDEX

https://www.postgresql.org/docs/9.6/static/sql-vacuum.html

https://www.postgresql.org/docs/9.6/static/gin-implementation.html#GIN-FAST-UPDATE

圈人需求 - 性能测试

对用例3进行压测

1. 圈人,10毫秒以内完成。

比如查找s1包含3, s2包含4的人群

postgres=# begin;
BEGIN
Time: 0.030 ms
postgres=# declare a cursor for select uid from test where s1 @> array[1] and s2 @> array[4];
DECLARE CURSOR
Time: 6.679 ms
postgres=# fetch 100 in a;uid
-----------19246842118611240148504032185844649
(4 rows)
Time: 101.041 ms

这个人群太少,没有代表性,我们找一个人群多一点的

postgres=# begin;
BEGIN
postgres=# declare a cursor for select uid from test where s1 @> array[1] or s2 @> array[4];
DECLARE CURSOR
Time: 3.484 ms
postgres=# fetch 100 in a;uid
---------29119412373506.....29713335378228368041602067
(100 rows)
Time: 3.892 mspostgres=# fetch 100 in a;uid
---------38417013322714282941......119094645248611110635
(100 rows)
Time: 4.005 ms

2. 分页,前面已经提到了,使用游标。

3. 流式返回,前面的例子已经提到了。

4. 并行批量返回

并行批量返回,可以使用plproxy插件,为每个分区指定一个并行,从而实现并行的批量返回。效果能好到什么程度呢?

比如串行查询,所有的分片表是依次查询的,所以累加的时间比较长,例如,圈出15221个人,耗时113毫秒。

postgres=# explain (analyze,verbose,timing,costs,buffers) select uid from test where s1 @> array[1];QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------Append  (cost=0.00..233541.24 rows=206876 width=4) (actual time=0.081..108.037 rows=15221 loops=1)Buffers: shared hit=60641->  Seq Scan on public.test  (cost=0.00..0.00 rows=1 width=4) (actual time=0.001..0.001 rows=0 loops=1)Output: test.uidFilter: (test.s1 @> '{1}'::integer[])->  Bitmap Heap Scan on public.test1  (cost=33.71..2901.56 rows=3188 width=4) (actual time=0.078..0.381 rows=242 loops=1)Output: test1.uidRecheck Cond: (test1.s1 @> '{1}'::integer[])Heap Blocks: exact=238Buffers: shared hit=243->  Bitmap Index Scan on idx_test1_s1  (cost=0.00..32.91 rows=3188 width=0) (actual time=0.049..0.049 rows=242 loops=1)Index Cond: (test1.s1 @> '{1}'::integer[])Buffers: shared hit=5...中间省略62个表->  Bitmap Heap Scan on public.test64  (cost=34.00..2935.31 rows=3225 width=4) (actual time=0.068..0.327 rows=214 loops=1)Output: test64.uidRecheck Cond: (test64.s1 @> '{1}'::integer[])Heap Blocks: exact=211Buffers: shared hit=216->  Bitmap Index Scan on idx_test64_s1  (cost=0.00..33.19 rows=3225 width=0) (actual time=0.041..0.041 rows=214 loops=1)Index Cond: (test64.s1 @> '{1}'::integer[])Buffers: shared hit=5Planning time: 2.016 msExecution time: 109.400 ms
(519 rows)
Time: 113.216 ms

而并行查询的性能则相当于单个分区的耗时, 约0.几毫秒。

postgres=# explain (analyze,verbose,timing,costs,buffers) select uid from test1 where s1 @> array[1];QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------Bitmap Heap Scan on public.test1  (cost=33.71..2901.56 rows=3188 width=4) (actual time=0.085..0.383 rows=242 loops=1)Output: uidRecheck Cond: (test1.s1 @> '{1}'::integer[])Heap Blocks: exact=238Buffers: shared hit=243->  Bitmap Index Scan on idx_test1_s1  (cost=0.00..32.91 rows=3188 width=0) (actual time=0.051..0.051 rows=242 loops=1)Index Cond: (test1.s1 @> '{1}'::integer[])Buffers: shared hit=5Planning time: 0.097 msExecution time: 0.423 ms
(10 rows)
Time: 1.011 ms

使用并行,可以大幅提升整体的性能。

参考文档

《使用Plproxy设计PostgreSQL分布式数据库》

《A Smart PostgreSQL extension plproxy 2.2 practices》

《PostgreSQL 最佳实践 - 水平分库(基于plproxy)》

而如果你需要的是流式返回,则没有必要使用并行。

sharding

当用户数达到几十亿时,我们可以按用户ID进行分片,使用多台主机。

当然了,如果你的主机空间足够大,CPU核心足够多,可以满足业务的需求的话,完全没有必要使用多台主机。

如果要使用多台主机,有哪些方法呢? 可以参考如下文章,也很简单,几步完成

你就把postgres_fdw节点当成MySQL的TDDL或者DRDS就好了,支持跨节点JOIN,条件,排序,聚合 的下推等,用起来和TDDL DRDS一样的爽。

postgres_fdw是无状态的,仅仅存储结构(分发规则),所以postgres_fdw节点本身也可以非常方便的横向扩展。

《PostgreSQL 9.6 单元化,sharding (based on postgres_fdw) - 内核层支持前传》

《PostgreSQL 9.6 sharding + 单元化 (based on postgres_fdw) 最佳实践 - 通用水平分库场景设计与实践》

《PostgreSQL 9.6 sharding based on FDW & pg_pathman》

《PostgreSQL 9.5+ 高效分区表实现 - pg_pathman》

基于位置圈人的需求与性能

这个需求直接落入PostgreSQL的怀抱,其实就是基于位置的KNN查询,PostgreSQL可以通过GiST索引来支撑这个需求。

在数据分片后,PostgreSQL通过归并排序,依旧可以快速的得到结果。

例如,

postgres=# explain (analyze,verbose,timing,costs,buffers) select * from t order by id limit 10;QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------Limit  (cost=0.72..1.13 rows=10 width=4) (actual time=0.158..0.165 rows=10 loops=1)Output: t.idBuffers: shared hit=3 read=4->  Merge Append  (cost=0.72..819.74 rows=20001 width=4) (actual time=0.157..0.162 rows=10 loops=1)Sort Key: t.idBuffers: shared hit=3 read=4->  Index Only Scan using idx on public.t  (cost=0.12..2.14 rows=1 width=4) (actual time=0.003..0.003 rows=0 loops=1)Output: t.idHeap Fetches: 0Buffers: shared hit=1->  Index Only Scan using idx1 on public.t1  (cost=0.29..225.28 rows=10000 width=4) (actual time=0.107..0.107 rows=6 loops=1)Output: t1.idHeap Fetches: 6Buffers: shared hit=1 read=2->  Index Only Scan using idx2 on public.t2  (cost=0.29..225.28 rows=10000 width=4) (actual time=0.043..0.044 rows=5 loops=1)Output: t2.idHeap Fetches: 5Buffers: shared hit=1 read=2Planning time: 0.181 msExecution time: 0.219 ms
(20 rows)postgres=# explain (analyze,verbose,timing,costs,buffers) select * from t order by id ;QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------------------Merge Append  (cost=0.72..819.74 rows=20001 width=4) (actual time=0.043..10.324 rows=20000 loops=1)Sort Key: t.idBuffers: shared hit=149->  Index Only Scan using idx on public.t  (cost=0.12..2.14 rows=1 width=4) (actual time=0.004..0.004 rows=0 loops=1)Output: t.idHeap Fetches: 0Buffers: shared hit=1->  Index Only Scan using idx1 on public.t1  (cost=0.29..225.28 rows=10000 width=4) (actual time=0.021..3.266 rows=10000 loops=1)Output: t1.idHeap Fetches: 10000Buffers: shared hit=74->  Index Only Scan using idx2 on public.t2  (cost=0.29..225.28 rows=10000 width=4) (actual time=0.017..3.309 rows=10000 loops=1)Output: t2.idHeap Fetches: 10000Buffers: shared hit=74Planning time: 0.175 msExecution time: 11.791 ms
(17 rows)

《PostgreSQL 百亿地理位置数据 近邻查询性能》

12Core的机器,每次请求约0.8毫秒返回,TPS约8万。

小结

回到圈人系统的三个核心问题,

精准,本文未涉及,属于数据挖掘系统的事情,我们可以在下一篇文章介绍(使用PostgreSQL, Greenplum 的 MADlib机器学习库)。

实时,实时的更新标签,本文已给出了在数据库中进行流式处理的解决方案,相比外部流处理的方案,节约资源,减少开发成本,提高开发效率,提高时效性。

高效,相比传统的方案,使用PostgreSQL以及数组的GIN索引功能,实现了本文在万亿USER_TAGS的情况下的毫秒级别的圈人功能。

其他相关文章

《"物联网"流式处理应用 - 用PostgreSQL实时处理(万亿每天)》

《为了部落 - 如何通过PostgreSQL基因配对,产生优良下一代》

《流计算风云再起 - PostgreSQL携PipelineDB力挺IoT》

《分析加速引擎黑科技 - LLVM、列存、多核并行、算子复用 大联姻 - 一起来开启PostgreSQL的百宝箱》

《金融风控、公安刑侦、社会关系、人脉分析等需求分析与数据库实现 - PostgreSQL图数据库场景应用》

《实时数据交换平台 - BottledWater-pg with confluent》

《PostgreSQL 在视频、图片去重,图像搜索业务中的应用》

《基于 阿里云 RDS PostgreSQL 打造实时用户画像推荐系统》

《PostgreSQL 与 12306 抢火车票的思考》

《门禁广告销售系统需求剖析 与 PostgreSQL数据库实现》

《聊一聊双十一背后的技术 - 物流、动态路径规划》

《聊一聊双十一背后的技术 - 分词和搜索》

《聊一聊双十一背后的技术 - 不一样的秒杀技术, 裸秒》

《聊一聊双十一背后的技术 - 毫秒分词算啥, 试试正则和相似度》

《PostgreSQL 9.6 引领开源数据库攻克多核并行计算难题》

《PostgreSQL 前世今生》

《如何建立GIS测试环境 - 将openstreetmap的样本数据导入PostgreSQL PostGIS库》

《PostgreSQL 9.6 单元化,sharding (based on postgres_fdw) - 内核层支持前传》

《PostgreSQL 9.6 sharding + 单元化 (based on postgres_fdw) 最佳实践 - 通用水平分库场景设计与实践》

《PostgreSQL 9.6 sharding based on FDW & pg_pathman》

《PostgreSQL 9.5+ 高效分区表实现 - pg_pathman》

《PostgreSQL 数据库安全指南》

《PostgreSQL 9.6 黑科技 bloom 算法索引,一个索引支撑任意列组合查询》

《PostgreSQL 使用递归SQL 找出数据库对象之间的依赖关系》

《用PostgreSQL描绘人生的高潮、尿点、低谷 - 窗口/帧 or 斜率/导数/曲率/微积分?》

《用PostgreSQL找回618秒逝去的青春 - 递归收敛优化》

《PostGIS 在 O2O应用中的优势》

《PostgreSQL 百亿地理位置数据 近邻查询性能》

《使用Plproxy设计PostgreSQL分布式数据库》

《A Smart PostgreSQL extension plproxy 2.2 practices》

《PostgreSQL 最佳实践 - 水平分库(基于plproxy)》

恭迎万亿级营销(圈人)潇洒的迈入毫秒时代 - 万亿user_tags级实时推荐系统数据库设计...相关推荐

  1. ChatGPT火爆出圈 人工智能赋能银行迈入新时代

    人智能(AI)技术并非新鲜事物,但ChatGPT和"文心一言"作为人工智能通用大模型横空出世,掀起了人工智能应用的新一轮浪潮.此前,人工智能运用的都是专用人工智能模型,往往只能在具 ...

  2. 互联网日报 | 5月12日 星期三 | 全国人口共14.1178亿人;长安汽车迈入2000万辆时代;Soul递交纳斯达克招股书...

    今日看点 ✦ 中国电信完成上交所主板IPO辅导工作 ✦ 探探创始人退出公司具体管理事务,陌陌CEO王力将兼任探探CEO ✦ 360集团战略投资哪吒汽车,周鸿祎担任其产品经理 ✦ 长安系中国品牌汽车第2 ...

  3. es对几十亿数据能达到秒级响应吗_万亿数据下的多维实时分析系统,如何做到亚秒级响应...

    导语 当业务发展到一定规模,实时数据仓库是一个必要的基础服务.从数据驱动方面考虑,多维实时数据分析系统的重要性也不言而喻.但是当数据量巨大的情况下,拿腾讯看点来说,一天上报的数据量达到万亿级的规模,要 ...

  4. 微信删除朋友圈多久会从服务器消失,6.6亿人消失在朋友圈,微信的社交属性正在降低?你多久没发了?...

    作者:李云飞 | 来源:原 创 大家好,我是李云飞,一位久经商场的创业老司机,关注我,每天分享一点财富干货,让你的人生少走弯路. 不久前,张小龙在演讲时就公布了一组数据,说中国平均每天有10.9亿人打 ...

  5. 视频号云逛万达,商圈数字化丙晟打了个新样板

    "抢到了,抢到了" "我抢到了520红包,感谢思聪老公" "我抢到了IMAX观影礼包.哈哈哈" 在万达广场"520直播宠粉节&qu ...

  6. 被裁的38岁互联网人:10年赔偿18万、半年花光、他打算送快递

    "在IT行业被淘汰,基本就是不思进取,没有提高自己." 如今吴东明白过来,已经后悔莫及. 被一家互联网大公司辞退1年后,2019年10月20日,吴东下定决心结束北漂. 这个想法在他 ...

  7. “魔幻”的直播带货:一夜血亏100万,依然有无数人入局!

    Python实战社群 Java实战社群 长按识别下方二维码,按需求添加 扫码关注添加客服 进Python社群▲ 扫码关注添加客服 进Java社群▲ 作者丨电商君 来源丨电商报(ID:kandiansh ...

  8. 大数据市场规模5年将达8000亿 交易平台忙圈地

     大数据市场规模5年将达8000亿 交易平台忙圈地 8月19日国务院常务会议通过<关于促进大数据发展的行动纲要>,这意味着我国大数据发展迎来顶层设计,将有助于培育经济发展引擎. 在商业 ...

  9. 杨超越是如何实现从全网嘲到全网夸的?营销圈来告诉你!

    随着宿舍东西越积越多,引来的小动物也纷纷来这里投奔.可不是,最近几天不仅蟑螂来投奔了,就连老鼠也不放过我们宿舍这一块小地了.没办法,我们都不知道怎么抓老鼠,只好求助一下度娘了.打了两个字"老 ...

最新文章

  1. 国庆七天学Linux技能,休闲假期不负时光
  2. saltStack运维工具的部署及master迁移实现的过程详解
  3. 看译飞的由浅入深漫谈margin属性
  4. 【转】SAP Fiori Design Guidelines基础篇
  5. IntelliJ IDEA for Mac 如何自定义快捷键_设置快捷键
  6. java学习(42):巩固练习
  7. 软件工程网络15个人阅读作业1(201521123062 杨钧宇)
  8. Python中的X[:,0]和X[:,1]
  9. [面试] C/C++语法(三)—— 字符串
  10. Entity Framework 学习初级篇7--基本操作:增加、更新、删除、事务(转)
  11. uni-app 条形码(一维码)/二维码生成实现
  12. 地图标识符号大全_资源小结:旅游类地图汇总(8.17版)
  13. 订单生产计划表范本_生产计划表_用Excel 如何制作生产排单的生产计划
  14. 23种设计模式之模板方法模式
  15. 缺失.NET Framework组件的解决方法
  16. php生产环境性能瓶颈分析,使用XHProf分析PHP性能瓶颈(一)
  17. 基于AT89C51单片机的超声波传感器测距【程序详细代码及注释】
  18. linux打开pkg文件怎么打开,linux中怎么打开tar.rar.gz文件
  19. 面试官问:“你还有什么事想问的?” 该怎么回答?
  20. Linux 命令【无标题命令】

热门文章

  1. 2021年高考成绩查询贵州一本线,2021年贵州高考一本分数线预测,今年贵州一本分数线预估多少分...
  2. 客户端手册_增值税发票管理系统“2.0”版——客户端环境配置问题
  3. 标题栏透明android,android改变标题栏的透明度
  4. springboot不会运行gc_SpringBoot 深度调优,让你的项目飞起来!
  5. 四川高中计算机考试操作题,四川省学业水平考试VB程序设计操作题演示
  6. Linux下的JAVA命令参数_以java命令为例解析linux命令行调用语句
  7. CSS三大特性的利用注意事项
  8. nvidia旧版驱动_NVIDIA显卡驱动曝出5个高危级别漏洞 请尽快升级最新版本
  9. python怎么处理数据集的缺失值_python 对数据集的缺失值补全方法 sklearn.preprocessing.Imputer...
  10. 微型计算机原理及应用程序题,微型计算机原理及应用试题及答案