表格存储新手指南:Java SDK异步接口的使用

本文涉及的产品
对象存储 OSS,20GB 3个月
对象存储 OSS,恶意文件检测 1000次 1年
日志服务 SLS,月写入数据量 50GB 1个月
简介: 本篇文章主要会介绍下表格存储的Java SDK提供的异步接口,如何使用以及应用场景。

本篇文章主要会介绍下表格存储的Java SDK提供的异步接口,如何使用以及应用场景。


为什么需要异步?

异步提供了一个non-blocking, event-driven的编程模型,能够将系统不同层级的模块进行层次化的解耦,能够利用多核并行执行任务,提高性能。

现如今,一个大型的系统,系统级调优的最关键一步,就是异步化。异步化最常改造的是远程RPC或者数据库访问部分,表格存储作为一个底层数据库产品,需要提供异步接口来适应这个潮流。


在表格存储内部,我们也有一些使用异步来优化系统的例子,就拿Java SDK来说,可以看下以下两篇文章:

1. 使用NIO来优化Java SDK的性能

2. 基于Java SDK异步接口,提供高并发、高吞吐率的数据导入接口



如何使用?

异步接口的使用和同步接口没有太大区别,使用同样的请求参数,唯一的不同在于返回结果的处理上。同步接口会同步的返回调用结果,而异步接口会返回Future类型的结果,或者直接通过Callback来通知结果。

