阿里云RDS PG实践 - 流式标签 - 万亿级,实时任意标签圈人

本文涉及的产品
云原生数据库 PolarDB MySQL 版,Serverless 5000PCU 100GB
云原生数据库 PolarDB 分布式版,标准版 2核8GB
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介:

标签

PostgreSQL , 阅后即焚 , 流计算 , 标签


背景

varbitx是阿里云RDS PG提供的一个BIT操作插件,使用这个插件已经成功的帮助用户提供了万亿级的毫秒级实时圈人功能。

《阿里云RDS for PostgreSQL varbitx插件与实时画像应用场景介绍》

《基于 阿里云 RDS PostgreSQL 打造实时用户画像推荐系统(varbitx)》

结合阅后即焚的流式批量处理,schemaless UDF,可以实现高效的增、删标签,以及毫秒级别的按标签圈人。

《PostgreSQL 在铁老大订单系统中的schemaless设计和性能压测》

目标是百亿级用户体量,千万级标签。

架构图

pic

阿里云varbitx 添加标签的函数接口

1. set_bit_array

set_bit_array (  
  varbit,  
  int,   -- 目标BIT (0|1)  
  int,   -- 填充BIT (0|1)  
  int[]  -- 目标位置  
) returns varbit  
   
  将指定位置的BIT设置为0|1,(起始位=0),超出原始长度的部分填充0|1  
  例如 set_bit_array('111100001111', 0, 1, array[1,15]) 返回 1011000011111110  

正在添加这个函数,用于一次性处理添加和删除的BIT的设置

2. set_bit_array

set_bit_array (  
  varbit,  
  int,     -- 1目标BIT (0|1)  
  int[]    -- 1目标位置  
  int,     -- 2目标BIT (0|1)  
  int[],   -- 2目标位置  
  int      -- 填充BIT (0|1)  
) returns varbit  
  
  将指定位置的BIT设置为0|1,(起始位=0),超出原始长度的部分填充0|1  
  例如 set_bit_array('111100001111', 0, array[1,15], 1, array[0,4], 0) 返回 1011100011111110  

阿里云varbitx 从bitmap得到字典ID

1. bit_posite

bit_posite (    
  varbit,     
  int,      -- (0|1)    
  boolean     
) returns int[]      
    
  返回 0|1 的位置,(起始位=0), true时正向返回,false时反向返回        
  例如 bit_posite ('11110010011', 1, true) 返回 [0,1,2,3,6,9,10]      
       bit_posite ('11110010011', 1, false) 返回 [10,9,6,3,2,1,0]     

从字典ID得到USERID

select uid from dict where id = any ( bit_posite(x,x,x) );      

demo

1 字典表

将USERID转换为ARRAY下标,即字典表

首先需要用到无缝自增ID

《PostgreSQL 无缝自增ID的实现 - by advisory lock》

1、字典表如下

create table dict(id int8 primary key, uid int8 not null);    
  
create unique index idx_dict_uid on dict (uid);    

2 生成已有用户

create table t_uid (  
  uid int8 primary key  -- 已有用户的USER ID  
);  

插入一批 USERID (一亿)

insert into t_uid select id from generate_series(1,100000000) t(id) order by random() ;  

3 已有用户,一次性生成mapping下标

create sequence seq minvalue 0 start 0;    
  
  
insert into dict select nextval('seq'), uid from t_uid;    
  
  
select min(id),max(id),count(*) from dict;    
 min |   max    |   count     
-----+----------+-----------  
   0 | 99999999 | 100000000  
(1 row)  

4 无缝自增下标 函数

新增的用户,通过这个函数写入,确保无缝自增下标:

create or replace function f_uniq(i_uid int8) returns int as $$  
declare  
  newid int;    
  i int := 0;    
  res int;    
begin  
  loop   
    if i>0 then   
      perform pg_sleep(0.2*random());  
    else  
      i := i+1;  
    end if;  
  
    -- 获取已有的最大ID+1 (即将插入的ID)  
    select max(id)+1 into newid from dict;    
    if newid is not null then  
      -- 获取AD LOCK  
      if pg_try_advisory_xact_lock(newid) then  
        -- 插入  
        insert into dict (id,uid) values (newid,i_uid) ;    
        -- 返回此次获取到的UID  
        return newid;    
      else  
        -- 没有获取到AD LOCK则继续循环  
        continue;    
      end if;    
    else  
      -- 表示这是第一条记录,获取AD=0 的LOCK  
      if pg_try_advisory_xact_lock(0) then  
        insert into dict (id, uid) values (0, i_uid) ;    
        return 0;    
      else  
        continue;     
      end if;    
    end if;    
  end loop;    
    
  -- 如果因为瞬态导致PK冲突了,继续调用  
  exception when others then  
    select f_uniq(i_uid) into res;    
    return res;    
