PostgreSQL Fine-Grained Table,Column,Row Level Audit

本文涉及的产品
云原生数据库 PolarDB MySQL 版,Serverless 5000PCU 100GB
简介:
通过配置用户级或数据库级的参数可以实现用户以及数据库级别的审计, 但是这样的粒度可能还是太粗糙了.
如果需要更细致的审计, 例如针对某些表的操作审计, 某些用户对某些表的审计, 或者仅仅当某个列的值发生变化时才被审计(记录到LOG或表里面, 本文的例子是将审计信息输出到LOG, 使用raise).
这样的需求可以通过触发器来实现.
接下来以PostgreSQL 9.2为例进行讲解.
# 基础的参数配置
log_destination = 'csvlog'
logging_collector = on
log_directory = 'pg_log'
log_filename = 'postgresql-%Y-%m-%d_%H%M%S.log'
log_file_mode = 0600
log_truncate_on_rotation = on
log_rotation_age = 1d
log_rotation_size = 10MB
log_connections = on
log_error_verbosity = verbose
log_timezone = 'PRC'
log_statement = 'none'
log_min_duration_statement = -1

# 创建测试表 : 
digoal=> create table user_account_kb(id int, info text, balance numeric, crt_time timestamp, mod_time timestamp);
CREATE TABLE

# 插入测试数据 : 
digoal=> insert into user_account_kb select generate_series(1,10),'test',trunc(100*random()),now(),null;
INSERT 0 10
digoal=> select * from user_account_kb ;
 id | info | balance |          crt_time          | mod_time 
----+------+---------+----------------------------+----------
  1 | test |      66 | 2013-03-20 10:08:15.969523 | 
  2 | test |      50 | 2013-03-20 10:08:15.969523 | 
  3 | test |      95 | 2013-03-20 10:08:15.969523 | 
  4 | test |      90 | 2013-03-20 10:08:15.969523 | 
  5 | test |      50 | 2013-03-20 10:08:15.969523 | 
  6 | test |      12 | 2013-03-20 10:08:15.969523 | 
  7 | test |      39 | 2013-03-20 10:08:15.969523 | 
  8 | test |      42 | 2013-03-20 10:08:15.969523 | 
  9 | test |       6 | 2013-03-20 10:08:15.969523 | 
 10 | test |      11 | 2013-03-20 10:08:15.969523 | 
(10 rows)

【审计场景1】
1. 审计某个表的insert, update, delete, truncate语句.
使用after for each statement触发器.
# 创建触发器函数
digoal=> create or replace function trace_statement() returns trigger as $$
declare
  v_user name;
  v_db name;
  v_query text;
begin
  select current_user, current_database(), current_query() into v_user, v_db, v_query;
  raise warning 'user:%, db:%, query:%', v_user, v_db, v_query;
  return null;
end;
$$ language plpgsql;

# 创建触发器
digoal=> create trigger tg1 after insert or update or delete or truncate on user_account_kb for each statement execute procedure trace_statement();
CREATE TRIGGER

# 测试插入
digoal=> insert into user_account_kb values(11,'test',100,now(),null);
WARNING:  user:digoal, db:digoal, query:insert into user_account_kb values(11,'test',100,now(),null);
INSERT 0 1
digoal=> select * from user_account_kb where id=11;
 id | info | balance |          crt_time          | mod_time 
----+------+---------+----------------------------+----------
 11 | test |     100 | 2013-03-20 10:18:02.495836 | 
(1 row)

# 测试更新
digoal=> update user_account_kb set info='new' where id=11;
WARNING:  user:digoal, db:digoal, query:update user_account_kb set info='new' where id=11;
UPDATE 1
digoal=> select * from user_account_kb where id=11;
 id | info | balance |          crt_time          | mod_time 
----+------+---------+----------------------------+----------
 11 | new  |     100 | 2013-03-20 10:18:02.495836 | 
(1 row)

# 测试删除
digoal=> delete from user_account_kb where id=11;
WARNING:  user:digoal, db:digoal, query:delete from user_account_kb where id=11;
DELETE 1

