超时流式处理 - 没有消息流入的数据异常监控

本文涉及的产品
云原生数据库 PolarDB MySQL 版,Serverless 5000PCU 100GB
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介:

标签

PostgreSQL , 流式处理 , 无流入数据超时异常


背景

流计算有个特点,数据流式写入,流式计算。

但是有一种情况,可能无法覆盖。例如电商中的 收货超时,退款处理超时 事件的流式监控。因为数据都不会再写进来了,所以也无法触发流式计算。

这些问题如何流式预警呢?

可以用超时时间+调度的方式,当然这里面有PostgreSQL的独门秘籍:

1、CTE,语法灵活。

2、partial index,不需要检索的数据不构建索引。

3、DML returning,可以返回DML语句的结果,结合CTE实现最小交互计算。

4、multi-index bitmap scan,多个索引合并扫描,在使用OR条件时,可以结合多个字段的索引进行合并扫描。

DEMO设计

1、被监控表的结构。里面记录了订单、退款等事件的超时处理时间,超时通知次数,下一次通知时间间隔,完结状态等。

create table tbl (  
  id int8,                                         
  --                               ..... 其他字段 (比如已完结状态)  
  state int,                       -- 完结状态(1 表示已完结)  
  deadts timestamp,                -- 超时时间        
  nts interval,                    -- 超时间隔,用于更新下一次通知时间 (比如一天通知一次)       
  notify_times int default 0,      -- 通知次数  
  deadts_next timestamp            -- 下一次通知时间  
);  

2、创建partial index,也就是说,对未完结工单才需要通知用户,这些数据是业务关心的,使用partial index可以简化索引大小。提高速度。

create index idx_tbl_1 on tbl (deadts,notify_times,state) where notify_times=0 and state<>1;  
  
create index idx_tbl_2 on tbl (deadts_next,state) where deadts_next is not null and state<>1;  

3、获取需要通知的数据,并且更新通知次数以及下一次的通知时间。

with tmp1 as (  
update tbl set   
  deadts_next=now()+nts,   
  notify_times=notify_times+1   
where ctid = any (array(  
  select ctid from tbl where  
  ( deadts < now() and notify_times=0 and state<>1) 
  union all
  select ctid from tbl where
  ( deadts_next < now() and deadts_next is not null and state<>1)   
  limit 10000     -- 一次获取1万条超时数据    
))  
returning *  
)  
select * from tmp1;  

4、执行计划完美

 CTE Scan on tmp1  (cost=18163.25..18163.45 rows=10 width=48)
   CTE tmp1
     ->  Update on tbl tbl_2  (cost=18151.05..18163.25 rows=10 width=54)
           InitPlan 1 (returns $0)
             ->  Limit  (cost=0.13..18151.03 rows=10000 width=6)
                   ->  Append  (cost=0.13..764699.60 rows=421301 width=6)
                         ->  Index Scan using idx_tbl_1 on tbl  (cost=0.13..169527.13 rows=369766 width=6)
                               Index Cond: (deadts < now())
                         ->  Index Scan using idx_tbl_2 on tbl tbl_1  (cost=0.43..590959.46 rows=51535 width=6)
                               Index Cond: (deadts_next < now())
           ->  Tid Scan on tbl tbl_2  (cost=0.01..12.21 rows=10 width=54)
                 TID Cond: (ctid = ANY ($0))
(12 rows)

5、调度

《PostgreSQL 定时任务方法2》

《PostgreSQL Oracle 兼容性之 - DBMS_JOBS - Daily Maintenance - Timing Tasks(pgagent)》

当然你如果使用阿里云,可以使用阿里云的调度平台,配置调度任务。

性能指标

1、写入1亿数据,假设有100万条同时超时需要处理,耗时如何?

-- 1亿条完结  
insert into tbl select id, 1, now(), '5 min', 0, null from generate_series(1,100000000) t(id);  
  
-- 100万条超时  
insert into tbl select id, 0, now(), '5 min', 0, null from generate_series(1,1000000) t(id);  

通知性能,比如每一批通知1万条:

(小批量获取,并更新超时时间,目的是让autovacuum介入,实时回收垃圾)