end;  
$$ language plpgsql strict;  

例如,新增一个USERID。

select f_uniq(?);  

5 从ID或者USERID或从USERID获得ID

create or replace function get_id_from_uid (int8) returns int8 as $$  
  select id from dict where uid=$1;  
$$ language sql strict;  
  
create or replace function get_uid_from_id (int8) returns int8 as $$  
  select uid from dict where id=$1;  
$$ language sql strict;  

6 标签描述表

create table t_tags(tagid int primary key, desc text);    

7 流式标签表

1、UID范围映射表,落在什么区间的ID,对应什么后缀的表名

建议在写入时,自动写到不同的t_tag_log表,这样的话,不需要在消费时过滤。

create table t_mapping (  
  dict_id int8range,      -- 字典ID区间  
  suffix text unique             -- 表名后缀  
);  
  
alter table t_mapping add constraint ck_exclude_dict_id exclude using gist(dict_id with &&);  -- 防止交叉区间  
insert into t_mapping values (int8range(0,100000000), '0');  
insert into t_mapping values (int8range(100000000,200000000), '1');  
insert into t_mapping values (int8range(200000000,300000000), '2');  
insert into t_mapping values (int8range(300000000,400000000), '3');  

2、流式标签主表

create table t_tag_log (  
  dict_id int8,        -- 用户下标ID    
  action int2,         -- 删除0、新增1    
  tagid int,           -- 标签ID   
  crt_time timestamp   -- 时间    
);  
  
create index idx_t_tag_log on t_tag_log (crt_time);  

3、创建流式标签子表

按tagid分区,按t_mapping后缀对应关系分区。

do language plpgsql $$  
declare  
  x text;  
begin  
  for i in 0..63 loop  
    for x in select suffix from t_mapping loop  
      execute format('create table t_tag_log_%s_%s (like t_tag_log including all) inherits (t_tag_log)', i, x);  
    end loop;  
  end loop;  
end;  
$$;  
  
postgres=# \dt t_tag_log_*  
             List of relations  
 Schema |      Name      | Type  |  Owner     
--------+----------------+-------+----------  
 public | t_tag_log_0_0  | table | postgres  
 public | t_tag_log_0_1  | table | postgres  
 public | t_tag_log_0_2  | table | postgres  
 public | t_tag_log_0_3  | table | postgres  
 public | t_tag_log_10_0 | table | postgres  
 public | t_tag_log_10_1 | table | postgres  
 public | t_tag_log_10_2 | table | postgres  
 public | t_tag_log_10_3 | table | postgres  
 public | t_tag_log_11_0 | table | postgres  
 public | t_tag_log_11_1 | table | postgres  
.....................

4、使用schema lessUDF写入标签信息

create or replace function ins_tag(v_uid int8, v_action int2, v_tagid int, v_crt_time timestamp) returns void as $$  
declare  
  i int := mod(v_tagid, 64);  
  x text;   
begin  
  select suffix into x from t_mapping where dict_id @> $1;  
  execute format ('insert into t_tag_log_%s_%s (dict_id, action, tagid, crt_time) values (%s, %s, %s, %L)', i, x, get_id_from_uid(v_uid), v_action, v_tagid, v_crt_time);  
end;  
$$ language plpgsql strict;  

8 高速写入贴标签、删除标签

vi test.sql  
  
\set uid random(1,100000000)  
\set tagid random(1,100000)  
\set action random(0,1)  
select ins_tag (:uid::int8, :action::int2, :tagid, now()::timestamp);  
  
pgbench -M prepared -n -r -P 1 -f ./test.sql -c 32 -j 32 -T 120  

单表单条写入性能:

单表或多表 批量写入 性能可以超过100万行/s。