# 测试truncate
digoal=> begin;
BEGIN
digoal=> truncate user_account_kb ;
WARNING:  user:digoal, db:digoal, query:truncate user_account_kb ;
TRUNCATE TABLE
digoal=> rollback;
ROLLBACK

# 注意回滚的操作不会被记录. 即使log_statement = 'ddl', 所以rollback没有被记录下来.
# 这是个弊端. 需要注意. 希望未来的PostgreSQL版本加以改进. 现在的解决办法是修正触发器的触发点, 小结部分会提到.
# 以上操作的日志输出如下.
2013-03-20 10:18:02.496 CST,"digoal","digoal",4521,"[local]",51491867.11a9,9,"INSERT",2013-03-20 10:01:11 CST,1/229,3355,WARNING,01000,"user:digoal, db:digoal, query:insert into user_account_kb values(11,'test',100,now(),null);",,,,,,,,"exec_stmt_raise, pl_exec.c:2840","psql"
2013-03-20 10:19:42.980 CST,"digoal","digoal",4521,"[local]",51491867.11a9,10,"UPDATE",2013-03-20 10:01:11 CST,1/233,3356,WARNING,01000,"user:digoal, db:digoal, query:update user_account_kb set info='new' where id=11;",,,,,,,,"exec_stmt_raise, pl_exec.c:2840","psql"
2013-03-20 10:19:53.612 CST,"digoal","digoal",4521,"[local]",51491867.11a9,11,"DELETE",2013-03-20 10:01:11 CST,1/236,3357,WARNING,01000,"user:digoal, db:digoal, query:delete from user_account_kb where id=11;",,,,,,,,"exec_stmt_raise, pl_exec.c:2840","psql"
2013-03-20 10:20:18.361 CST,"digoal","digoal",4521,"[local]",51491867.11a9,12,"TRUNCATE TABLE",2013-03-20 10:01:11 CST,1/237,3358,WARNING,01000,"user:digoal, db:digoal, query:truncate user_account_kb ;",,,,,,,,"exec_stmt_raise, pl_exec.c:2840","psql"

【审计场景2】
2. 按用户审计某个表的insert, update, delete, truncate语句.
使用after for each statement  when (current_user='') 触发器.
# 删除前面用到的触发器
digoal=> drop trigger tg1 on user_account_kb;
DROP TRIGGER

# 创建触发器, 这次带上when条件
digoal=> create trigger tg1 after insert or update or delete or truncate on user_account_kb for each statement when (current_user='digoal') execute procedure trace_statement();
CREATE TRIGGER

# 测试digoal用户的操作
digoal=> update user_account_kb set info='new' where id=11;
WARNING:  user:digoal, db:digoal, query:update user_account_kb set info='new' where id=11;
UPDATE 0

# 测试其他用户的操作, 不被审计
digoal=> \c digoal postgres
You are now connected to database "digoal" as user "postgres".
digoal=# update digoal.user_account_kb set info='new' where id=11;
UPDATE 0

【审计场景3】
3. 按条件审计某个表的insert, update, delete语句.
使用after for each row  when (new.balance <> old.balance) 触发器.
# 删除前面用到的触发器
digoal=> drop trigger tg1 on user_account_kb;
DROP TRIGGER

# 新建触发器函数
digoal=> create or replace function trace_row() returns trigger as $$                                                            
declare
  v_user name;
  v_db name;
  v_query text;
begin
select current_user, current_database(), current_query() into v_user, v_db, v_query;
case TG_OP
  when 'UPDATE' then
    raise warning 'user:%, db:%, query:%, newdata:%, olddata:%', v_user, v_db, v_query, NEW, OLD;
  when 'INSERT' then
    raise warning 'user:%, db:%, query:%, newdata:%', v_user, v_db, v_query, NEW;
  when 'DELETE' then
    raise warning 'user:%, db:%, query:%, olddata:%', v_user, v_db, v_query, OLD;
  else
    null;
end case;
return null;
end;
$$ language plpgsql;
CREATE FUNCTION

# 新建触发器
digoal=> create trigger tg1 after insert or update or delete on user_account_kb for each row execute procedure trace_row();
CREATE TRIGGER

