PostgreSQL dblink异步调用实践,跑并行多任务 - 例如开N个并行后台任务创建索引, 开N个后台任务跑若干SQL

本文涉及的产品
云原生数据库 PolarDB MySQL 版,Serverless 5000PCU 100GB
云原生数据库 PolarDB 分布式版,标准版 2核8GB
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介: 标签PostgreSQL , 后台任务 , DBLINK 异步调用背景使用DBLINK异步接口,可以非常方便的实现跑后台任务,如果要让数据库执行若干条SQL,开N个并行执行,同样可以使用DBLINK封装成API进行调用。

标签

PostgreSQL , 后台任务 , DBLINK 异步调用


背景

使用DBLINK异步接口,可以非常方便的实现跑后台任务,如果要让数据库执行若干条SQL,开N个并行执行,同样可以使用DBLINK封装成API进行调用。

例如,结合我前面的一些文字,可以实现自动选择索引接口、指定并行度、指定表空间、给所有字段创建索引。

《自动选择正确索引访问接口(btree,hash,gin,gist,sp-gist,brin,bitmap...)的方法》

《PostgreSQL 快速给指定表每个字段创建索引》

《阿里云RDS PostgreSQL OSS 外部表实践 - (dblink异步调用封装并行) 从OSS并行导入数据》

《在PostgreSQL中跑后台长任务的方法 - 使用dblink异步接口》

并行后台任务接口实现

接口效果:

select run_sqls_parallel (  
  参数1:并行度,  
  参数2:要执行的SQLs(数组呈现)  
);  

实现

1、创建dblink插件

create extension if not exists dblink;    

2、创建一个建立连接函数,不报错

create or replace function conn(      
  name,   -- dblink名字      
  text    -- 连接串,URL      
) returns void as $$        
declare        
begin        
  perform dblink_connect($1, $2);       
  return;        
exception when others then        
  return;        
end;        
$$ language plpgsql strict;    

3、创建跑多任务的接口函数

create or replace function run_sqls_parallel(   
  parallels int,    -- 并行度  
  sqls text[],      -- 需要执行的SQLs  
  conn text    default format('hostaddr=%s port=%s user=%s dbname=%s application_name=', '127.0.0.1', current_setting('port'), current_user, current_database())       -- 连接串     
)  
returns setof record as $$  
declare  
  app_prefix_stat text := md5(random()::text);   -- 用来获取pg_stat_activity的实时内容 (由于pg_stat_activity的函数是stable的,无法在事务中获取到被其他会话变更的内容)  
  app_prefix text := md5(random()::text);        -- application, dblink name prefix   
  i int := 1;       -- 任务ID变量,1累加  
  app_conn_name text;  -- application_name, dblink conn name = app_prefix+i   
  sql text;       -- SQL 元素   
  current_conns int := 0;  -- 当前活跃的异步调用   
begin  
  -- 建立获取实时pg_stat_activity内容连接    
  perform conn(app_prefix_stat,  conn||app_prefix_stat);              
  
  foreach sql in array sqls  
  loop  
    -- 当前是否有空闲异步连接   
    select application_name into app_conn_name from   
      dblink(app_prefix_stat, format($_$ select application_name from pg_stat_activity where application_name ~ '^%s' and state='idle' limit 1 $_$, app_prefix))  
    as t(application_name text);   
    -- 有空闲异步连接  
    if found then  
      -- 消耗掉上一次异步连接的结果,否则会报错。  
      return query select a from dblink_get_result(app_conn_name, false) as t(a text);      
      return query select a from dblink_get_result(app_conn_name, false) as t(a text);    
  
      -- 发送异步DBLINK调用      
      perform dblink_send_query(app_conn_name, sql);   
      
    -- 无空闲异步连接  
    else  
      -- 当前已建立的异步连接数  
      select cn into current_conns from   
        dblink(app_prefix_stat, format($_$ select count(*) from pg_stat_activity where application_name ~ '^%s' $_$, app_prefix))  
      as t(cn int);      
      loop  
        -- 达到并行度  
        if current_conns >= parallels then   
          -- 是否有空闲异步连接  
          select application_name into app_conn_name from   
            dblink(app_prefix_stat, format($_$ select application_name from pg_stat_activity where application_name ~ '^%s' and state='idle' limit 1 $_$, app_prefix))  
          as t(application_name text);   
          -- 有  
          if found then  
            -- 消耗掉上一次异步连接的结果,否则会报错。  
            return query select a from dblink_get_result(app_conn_name, false) as t(a text);    
            return query select a from dblink_get_result(app_conn_name, false) as t(a text);    
  
            -- 发送异步DBLINK调用      
            perform dblink_send_query(app_conn_name, sql);   
              
            -- 退出循环  
            exit;  
          -- 没有,等  
          else  
            perform pg_sleep(1);  
            raise notice 'current running tasks: %, waiting idle conns.', current_conns;  
          end if;  
        -- 未达到并行度  
        else  
          -- 建立连接  
          perform conn(app_prefix||i,  conn||app_prefix||i);             -- 建立连接。  
  
          -- 发送异步DBLINK调用      
          perform dblink_send_query(app_prefix||i, sql);   
            
          -- 连接suffix序号 递增  
          i := i+1;  
            
          -- 退出循环  
          exit;  
        end if;  
      end loop;  
    end if;  
  end loop;  
  
  loop  
    -- 当前已建立的异步连接数  
    select cn into current_conns from   
      dblink(app_prefix_stat, format($_$ select count(*) from pg_stat_activity where application_name ~ '^%s' and state <> 'idle' $_$, app_prefix))  
    as t(cn int);      
    if current_conns=0 then   
      raise notice 'whole tasks done.';  
      for app_conn_name in 
        select application_name from   
          dblink(app_prefix_stat, format($_$ select application_name from pg_stat_activity where application_name ~ '^%s' $_$, app_prefix))  
        as t(application_name text)
      loop
        return query select a from dblink_get_result(app_conn_name, false) as t(a text);   
      end loop;
      return;  
    else  
      raise notice 'the last % tasks running.', current_conns;  
      perform pg_sleep(1);  
    end if;  
  end loop;  
  
