PostgreSQL 秒杀4种方法 - 增加 批量流式加减库存 方法

本文涉及的产品
云数据库 RDS SQL Server,基础系列 2核4GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云原生数据库 PolarDB 分布式版,标准版 2核8GB
简介:

标签

PostgreSQL , 秒杀 , 批量扣减 , 流处理


背景

秒杀,即对同一个商品,消减库存。

带来的数据库问题是热点行,由于数据库最细粒度的锁通常是行锁,同一行同一时刻只能被一个事务更新,其他事务如果要更新同一行,会等待行级排它锁。

PostgreSQL中有几种方法来处理秒杀:

1、暴力,直接更新

2、skip locked,跳过被锁的行,直接返回,防止会话长时间等待。可以发起重试。

3、advisory lock,跳过被锁的行,直接返回,防止会话长时间等待。可以发起重试。

4、把更新转成写入,批量消费,可以在内核层面实现(批量消耗),也可以在业务层面实现。

看一下几种方法的性能。

create table t(    
  id int primary key,  -- 商品ID    
  cnt int              -- 库存    
);    
    
insert into t values (1,2000000000);    
AI 代码解读

都使用100个并发连接。

1、暴力更新

100并发

transaction type: ./test.sql    
scaling factor: 1    
query mode: prepared    
number of clients: 100    
number of threads: 100    
duration: 120 s    
number of transactions actually processed: 342042    
latency average = 35.083 ms    
latency stddev = 36.273 ms    
tps = 2849.507392 (including connections establishing)    
tps = 2849.837580 (excluding connections establishing)    
script statistics:    
 - statement latencies in milliseconds:    
        35.083  update t set cnt=cnt-1 where id=1;    
AI 代码解读

2并发

transaction type: ./test.sql  
scaling factor: 1  
query mode: prepared  
number of clients: 2  
number of threads: 2  
duration: 120 s  
number of transactions actually processed: 2819491  
latency average = 0.085 ms  
latency stddev = 0.009 ms  
tps = 23495.740654 (including connections establishing)  
tps = 23496.241610 (excluding connections establishing)  
script statistics:  
 - statement latencies in milliseconds:  
         0.085  update t set cnt=cnt-1 where id=1;  
AI 代码解读

2、skip locked row

skip locked是PG提供的一种语法,可以跳过被锁的行。

update t set cnt=cnt-1 where ctid = any (array(select ctid from t where id=1 for update skip locked)) returning *;    
    
    
transaction type: ./test.sql    
scaling factor: 1    
query mode: prepared    
number of clients: 100    
number of threads: 100    
duration: 120 s    
number of transactions actually processed: 6508322    
latency average = 1.844 ms    
latency stddev = 2.390 ms    
tps = 54226.911876 (including connections establishing)    
tps = 54233.143956 (excluding connections establishing)    
script statistics:    
 - statement latencies in milliseconds:    
         1.843  update t set cnt=cnt-1 where ctid = any (array(select ctid from t where id=1 for update skip locked)) returning *;    
AI 代码解读

3、advisory lock

advisory lock,更新时,锁住PK,而不是ROW本身,如果未获得锁,直接返回。与skip locked类似,但是更加高效,因为不需要SEARCH ROW。

transaction type: ./test.sql    
scaling factor: 1    
query mode: prepared    
number of clients: 100    
number of threads: 100    
duration: 120 s    
number of transactions actually processed: 31690080    
latency average = 0.379 ms    
latency stddev = 0.671 ms    
tps = 264047.289635 (including connections establishing)    
tps = 264083.172081 (excluding connections establishing)    
script statistics:    
 - statement latencies in milliseconds:    
         0.379  update t set cnt=cnt-1 where id=1 and pg_try_advisory_xact_lock(1);    
AI 代码解读

4、流式批量更新

流式批量处理,将更新转换为写入,避免热点行锁,然后批量合并到库存表。

但是需要注意,这个属于异步的方法,也就是说,可能导致库存负数。不过消费足够快的话,不会有太大问题。

