Greenplum merge insert 用法与性能 (insert on conflict)

本文涉及的产品
云原生数据库 PolarDB MySQL 版,Serverless 5000PCU 100GB
云原生数据库 PolarDB 分布式版,标准版 2核8GB
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介:

标签

PostgreSQL , Greenplum , merge insert , insert on conflict , 合并插入 , 有则更新 , 无则插入


背景

PostgreSQL insert on conflict语法非常强大,支持合并写入(当违反某唯一约束时,冲突则更新,不冲突则写入),同时支持流式计算。

流计算例子链接:

《PostgreSQL 流式统计 - insert on conflict 实现 流式 UV(distinct), min, max, avg, sum, count ...》

《HTAP数据库 PostgreSQL 场景与性能测试之 22 - (OLTP) merge insert|upsert|insert on conflict|合并写入》

《PostgreSQL upsert功能(insert on conflict do)的用法》

《PostgreSQL 10.0 preview 功能增强 - 支持分区表ON CONFLICT .. DO NOTHING》

PostgreSQL insert on conflict语法如下:

Command:     INSERT    
Description: create new rows in a table    
Syntax:    
[ WITH [ RECURSIVE ] with_query [, ...] ]    
INSERT INTO table_name [ AS alias ] [ ( column_name [, ...] ) ]    
    [ OVERRIDING { SYSTEM | USER} VALUE ]    
    { DEFAULT VALUES | VALUES ( { expression | DEFAULT } [, ...] ) [, ...] | query }    
    [ ON CONFLICT [ conflict_target ] conflict_action ]    
    [ RETURNING * | output_expression [ [ AS ] output_name ] [, ...] ]    
    
where conflict_target can be one of:    
    
    ( { index_column_name | ( index_expression ) } [ COLLATE collation ] [ opclass ] [, ...] ) [ WHERE index_predicate ]    
    ON CONSTRAINT constraint_name    
    
and conflict_action is one of:    
    
    DO NOTHING    
    DO UPDATE SET { column_name = { expression | DEFAULT } |    
                    ( column_name [, ...] ) = [ ROW ] ( { expression | DEFAULT } [, ...] ) |    
                    ( column_name [, ...] ) = ( sub-SELECT )    
                  } [, ...]    
              [ WHERE condition ]    

Greenplum的版本较低,还不支持insert on conflict的语法。

如果需要在Greenplum中实现类似的功能该如何操作?

DEMO

ID为PK,以它为合并列,举例。

1、目标表,也就是需要合并写入的目标:

create table t( id int primary key, c1 int , c2 int, c3 int, c4 int, c5 int, crt_time timestamp);    

2、中间表,也就是用户只管插入的表:

create table t_tmp(like t);    

写入一些中间记录。

insert into t_tmp values(1,1,2,3,null,null,now());    
insert into t_tmp values(1,1,2,4,null,null,now());    
insert into t_tmp values(1,1,2,3,null,7,now());    
insert into t_tmp values(1,1,null,3,5,6,now());    
postgres=# select * from t_tmp;    
 id | c1 | c2 | c3 | c4 | c5 |          crt_time              
----+----+----+----+----+----+----------------------------    
  1 |  1 |  2 |  3 |    |    | 2017-12-13 17:03:16.28482    
  1 |  1 |  2 |  4 |    |    | 2017-12-13 17:03:16.286302    
  1 |  1 |  2 |  3 |    |  7 | 2017-12-13 17:03:16.635121    
  1 |  1 |    |  3 |  5 |  6 | 2017-12-13 17:03:25.434191    
(4 rows)    

3、窗口合并,按唯一值约束,仅提取一条(可能存在窗口内合并的需求,例如按时间取最新,比如以最后一条为准,又或者以有值,且最新的为准)。

以有值切最新为准例子:

select distinct on (id)     
  id,    
  first_value(c1) over (partition by id order by (case when c1 is null then null else crt_time end) desc nulls last) as c1,    
  first_value(c2) over (partition by id order by (case when c2 is null then null else crt_time end) desc nulls last) as c2,    
  first_value(c3) over (partition by id order by (case when c3 is null then null else crt_time end) desc nulls last) as c3,    
  first_value(c4) over (partition by id order by (case when c4 is null then null else crt_time end) desc nulls last) as c4,    
  first_value(c5) over (partition by id order by (case when c5 is null then null else crt_time end) desc nulls last) as c5,    
  first_value(crt_time) over (partition by id order by crt_time desc) as crt_time    
  from t_tmp ;    
    
 id | c1 | c2 | c3 | c4 | c5 |          crt_time              