end;  
$$ language plpgsql strict;  

试用

1、运行5条SQL,开2个并行任务

select * from run_sqls_parallel(  
 2, -- 并行度  
 -- 执行如下SQL数组  
 array['select pg_sleep(10)', 'select pg_sleep(10)', 'select pg_sleep(10)', 'select count(*) from pg_class where relname ~ ''t''', 'select pg_sleep(10)', 'select pg_sleep(10)']  
)
as t(a text);   
  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  the last 2 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  whole tasks done.  
 run_sqls_parallel   
-------------------  
   
(1 row)  
  
Time: 30070.275 ms (00:30.070)  

2、运行10个并行任务,跑6条SQL

postgres=# select * from run_sqls_parallel(  
 10, -- 并行度  
 -- 执行如下SQL数组  
 array['select pg_sleep(10)', 'select pg_sleep(10)', 'select pg_sleep(10)', 'select count(*) from pg_class where relname ~ ''t''', 'select pg_sleep(10)', 'select pg_sleep(10)']  
)
as t(a text);   

NOTICE:  the last 6 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  whole tasks done.  
 run_sqls_parallel   
-------------------  
   
(1 row)  
  
Time: 10050.064 ms (00:10.050)  

完全符合预期。

3、结合前面写的文档,我们如果要创建很多索引,可以使用同样的方法实现并行任务

create table t1(id int, c1 int,c2 int, c3 int, c4 int, c5 int,c6 int, c7 int, c8 int);  
create table t2(id int, c1 int,c2 int, c3 int, c4 int, c5 int,c6 int, c7 int, c8 int);  
create table t3(id int, c1 int,c2 int, c3 int, c4 int, c5 int,c6 int, c7 int, c8 int);  
do language plpgsql $$    
declare    
  tables name[] := array['t1','t2','t3'];     -- 表名  
  n name;   -- 表名  
  x name;   -- 字段名  
  i int;    -- LOOP值  
  sql text;        
  sqls text[];      
  tbs name := 'tbs1';    -- 索引表空间  
begin    
  set maintenance_work_mem='2GB';    
    
  foreach n in array tables loop    
    i := 1;      
    for x in select attname from pg_attribute where attrelid=n::regclass and attnum>=1 and not attisdropped   
    loop    
      -- 结合自动选择索引接口(btree,hash,gin,gist等)的功能,可以实现更完美的全字段创建索引  
      sql := format('create index IF NOT EXISTS idx_%s__%s on %s (%s) tablespace %s', n, i, n, x, tbs);      -- 封装创建索引的SQL    
      sqls := array_append(sqls, sql);   
      i:=i+1;    
    end loop;    
  end loop;    
  
  perform * from run_sqls_parallel(  
    10,   -- 并行度  
    sqls  -- 执行index SQL数组  
  ) as t(a text);   
  
  foreach n in array tables loop    
    execute format('analyze %s', n);     
  end loop;    
end;    
$$;    

完全符合预期。

小结

本文使用dblink异步调用的功能,增加了一个API函数,可以用于开启N个并行,跑若干条长SQL,例如用来创建索引非常给力。

接口效果:

select * from run_sqls_parallel (  
  参数1:并行度,   
  参数2:要后台并行执行的SQLs(数组呈现)     
)
as t(a text);

参考

《自动选择正确索引访问接口(btree,hash,gin,gist,sp-gist,brin,bitmap...)的方法》

《PostgreSQL 快速给指定表每个字段创建索引》

《阿里云RDS PostgreSQL OSS 外部表实践 - (dblink异步调用封装并行) 从OSS并行导入数据》

《在PostgreSQL中跑后台长任务的方法 - 使用dblink异步接口》

《PostgreSQL AB表切换最佳实践 - 提高切换成功率,杜绝雪崩 - 珍藏级》

