超时流式处理 - 没有消息流入的数据异常监控
标签
PostgreSQL , 流式处理 , 无流入数据超时异常
背景
流计算有个特点,数据流式写入,流式计算。
但是有一种情况,可能无法覆盖。例如电商中的 收货超时,退款处理超时 事件的流式监控。因为数据都不会再写进来了,所以也无法触发流式计算。
这些问题如何流式预警呢?
可以用超时时间+调度的方式,当然这里面有PostgreSQL的独门秘籍:
1、CTE,语法灵活。
2、partial index,不需要检索的数据不构建索引。
3、DML returning,可以返回DML语句的结果,结合CTE实现最小交互计算。
4、multi-index bitmap scan,多个索引合并扫描,在使用OR条件时,可以结合多个字段的索引进行合并扫描。
DEMO设计
1、被监控表的结构。里面记录了订单、退款等事件的超时处理时间,超时通知次数,下一次通知时间间隔,完结状态等。
create table tbl ( id int8, -- ..... 其他字段 (比如已完结状态) state int, -- 完结状态(1 表示已完结) deadts timestamp, -- 超时时间 nts interval, -- 超时间隔,用于更新下一次通知时间 (比如一天通知一次) notify_times int default 0, -- 通知次数 deadts_next timestamp -- 下一次通知时间
);
2、创建partial index,也就是说,对未完结工单才需要通知用户,这些数据是业务关心的,使用partial index可以简化索引大小。提高速度。
create index idx_tbl_1 on tbl (deadts,notify_times,state) where notify_times=0 and state<>1; create index idx_tbl_2 on tbl (deadts_next,state) where deadts_next is not null and state<>1;
3、获取需要通知的数据,并且更新通知次数以及下一次的通知时间。
with tmp1 as (
update tbl set deadts_next=now()+nts, notify_times=notify_times+1
where ctid = any (array( select ctid from tbl where ( deadts < now() and notify_times=0 and state<>1) union allselect ctid from tbl where( deadts_next < now() and deadts_next is not null and state<>1) limit 10000 -- 一次获取1万条超时数据
))
returning *
)
select * from tmp1;
4、执行计划完美
CTE Scan on tmp1 (cost=18163.25..18163.45 rows=10 width=48)CTE tmp1-> Update on tbl tbl_2 (cost=18151.05..18163.25 rows=10 width=54)InitPlan 1 (returns $0)-> Limit (cost=0.13..18151.03 rows=10000 width=6)-> Append (cost=0.13..764699.60 rows=421301 width=6)-> Index Scan using idx_tbl_1 on tbl (cost=0.13..169527.13 rows=369766 width=6)Index Cond: (deadts < now())-> Index Scan using idx_tbl_2 on tbl tbl_1 (cost=0.43..590959.46 rows=51535 width=6)Index Cond: (deadts_next < now())-> Tid Scan on tbl tbl_2 (cost=0.01..12.21 rows=10 width=54)TID Cond: (ctid = ANY ($0))
(12 rows)
5、调度
《PostgreSQL 定时任务方法2》
《PostgreSQL Oracle 兼容性之 - DBMS_JOBS - Daily Maintenance - Timing Tasks(pgagent)》
当然你如果使用阿里云,可以使用阿里云的调度平台,配置调度任务。
性能指标
1、写入1亿数据,假设有100万条同时超时需要处理,耗时如何?
-- 1亿条完结
insert into tbl select id, 1, now(), '5 min', 0, null from generate_series(1,100000000) t(id); -- 100万条超时
insert into tbl select id, 0, now(), '5 min', 0, null from generate_series(1,1000000) t(id);
通知性能,比如每一批通知1万条:
(小批量获取,并更新超时时间,目的是让autovacuum介入,实时回收垃圾)
with tmp1 as (
update tbl set deadts_next=now()+nts, notify_times=notify_times+1
where ctid = any (array( select ctid from tbl where ( deadts < now() and notify_times=0 and state<>1) union allselect ctid from tbl where( deadts_next < now() and deadts_next is not null and state<>1) limit 10000 -- 一次获取1万条超时数据
))
returning *
)
select * from tmp1; -- 计划 CTE Scan on tmp1 (cost=18163.25..18163.45 rows=10 width=48) (actual time=39.092..78.707 rows=10000 loops=1)Output: tmp1.id, tmp1.state, tmp1.deadts, tmp1.nts, tmp1.notify_times, tmp1.deadts_nextBuffers: shared hit=75094 read=49 dirtied=49CTE tmp1-> Update on public.tbl tbl_2 (cost=18151.05..18163.25 rows=10 width=54) (actual time=39.089..74.637 rows=10000 loops=1)Output: tbl_2.id, tbl_2.state, tbl_2.deadts, tbl_2.nts, tbl_2.notify_times, tbl_2.deadts_nextBuffers: shared hit=75094 read=49 dirtied=49InitPlan 1 (returns $0)-> Limit (cost=0.13..18151.03 rows=10000 width=6) (actual time=31.265..36.899 rows=10000 loops=1)Output: tbl.ctidBuffers: shared hit=11395-> Append (cost=0.13..764699.60 rows=421301 width=6) (actual time=31.264..35.354 rows=10000 loops=1)Buffers: shared hit=11395-> Index Scan using idx_tbl_1 on public.tbl (cost=0.13..169527.13 rows=369766 width=6) (actual time=0.014..0.014 rows=0 loops=1)Output: tbl.ctidIndex Cond: (tbl.deadts < now())Buffers: shared hit=1-> Index Scan using idx_tbl_2 on public.tbl tbl_1 (cost=0.43..590959.46 rows=51535 width=6) (actual time=31.249..33.870 rows=10000 loops=1)Output: tbl_1.ctidIndex Cond: (tbl_1.deadts_next < now())Buffers: shared hit=11394-> Tid Scan on public.tbl tbl_2 (cost=0.01..12.21 rows=10 width=54) (actual time=39.017..43.529 rows=10000 loops=1)Output: tbl_2.id, tbl_2.state, tbl_2.deadts, tbl_2.nts, (tbl_2.notify_times + 1), (now() + tbl_2.nts), tbl_2.ctidTID Cond: (tbl_2.ctid = ANY ($0))Buffers: shared hit=21395Planning time: 0.301 msExecution time: 79.905 ms
丝般柔滑
Time: 79.905 ms
小结
使用以上方法,可以完美的解决超时数据的监控问题。性能好。
超时流式处理 - 没有消息流入的数据异常监控相关推荐
- python 流式编程_python 使用yield进行数据的流式处理
demo:从文件中取包含字符"a"的5行数据做一次批处理!!! # coding: utf-8 import time def cat(f): for line in f: yie ...
- 实时流式处理平台功能介绍
作者:赵平 导读:在上一篇Wormhole系列文章中,我们介绍了Wormhole的设计思想,并给出了Stream.UMS.Flow.Namespace等相关概念的具体定义,从文章中我们得知,Wormh ...
- 深度解读!时序数据库HiTSDB:分布式流式聚合引擎
阿里妹导读:高性能时间序列数据库 (High-Performance Time Series Database , 简称 HiTSDB) 是一种高性能,低成本,稳定可靠的在线时序数据库服务, 提供高效 ...
- hadoop 批流处理的实现_从T+1到T+0,浅谈PetaBase的实时流式处理
随着互联网+的进一步发展,各行业对大数据技术的应用日趋成熟,企业的信息化范围正在高速扩展. 我们发现,越来越多的企业大数据分析已不再局限于传统的T+1场景,对数据的实时性分析和处理要求很高.例如网站流 ...
- 大数据架构中的流式架构和Kappa架构
关于大数据的架构有很多,比如说传统的大数据架构,当然,还有很多经典的大数据架构,比如说流式架构和Kappa架构.流式架构和Kappa架构在大数据中的应用还是很多的,在这篇文章中我们就给大家介绍一下关于 ...
- 大数据Hadoop之——新一代流式数据湖平台 Apache Hudi
文章目录 一.概述 二.Hudi 架构 三.Hudi的表格式 1)Copy on Write(写时复制) 2)Merge On Read(读时合并) 3)COW vs MOR 四.元数据表(Metad ...
- chatGPT流式回复是怎么实现的
chatGPT流式回复是怎么实现的 先说结论: chatGPT的流式回复用的就是HTTP请求方案中的server-send-event流式接口,也就是服务端向客户端推流数据. 那eventStream ...
- Data Artisans发布支持ACID事务的流式处理框架Streaming Ledger
data Artisans宣布推出Streaming Ledger,它扩展了Apache Flink,提供了跨表.键和事件流执行可序列化ACID事务的功能.这项正在申请专利的技术是Flink的专有附加 ...
- ASP.NET Core SignalR中的流式传输
什么是流式传输? 流式传输是这一种以稳定持续流的形式传输数据的技术. 流式传输的使用场景 有些场景中,服务器返回的数据量较大,等待时间较长,客户端不得不等待服务器返回所有数据后,再进行相应的操作.这时 ...
最新文章
- qt能使用logback_Spring boot使用logback实现日志配置
- Java异常机制及异常处理建议
- yum源简单介绍及本地yum源的搭建
- python 目录和文件操作
- jquery判断页面、图片是否加载完成
- 不停歇的 Java 即将发布 JDK 16,新特性速览!
- php重定向 htaccess文件的编写
- C语言实现pid算法(附完整源码)
- 无法读取iis redirection.config
- wine的sys文件具体位置
- docker4dotnet #2 容器化主机
- 《计算机网络》学习笔记 ·003【数据链路层】
- 上海,不算太郁闷的地方
- 强密码生成器(C++)
- 计算机软件存储位置,微信电脑版存储位置在什么地方?查看微信电脑版存储路径的方法...
- DataSource 详解
- cio时代_成功实现数字时代CIO的6条原则
- 树莓派安装第三方操作系统
- flink SQL与hbase整合
- php 朋友圈留言,php实例-PHP仿qq空间或朋友圈发布动态、评论动态、回复评论、删除动态或评论的功能(上)...
热门文章
- poj - 3786 Repeater
- 如何找到Kafka集群的吞吐量极限?\n
- Web 开发学习笔记(6) --- 前端开发之 HTML5
- 作为数据科学家,我都有哪些弱点
- 11.2.0.2 asmcmd lsdg show incorrect diskgroup number
- [Java开发之路](23)装箱与拆箱
- 一张图看Windows Store有多混乱 微软现在开始整治
- Windows 7键盘失灵导致无法输入登录密码问题解决方案
- How to Convert Array to ArrayList in Java?
- 铁路网上购票需要完善但值得鼓励