如何提升AnalyticDB实时写入性能

简介: 从AnalyticDB写入原理分析,可以从三个方面提升AnalyticDB的写入能力:降低网络传输开销、减少与硬件设备io操作和尽量少消耗cpu资源。针对这三个特性本文将介绍如何对写入sql进行改造以达到最优性能。

从AnalyticDB写入原理分析,可以从三个方面提升AnalyticDB的写入能力:降低网络传输开销、减少与硬件设备io操作和尽量少消耗cpu资源。针对这三个特性本文将介绍如何对写入sql进行改造以达到最优性能。

  • 采用批量写入(batch insert)模式,即每次在VALUES部分添加多行数据,一般建议每次批量写入数据量大约为16KB,以提高网络和磁盘吞吐。如下
INSERT INTO db_name.table_name (col1, col2, col3) VALUES ('xxx', 111, 'xxx'), ('xxx', 222, 'xxx'), ('xxx', 333, 'xxx');
  • 如果对一行的所有列都进行插入,则去除col_name并保证values顺序与表结构中的col_name顺序一致,以降低网络带宽耗用。如下
INSERT INTO db_name.table_name VALUES ('xxx', 111, 'xxx'), ('xxx', 222, 'xxx'), ('xxx', 333, 'xxx');
  • 保持主键相对有序。AnalyticDB的insert语句要求必须提供主键,且主键可以为复合主键。当确定复合主键时,根据业务含义调整复合主键中各个列的次序,从业务层面保证插入时主键是严格递增或近似递增的,也可以提升实时写入速度。
  • 增加ignore关键字。执行不带ignore关键字的insert sql,当主键冲突时,后续数据会覆盖之前插入的数据;带上ignore关键字,则主键冲突时,会保留之前插入的数据而自动忽略新数据。如果业务层没有数据覆盖的语义要求,则建议所有insert sql都加上ignore关键字,以减小覆盖数据带来的性能开销。
  • AnalyticDB需要对数据进行分区存储,当一次Batch insert中含有属于不同分区的多行数据时,将会耗费大量CPU资源进行分区号计算。因此建议在写入程序中提前计算好每行数据的分区号,并且将属于同一分区的多行数据组成一个批次,一次性插入。

实现聚合写入目前主要有两种途径:

  • 用户自行实现该聚合方法,对分区号的计算规则为:partition_num = CRC32(hash_partition_column_value) mod m,其中hash_partition_column_value是分区列的值,m是分区总数。如下代码
public class HashInsert extends AbstractJavaSamplerClient{
    private static Logger log = Logger.getLogger(HashInsert1M.class.getName());
    private static AtomicLong idGen = new AtomicLong();
    private int bufferSize =2000 
    private int batchSize = 20;
    private int partitionCnt = 100;