----+----+----+----+----+----+----------------------------    
  1 |  1 |  2 |  3 |  5 |  6 | 2017-12-13 17:03:25.434191    
(1 row)    

存储中间结果:

create table t_tmp1 (like t) ;    
    
insert into t_tmp1     
select distinct on (id)     
  id,    
  first_value(c1) over (partition by id order by (case when c1 is null then null else crt_time end) desc nulls last) as c1,    
  first_value(c2) over (partition by id order by (case when c2 is null then null else crt_time end) desc nulls last) as c2,    
  first_value(c3) over (partition by id order by (case when c3 is null then null else crt_time end) desc nulls last) as c3,    
  first_value(c4) over (partition by id order by (case when c4 is null then null else crt_time end) desc nulls last) as c4,    
  first_value(c5) over (partition by id order by (case when c5 is null then null else crt_time end) desc nulls last) as c5,    
  first_value(crt_time) over (partition by id order by crt_time desc) as crt_time    
  from t_tmp ;    

4、合并写入:

将窗口提取的结果,合并写入目标表。

4.1、INNER JOIN,覆盖旧记录,同时补齐旧的字段(以NULL为判断条件。如果新的记录没有值,则取旧记录的值。)提取。

create table t_tmp2 (like t);    
    
insert into t_tmp2    
select     
t_tmp.id,     
coalesce(t_tmp.c1, t.c1),     
coalesce(t_tmp.c2, t.c2),     
coalesce(t_tmp.c3, t.c3),     
coalesce(t_tmp.c4, t.c4),     
coalesce(t_tmp.c5, t.c5),     
coalesce(t_tmp.crt_time, t.crt_time)     
from    
t_tmp1 as t_tmp    
inner join     
t    
using (id);    

4.2、DELETE USING,删除全量表的符合条件的记录。

delete from t using t_tmp2 where t.id=t_tmp2.id;    

4.3、INSERT

insert into t    
select t_tmp1.* from t_tmp1 left join t_tmp2 using (id) where t_tmp2.* is null    
union all    
select * from t_tmp2;    

Greenplum merge insert 性能

硬件:使用一台64线程机器,单机启动48个segment。

1、全量数据20亿。

create table t(id int, c1 int , c2 int, c3 int, c4 int, c5 int, crt_time timestamp) with (APPENDONLY=true, ORIENTATION=column);    
    
insert into t select id, null,null,null,null,10000, now() from generate_series(1,2000000000) t(id);    

2、增量数据1000万条,涉及500万个ID。

create table t_tmp(like t);    
    
insert into t_tmp select random()*1000000, random()*100,null,null,null,null, clock_timestamp() from generate_series(1,2000000) t(id);    
insert into t_tmp select random()*2000000, null,random()*100,null,null,null, clock_timestamp() from generate_series(1,2000000) t(id);    
insert into t_tmp select random()*3000000, null,null,random()*100,null,null, clock_timestamp() from generate_series(1,2000000) t(id);    
insert into t_tmp select random()*4000000, null,null,null,random()*100,null, clock_timestamp() from generate_series(1,2000000) t(id);    
insert into t_tmp select random()*5000000, null,null,null,null,random()*100, clock_timestamp() from generate_series(1,2000000) t(id);    
    
总耗时4.5秒。    

3、合并。

增量数据,窗口合并去重。

create table t_tmp1 (like t) ;    
    
