标签

PostgreSQL , 流式处理 , 无流入数据超时异常


背景

流计算有个特点,数据流式写入,流式计算。

但是有一种情况,可能无法覆盖。例如电商中的 收货超时,退款处理超时 事件的流式监控。因为数据都不会再写进来了,所以也无法触发流式计算。

这些问题如何流式预警呢?

可以用超时时间+调度的方式,当然这里面有PostgreSQL的独门秘籍:

1、CTE,语法灵活。

2、partial index,不需要检索的数据不构建索引。

3、DML returning,可以返回DML语句的结果,结合CTE实现最小交互计算。

4、multi-index bitmap scan,多个索引合并扫描,在使用OR条件时,可以结合多个字段的索引进行合并扫描。

DEMO设计

1、被监控表的结构。里面记录了订单、退款等事件的超时处理时间,超时通知次数,下一次通知时间间隔,完结状态等。

create table tbl (  id int8,                                         --                               ..... 其他字段 (比如已完结状态)  state int,                       -- 完结状态(1 表示已完结)  deadts timestamp,                -- 超时时间        nts interval,                    -- 超时间隔,用于更新下一次通知时间 (比如一天通知一次)       notify_times int default 0,      -- 通知次数  deadts_next timestamp            -- 下一次通知时间
);

2、创建partial index,也就是说,对未完结工单才需要通知用户,这些数据是业务关心的,使用partial index可以简化索引大小。提高速度。

create index idx_tbl_1 on tbl (deadts,notify_times,state) where notify_times=0 and state<>1;  create index idx_tbl_2 on tbl (deadts_next,state) where deadts_next is not null and state<>1;

3、获取需要通知的数据,并且更新通知次数以及下一次的通知时间。

with tmp1 as (
update tbl set   deadts_next=now()+nts,   notify_times=notify_times+1
where ctid = any (array(  select ctid from tbl where  ( deadts < now() and notify_times=0 and state<>1) union allselect ctid from tbl where( deadts_next < now() and deadts_next is not null and state<>1)   limit 10000     -- 一次获取1万条超时数据
))
returning *
)
select * from tmp1;

4、执行计划完美

 CTE Scan on tmp1  (cost=18163.25..18163.45 rows=10 width=48)CTE tmp1->  Update on tbl tbl_2  (cost=18151.05..18163.25 rows=10 width=54)InitPlan 1 (returns $0)->  Limit  (cost=0.13..18151.03 rows=10000 width=6)->  Append  (cost=0.13..764699.60 rows=421301 width=6)->  Index Scan using idx_tbl_1 on tbl  (cost=0.13..169527.13 rows=369766 width=6)Index Cond: (deadts < now())->  Index Scan using idx_tbl_2 on tbl tbl_1  (cost=0.43..590959.46 rows=51535 width=6)Index Cond: (deadts_next < now())->  Tid Scan on tbl tbl_2  (cost=0.01..12.21 rows=10 width=54)TID Cond: (ctid = ANY ($0))
(12 rows)

5、调度

《PostgreSQL 定时任务方法2》

《PostgreSQL Oracle 兼容性之 - DBMS_JOBS - Daily Maintenance - Timing Tasks(pgagent)》

当然你如果使用阿里云,可以使用阿里云的调度平台,配置调度任务。

性能指标

1、写入1亿数据,假设有100万条同时超时需要处理,耗时如何?

-- 1亿条完结
insert into tbl select id, 1, now(), '5 min', 0, null from generate_series(1,100000000) t(id);  -- 100万条超时
insert into tbl select id, 0, now(), '5 min', 0, null from generate_series(1,1000000) t(id);

通知性能,比如每一批通知1万条:

(小批量获取,并更新超时时间,目的是让autovacuum介入,实时回收垃圾)

with tmp1 as (
update tbl set   deadts_next=now()+nts,   notify_times=notify_times+1
where ctid = any (array(  select ctid from tbl where  ( deadts < now() and notify_times=0 and state<>1)   union allselect ctid from tbl where( deadts_next < now() and deadts_next is not null and state<>1)   limit 10000     -- 一次获取1万条超时数据
))
returning *
)
select * from tmp1;  -- 计划  CTE Scan on tmp1  (cost=18163.25..18163.45 rows=10 width=48) (actual time=39.092..78.707 rows=10000 loops=1)Output: tmp1.id, tmp1.state, tmp1.deadts, tmp1.nts, tmp1.notify_times, tmp1.deadts_nextBuffers: shared hit=75094 read=49 dirtied=49CTE tmp1->  Update on public.tbl tbl_2  (cost=18151.05..18163.25 rows=10 width=54) (actual time=39.089..74.637 rows=10000 loops=1)Output: tbl_2.id, tbl_2.state, tbl_2.deadts, tbl_2.nts, tbl_2.notify_times, tbl_2.deadts_nextBuffers: shared hit=75094 read=49 dirtied=49InitPlan 1 (returns $0)->  Limit  (cost=0.13..18151.03 rows=10000 width=6) (actual time=31.265..36.899 rows=10000 loops=1)Output: tbl.ctidBuffers: shared hit=11395->  Append  (cost=0.13..764699.60 rows=421301 width=6) (actual time=31.264..35.354 rows=10000 loops=1)Buffers: shared hit=11395->  Index Scan using idx_tbl_1 on public.tbl  (cost=0.13..169527.13 rows=369766 width=6) (actual time=0.014..0.014 rows=0 loops=1)Output: tbl.ctidIndex Cond: (tbl.deadts < now())Buffers: shared hit=1->  Index Scan using idx_tbl_2 on public.tbl tbl_1  (cost=0.43..590959.46 rows=51535 width=6) (actual time=31.249..33.870 rows=10000 loops=1)Output: tbl_1.ctidIndex Cond: (tbl_1.deadts_next < now())Buffers: shared hit=11394->  Tid Scan on public.tbl tbl_2  (cost=0.01..12.21 rows=10 width=54) (actual time=39.017..43.529 rows=10000 loops=1)Output: tbl_2.id, tbl_2.state, tbl_2.deadts, tbl_2.nts, (tbl_2.notify_times + 1), (now() + tbl_2.nts), tbl_2.ctidTID Cond: (tbl_2.ctid = ANY ($0))Buffers: shared hit=21395Planning time: 0.301 msExecution time: 79.905 ms