1、创建FEED表,存储用户扣减库存的记录。

create table stat(    
  uid int,   -- 用户ID    
  id int,    -- 商品ID    
  cnt int,   -- 购买数量    
  crt_time timestamp default now()  -- 写入时间    
);    
AI 代码解读

2、创建用户扣减库存的函数,这里面使用一个判断,当库存(也就是说,默认不关心还没有合并到最终结果的那些消费记录。)

create or replace function consume(int, int, int) returns int as $$    
  insert into stat (uid, id, cnt) select $1 as uid, $2 as id, $3 as cnt from t where id=$2 and cnt+$3>=0 returning cnt;    
$$ language sql strict;     
AI 代码解读

3、调度,比如每100毫秒调度一次,异步合并

with tmp as (    
delete from stat where ctid = any ( array (    
  select ctid from stat limit 1000000    
)) returning *    
),    
t1 as (select id, sum(cnt) as cnt from tmp group by id)    
update t set cnt=t.cnt+t1.cnt from t1 where t.id=t1.id;    
AI 代码解读

4、调度(可以使用autovacuum自动调度),垃圾回收。

vacuum stat;    
AI 代码解读

5、压测

pgbench -M prepared -n -r -P 1 -f ./test.sql -c 100 -j 100 -T 120    
    
transaction type: ./test.sql    
scaling factor: 1    
query mode: prepared    
number of clients: 100    
number of threads: 100    
duration: 120 s    
number of transactions actually processed: 17155235    
latency average = 0.699 ms    
latency stddev = 0.546 ms    
tps = 142929.999871 (including connections establishing)    
tps = 142949.652076 (excluding connections establishing)    
script statistics:    
 - statement latencies in milliseconds:    
         0.702  select consume(1,1,1);    
AI 代码解读

如果我们需要按先后顺序合并,可以加个索引

create index idx_stat_2 on stat(crt_time);    
AI 代码解读

合并SQL如下:

with tmp as (    
delete from stat where ctid = any ( array (    
  select ctid from stat order by crt_time limit 1000000    
)) returning *    
),    
t1 as (select id, sum(cnt) as cnt from tmp group by id)    
update t set cnt=t.cnt+t1.cnt from t1 where t.id=t1.id;    
AI 代码解读

性能如下:

transaction type: ./test.sql    
scaling factor: 1    
query mode: prepared    
number of clients: 100    
number of threads: 100    
duration: 120 s    
number of transactions actually processed: 10394002    
latency average = 1.154 ms    
latency stddev = 0.951 ms    
tps = 86585.839187 (including connections establishing)    
tps = 86597.281593 (excluding connections establishing)    
script statistics:    
 - statement latencies in milliseconds:    
         1.155  select consume(1,1);    
AI 代码解读

消费速度与写入速度几乎一致。只有调度延迟。

如果通过内核层面来实现的话,可以避免库存负数这个问题,提高一定的性能,但是:为了不破坏原有的一致性和可靠性,同样不能避免批量提交前,会话占用数据库连接的问题。

所以是有利有弊的。

另一方面,如果我们在内部实现同一个ID最多分配给两个SERVER PROCESS执行,也能很好的解决这个问题。类似oracle的shared server mode,同时对id进行路由分配,至多给两个SHARED PROCESS,从而每个ID保证2万多的TPS。

类似的流计算案例

《超时流式处理 - 没有消息流入的数据异常监控》

《阿里云RDS PostgreSQL varbitx实践 - 流式标签 (阅后即焚流式批量计算) - 万亿级,任意标签圈人,毫秒响应》

《PostgreSQL 流式统计 - insert on conflict 实现 流式 UV(distinct), min, max, avg, sum, count ...》

《HTAP数据库 PostgreSQL 场景与性能测试之 32 - (OLTP) 高吞吐数据进出(堆存、行扫、无需索引) - 阅后即焚(JSON + 函数流式计算)》