insert into t_tmp1     
select distinct on (id)     
  id,    
  first_value(c1) over (partition by id order by ((case when c1 is null then null else crt_time end) is null), (case when c1 is null then null else crt_time end) desc) as c1,    
  first_value(c2) over (partition by id order by ((case when c2 is null then null else crt_time end) is null), (case when c2 is null then null else crt_time end) desc) as c2,    
  first_value(c3) over (partition by id order by ((case when c3 is null then null else crt_time end) is null), (case when c3 is null then null else crt_time end) desc) as c3,    
  first_value(c4) over (partition by id order by ((case when c4 is null then null else crt_time end) is null), (case when c4 is null then null else crt_time end) desc) as c4,    
  first_value(c5) over (partition by id order by ((case when c5 is null then null else crt_time end) is null), (case when c5 is null then null else crt_time end) desc) as c5,    
  first_value(crt_time) over (partition by id order by crt_time desc) as crt_time    
  from t_tmp ;    
    
INSERT 0 3628283    
Time: 5208.968 ms    

使用增量数据,提取并合并旧数据。

create table t_tmp2 (like t);    
    
insert into t_tmp2    
select     
t_tmp.id,     
coalesce(t_tmp.c1, t.c1),     
coalesce(t_tmp.c2, t.c2),     
coalesce(t_tmp.c3, t.c3),     
coalesce(t_tmp.c4, t.c4),     
coalesce(t_tmp.c5, t.c5),     
coalesce(t_tmp.crt_time, t.crt_time)     
from    
t_tmp1 as t_tmp    
inner join     
t    
using (id);    
    
INSERT 0 3628282    
Time: 9504.092 ms    

删除旧数据。

delete from t using t_tmp2 where t.id=t_tmp2.id;    
    
DELETE 3628282    
Time: 15356.920 ms    

插入新增、以及合并的增量数据。

insert into t    
select t_tmp1.* from t_tmp1 left join t_tmp2 using (id) where t_tmp2.* is null    
union all    
select * from t_tmp2;    
    
INSERT 0 3628283    
Time: 778.014 ms    

数据校验

-- 中间结果    
    
postgres=# select * from t_tmp where id=9;    
 id | c1 | c2 | c3 | c4 | c5 |          crt_time              
----+----+----+----+----+----+----------------------------    
  9 | 99 |    |    |    |    | 2017-12-13 23:18:14.65243    
  9 |  7 |    |    |    |    | 2017-12-13 23:18:14.817107    
  9 |    |  9 |    |    |    | 2017-12-13 23:18:15.292311    
  9 |    | 56 |    |    |    | 2017-12-13 23:18:15.449415    
(4 rows)    
    
-- 中间结果    
    
postgres=# select * from t_tmp where id=446;    
 id  | c1 | c2 | c3 | c4 | c5 |          crt_time              
-----+----+----+----+----+----+----------------------------    
 446 | 43 |    |    |    |    | 2017-12-13 23:18:14.291335    
 446 | 16 |    |    |    |    | 2017-12-13 23:18:14.715026    
 446 | 45 |    |    |    |    | 2017-12-13 23:18:15.048879    
 446 |    | 34 |    |    |    | 2017-12-13 23:18:15.646904    
 446 |    |  7 |    |    |    | 2017-12-13 23:18:15.81838    
 446 |    |    | 12 |    |    | 2017-12-13 23:18:16.220083    
 446 |    |    | 22 |    |    | 2017-12-13 23:18:16.26496    
 446 |    |    |    | 97 |    | 2017-12-13 23:18:17.464355    
 446 |    |    |    |    | 56 | 2017-12-13 23:18:18.427068    
(9 rows)    
    
-- 使用窗口合并后结果    
    
postgres=# select * from t_tmp1 limit 10;    
 id  | c1 | c2 | c3 | c4 | c5 |          crt_time              
-----+----+----+----+----+----+----------------------------    
   9 |  7 | 56 |    |    |    | 2017-12-13 23:18:15.449415  -- 验证    
  25 | 69 |    |  1 |    |    | 2017-12-13 23:18:16.161339    
 169 | 74 | 33 |  3 |    |    | 2017-12-13 23:18:16.71554    
 185 | 22 |    |    |    |    | 2017-12-13 23:18:14.93206    
 217 | 11 | 20 | 26 |    | 59 | 2017-12-13 23:18:17.911174    
 270 | 55 |    | 42 |    |    | 2017-12-13 23:18:16.494782    
 286 | 65 | 77 | 17 |    | 75 | 2017-12-13 23:18:17.895121    
 430 | 12 |    | 56 |    |    | 2017-12-13 23:18:16.744847    
 446 | 45 |  7 | 22 | 97 | 56 | 2017-12-13 23:18:18.427068  -- 验证    
 478 | 23 | 56 |    | 25 | 77 | 2017-12-13 23:18:18.293153    