丝般柔滑

Time: 79.905 ms

小结

使用以上方法,可以完美的解决超时数据的监控问题。性能好。

超时流式处理 - 没有消息流入的数据异常监控相关推荐

  1. python 流式编程_python 使用yield进行数据的流式处理

    demo:从文件中取包含字符"a"的5行数据做一次批处理!!! # coding: utf-8 import time def cat(f): for line in f: yie ...

  2. 实时流式处理平台功能介绍

    作者:赵平 导读:在上一篇Wormhole系列文章中,我们介绍了Wormhole的设计思想,并给出了Stream.UMS.Flow.Namespace等相关概念的具体定义,从文章中我们得知,Wormh ...

  3. 深度解读!时序数据库HiTSDB:分布式流式聚合引擎

    阿里妹导读:高性能时间序列数据库 (High-Performance Time Series Database , 简称 HiTSDB) 是一种高性能,低成本,稳定可靠的在线时序数据库服务, 提供高效 ...

  4. hadoop 批流处理的实现_从T+1到T+0,浅谈PetaBase的实时流式处理

    随着互联网+的进一步发展,各行业对大数据技术的应用日趋成熟,企业的信息化范围正在高速扩展. 我们发现,越来越多的企业大数据分析已不再局限于传统的T+1场景,对数据的实时性分析和处理要求很高.例如网站流 ...

  5. 大数据架构中的流式架构和Kappa架构

    关于大数据的架构有很多,比如说传统的大数据架构,当然,还有很多经典的大数据架构,比如说流式架构和Kappa架构.流式架构和Kappa架构在大数据中的应用还是很多的,在这篇文章中我们就给大家介绍一下关于 ...

  6. 大数据Hadoop之——新一代流式数据湖平台 Apache Hudi

    文章目录 一.概述 二.Hudi 架构 三.Hudi的表格式 1)Copy on Write(写时复制) 2)Merge On Read(读时合并) 3)COW vs MOR 四.元数据表(Metad ...

  7. chatGPT流式回复是怎么实现的

    chatGPT流式回复是怎么实现的 先说结论: chatGPT的流式回复用的就是HTTP请求方案中的server-send-event流式接口,也就是服务端向客户端推流数据. 那eventStream ...

  8. Data Artisans发布支持ACID事务的流式处理框架Streaming Ledger

    data Artisans宣布推出Streaming Ledger,它扩展了Apache Flink,提供了跨表.键和事件流执行可序列化ACID事务的功能.这项正在申请专利的技术是Flink的专有附加 ...

  9. ASP.NET Core SignalR中的流式传输

    什么是流式传输? 流式传输是这一种以稳定持续流的形式传输数据的技术. 流式传输的使用场景 有些场景中,服务器返回的数据量较大,等待时间较长,客户端不得不等待服务器返回所有数据后,再进行相应的操作.这时 ...

最新文章

  1. qt能使用logback_Spring boot使用logback实现日志配置
  2. Java异常机制及异常处理建议
  3. yum源简单介绍及本地yum源的搭建
  4. python 目录和文件操作
  5. jquery判断页面、图片是否加载完成
  6. 不停歇的 Java 即将发布 JDK 16,新特性速览!
  7. php重定向 htaccess文件的编写
  8. C语言实现pid算法(附完整源码)
  9. 无法读取iis redirection.config
  10. wine的sys文件具体位置
  11. docker4dotnet #2 容器化主机
  12. 《计算机网络》学习笔记 ·003【数据链路层】
  13. 上海,不算太郁闷的地方
  14. 强密码生成器(C++)
  15. 计算机软件存储位置,微信电脑版存储位置在什么地方?查看微信电脑版存储路径的方法...
  16. DataSource 详解
  17. cio时代_成功实现数字时代CIO的6条原则
  18. 树莓派安装第三方操作系统
  19. flink SQL与hbase整合
  20. php 朋友圈留言,php实例-PHP仿qq空间或朋友圈发布动态、评论动态、回复评论、删除动态或评论的功能(上)...

热门文章

  1. poj - 3786 Repeater
  2. 如何找到Kafka集群的吞吐量极限?\n
  3. Web 开发学习笔记(6) --- 前端开发之 HTML5
  4. 作为数据科学家,我都有哪些弱点
  5. 11.2.0.2 asmcmd lsdg show incorrect diskgroup number
  6. [Java开发之路](23)装箱与拆箱
  7. 一张图看Windows Store有多混乱 微软现在开始整治
  8. Windows 7键盘失灵导致无法输入登录密码问题解决方案
  9. How to Convert Array to ArrayList in Java?
  10. 铁路网上购票需要完善但值得鼓励