动态输出(ToB海量日志转换业务) - 阿里云HybridDB for PostgreSQL最佳实践

本文涉及的产品
云原生数据库 PolarDB MySQL 版,Serverless 5000PCU 100GB
云原生数据库 PolarDB 分布式版,标准版 2核8GB
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介: 标签 PostgreSQL , UDF , 动态格式 , format , JOIN , OSS外部表 背景 有一些业务需要将数据归类动态的输出,比如一些公共日志服务,所有用户的日志都被统一的按格式记录到一起,但是每个最终用户关心的字段都不一样,甚至每个用户对数据转换的需求都不一样。

标签

PostgreSQL , UDF , 动态格式 , format , JOIN , OSS外部表


背景

有一些业务需要将数据归类动态的输出,比如一些公共日志服务,所有用户的日志都被统一的按格式记录到一起,但是每个最终用户关心的字段都不一样,甚至每个用户对数据转换的需求都不一样。

pic

比如这个业务:

《日增量万亿+级 实时分析、数据规整 - 阿里云HybridDB for PostgreSQL最佳实践》

一、需求

1、可以根据ToB的用户的定义,输出不同的格式。

2、每个ToB的用户,写入到一个文件或多个文件。

3、一个文件不能出现两个用户的内容。

其他需求见:

《日增量万亿+级 实时分析、数据规整 - 阿里云HybridDB for PostgreSQL最佳实践》

二、架构设计

1、采用OSS存储实时载入的海量公共日志。

2、采用HybridDB for PostgreSQLRDS PostgreSQL的OSS外部表接口,直接并行读取OSS的文件。

3、为了防止ToB ID的数据倾斜问题,引入一个时间字段进行截断,采用 "ToB ID+截断时间" 两个字段进行重分布。

为了实现自动生成截断格式,需要起一个任务,自动计算截断格式。并将格式写入一张表。

4、通过HDB PG的窗口函数,按 "ToB ID+截断时间" 强制重分布。

5、通过UDF,将公共日志的格式,按ToB ID对应的UDF转换为对应的格式。

为了实现动态的格式需求,采用UDF,并将ToB ID对应的UDF写入一张表。

6、将转换后的数据,写入OSS。自动按ToB ID切换,绝对保证每个ToB的用户,写入到一个文件或多个文件。一个文件不出现两个用户的内容。

以上功能是阿里云HybridDB for PostgreSQL或RDS PostgreSQL的独有功能。

三、DEMO与性能

这里介绍如何动态的转换数据,其他内容请详见案例:

《日增量万亿+级 实时分析、数据规整 - 阿里云HybridDB for PostgreSQL最佳实践》

1、创建公共日志OSS外部表 - 数据来源

根据domain字段的值,动态格式输出,每个domain写一个或多个文件。不同的domain绝对不能出现在同一个文件中。