# 测试插入
digoal=> insert into user_account_kb select * from user_account_kb limit 3;
WARNING:  user:digoal, db:digoal, query:insert into user_account_kb select * from user_account_kb limit 3;, newdata:(1,test,66,"2013-03-20 10:08:15.969523",)
WARNING:  user:digoal, db:digoal, query:insert into user_account_kb select * from user_account_kb limit 3;, newdata:(2,test,50,"2013-03-20 10:08:15.969523",)
WARNING:  user:digoal, db:digoal, query:insert into user_account_kb select * from user_account_kb limit 3;, newdata:(3,test,95,"2013-03-20 10:08:15.969523",)
INSERT 0 3

# 测试更新
digoal=> update user_account_kb set info='new' where id<3;
WARNING:  user:digoal, db:digoal, query:update user_account_kb set info='new' where id<3;, newdata:(1,new,66,"2013-03-20 10:08:15.969523",), olddata:(1,new,66,"2013-03-20 10:08:15.969523",)
WARNING:  user:digoal, db:digoal, query:update user_account_kb set info='new' where id<3;, newdata:(2,new,50,"2013-03-20 10:08:15.969523",), olddata:(2,new,50,"2013-03-20 10:08:15.969523",)
WARNING:  user:digoal, db:digoal, query:update user_account_kb set info='new' where id<3;, newdata:(1,new,66,"2013-03-20 10:08:15.969523",), olddata:(1,new,66,"2013-03-20 10:08:15.969523",)
WARNING:  user:digoal, db:digoal, query:update user_account_kb set info='new' where id<3;, newdata:(2,new,50,"2013-03-20 10:08:15.969523",), olddata:(2,new,50,"2013-03-20 10:08:15.969523",)
UPDATE 4

# 测试删除
digoal=> delete from user_account_kb where id<3;
WARNING:  user:digoal, db:digoal, query:delete from user_account_kb where id<3;, olddata:(1,new,66,"2013-03-20 10:08:15.969523",)
WARNING:  user:digoal, db:digoal, query:delete from user_account_kb where id<3;, olddata:(2,new,50,"2013-03-20 10:08:15.969523",)
WARNING:  user:digoal, db:digoal, query:delete from user_account_kb where id<3;, olddata:(1,new,66,"2013-03-20 10:08:15.969523",)
WARNING:  user:digoal, db:digoal, query:delete from user_account_kb where id<3;, olddata:(2,new,50,"2013-03-20 10:08:15.969523",)
DELETE 4


【审计场景4】
# 基于列的判断审计
# 删除前面用到的触发器
digoal=> drop trigger tg1 on user_account_kb;
DROP TRIGGER

# 创建触发器, 这里用到when条件, 只有当balance变化时才审计
digoal=> create trigger tg1 after update on user_account_kb for each row when (new.balance<>old.balance) execute procedure trace_row();
CREATE TRIGGER

# 测试
digoal=> update user_account_kb set info='new' where id=4;
UPDATE 1
digoal=> update user_account_kb set info='new',balance=balance where id=4;
UPDATE 1

# balance变化才审计
digoal=> update user_account_kb set info='new',balance=balance-1 where id=4;
WARNING:  user:digoal, db:digoal, query:update user_account_kb set info='new',balance=balance-1 where id=4;, newdata:(4,new,89,"2013-03-20 10:08:15.969523",), olddata:(4,new,90,"2013-03-20 10:08:15.969523",)
UPDATE 1

【小结】
1. 前面提到ROLLBACK等事务相关的SQL不会被审计到, 所以当SQL执行失败时, LOG已经记录了, 但是没有记录回滚的动作, 所以信息是不完整的, 除非从XLOG/CLOG中取出对应的XID是提交还是回滚. 
为了使记录在LOG中的语句一定是提交的, 那么需要调整一下触发器的创建方法, 使得回滚的事务中所有的SQL都不被审计.
如下,
触发器只有在提交时才会触发, 回滚不触发. (使用constraint来创建触发器)
digoal=> create constraint trigger tg1 after update on user_account_kb DEFERRABLE INITIALLY deferred for each row when (new.balance<>old.balance) execute procedure trace_row();
CREATE TRIGGER
digoal=> begin;
BEGIN
digoal=> update user_account_kb set balance=balance+1 where id=1;
UPDATE 0
digoal=> update user_account_kb set balance=balance+1 where id=4;
UPDATE 1
digoal=> end;
WARNING:  user:digoal, db:digoal, query:end;, newdata:(4,new,90,"2013-03-20 10:08:15.969523",), olddata:(4,new,89,"2013-03-20 10:08:15.969523",)
COMMIT
digoal=> begin;
BEGIN
digoal=> update user_account_kb set balance=balance+1 where id=4;
UPDATE 1
digoal=> rollback;
ROLLBACK