相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
相关文章
|
4月前
|
存储 SQL Cloud Native
深入了解云原生数据库CockroachDB的概念与实践
作为一种全球领先的分布式SQL数据库,CockroachDB以其高可用性、强一致性和灵活性等特点备受关注。本文将深入探讨CockroachDB的概念、设计思想以及实践应用,并结合实例演示其在云原生环境下的优越表现。
|
4月前
|
Cloud Native 关系型数据库 大数据
CockroachDB:云原生数据库的新概念与实践
本文将介绍CockroachDB,一种先进的云原生数据库,它具备分布式、强一致性和高可用性等特点。我们将探讨CockroachDB的基本原理、架构设计以及在实际应用中的种种优势和挑战。
|
5月前
|
关系型数据库 MySQL 分布式数据库
PolarDB MySQL版并行查询技术探索与实践
PolarDB MySQL版并行查询技术探索与实践 PolarDB MySQL版在企业级查询加速特性上进行了深度技术探索,其中并行查询作为其重要组成部分,已经在线稳定运行多年,持续演进。本文将详细介绍并行查询的背景、挑战、方案、特性以及实践。
109 2
|
9月前
|
SQL 存储 druid
PolarDB-X 针对跑批场景的思考和实践
金融行业和运营商系统,业务除了在线联机查询外,同时有离线跑批处理,跑批场景比较注重吞吐量,同时基于数据库场景有一定的使用惯性,比如直连MySQL分库分表的存储节点做本地化跑批、以及基于Oracle/DB2等数据库做ETL的数据清洗跑批等。
|
5月前
|
关系型数据库 MySQL 分布式数据库
PolarDB auto_inc场景下的性能优化实践
PolarDB auto_inc场景下的性能优化实践 在数据库的使用场景中,并发插入数据或并发导入数据场景是最常见的。针对这一场景,PolarDB MySQL版进行了深度性能优化,以提高插入性能。本文将详细介绍PolarDB在auto_inc场景下的性能优化相关内容。
63 2
|
5月前
|
存储 关系型数据库 MySQL
存储成本最高降至原来的5%,PolarDB分布式冷数据归档的业务实践
国内某家兼具投资理财、文化旅游、票务为一体的大型综合型集团公司,2015年成立至今,由于业务高速发展,业务数据增长非常快,数据库系统屡次不堪重负。该公司数据库运维总监介绍,他们目前业务压力比较大的是票务和订单系统,他们的平台每天新增几千万的订单数据,订单的数据来自于各个终端,近几年每个月以300G的数据规模在高速增长,由于数据不断增加,数据库系统迄今为止迭代过了3次。
|
7月前
|
SQL 缓存 关系型数据库
PolarDB-X 混沌测试实践:如何衡量数据库索引选择能力
随着PolarDB分布式版的不断演进,功能不断完善,新的特性不断增多,整体架构扩大的同时带来了测试链路长,出现问题前难发现,出现问题后难排查等等问题。原有的测试框架已经难以支撑实际场景的复杂模拟测试。因此,我们实现了一个基于业务场景面向优化器索引选择的混沌查询实验室,本文之后简称为CEST(complex environment simulation test)。
|
8月前
|
存储 SQL 关系型数据库
AnalyticDB PostgreSQL构建一站式实时数仓实践
本文介绍通过 AnalyticDB PostgreSQL 版基于实时物化视图,构建流批一体的一站式实时数仓解决方案,实现一套系统、一份数据、一次写入,即可在数仓内完成实时数据源头导入到实时分析全流程。
1889 5
AnalyticDB PostgreSQL构建一站式实时数仓实践
|
9月前
|
分布式数据库 调度 数据库
直播预告 | PolarDB-X 备份恢复原理与实践
备份恢复是生产级数据库必不可少的功能,而PolarDB-X 作为一款分布式数据库,备份数据的全局一致也是最基本的要求。本期分享将介绍PolarDB-X 开源版备份恢复功能的背景与原理,以及如何使用 PolarDB-X Operator 实现备份调度。
直播预告 | PolarDB-X 备份恢复原理与实践
|
9月前
|
关系型数据库 测试技术 分布式数据库
PolarDB | PostgreSQL 高并发队列处理业务的数据库性能优化实践
在电商业务中可能涉及这样的场景, 由于有上下游关系的存在, 1、用户下单后, 上下游厂商会在自己系统中生成一笔订单记录并反馈给对方, 2、在收到反馈订单后, 本地会先缓存反馈的订单记录队列, 3、然后后台再从缓存取出订单并进行处理. 如果是高并发的处理, 因为大家都按一个顺序获取, 容易产生热点, 可能遇到取出队列遇到锁冲突瓶颈、IO扫描浪费、CPU计算浪费的瓶颈. 以及在清除已处理订单后, 索引版本未及时清理导致的回表版本判断带来的IO浪费和CPU运算浪费瓶颈等. 本文将给出“队列处理业务的数据库性能优化”优化方法和demo演示. 性能提升10到20倍.
606 4

相关产品

  • 云原生数据库 PolarDB