    public SampleResult runTest(JavaSamplerContext arg0) {
        ..........
        ..........
        String sqls[] = new String[bufferSize];
        int partNo[] = new int [bufferSize];
        int sortedSqlIndex[] = new int [bufferSize];
        int end = 100;
        for(int i = 0; i < bufferSize; i++) {
            long id = idGen.getAndIncrement();
            boolean boolean_id = DataUtil.getBoolean_test(id);
            int byte_id = DataUtil.getByte_test(id);
            int short_id = DataUtil.getShort_test(id);
            long user_id = DataUtil.getInt_test(id);
            long seller_id = id;
            float float_id = DataUtil.getFloat_test(id);
            double double_id = DataUtil.getDouble_test(id);
            String follow_id = DataUtil.getString_test(id);
            String time_id = DataUtil.getTime_test(id);
            String date_id = DataUtil.getDate_test(id);
            String timestamp_id = DataUtil.getTimestamp_test(id);
            String interest_flag = DataUtil.getMutilValue(id);
               StringBuffer sb = new StringBuffer();
            sb.append("(").append(boolean_id).append(",").append(byte_id).append(",").append(short_id).append(",").append(user_id)
            .append(",").append(seller_id).append(",").append(float_id).append(",").append(double_id).append(",'").append(follow_id)
            .append("','").append(time_id).append("','").append(date_id).append("','").append(timestamp_id).append("','").append(interest_flag)
            .append("',");
            for(int j=0;j<end-1;j++){
                sb.append("'").append(follow_id).append("',");
            }
            sb.append("'").append(follow_id).append("')");
            sqls[i]  = sb.toString();

            partNo[i] = getHashPartition("" + user_id, partitionCnt);
            sortedSqlIndex[i] = i;
        }

        for(int i = 0; i < bufferSize - 1; i++) {
            for(int j = i + 1; j < bufferSize; j++) {
                if (partNo[sortedSqlIndex[i]] > partNo[sortedSqlIndex[j]]) {
                    int tmp = sortedSqlIndex[i];
                    sortedSqlIndex[i] = sortedSqlIndex[j];
                    sortedSqlIndex[j] = tmp;
                }
            }
        }

        batchSize =  Integer.valueOf(AdsUtil.getBatchNum());
        try {
            .........
            .........
            String dbName = AdsUtil.getDBName();
            String tableName = AdsUtil.getTableName();
            String sql = "insert into " + dbName + "." + tableName  + " values ";
            for(int i = 0; i < bufferSize  - batchSize; i+= batchSize) {
                StringBuffer sb = new StringBuffer(sql);
                for(int j = 0 ; j < batchSize; j++) {
                    if (j != 0)
                        sb.append(",");
                    sb.append(sqls[sortedSqlIndex[i + j]]);                
                }
                ..............
                ..............
            }
            res = true;
        } catch (Exception e) {
            ...........
            ...........
        } finally {
            ...........
            ...........
        }

        return ...;
    }

    public static int getHashPartition(String value, int totalHashPartitionNum) {
        long crc32 = (value == null ? getCRC32("-1") : getCRC32(value));
        return (int) (crc32 % totalHashPartitionNum);
    }

    private static long getCRC32(String value) {
        Checksum checksum = new CRC32();
        byte[] bytes = value.getBytes();
        checksum.update(bytes, 0, bytes.length);
        return checksum.getValue();
    }
}
  • 采用AnalyticDB搭配的同步工具”数据集成”进行实时数据同步。一般建议采用第二种方法。