transaction type: ./test.sql  
scaling factor: 1  
query mode: prepared  
number of clients: 32  
number of threads: 32  
duration: 120 s  
number of transactions actually processed: 19013298  
latency average = 0.202 ms  
latency stddev = 0.267 ms  
tps = 158442.952245 (including connections establishing)  
tps = 158449.386772 (excluding connections establishing)  
script statistics:  
 - statement latencies in milliseconds:  
         0.001  \set uid random(1,100000000)  
         0.000  \set tagid random(1,100000)  
         0.000  \set action random(0,1)  
         0.200  insert into t_tag_log (dict_id, action, tagid, crt_time) values (get_id_from_uid(:uid), :action, :tagid, now());  

9 标签反转表

对应到前面的拆分模式,有两种模式,一种模式,使用多表存储。另一种模式使用单表存储。

1、多表存储。

标签反转主表

create table tag_userbitmap (  
  tagid int primary key,             -- 标签ID  
  usrebitmap varbit                  -- 用户bitmap  
);  

标签反转子表

do language plpgsql $$  
declare  
  x text;  
begin  
  for x in select suffix from t_mapping loop  
    execute format('create table tag_userbitmap_%s (like tag_userbitmap including all) inherits (tag_userbitmap)', x);  
  end loop;  
end;  
$$;  

tag不需要分区,因为增量更新时,没有行锁冲突,而且表大小也足够。

postgres=# \dt tag_userbitmap*  
              List of relations  
 Schema |       Name       | Type  |  Owner     
--------+------------------+-------+----------  
 public | tag_userbitmap   | table | postgres  
 public | tag_userbitmap_0 | table | postgres  
 public | tag_userbitmap_1 | table | postgres  
 public | tag_userbitmap_2 | table | postgres  
 public | tag_userbitmap_3 | table | postgres  
(5 rows)  

2、单表存储,则加一列区分OFFSET。

create table tag_userbitmap_single (
  tagid int ,               -- 标签ID
  bitmap_offset int ,       -- bit号段offset 值
  usrebitmap varbit,        -- 当前号段的bitmap
  primary key (tagid, bitmap_offset)
);

10 阅后即焚-流式消费标签,合并到标签反转表

不考虑分区表时,这样来进行消费。

with tmp as (delete from t_tag_log     -- 查询哪个表  
  where ctid = any ( array (  
    select ctid from t_tag_log order by crt_time limit 100000       -- 批量处理10万条  
  )) returning *  
)   
select tagid, action, array_agg(dict_id) as dict_id from    
(  
  select row_number() over w1 as rn, *   
  from tmp   
  window w1 as (partition by dict_id, tagid order by crt_time desc)  -- 每个ID,每个标签,取最后一条  
) t where rn=1  
group by tagid, action                                               -- 聚合为下标数组  
;  

11 schemaless UDF, 阅后即焚-流式消费标签,合并到标签反转表

对应tag_userbimap表的设计(单表、或多表),schemaless UDF也有不同。

1、多表:在UDF中自动拼接表名,将BITMAP合并到对应子表。

create or replace function merge_tags(v_limit int, v_suffix1 int, v_suffix2 int) returns void as $$  
declare  
  v_tagid int;  
  v_action int2;  
  v_dict_id int[];  
  v_offset int;  
  v_fixed_dict_id int[];  
begin  
  select substring(dict_id::text,'(\d+),') into v_offset from t_mapping where suffix=v_suffix2::text;   
  
  for v_tagid, v_action, v_dict_id in   
    execute format   
('  
with tmp as (delete from t_tag_log_%s_%s     -- 查询哪个表  
  where ctid = any ( array (  
    select ctid from t_tag_log_%s_%s order by crt_time limit %s       -- 批量处理N条  
  )) returning *  
)   
select tagid, action, array_agg(dict_id) as dict_id from    
(  
  select row_number() over w1 as rn, *   
  from tmp   
  window w1 as (partition by dict_id, tagid order by crt_time desc)   -- 每个ID,每个标签,取最后一条  
) t where rn=1  
group by tagid, action                                               
',   
v_suffix1, v_suffix2, v_suffix1, v_suffix2, v_limit)    
  
loop  
  select array(select unnest(v_dict_id)-v_offset) into v_fixed_dict_id;    
  -- raise notice '% ,% ,% ,%', v_tagid, v_action, v_dict_id, v_fixed_dict_id;  
  execute format('insert into tag_userbitmap_%s (tagid, usrebitmap) values (%s, set_bit_array(''0'', %s, %s, %L))   
                  on conflict (tagid)   
                  do update set usrebitmap=set_bit_array(tag_userbitmap_%s.usrebitmap, %s, %s, %L)',   
                  v_suffix2, v_tagid, v_action, 0, v_fixed_dict_id, v_suffix2, v_action, 0, v_fixed_dict_id);  