(10 rows)    
    
-- 合并到全量表后结果    
    
postgres=# select * from t where id=9;    
 id | c1 | c2 | c3 | c4 |  c5   |          crt_time              
----+----+----+----+----+-------+----------------------------    
  9 |  7 | 56 |    |    | 10000 | 2017-12-13 23:18:15.449415    
(1 row)    
    
postgres=# select * from t where id=446;    
 id  | c1 | c2 | c3 | c4 | c5 |          crt_time              
-----+----+----+----+----+----+----------------------------    
 446 | 45 |  7 | 22 | 97 | 56 | 2017-12-13 23:18:18.427068    
(1 row)    

4、合并总耗时:

35秒

5、耗时分布

增量数据1000万条,涉及500万个ID。

4.5秒

增量数据,窗口合并去重。

5.2秒

使用增量数据,提取并合并旧数据。

9.5秒

删除旧数据。

15秒

插入新增、以及合并的增量数据。

0.7秒

Greenplum merge insert 限制

比较复杂、而且不支持新值使用NULL值(要支持的话,得修改一下覆盖逻辑)。

参考

《Greenplum 排序nulls first|last的 SQL写法实现》

《PostgreSQL 流式统计 - insert on conflict 实现 流式 UV(distinct), min, max, avg, sum, count ...》

《HTAP数据库 PostgreSQL 场景与性能测试之 22 - (OLTP) merge insert|upsert|insert on conflict|合并写入》

《PostgreSQL upsert功能(insert on conflict do)的用法》

《PostgreSQL 10.0 preview 功能增强 - 支持分区表ON CONFLICT .. DO NOTHING》

相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
相关文章
|
4月前
Hologres的INSERT ON CONFLICT语法暂时不支持多个values的直接插入
Hologres的INSERT ON CONFLICT语法暂时不支持多个values的直接插入
25 2
|
9月前
openGauss向量化Merge Join--inner join
openGauss向量化Merge Join--inner join
55 0
openGauss向量化Merge Join--inner join
|
SQL 关系型数据库 MySQL
MySQL学习笔记汇总(三)——子查询、limit、表(insert,update,delete)
子查询、limit、表(insert,update,delete)的相关学习。
MySQL学习笔记汇总(三)——子查询、limit、表(insert,update,delete)
【1089】Insert or Merge (25 分)
先模拟直接插入排序的每一趟的过程,并进行判断每一趟的序列和给出的第二个序列是否相同,若不相同则一定是归并排序,直接对第一个序列进行归并排序一趟并输出。 (1)归并排序可以使用非递归版本(更少);
74 0
|
SQL 数据库
|
消息中间件 SQL 关系型数据库
【DB吐槽大会】第69期 - PG 不支持update | delete limit语法
大家好,这里是DB吐槽大会,第69期 - PG 不支持update | delete limit语法
|
SQL 弹性计算 关系型数据库
PostgreSQL 大宽表,全列索引,高并发合并写入(insert into on conflict, upsert, merge insert) - 实时adhoc query
标签 PostgreSQL , 全列索引 , 大宽表 , 写测试 , insert on conflict , upsert , merge insert , adhoc query 背景 OLAP系统中,adhoc query非常场景(任意维度查询分析)。 adhoc query,通常来说,可以加GIN倒排,或者每一列都加一个索引来实现。 《PostgreSQL 设计优化case
8416 0
|
弹性计算 关系型数据库 测试技术
PostgreSQL 分区表如何支持多列唯一约束 - 枚举、hash哈希 分区, 多列唯一, insert into on conflict, update, upsert, merge insert
标签 PostgreSQL , 分区表 , native partition , 唯一 , 非分区键唯一 , 组合唯一 , insert into on conflict , upsert , merge insert 背景 PG 11开始支持HASH分区,10的分区如果要支持hash分区,可以通过枚举绕道实现。 《PostgreSQL 9.x, 10, 11 hash分区表 用法举例
2900 0