相关实践学习
数据库实验室挑战任务-初级任务
本场景介绍如何开通属于你的免费云数据库,在RDS-MySQL中完成对学生成绩的详情查询,执行指定类型SQL。
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
目录
相关文章
|
关系型数据库 数据库 索引
AnalyticDB for PostgreSQL 黑科技解析 - 列存储 Meta Scan 性能加速
本文介绍阿里云 AnalyticDB for PostgreSQL(原HybridDB for PostgreSQL) 产品,即 MPP 数据仓库服务,其列存储 meta scan机制,及其对 分析场景的性能提升。
2398 0
|
5月前
|
存储 人工智能 关系型数据库
5倍性能提升,阿里云AnalyticDB PostgreSQL版新一代实时智能引擎重磅发布
2023 云栖大会上,AnalyticDB for PostgreSQL新一代实时智能引擎重磅发布,全自研计算和行列混存引擎较比开源Greenplum有5倍以上性能提升。AnalyticDB for PostgreSQL与通义大模型家族深度集成,推出一站式AIGC解决方案。阿里云新发布的行业模型及“百炼”平台,采用AnalyticDB for PostgreSQL作为内置向量检索引擎,性能较开源增强了2~5倍。大会上来自厦门国际银行、三七互娱等知名企业代表和瑶池数据库团队产品及技术资深专家们结合真实场景实践,深入分享了最新的技术进展和解析。
5倍性能提升,阿里云AnalyticDB PostgreSQL版新一代实时智能引擎重磅发布
|
8月前
|
SQL 关系型数据库 MySQL
10倍性能提升!一文读懂AnalyticDB MySQL秒级漏斗分析函数
营销域中的洞察分析/智能圈人/经营报表等场景是OLAP分析型数据库的重要应用场景,云原生数据仓库AnalyticDB MySQL在淘宝、饿了么、菜鸟、优酷、盒马等业务的营销场景有比较长时间的积累和沉淀,我们将通过一系列文章来介绍AnalyticDB MySQL在营销域数据产品中的落地与应用,本文主要介绍“漏斗分析”的实现与应用。
|
11月前
|
SQL Cloud Native 安全
10倍性能提升,一文读懂AnalyticDB秒级漏斗分析函数
AnalyticDB MySQL秒级漏斗分析函数助力企业简单快速研判用户增长
10561 2
|
SQL 移动开发 BI
【SQL开发实战技巧】系列(二十三):数仓报表场景☞ 如何对数据排列组合去重以及通过如何找到包含最大值和最小值的记录这个问题再次用执行计划给你证明分析函数性能不一定高
怎样对数据组合重新排列并去重的问题、通过如何找到包含最大值和最小值的记录这个问题再次用执行计划给你证明分析函数性能不一定高【SQL开发实战技巧】这一系列博主当作复习旧知识来进行写作,毕竟SQL开发在数据分析场景非常重要且基础,面试也会经常问SQL开发和调优经验,相信当我写完这一系列文章,也能再有所收获,未来面对SQL面试也能游刃有余~。本篇文章主要介绍的两个方面,第一个方面曾经有好几个网友和同事问我,第二个问题真的是很多同行的通病,认为分析函数是万金油,一股脑用。
【SQL开发实战技巧】系列(二十三):数仓报表场景☞ 如何对数据排列组合去重以及通过如何找到包含最大值和最小值的记录这个问题再次用执行计划给你证明分析函数性能不一定高
|
关系型数据库 MySQL OLAP
《阿里云 AnalyticDB MySQL版性能白皮书(1)》电子版地址
阿里云 AnalyticDB MySQL版性能白皮书(1)
97 0
《阿里云 AnalyticDB MySQL版性能白皮书(1)》电子版地址
|
存储 SQL Cloud Native
技术解析:阿里云 AnalyticDB 如何实现全球性能第一
北京时间 2020 年 5 月 4 日,TPC 官网正式公布,阿里云自研云原生数据仓库 AnalyticDB 通过严苛的 TPC-DS 全流程测试,性能较前世界纪录提升 29%,单位成本仅为其 1/3,再次成为全球性能领先的数据仓库。本文将对 AnalyticDB 进行全面解析,详细阐述其技术架构及存储和查询技术,并对 AnalyticDB 的下一步发展做出展望。
2758 0
技术解析:阿里云 AnalyticDB 如何实现全球性能第一
|
存储 分布式计算 Cloud Native
干货 | 云原生数仓如何破解大规模集群的关联查询性能问题?
近年来,数据库系统服务的数据量呈指数级增长,同时也面临处理的业务需求愈发复杂、实时性要求越来越高等挑战。单机数据库系统已经逐渐不能满足现代的数据库服务要求,因此分布式数据库/数据仓库得到了越来越广泛地运用。
285 0
|
存储 消息中间件 Cloud Native
云原生数仓如何破解大规模集群的关联查询性能问题?
AnalyticDB for PostgreSQL(以下简称ADB PG)是一款PB级的MPP架构云原生数据仓库。本文从ADB PG架构设计的角度出发,探讨Runtime Filter在ADB PG中的实现方案,并介绍了基于Bloom Filter的ADB PG Dynamic Join Filter功能技术细节。
云原生数仓如何破解大规模集群的关联查询性能问题?
|
存储 SQL Cloud Native

热门文章

最新文章

相关产品

  • 云原生数据仓库AnalyticDB MySQL版