end loop;  
end;  
$$ language plpgsql strict;  

2、单表:

create or replace function merge_tags(v_limit int, v_suffix1 int, v_suffix2 int) returns void as $$  
declare  
  v_tagid int;  
  v_action int2;  
  v_dict_id int[];  
  v_offset int;  
  v_fixed_dict_id int[];  
begin  
  select substring(dict_id::text,'(\d+),') into v_offset from t_mapping where suffix=v_suffix2::text;   
  
  for v_tagid, v_action, v_dict_id in   
    execute format   
('  
with tmp as (delete from t_tag_log_%s_%s     -- 查询哪个表  
  where ctid = any ( array (  
    select ctid from t_tag_log_%s_%s order by crt_time limit %s       -- 批量处理N条  
  )) returning *  
)   
select tagid, action, array_agg(dict_id) as dict_id from    
(  
  select row_number() over w1 as rn, *   
  from tmp   
  window w1 as (partition by dict_id, tagid order by crt_time desc)   -- 每个ID,每个标签,取最后一条  
) t where rn=1  
group by tagid, action                                               
',   
v_suffix1, v_suffix2, v_suffix1, v_suffix2, v_limit)    
  
loop  
  select array(select unnest(v_dict_id)-v_offset) into v_fixed_dict_id;    
  -- raise notice '% ,% ,% ,%', v_tagid, v_action, v_dict_id, v_fixed_dict_id;  
  execute format('insert into tag_userbitmap (tagid, bitmap_offset, usrebitmap) values (%s, %s, set_bit_array(''0'', %s, %s, %L))   
                  on conflict (tagid, bitmap_offset)   
                  do update set usrebitmap=set_bit_array(tag_userbitmap.usrebitmap, %s, %s, %L)',   
                  v_suffix2, v_suffix1, v_action, 0, v_fixed_dict_id, v_action, 0, v_fixed_dict_id);  
end loop;  
end;  
$$ language plpgsql strict;  

3、单表和多表的函数接口是一样的,都是调用merge_tags函数,增量更新TAG,不同组合,允许并行调用。

以下QUERY可以并行调用:

select merge_tags(100000, 0, 0);  
select merge_tags(100000, 0, 1);  
select merge_tags(100000, 0, 2);  
select merge_tags(100000, 0, 3);  
.....  
select merge_tags(100000, 63, 0);  
select merge_tags(100000, 63, 1);  
select merge_tags(100000, 63, 2);  
select merge_tags(100000, 63, 3);  

12 查询包含哪些TAG的USEID

这个SQL可以略微修改,包括coalesce,补齐varbit长度到固定长度等。

full outer JOIN 得到最终的varbit。

根据bitmap求dict_id, 再根据dict_id求user_id。

多表:

select bit_posite(t1.usrebitmap||t2.usrebitmap||t3.usrebitmap||t4.usrebitmap, '1', true) 
from tag_userbitmap_0 t1 , ....
where tx.tagid in (....);

或单表:

create aggregate bit_agg (varbit) (sfunc = bitcat, stype=varbit) ;
select bit_posite(bit_and(tag), '1', true) from (
  select bit_agg(usrebitmap) from tag_userbitmap where tagid in (...) group by tagid
) t

详见用法

《阿里云RDS for PostgreSQL varbitx插件与实时画像应用场景介绍》

其他

1、维护字典表(删除僵尸用户)

2、同时收缩VARBITX

查询某个用户有哪些标签

这种点查,建议使用以下结构,数组作为标签。与本文相反。

create table t_user_tags(   
  uid int8 primary key,   -- 用户ID  
  tagid int[]             -- 标签ID  
);  

相似案例

《PostgreSQL手机行业经营分析、决策系统设计 - 实时圈选、透视、估算》

《阿里云RDS for PostgreSQL varbitx插件与实时画像应用场景介绍》

《基于 阿里云 RDS PostgreSQL 打造实时用户画像推荐系统(varbitx)》

《HTAP数据库 PostgreSQL 场景与性能测试之 32 - (OLTP) 高吞吐数据进出(堆存、行扫、无需索引) - 阅后即焚(JSON + 函数流式计算)》

《HTAP数据库 PostgreSQL 场景与性能测试之 31 - (OLTP) 高吞吐数据进出(堆存、行扫、无需索引) - 阅后即焚(读写大吞吐并测)》

