阿里云存储服务 + 关注
手机版

Table Store新一代数据实时消费通道:Tunnel Service介绍

  1. 云栖社区>
  2. 阿里云存储服务>
  3. 博客>
  4. 正文

Table Store新一代数据实时消费通道:Tunnel Service介绍

竹千代_ 发布时间:2019-01-10 18:08:44 浏览527 评论0

摘要: 通道服务Tunnel Service是基于Table Store数据接口之上的全增量一体化服务,它通过一组Tunnel Service API和SDK为用户提供了增量、全量和增量加全量三种类型的分布式数据实时消费通道。

概述

通道服务Tunnel Service是基于Table Store数据接口之上的全增量一体化服务,它通过一组Tunnel Service API和SDK为用户提供了增量、全量和增量加全量三种类型的分布式数据实时消费通道。通过为数据表建立Tunnel Service数据通道,用户可以简单地实现对表中历史存量和新增数据的消费处理。

Table Store NoSQL数据库非常适合元数据管理、时序数据监控、消息系统等服务应用,这些应用的一个常见设计模式就是利用增量数据流或者先全量后增量的数据流来触发一些附加的操作逻辑,这些附加操作包括:

  • 数据同步,将数据同步到缓存、搜索引擎或者数据仓库中
  • 事件驱动,触发函数计算、通知消费端消费或者调用一些API
  • 流式数据处理,对接流式或者流批一体计算引擎
  • 数据搬迁,数据备份到OSS、迁移到容量型的Table Store实例等

利用Tunnel Service,我们可以针对这些模式轻松构建高效、弹性的解决方案,如下图:

tunnel1

功能简述

Tunnel Service是Table Store在stream功能之上推出的更强大的数据通道功能,Tunnel Service提供了:

  • 全增量一体的数据通道

    Tunnel Service不仅提供增量数据消费能力,还提供了可并行的全量数据消费和全量加增量数据消费功能
  • 增量数据变化保序

    Tunnel Service会为用户数据划分一到多个可并行消费的逻辑分区channel,每个channel下的增量数据按写入时间顺序保序,不同channel的数据可以并行消费
  • 消费延迟监控

    Tunnel Service通过DescribeTunnel API提供了客户端消费数据RPO(_恢复点目标,recovery point objective_)信息,并在控制台提供了Tunnel数据消费监控
  • 数据消费能力水平扩展

    Tunnel Service提供了逻辑分区channel的自动负载均衡,通过增加消费端数量,可以水平扩展数据消费速度

快速入门

我们可以使用table store控制台快速体验tunnel service功能:

  • 在控制台选择测试数据表,在通道服务管理页面创建通道,例如创建增量类型tunnel

tunnel2

tunnel3

  • 新建的tunnel页面下,可以查看tunnel中的数据内容、消费延迟监控、逻辑分区channel下的消费数据行数统计

tunnel4

  • 在控制台数据管理页面随机写入或删除数据

tunnel5

  • 使用模拟消费按钮可以预览通道中的数据格式

tunnel6

  • 复制通道ID,使用任一语言tunnel SDK,开启新建tunnel的数据消费
//用户定义消费callback函数
func exampleConsumeFunction(ctx *client.ChannelContext, records []*client.Record) error {
    fmt.Println("user-defined information", ctx.CustomValue)
    for _, rec := range records {
        fmt.Println("tunnel record detail:", rec.String())
    }
    fmt.Println("a round of records consumption finished")
    return nil
}

func main() {
    //配置callback到SimpleProcessFactory,配置消费端TunnelWorkerConfig
    workConfig := &client.TunnelWorkerConfig{
        ProcessorFactory: &client.SimpleProcessFactory{
            CustomValue: "user custom interface{} value",
            ProcessFunc: exampleConsumeFunction,
        },
    }

    //使用TunnelDaemon持续消费指定tunnel
    tunnelClient := client.NewTunnelClient(endpoint, instance, accessKeyId, accessKeySecret)
    daemon := client.NewTunnelDaemon(tunnelClient, $通道ID, workConfig)
    log.Fatal(daemon.Run())
}
  • 在数据消费标准输出可以看到增量数据消费日志,在控制台或者使用describeTunnel接口也可以查看消费延迟,channel下的消费数据行数更新

tunnel7

SDK

【云栖快讯】云栖专辑 | 阿里开发者们的第20个感悟:好的工程师为人写代码,而不仅是为编译器  详情请点击

网友评论