用PostgreSQL支持含有更新,删除,插入的实时流式计算

本文涉及的产品
云原生多模数据库 Lindorm,多引擎 多规格 0-4节点
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
云数据库 MongoDB,通用型 2核4GB
简介: 大多数的流式计算产品只支持APPEND ONLY的应用场景,也就是只有插入,没有更新和删除操作。如果要实现更新和删除的实时流式计算,在PostgreSQL中可以这样来实现。在此前你可以阅读我以前写的文章来了解PG是如何处理一天一万亿的实时流式计算的:https://yq.aliyun.com/ar.

大多数的流式计算产品只支持APPEND ONLY的应用场景,也就是只有插入,没有更新和删除操作。
如果要实现更新和删除的实时流式计算,在PostgreSQL中可以这样来实现。
在此前你可以阅读我以前写的文章来了解PG是如何处理一天一万亿的实时流式计算的:
https://yq.aliyun.com/articles/166

要支持更新和删除,思路是这样的,加一张前置表,这个前置表的某个字段用来记录字段的最终状态,即到达这个状态后,记录不会被更新或删除。
通过触发器来控制什么记录插入到流中同时从前置表删除,什么记录现暂存在前置表。
下面是例子
本文假设flag=2是最终状态,应用层自己来定义这个FLAG。

pipeline=# create table pret1(id serial primary key, info text, flag smallint);
CREATE TABLE

pipeline=# create stream s0 (like pret1);
CREATE STREAM

pipeline=# create continuous view v0 as select count(*) from s0;
CREATE CONTINUOUS VIEW

flag=2的记录旁路到流,其他记录放到前置表。

pipeline=# create or replace function tg0() returns trigger as 
$$

 declare
 begin
   if new.flag=2 then
     insert into s0 values (new.*);
     return null;
   end if;
     return new;
 end;
 
$$
 language plpgsql strict;
CREATE FUNCTION

pipeline=# create trigger tg0 before insert on pret1 for each row execute procedure tg0();
CREATE TRIGGER

更新后flag=2的记录旁路到流,并删除前置表的对应记录。

pipeline=# create or replace function tg1() returns trigger as 
$$

 declare
 begin
   if new.flag=2 then
     insert into s0 values (new.*); 
     delete from pret1 where id=new.id; 
     return null;
   end if;
     return new;
 end;
 
$$
 language plpgsql strict;
CREATE FUNCTION

pipeline=# create trigger tg1 before update on pret1 for each row execute procedure tg1();
CREATE TRIGGER

测试

pipeline=# insert into pret1(info,flag) values ('test',0);
INSERT 0 1
pipeline=# select * from v0;
 count 
-------
(0 rows)

pipeline=# insert into pret1(info,flag) values ('test',1);
INSERT 0 1
pipeline=# select * from v0;
 count 
-------
(0 rows)

pipeline=# select * from pret1;
 id | info | flag 
----+------+------
  1 | test |    0
  2 | test |    1
(2 rows)

pipeline=# update pret1 set flag=2;
UPDATE 0
pipeline=# select * from pret1;
 id | info | flag 
----+------+------
(0 rows)

pipeline=# select * from v0;
 count 
-------
     2
(1 row)

pipeline=# insert into pret1(info,flag) values ('test',1);
INSERT 0 1
pipeline=# delete from pret1 ;
DELETE 1
pipeline=# select * from v0;
 count 
-------
     2
(1 row)

pipeline=# insert into pret1(info,flag) values ('test',1);
INSERT 0 1
pipeline=# select * from v0;
 count 
-------
     2
(1 row)

pipeline=# update pret1 set flag =10;
UPDATE 1
pipeline=# select * from v0;
 count 
-------
     2
(1 row)

pipeline=# select * from pret1;
 id | info | flag 
----+------+------
  4 | test |   10
(1 row)

pipeline=# update pret1 set flag =2;
UPDATE 0
pipeline=# select * from pret1;
 id | info | flag 
----+------+------
(0 rows)

pipeline=# select * from v0;
 count 
-------
     3
(1 row)

详情请参考
http://docs.pipelinedb.com/introduction.html

如果你觉得这还不够爽,PostgreSQL还有kafka插件,可以类lambda的模式从kafka持续读数据,进行流式计算。
PostgreSQL就是个"老流氓",因为任何软件可能都和这只大象有一腿。

相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
相关文章
|
SQL 关系型数据库 数据库
PostgreSQL 删除数据库
PostgreSQL 删除数据库
257 0
|
搜索推荐 关系型数据库 数据库
《阿里云RDS PostgreSQL实践课 2 实时用户画像数据库实践》电子版地址
阿里云RDS PostgreSQL实践课 2 实时用户画像数据库实践
112 0
《阿里云RDS PostgreSQL实践课 2 实时用户画像数据库实践》电子版地址
|
关系型数据库 数据库 PostgreSQL
PostgreSQL 删除数据库
PostgreSQL 删除数据库
137 0
|
存储 SQL Oracle
10 PostgreSQL 表级复制-物化视图篇, 支持异地,异构如 Oracle 到 pg 的物化视图|学习笔记
快速学习10 PostgreSQL 表级复制-物化视图篇,支持异地,异构如 Oracle 到 pg 的物化视图
426 0
10 PostgreSQL 表级复制-物化视图篇, 支持异地,异构如 Oracle 到 pg 的物化视图|学习笔记
|
存储 SQL Oracle
AnalyticDB PostgreSQL 7.0 支持存储过程(CREATE PROCEDURE)特性
AnalyticDB PostgreSQL 7.0 新增了存储过程功能的支持,让用户在使用ADB PG时能够更方便高效地开发业务,并能够更好地兼容Oracle等传统数仓的业务。
425 1
AnalyticDB PostgreSQL 7.0 支持存储过程(CREATE PROCEDURE)特性
|
SQL 存储 关系型数据库
RDS For SQL Server删除数据库报错
RDS For SQL Server删除数据库报错
|
SQL 并行计算 关系型数据库
Citus 分布式 PostgreSQL 集群 - SQL Reference(SQL支持和变通方案)
Citus 分布式 PostgreSQL 集群 - SQL Reference(SQL支持和变通方案)
147 0
|
存储 消息中间件 监控
分布式 PostgreSQL 集群(Citus)官方示例 - 实时仪表盘
分布式 PostgreSQL 集群(Citus)官方示例 - 实时仪表盘
192 0
分布式 PostgreSQL 集群(Citus)官方示例 - 实时仪表盘
|
存储 资源调度 Kubernetes
云原生 PostgreSQL - CrunchyData PGO 教程:创建、连接、删除 Postgres 集群
云原生 PostgreSQL - CrunchyData PGO 教程:创建、连接、删除 Postgres 集群
277 0
|
SQL 弹性计算 关系型数据库
PostgreSQL 12 preview - CTE 增强,支持用户语法层控制 materialized 优化
标签 PostgreSQL , CTE , materialized , not materialized , push down 背景 PostgreSQL with 语法,能跑非常复杂的SQL逻辑,包括递归,多语句物化计算等。 在12以前的版本中,WITH中的每一个CTE(common table express),都是直接进行物化的,也就是说外层的条件不会推到CTE(物化节点)里
837 0