《HTAP数据库 PostgreSQL 场景与性能测试之 27 - (OLTP) 物联网 - FEED日志, 流式处理 与 阅后即焚 (CTE)》

《在PostgreSQL中实现update | delete limit - CTID扫描实践 (高效阅后即焚)》

《PostgreSQL 异步消息实践 - Feed系统实时监测与响应(如 电商主动服务) - 分钟级到毫秒级的实现》

相关实践学习
基于CentOS快速搭建LAMP环境
本教程介绍如何搭建LAMP环境,其中LAMP分别代表Linux、Apache、MySQL和PHP。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
27天前
|
SQL 关系型数据库 MySQL
阿里云MySQL数据库价格、购买、创建账号密码和连接数据库教程
阿里云数据库使用指南:购买MySQL、SQL Server等RDS实例,选择配置和地区,完成支付。创建数据库和账号,设置权限。通过DMS登录数据库,使用账号密码访问。同地域VPC内的ECS需将IP加入白名单以实现内网连接。参考链接提供详细步骤。
366 3
|
21天前
|
SQL 存储 API
阿里云实时计算Flink的产品化思考与实践【下】
本文整理自阿里云高级产品专家黄鹏程和阿里云技术专家陈婧敏在 FFA 2023 平台建设专场中的分享。
110279 10
阿里云实时计算Flink的产品化思考与实践【下】
|
21天前
|
存储 关系型数据库 数据库
超1/3中国500强企业都在用的「汇联易」,为什么选用阿里云RDS?
迎峰而上:汇联易依托阿里云RDS通用云盘,加速业务智能化升级
超1/3中国500强企业都在用的「汇联易」,为什么选用阿里云RDS?
|
27天前
|
弹性计算 关系型数据库 MySQL
阿里云MySQL云数据库优惠价格、购买和使用教程分享!
阿里云数据库使用流程包括购买和管理。首先,选购支持MySQL、SQL Server、PostgreSQL等的RDS实例,如选择2核2GB的MySQL,设定地域和可用区。购买后,等待实例创建。接着,创建数据库和账号,设置DB名称、字符集及账号权限。最后,通过DMS登录数据库,填写账号和密码。若ECS在同一地域和VPC内,可内网连接,记得将ECS IP加入白名单。
419 2
|
27天前
|
SQL 关系型数据库 MySQL
阿里云mysql数据库价格购买和使用教程
阿里云数据库使用指南:购买MySQL、SQL Server等RDS实例,通过选择配置、地域和可用区完成购买。创建数据库和账号,分配权限。使用DMS登录数据库,进行管理操作。确保ECS与RDS在同一地域的VPC内,配置白名单实现内网连接。详细步骤见官方文档。
627 1
|
28天前
|
弹性计算 网络协议 关系型数据库
网络技术基础阿里云实验——企业级云上网络构建实践
实验地址:<https://developer.aliyun.com/adc/scenario/65e54c7876324bbe9e1fb18665719179> 本文档指导在阿里云上构建跨地域的网络环境,涉及杭州和北京两个地域。任务包括创建VPC、交换机、ECS实例,配置VPC对等连接,以及设置安全组和网络ACL规则以实现特定服务间的互访。例如,允许北京的研发服务器ECS-DEV访问杭州的文件服务器ECS-FS的SSH服务,ECS-FS访问ECS-WEB01的SSH服务,ECS-WEB01访问ECS-DB01的MySQL服务,并确保ECS-WEB03对外提供HTTP服务。
|
30天前
|
关系型数据库 MySQL 数据库
使用阿里云的数据传输服务DTS(Data Transmission Service)进行MySQL 5.6到MySQL 8.0的迁移
【2月更文挑战第29天】使用阿里云的数据传输服务DTS(Data Transmission Service)进行MySQL 5.6到MySQL 8.0的迁移
216 2
|
1月前
|
SQL 关系型数据库 MySQL
购买阿里云RDS实例
购买阿里云RDS实例
165 2
|
1月前
|
存储 DataWorks 关系型数据库
购买和初始化阿里云RDS
购买和初始化阿里云RDS
26 3
|
10天前
|
关系型数据库 MySQL 数据库
mysql卸载、下载、安装(window版本)
mysql卸载、下载、安装(window版本)

相关产品

  • 云数据库 RDS MySQL 版