《HTAP数据库 PostgreSQL 场景与性能测试之 31 - (OLTP) 高吞吐数据进出(堆存、行扫、无需索引) - 阅后即焚(读写大吞吐并测)》

《HTAP数据库 PostgreSQL 场景与性能测试之 27 - (OLTP) 物联网 - FEED日志, 流式处理 与 阅后即焚 (CTE)》

《PostgreSQL 异步消息实践 - Feed系统实时监测与响应(如 电商主动服务) - 分钟级到毫秒级的实现》

小结

处理能力如下

1、暴力,直接更新

2849/s (100并发)

2.35万/s (2并发)

2、skip locked,跳过被锁的行,直接返回,防止会话长时间等待。可以发起重试。

5.4万/s

3、advisory lock,跳过被锁的行,直接返回,防止会话长时间等待。可以发起重试。

26.4万/s

4、把更新转成写入,批量消费,可以在内核层面实现(批量消耗),也可以在业务层面实现。

14.3万/s (乱序消费)

8.6万/s (按顺序消费)

内核层面来解决热点,批量合并或者shared server process和根据ID分配(每个ID 2.35万/s的处理吞吐已经够用了,因为秒杀完后,库存为负时,就没有锁冲突问题了),是最靠谱的。

相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
目录
打赏
0
2
0
0
20691
分享
相关文章
PostgreSQL配置文件修改及启用方法
总的来说,修改和启用PostgreSQL的配置文件是一个直接而简单的过程。只需要找到配置文件,修改你想要改变的选项,然后重启服务器即可。但是,你需要注意的是,不正确的配置可能会导致服务器性能下降,甚至导致服务器无法启动。因此,在修改配置文件之前,你应该充分理解每个选项的含义和影响,如果可能的话,你应该在测试环境中先进行试验。
112 72
在CentOS 6上安装和使用PostgreSQL的方法
在CentOS 6上安装和使用PostgreSQL的方法
114 2
在Ubuntu 18.04上安装和使用PostgreSQL的方法
在Ubuntu 18.04上安装和使用PostgreSQL的方法
131 1
在Ubuntu 14.04上安装和使用PostgreSQL的方法
在Ubuntu 14.04上安装和使用PostgreSQL的方法
89 1
在CentOS 7上安装和使用PostgreSQL的方法
在CentOS 7上安装和使用PostgreSQL的方法
677 0
postgresql |数据库 |数据库的常用备份和恢复方法总结
postgresql |数据库 |数据库的常用备份和恢复方法总结
291 0
PostgreSQL 准确且快速的数据对比方法
作为一款强大而广受欢迎的开源关系型数据库管理系统,PostgreSQL 在数据库领域拥有显著的市场份额。其出色的可扩展性、稳定性使其成为众多企业和项目的首选数据库。而在很多场景下(开发 | 生产环境同步、备份恢复验证、数据迁移、数据合并等),不同环境中的数据库数据可能导致数据的不一致,因此,进行数据库之间的数据对比变得至关重要。
427 0
PostgreSQL安装、配置及简单使用方法
一、PostgreSQL简介 1、什么是PostgreSQL PostgreSQL数据库是目前功能最强大的开源数据库,支持丰富的数据类型(如JSON何JSONB类型,数组类型)和自定义类型。而且它提供了丰富的接口,可以很容易地扩展它的功能,如可以在GiST框架下实现自己的索引类型等,它还支持使用C语言写自定义函数、触发器,也支持使用流行的语言写自定义函数,比如其中的PL/Perl提供了使用Perl语言写自定义函数的功能,当然还有PL/Python、PL/Tcl,等等。 2、PostgreSQL数据库的优势 PostgreSQL数据库是目前功能最强大的开源数据库,它是最接近工业标准SQL
1872 0
PostgreSQL 12的可拔插存储引擎--表访问方法以及bloackholes案例
PostgreSQL 12的可拔插存储引擎--表访问方法以及bloackholes案例
213 0

相关产品

  • 云原生数据库 PolarDB
  • 云数据库 RDS PostgreSQL 版