create external table demo_source (    
UnixTime text,  -- 这个字段为unixtime,需要使用to_timestamp进行格式输出    
domain  text,    
c1 text,    
c2 text,    
c3 text,    
c4 text,    
c5 text,    
c6 int,    
c7 text,    
c8 text,    
c9 text    
)     
location('oss://xxxxx     
        dir=xxxx/xxx/xxx/xxx id=xxx    
        key=xxx bucket=xxx') FORMAT 'csv' (QUOTE '''' ESCAPE ''''   DELIMITER ',')    
LOG ERRORS SEGMENT REJECT LIMIT 10000    
;    

2、写入一批测试数据,根据对应的CSV格式,将文件写入OSS对应的dir/bucket中。

3、创建公共日志OSS外部表 - 动态格式输出

create WRITABLE external table demo_output (    
rn int8,   -- 输出到OSS时,忽略该字段  
domain_and_key  text,   -- 输出到OSS时,忽略该字段  
Data  text    
)     
location('oss://xxxxx    
        dir=xxxx/xxx/xxx/xxx id=xxx id=xxx    
        key=xxx bucket=xxx distributed_column=domain_and_key') FORMAT 'csv'     
DISTRIBUTED BY (domain_and_key)    
;    

将按domain_and_key字段值的不同,自动将Data字段的数据写入对应的OSS文件。实现数据的重规整。(domain_and_key字段不写入OSS文件。)

这是阿里云HybridDB for PG的定制功能。

4、创建domain的时间格式化表。

create table domain_tsfmt(    
  domain text,    
  tsfmt text    
);    

根据domain的count(*),即数据量的多少,决定使用的时间截断格式,(yyyy, yyyymm, yyyymmdd, yyyymmddhh24, yyyymmddhh24miss)

越大的domain,需要越长(yyyymmhh24miss)的截断。

业务方可以来维护这个表,例如一天生成一次。

对于很小的domain,不需要写入这张表,可以采用统一格式,例如0000。

insert into domain_tsfmt values ('domain_a', 'yyyymmhh24');    
insert into domain_tsfmt values ('domain_b', 'yyyymmhh24mi');    

5、创建UDF元信息表,存储每个ToB ID对应的UDF名字

create table domain_udf(domain text, udf name);      

6、创建UDF,需要定制格式的ToB ID,创建对应的UDF

PostgreSQL 例子

建议用format。

create or replace function f1(demo_source) returns text as $$      
  select format('domain: %L , c2: %L , c4: %L', $1.domain, $1.c2, $1.c4);      
$$ language sql strict;      
      
create or replace function f2(demo_source) returns text as $$      
declare      
  res text := format('%L , %L, %L', upper($1.c2), $1.c4, $1.c3);      
begin      
  return res;      
end;      
$$ language plpgsql strict;      

HybridDB for PostgreSQL 例子

create or replace function f1(demo_source) returns text as $$      
  select 'domain: '||$1.domain||' , c2: '||$1.c2||' , c4: '||$1.c4;      
$$ language sql strict;      
      
create or replace function f2(demo_source) returns text as $$      
  select upper($1.c2)||' , '||$1.c4||' , '||$1.c3;      
$$ language sql strict;     
    
create or replace function f3(demo_source) returns text as $$      
  select upper($1.c2)||' , '||$1.c4||' , '||$1.c9;      
$$ language sql strict;     

7、创建动态UDF,根据输入,动态调用对应的UDF

create or replace function ff(domain_source, name) returns text as $$      
declare      
  sql text := 'select '||quote_ident($2)||'('||quote_literal($1)||')';      
  res text;      
begin      
  execute sql into res;      
  return res;      
end;      
$$ language plpgsql strict;      
    
-- 由于hdb版本太老,不支持format,不支持record和text互转,不支持quote_literal(record)。    
-- 调整如下    
    
create or replace function ff(domain_source, name) returns text as $$      
declare      
  sql text := 'select '||quote_ident($2)||'($abc_abc_abc$'||textin(record_out($1))||'$abc_abc_abc$)';      
  res text;      
begin      
  execute sql into res;      
  return res;      
end;      
$$ language plpgsql strict;      

8、写入UDF映射,例如1-100的domain,使用F1进行转换,0的ID使用F2进行转换。

insert into domain_udf select 'domain_'||generate_series(1,100), 'f1';      
insert into domain_udf values ('domain_0', 'f2');      

不在这里的DOMAIN,采用默认UDF转换格式,例如f3。

9、根据 "domain + 时间截断" 重分发,根据UDF动态转换查询如下:

select domain_and_key, data from   
(  
select row_number() over (partition by domain||key order by domain,key) as RN, domain||key as domain_and_key, data from    
-- partition by的窗口与第二个字段domain||key(目标表的分布键)保持一致,确保只需要重分发一次  
(    
  select     
    t1.domain,   
    (case when t2.* is null then '0000' else to_char(to_timestamp(t1.UnixTime::text::float8), t2.tsfmt) end) as key,     
    ff(t1, (case when t3.* is null then 'f3' else t3.udf end)) as data  
  from domain_source t1     
    left join domain_tsfmt t2 using (domain)     
    left join domain_udf t3 using (domain)    
) t    
) t  
;    

阿里云HDB FOR PG将自动根据OSS目标外部表中定义的distributed_column参数,当VALUE发生变化时,自动写新文件,从而实现不同的DOMAIN VALUE,写到不同的文件中。

10、将规整后的数据输出到OSS

insert into domain_output    
select domain_and_key, data from   
(  
select row_number() over (partition by domain||key order by domain,key) as RN, domain||key as domain_and_key, data from    
-- partition by的窗口与第二个字段domain||key(目标表的分布键)保持一致,确保只需要重分发一次  
(    
  select     
    t1.domain,   
    (case when t2.* is null then '0000' else to_char(to_timestamp(t1.UnixTime::text::float8), t2.tsfmt) end) as key,     
    ff(t1, (case when t3.* is null then 'f3' else t3.udf end)) as data  
  from domain_source t1     
    left join domain_tsfmt t2 using (domain)     
    left join domain_udf t3 using (domain)    
) t    
) t  
;    
  
执行计划  
只需要重分布一次  
  
                                                               QUERY PLAN                                                                  
-----------------------------------------------------------------------------------------------------------------------------------------  
 Insert (slice0; segments: 3)  (rows=333334 width=64)  
   ->  Subquery Scan t  (cost=375072.68..395072.68 rows=333334 width=64)  
         ->  Window  (cost=375072.68..385072.68 rows=333334 width=96)  
               Partition By: domain_and_key  
               Order By: domain_and_key      
	       -- 有按目标列排序,所以OSS切换文件没有问题  
	       -- 注意同一个domain的不同时间的数据,可能会切成多个文件  
	       -- 如果想让同一个domain的数据,切到一个文件中,那么请使用domain, key作为窗口分组,同时OSS外表的实现上需要修改一下,忽略两个列。  
               ->  Sort  (cost=375072.68..377572.68 rows=333334 width=96)  
                     Sort Key: domain_and_key  
                     ->  Redistribute Motion 3:3  (slice3; segments: 3)  (cost=12.84..86770.34 rows=333334 width=96)  
		     -- 重分布一次。采用窗口函数,强制了重分布  
                           Hash Key: domain_and_key  
                           ->  Subquery Scan t  (cost=12.84..66770.34 rows=333334 width=96)  
                                 ->  Hash Left Join  (cost=12.84..54270.34 rows=333334 width=237)  
                                       Hash Cond: t1.domain = t2.domain  
                                       ->  Hash Left Join  (cost=11.75..24261.75 rows=333334 width=192)  
                                             Hash Cond: t1.domain = t3.domain  
                                             ->  External Scan on domain_source t1  (cost=0.00..11000.00 rows=333334 width=96)  
                                             ->  Hash  (cost=8.00..8.00 rows=100 width=105)  
                                                   ->  Broadcast Motion 3:3  (slice1; segments: 3)  (cost=0.00..8.00 rows=100 width=105)  
                                                         ->  Seq Scan on domain_udf t3  (cost=0.00..4.00 rows=34 width=105)  
                                       ->  Hash  (cost=1.05..1.05 rows=1 width=61)  
                                             ->  Broadcast Motion 3:3  (slice2; segments: 3)  (cost=0.00..1.05 rows=1 width=61)  
                                                   ->  Seq Scan on domain_tsfmt t2  (cost=0.00..1.01 rows=1 width=61)  
(21 rows)  

简化DEMO (不依赖OSS,仅仅演示)

1、创建公共日志表

create table t1 (tid int, c1 text, c2 text, c3 int, c4 timestamp, c5 numeric);      

2、写入一批测试数据

insert into t1 select random()*100, md5(random()::text), 'test', random()*10000, clock_timestamp(), random() from generate_series(1,1000000);      

3、创建目标表

create table t1_output (rn int8, fmt text, data text);    

4、创建UDF元信息表,存储每个ToB ID对应的UDF名字

create table t2(tid int, udf name);      

5、创建UDF,需要定制格式的ToB ID,创建对应的UDF

create or replace function f1(t1) returns text as $$      
  select 'tid: '||$1.tid||' , c2: '||$1.c2||' , c4: '||$1.c4;      
$$ language sql strict;      
      
create or replace function f2(t1) returns text as $$       
  select $1.tid||' , '||upper($1.c2)||' , '||$1.c4||' , '||$1.c3;      
$$ language sql strict;      
    
create or replace function f3(t1) returns text as $$       
  select $1.tid||' , '||upper($1.c2)||' , '||$1.c3||' , '||$1.c3;      
$$ language sql strict;      

默认采用f3()函数。

6、创建动态UDF,根据输入,动态调用对应的UDF

create or replace function ff(t1, name) returns text as $$      
declare      
  sql text := 'select '||quote_ident($2)||'($abc_abc_abc$'||textin(record_out($1))||'$abc_abc_abc$)';      
  res text;      
begin      
  execute sql into res;      
  return res;      
end;      
$$ language plpgsql strict;      

7、写入UDF映射,例如1-10的ID,使用F1进行转换,0的ID使用F2进行转换。

insert into t2 select generate_series(1,10), 'f1';      
insert into t2 values (0, 'f2');      

8、创建格式表

create table t1_fmt (tid int, fmt text);    
    
insert into t1_fmt values (1, 'yyyymm');    
insert into t1_fmt values (2, 'yyyy');    

默认采样'0000'的格式。

9、动态转换查询如下:

select tid_key, data from  
(  
select row_number() over (partition by tid||key order by key) as RN, tid||key as tid_key, data from    
(    
  select     
    t1.tid,     
    (case when t2.* is null then '0000' else to_char(t1.c4, t2.fmt) end) as key,   
    ff(t1, (case when t3.* is null then 'f3' else t3.udf end)) as data  
  from t1     
    left join t1_fmt t2 using (tid)     
    left join t2 t3 using (tid)    
) t    
) t  
;    

10、将规整后的数据输出到目标表

insert into t1_output    
select tid_key, data from  
(  
select row_number() over (partition by tid||key order by key) as RN, tid||key as tid_key, data from    
(    
  select     
    t1.tid,     
    (case when t2.* is null then '0000' else to_char(t1.c4, t2.fmt) end) as key,   
    ff(t1, (case when t3.* is null then 'f3' else t3.udf end)) as data  
  from t1     
    left join t1_fmt t2 using (tid)     
    left join t2 t3 using (tid)    
) t    
) t    
;    
    
INSERT 0 1000000    
  
  
postgres=# select * from t1_output  limit 100;    
   fmt   |                        data                           
---------+-----------------------------------------------------  
 870000  | 87 , TEST , 7108 , 7108  
 870000  | 87 , TEST , 787 , 787  
 870000  | 87 , TEST , 6748 , 6748  
 870000  | 87 , TEST , 6385 , 6385  
 870000  | 87 , TEST , 5278 , 5278  
 870000  | 87 , TEST , 8132 , 8132  
 870000  | 87 , TEST , 7513 , 7513  
 870000  | 87 , TEST , 2025 , 2025  
 870000  | 87 , TEST , 7322 , 7322  
 870000  | 87 , TEST , 2019 , 2019  
 870000  | 87 , TEST , 6416 , 6416  
 870000  | 87 , TEST , 9959 , 9959  
 870000  | 87 , TEST , 7876 , 7876  
 870000  | 87 , TEST , 5022 , 5022  
.....  
  
写OSS外部表时,根据fmt列的VALUE进行识别,当遇到不同的VALUE就切换文件名,不同FMT VALUE的数据写入不同的文件。  

四、技术点

这里只谈本文涉及的技术点。

1、UDF

PostgreSQL支持多种UDF语言(例如C,plpgsql, sql, plpython, pljava, plv8, ......),用户通过UDF定义需要转换的格式。

2、动态调用

用户通过动态调用,可以动态的调用对应的UDF,在一个请求中生成不同的格式。

五、云端产品

阿里云 RDS PostgreSQL

阿里云 HybridDB for PostgreSQL

阿里云 OSS

六、类似场景、案例

《日增量万亿+级 实时分析、数据规整 - 阿里云HybridDB for PostgreSQL最佳实践》

七、小结

一些公共日志服务,所有用户的日志都被统一的按格式记录到一起,但是每个最终用户关心的字段都不一样,甚至每个用户对数据转换的需求都不一样。

PostgreSQL支持多种UDF语言(例如C,plpgsql, sql, plpython, pljava, plv8, ......),用户通过UDF定义需要转换的格式。

用户通过动态调用,可以动态的调用对应的UDF,在一个请求中生成不同的格式。

八、参考

《日增量万亿+级 实时分析、数据规整 - 阿里云HybridDB for PostgreSQL最佳实践》

相关实践学习
借助OSS搭建在线教育视频课程分享网站
本教程介绍如何基于云服务器ECS和对象存储OSS,搭建一个在线教育视频课程分享网站。
相关文章
|
1月前
|
API
阿里云微服务引擎及 API 网关 2024 年 2 月产品动态
阿里云微服务引擎及 API 网关 2024 年 2 月产品动态
|
2月前
|
存储 关系型数据库 MySQL
深入理解MySQL索引:从原理到最佳实践
深入理解MySQL索引:从原理到最佳实践
168 0
|
2月前
|
人工智能 监控 Cloud Native
阿里云参编业内首个代码大模型标准丨云原生 2024 年 1 月产品技术动态
阿里云参编业内首个代码大模型标准丨云原生 2024 年 1 月产品技术动态
|
2天前
|
API
阿里云微服务引擎及 API 网关 2024 年 3 月产品动态
阿里云微服务引擎及 API 网关 2024 年 3 月产品动态。
|
17天前
|
消息中间件 NoSQL Kafka
云原生最佳实践系列 5:基于函数计算 FC 实现阿里云 Kafka 消息内容控制 MongoDB DML 操作
该方案描述了一个大数据ETL流程,其中阿里云Kafka消息根据内容触发函数计算(FC)函数,执行针对MongoDB的增、删、改操作。
|
1月前
|
弹性计算 安全 容灾
【产品动态】阿里云弹性计算产品月刊-2024年2月
阿里云产品官网价格全线下调,ECS最多下降36%;容器服务 Kubernetes 版 v1.28开放升级;安全访问与管理ECS资源的最佳实践;超高性能第八代AMD实例低至68.66元/月、老用户限时续费优惠,续费享1年5折……更多前沿云产品动态,尽在弹性计算产品月刊。
|
1月前
|
SQL 监控 测试技术
阿里云可观测 2024 年 2 月产品动态
阿里云可观测 2024 年 2 月产品动态
|
1月前
|
运维 Cloud Native 应用服务中间件
阿里云微服务引擎 MSE 及 API 网关 2024 年 02 月产品动态
阿里云微服务引擎 MSE 面向业界主流开源微服务项目, 提供注册配置中心和分布式协调(原生支持 Nacos/ZooKeeper/Eureka )、云原生网关(原生支持Higress/Nginx/Envoy,遵循Ingress标准)、微服务治理(原生支持 Spring Cloud/Dubbo/Sentinel,遵循 OpenSergo 服务治理规范)能力。API 网关 (API Gateway),提供 APl 托管服务,覆盖设计、开发、测试、发布、售卖、运维监测、安全管控、下线等 API 生命周期阶段。帮助您快速构建以 API 为核心的系统架构.满足新技术引入、系统集成、业务中台等诸多场景需要。
|
1月前
|
自然语言处理 算法 关系型数据库
阿里云PAI大模型RAG对话系统最佳实践
本文为大模型RAG对话系统最佳实践,旨在指引AI开发人员如何有效地结合LLM大语言模型的推理能力和外部知识库检索增强技术,从而显著提升对话系统的性能,使其能更加灵活地返回用户查询的内容。适用于问答、摘要生成和其他依赖外部知识的自然语言处理任务。通过该实践,您可以掌握构建一个大模型RAG对话系统的完整开发链路。
|
1月前
|
Cloud Native Dubbo Java
阿里云微服务引擎 MSE 2024 年 01 月产品动态
阿里云微服务引擎 MSE 面向业界主流开源微服务项目, 提供注册配置中心和分布式协调(原生支持 Nacos/ZooKeeper/Eureka )、云原生网关(原生支持Higress/Nginx/Envoy,遵循Ingress标准)、微服务治理(原生支持 Spring Cloud/Dubbo/Sentinel,遵循 OpenSergo 服务治理规范)能力。

相关产品

  • 云原生数据库 PolarDB