Future的使用

    private static void listTableWithFuture(OTSClientAsync client) {
        // 通过Future同步的等待结果返回。
        try {
            OTSFuture<ListTableResult> future = client.listTable();
            ListTableResult result = future.get(); // 同步的等待
            System.out.println("\nList table by listTableWithFuture:");
            for (String tableName : result.getTableNames()) {
                System.out.println(tableName);
            }
        } catch (OTSException e) {
            e.printStackTrace();
        } catch (ClientException e) {
            e.printStackTrace();
        }

        // 通过Future,间歇性的等待结果返回。
        try {
            OTSFuture<ListTableResult> future = client.listTable();

            while (!future.isDone()) {
                System.out.println("Waiting for result of list table.");
                Thread.sleep(10); // 每隔10ms检查结果是否返回
            }

            ListTableResult result = future.get();
            System.out.println("\nList table by listTableWithFuture:");
            for (String tableName : result.getTableNames()) {
                System.out.println(tableName);
            }
        } catch (OTSException e) {
            e.printStackTrace();
        } catch (ClientException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
AI 代码解读

Callback的使用

    private static void listTableWithCallback(OTSClientAsync asyncClient) {
        final AtomicBoolean isDone = new AtomicBoolean(false);
        OTSCallback<ListTableRequest, ListTableResult> callback = new OTSCallback<ListTableRequest, ListTableResult>() {
            @Override
            public void onCompleted(OTSContext<ListTableRequest, ListTableResult> otsContext) {
                isDone.set(true);
                System.out.println("\nList table by listTableWithCallback:");
                for (String tableName : otsContext.getOTSResult().getTableNames()) {
                    System.out.println(tableName);
                }
            }

            @Override
            public void onFailed(OTSContext<ListTableRequest, ListTableResult> otsContext, OTSException ex) {
                isDone.set(true);
                ex.printStackTrace();
            }

            @Override
            public void onFailed(OTSContext<ListTableRequest, ListTableResult> otsContext, ClientException ex) {
                isDone.set(true);
                ex.printStackTrace();
            }
        };

        asyncClient.listTable(callback); // 将callback扔给SDK,SDK在完成请求接到响应后,会自动调用callback

        // 等待callback被调用,一般的业务处理逻辑下,不需要这一步等待。
        while (!isDone.get()) {
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
AI 代码解读

案例一:如何突破BatchWriteRow的行数限制,一次性导入N行数据

    private static void batchWriteRow(OTSClientAsync asyncClient, String tableName) {
        // BatchWriteRow的行数限制是100行,使用异步接口,实现一次批量导入1000行。
        List<OTSFuture<BatchWriteRowResult>> futures = new ArrayList<OTSFuture<BatchWriteRowResult>>();
        int count = 10;
        // 一次性发出10个请求,每个请求写100行数据
        for (int i = 0; i < count; i++) {
            BatchWriteRowRequest request = new BatchWriteRowRequest();
            for (int j = 0; j < 100; j++) {
                RowPutChange rowChange = new RowPutChange(tableName);
                RowPrimaryKey primaryKey = new RowPrimaryKey();
                primaryKey.addPrimaryKeyColumn(COLUMN_GID_NAME, PrimaryKeyValue.fromLong(i * 100 + j));
                primaryKey.addPrimaryKeyColumn(COLUMN_UID_NAME, PrimaryKeyValue.fromLong(j));
                rowChange.setPrimaryKey(primaryKey);
                rowChange.addAttributeColumn(COLUMN_NAME_NAME, ColumnValue.fromString("name" + j));
                rowChange.addAttributeColumn(COLUMN_AGE_NAME, ColumnValue.fromLong(j));

                request.addRowChange(rowChange);
            }
            OTSFuture<BatchWriteRowResult> result = asyncClient.batchWriteRow(request);
            futures.add(result);
        }

        // 等待结果返回
        List<BatchWriteRowResult> results = new ArrayList<BatchWriteRowResult>();
        for (OTSFuture<BatchWriteRowResult> future : futures) {
            try {
                BatchWriteRowResult result = future.get(); // 同步等待结果返回
                results.add(result);
            } catch (OTSException e) {
                e.printStackTrace();
            } catch (ClientException e) {
                e.printStackTrace();
            }
        }

        // 统计返回结果
        int totalSucceedRows = 0;
        int totalFailedRows = 0;
        for (BatchWriteRowResult result : results) {
            totalSucceedRows += result.getSucceedRowsOfPut().size();
            totalFailedRows += result.getFailedRowsOfPut().size();
        }

        System.out.println("Total succeed rows: " + totalSucceedRows);
        System.out.println("Total failed rows: " + totalFailedRows);
    }
AI 代码解读


案例二:如何实现batch getRange

    private static void batchGetRange(OTSClientAsync asyncClient, String tableName) {
        // 一次性查询多个范围的数据,设置10个任务,每个任务查询100条数据。
        // 每个范围查询的时候设置limit为10,100条数据需要10次请求才能全部查完。
        int count = 10;
        OTSFuture<GetRangeResult>[] futures = new OTSFuture[count];
        for (int i = 0; i < count; i++) {
            futures[i] = sendGetRangeRequest(asyncClient, tableName, i * 100, i * 100 + 100);
        }

        // 检查是否所有范围查询均已做完,若未做完,则继续发送查询请求
        List<Row> allRows = new ArrayList<Row>();
        while (true) {
            boolean completed = true;
            for (int i = 0; i < futures.length; i++) {
                OTSFuture<GetRangeResult> future = futures[i];
                if (future == null) {
                    continue;
                }

                if (future.isDone()) {
                    GetRangeResult result = future.get();
                    allRows.addAll(result.getRows());

                    if (result.getNextStartPrimaryKey() != null) {
                        // 该范围还未查询完毕,需要从nextStart开始继续往下读。
                        long nextStart = result.getNextStartPrimaryKey().getPrimaryKey().get(COLUMN_GID_NAME).asLong();
                        long rangeEnd = i * 100 + 100;
                        futures[i] = sendGetRangeRequest(asyncClient, tableName, nextStart, rangeEnd);
                        completed = false;
                    } else {
                        futures[i] = null; // 若某个范围查询完毕,则将对应future设置为null
                    }
                } else {
                    completed = false;
                }
            }

            if (completed) {
                break;
            } else {
                try {
                    Thread.sleep(10); // 避免busy wait,每次循环完毕后等待一小段时间
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

        // 所有数据全部读出
        System.out.println("Total rows scanned: " + allRows.size());
    }
AI 代码解读

示例代码可从这里下载。

相关实践学习
消息队列+Serverless+Tablestore:实现高弹性的电商订单系统
基于消息队列以及函数计算,快速部署一个高弹性的商品订单系统,能够应对抢购场景下的高并发情况。
阿里云表格存储使用教程
表格存储(Table Store)是构建在阿里云飞天分布式系统之上的分布式NoSQL数据存储服务,根据99.99%的高可用以及11个9的数据可靠性的标准设计。表格存储通过数据分片和负载均衡技术,实现数据规模与访问并发上的无缝扩展,提供海量结构化数据的存储和实时访问。 产品详情:https://www.aliyun.com/product/ots
目录
打赏
0
0
0
5
7092
分享
相关文章
JAVA接入DeepSeek大模型接口开发---阿里云的百炼模型
随着大模型的越来越盛行,现在很多企业开始接入大模型的接口,今天我从java开发角度来写一个demo的示例,用于接入DeepSeek大模型,国内的大模型有很多的接入渠道,今天主要介绍下阿里云的百炼模型,因为这个模型是免费的,只要注册一个账户,就会免费送百万的token进行学习,今天就从一个简单的可以执行的示例开始进行介绍,希望可以分享给各位正在学习的同学们。
208 3
JAVA接入DeepSeek大模型接口开发---阿里云的百炼模型
|
7天前
|
Java Lambda 表达式:以 Foo 接口为例深入解析
本文深入解析了 Java 8 中 Lambda 表达式的用法及其背后的函数式接口原理,以 `Foo` 接口为例,展示了如何通过简洁的 Lambda 表达式替代传统匿名类实现。文章从 Lambda 基本语法、函数式接口定义到实际应用层层递进,并探讨默认方法与静态方法的扩展性,最后总结常见误区与关键点,助你高效优化代码!
28 0
|
7天前
|
java中一个接口A,以及一个实现它的类B,一个A类型的引用对象作为一个方法的参数,这个参数的类型可以是B的类型吗?
本文探讨了面向对象编程中接口与实现类的关系,以及里氏替换原则(LSP)的应用。通过示例代码展示了如何利用多态性将实现类的对象传递给接口类型的参数,满足LSP的要求。LSP确保子类能无缝替换父类或接口,不改变程序行为。接口定义了行为规范,实现类遵循此规范,从而保证了多态性和代码的可维护性。总结来说,接口与实现类的关系天然符合LSP,体现了多态性的核心思想。
19 0
java语言后台管理若依框架-登录提示404-接口异常-系统接口404异常如何处理-登录验证码不显示prod-api/captchaImage 404 (Not Found) 如何处理-解决方案优雅草卓伊凡
java语言后台管理若依框架-登录提示404-接口异常-系统接口404异常如何处理-登录验证码不显示prod-api/captchaImage 404 (Not Found) 如何处理-解决方案优雅草卓伊凡
363 5
|
3月前
|
java异步判断线程池所有任务是否执行完
通过上述步骤,您可以在Java中实现异步判断线程池所有任务是否执行完毕。这种方法使用了 `CompletionService`来监控任务的完成情况,并通过一个独立线程异步检查所有任务的执行状态。这种设计不仅简洁高效,还能确保在大量任务处理时程序的稳定性和可维护性。希望本文能为您的开发工作提供实用的指导和帮助。
160 17
|
4月前
|
java如何请求接口然后终止某个线程
通过本文的介绍,您应该能够理解如何在Java中请求接口并根据返回结果终止某个线程。合理使用标志位或 `interrupt`方法可以确保线程的安全终止,而处理好网络请求中的各种异常情况,可以提高程序的稳定性和可靠性。
80 6
利用Java获取京东SKU接口指南
本文介绍如何使用Java通过京东API获取商品SKU信息。首先,需注册京东开放平台账号并创建应用以获取AppKey和AppSecret。接着,查阅API文档了解调用方法。明确商品ID后,构建请求参数并通过HTTP客户端发送请求。最后,解析返回的JSON数据提取SKU信息。注意遵守API调用频率限制及数据保护法规。此方法适用于电商平台及其他数据获取场景。
|
5月前
|
Java中内置的函数式接口
Java中内置的函数式接口
65 2
|
2月前
|
【Java并发】【线程池】带你从0-1入门线程池
欢迎来到我的技术博客!我是一名热爱编程的开发者,梦想是编写高端CRUD应用。2025年我正在沉淀中,博客更新速度加快,期待与你一起成长。 线程池是一种复用线程资源的机制,通过预先创建一定数量的线程并管理其生命周期,避免频繁创建/销毁线程带来的性能开销。它解决了线程创建成本高、资源耗尽风险、响应速度慢和任务执行缺乏管理等问题。
194 60
【Java并发】【线程池】带你从0-1入门线程池
|
9天前
|
【源码】【Java并发】从InheritableThreadLocal和TTL源码的角度来看父子线程传递
本文涉及InheritableThreadLocal和TTL,从源码的角度,分别分析它们是怎么实现父子线程传递的。建议先了解ThreadLocal。
47 4
【源码】【Java并发】从InheritableThreadLocal和TTL源码的角度来看父子线程传递

云存储

+关注