注意以上方法只有after ... for each row才能被用到.

When the CONSTRAINT option is specified, this command creates a constraint trigger. This is the same as a regular trigger except that the timing of the trigger firing can be adjusted using SET CONSTRAINTS. 

Constraint triggers must be AFTER ROW triggers. They can be fired either at the end of the statement causing the triggering event, or at the end of the containing transaction; in the latter case they are said to be deferred. 

A pending deferred-trigger firing can also be forced to happen immediately by using SET CONSTRAINTS. 

Constraint triggers are expected to raise an exception when the constraints they implement are violated.

【参考】 6.  http://www.postgresql.org/docs/9.2/static/sql-createtrigger.html
相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
相关文章
|
关系型数据库 数据库 PostgreSQL
PostgreSQL 内存表可选项 - unlogged table
标签 PostgreSQL , 内存表 , unlogged table 背景 内存表,通常被用于不需要持久化,变更频繁,访问RT低的场景。 目前社区版本PostgreSQL没有内存表的功能,postgrespro提供了两个插件可以实现类似内存表的功能。
3090 0
|
SQL 存储 运维
PolarDB-X性能优化之多表连接时table group及广播表的使用
通过实验演示多表连接时,PolarDB-X使用table group和广播表优化sql执行
237 0
|
SQL 存储 运维
PolarDB-X性能优化之利用table group优化sql
tablegroup(表组)PolarDB-X的重要特性之一,是数据库水平分库分表性能优化的重要技术手段。
422 0
|
SQL 弹性计算 算法
PostgreSQL 普通表在线转换为分区表 - online exchange to partition table
标签 PostgreSQL , 分区表 , 在线转换 背景 非分区表,如何在线(不影响业务)转换为分区表? 方法1,pg_pathman分区插件 《PostgreSQL 9.5+ 高效分区表实现 - pg_pathman》 使用非堵塞式的迁移接口 partition_table_concurrently( relation REGCLASS,
2493 0
|
SQL 存储 算法
PolarDB-X 1.0-SQL 手册-DDL-CREATE TABLE
本文主要介绍使用DDL语句进行建表的语法、子句、参数和基本方式。
200 0
|
关系型数据库 MySQL 分布式数据库
PolarDB-X 1.0-SQL 手册-DDL-DROP TABLE
DROP [TEMPORARY] TABLE [IF EXISTS] tbl_name [, tbl_name] ... [RESTRICT | CASCADE]
110 0
|
SQL 关系型数据库 MySQL
PolarDB-X 1.0-SQL 手册-DDL-ALTER TABLE
语法 ALTER [ONLINE|OFFLINE] [IGNORE] TABLE tbl_name [alter_specification [, alter_specification] ...] [partition_options]
130 0
|
关系型数据库 MySQL
PolarDB-X 1.0-SQL 手册-DDL-TRUNCATE TABLE
TRUNCATE TABLE 用于清空表中的数据,需要有 DROP 权限。
102 0
|
索引
PolarDB-X 1.0-SQL 手册-DDL-RENAME TABLE
您可以使用RENAME TABLE语句对表名进行重命名,具体语法如下: RENAME TABLE tbl_name TO new_tbl_name
195 0
|
SQL 索引
PolarDB-X 1.0-SQL 手册-DAL-CHECK TABLE
对数据表进行检查,主要用于 DDL 建表失败的情形。 对于拆分表,检查底层物理分表是否有缺失的情况,底层的物理分表的列和索引是否是一致; 对于单库单表,检查表是否存在。
113 0