with tmp1 as (  
update tbl set   
  deadts_next=now()+nts,   
  notify_times=notify_times+1   
where ctid = any (array(  
  select ctid from tbl where  
  ( deadts < now() and notify_times=0 and state<>1)   
  union all
  select ctid from tbl where
  ( deadts_next < now() and deadts_next is not null and state<>1)   
  limit 10000     -- 一次获取1万条超时数据    
))  
returning *  
)  
select * from tmp1;  
  
  
-- 计划  
  
 CTE Scan on tmp1  (cost=18163.25..18163.45 rows=10 width=48) (actual time=39.092..78.707 rows=10000 loops=1)
   Output: tmp1.id, tmp1.state, tmp1.deadts, tmp1.nts, tmp1.notify_times, tmp1.deadts_next
   Buffers: shared hit=75094 read=49 dirtied=49
   CTE tmp1
     ->  Update on public.tbl tbl_2  (cost=18151.05..18163.25 rows=10 width=54) (actual time=39.089..74.637 rows=10000 loops=1)
           Output: tbl_2.id, tbl_2.state, tbl_2.deadts, tbl_2.nts, tbl_2.notify_times, tbl_2.deadts_next
           Buffers: shared hit=75094 read=49 dirtied=49
           InitPlan 1 (returns $0)
             ->  Limit  (cost=0.13..18151.03 rows=10000 width=6) (actual time=31.265..36.899 rows=10000 loops=1)
                   Output: tbl.ctid
                   Buffers: shared hit=11395
                   ->  Append  (cost=0.13..764699.60 rows=421301 width=6) (actual time=31.264..35.354 rows=10000 loops=1)
                         Buffers: shared hit=11395
                         ->  Index Scan using idx_tbl_1 on public.tbl  (cost=0.13..169527.13 rows=369766 width=6) (actual time=0.014..0.014 rows=0 loops=1)
                               Output: tbl.ctid
                               Index Cond: (tbl.deadts < now())
                               Buffers: shared hit=1
                         ->  Index Scan using idx_tbl_2 on public.tbl tbl_1  (cost=0.43..590959.46 rows=51535 width=6) (actual time=31.249..33.870 rows=10000 loops=1)
                               Output: tbl_1.ctid
                               Index Cond: (tbl_1.deadts_next < now())
                               Buffers: shared hit=11394
           ->  Tid Scan on public.tbl tbl_2  (cost=0.01..12.21 rows=10 width=54) (actual time=39.017..43.529 rows=10000 loops=1)
                 Output: tbl_2.id, tbl_2.state, tbl_2.deadts, tbl_2.nts, (tbl_2.notify_times + 1), (now() + tbl_2.nts), tbl_2.ctid
                 TID Cond: (tbl_2.ctid = ANY ($0))
                 Buffers: shared hit=21395
 Planning time: 0.301 ms
 Execution time: 79.905 ms

丝般柔滑

Time: 79.905 ms  

小结

使用以上方法,可以完美的解决超时数据的监控问题。性能好。

相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
相关文章
|
消息中间件 存储 缓存
RocketMQ 监控告警:生产环境如何快速通过监控预警发现堆积、收发失败等问题?
本文主要向大家介绍如何利用 RocketMQ 可观测体系中的指标监控,对生产环境中典型场景:消息堆积、消息收发失败等场景配置合理的监控预警,快速发现问题,定位问题。
957 0
RocketMQ 监控告警:生产环境如何快速通过监控预警发现堆积、收发失败等问题?
|
9月前
|
消息中间件 存储 Kafka
MQ 学习日志(六) 保证消息的可靠性传输
消息的可靠性传输 简述
72 0
|
数据处理
使用队列和事务实现采集数据实例流程
使用队列和事务实现采集数据实例流程
63 0
|
消息中间件 存储 算法
多类型业务消息专题-定时消息| 学习笔记
快速学习多类型业务消息专题-定时消息
138 0
多类型业务消息专题-定时消息| 学习笔记
|
消息中间件 负载均衡 算法
消息消费负载和重新分布机制|学习笔记
快速学习消息消费负载和重新分布机制
70 0
消息消费负载和重新分布机制|学习笔记
|
消息中间件 存储 设计模式
生产故障|Kafka消息发送延迟达到几十秒的罪魁祸首竟然是...
生产故障|Kafka消息发送延迟达到几十秒的罪魁祸首竟然是...
生产故障|Kafka消息发送延迟达到几十秒的罪魁祸首竟然是...
|
消息中间件 存储 NoSQL
延迟消息的五种实现方案
生产者把消息发送到消息队列中以后,并不期望被立即消费,而是等待指定时间后才可以被消费者消费,这类消息通常被称为延迟消息。延迟消息的应用场景其实是非常的广泛,比如以下的场景:
608 0
延迟消息的五种实现方案
Ele
|
消息中间件 存储 架构师
消息丢失案例分析
关于MQ的消息丢失数据,分析
Ele
282 0
|
消息中间件
消息积压&消息丢失解决方案
消息积压&消息丢失解决方案
153 0
心跳 —— 超时机制分析
在C/S模式中,有时我们会长时间保持一个连接,以避免频繁地建立连接,但同时,一般会有一个超时时间,在这个时间内没发起任何请求的连接会被断开,以减少负载,节约资源。并且该机制一般都是在服务端实现,因为client强制关闭或意外断开连接,server端在此刻是感知不到的,如果放到client端实现,在上述情况下,该超时机制就失效了。本来这问题很普通,不太值得一提,但最近在项目中看到了该机制的一种糟糕的实现,故在此深入分析一下。
761 0
心跳 —— 超时机制分析