创建数据总线(DataHub)源表

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介:

什么是数据总线(DataHub)

DataHub作为一个流式数据总线,为阿里云数加平台提供了大数据的入口服务,共同构建一站式的数据处理平台。实时计算 Flink通常使用DataHub作为流式数据存储头和输出目的端。同时,上游众多流式数据,包括DTS、IOT等均选择DataHub作为大数据平台的数据入口。DataHub本身是流数据存储,实时计算只能将其作为流式数据输入。示例如下:

 
 
  1. create table datahub_stream(
  2. name varchar,
  3. age BIGINT,
  4. birthday BIGINT
  5. ) with (
  6. type='datahub',
  7. endPoint='http://dXXXXXXXX.com',
  8. project='blink_datahub_test',
  9. topic='test_topic_1',
  10. accessId='0i70XXXXXXXXs',
  11. accessKey='yF60EwXXXXXXXXXnvQPJ2zhCfHU',
  12. startTime='2017-07-21 00:00:00'
  13. );

属性字段

Flink SQL支持获取DataHub的属性字段。能够记录每条信息写入DataHub的系统时间。

如图所示:12421

字段名 注释说明
timestamp 每条记录入datahub的systemtime

示例

通过 HEADER 关键字获取属性字段。

例如,属性字段并不存在于DATAHUB的字段声明里。想获取每条记录入datahub的systemtime,可以将timestamp作为字段名,在后面加上HEADER就可以取出想要的属性值。

测试数据

name(VARCHAT) MsgID(VARCHAT)
ens_altar_flow ems0a

测试案例

 
 
  1. CREATE TABLE datahub_log (
  2. `timestamp` varchar HEADER,
  3. result varchar
  4. MsgID varchar
  5. )
  6. WITH
  7. (
  8. type ='datahub'
  9. );
  10. CREATE TABLE RDS_out (
  11. `timestamp` varchar,
  12. MsgID varchar,
  13. Version varchar
  14. )
  15. WITH
  16. (
  17. type ='RDS'
  18. );
  19. INSERT INTO RDS_out
  20. SELECT
  21. `timestamp`,
  22. result,
  23. MsgID
  24. FROM
  25. datahub_log;

测试结果

TIME(VARCHAT) MsgID(VARCHAT) Version(VARCHAT)
1522652455625 ems0a 0.0.1

WITH参数

目前只支持tuple模式的topic

参数 注释说明 备注
endPoint 消费端点信息 DATAHUB的Endpoint地址
accessId 读取的accessId
accessKey 读取的密钥
project 读取的项目
topic project下的具体的topic
startTime 启动位点的时间 格式为”yyyy-MM-dd hh:mm:ss”
maxRetryTimes 读取最大尝试次数 可选,默认为20。
retryIntervalMs 重试间隔 可选,默认为1000。
batchReadSize 单次读取条数 可选,默认为10,可设置的最大值为1000。
lengthCheck 单行字段条数检查策略 可选,默认为SKIP,其它可选值为EXCEPTION、PAD。SKIP:字段数目不符合时跳过 。EXCEPTION:字段数目不符合时抛出异常。PAD:按顺序填充,不存在的置为null。
columnErrorDebug 是否打开调试开关,如果打开,会把解析异常的log打印出来 可选,默认为false。

类型映射

DataHub和实时计算字段类型对应关系如下,建议使用该对应关系时进行DDL声明:

DataHub字段类型 实时计算字段类型
BIGINT BIGINT
STRING VARCHAR
DOUBLE DOUBLE
TIMESTAMP BIGINT
BOOLEAN BOOLEAN
DECIMAL DECIMAL

注意:DataHub的TIMESTAMP是精确到微妙级别的,在Unix时间戳里是16位的。而实时计算定义的TIMESTAMP是精确到毫秒级别的,在Unix时间戳里是13位的所以建议大家使用BIGINT来映射。如果一定是要用TIMESTAMP建议使用计算列来做转换。


本文转自实时计算—— 创建数据总线(DataHub)源表
相关实践学习
实时数据及离线数据上云方案
本实验通过使用CANAL、DataHub、DataWorks、MaxCompute服务,实现数据上云,解决了数据孤岛问题,同时把数据迁移到云计算平台,对后续数据的计算和应用提供了第一步开山之路。
相关文章
|
Java Apache Maven
阿里云数据总线(DataHub)使用Flume插件导入数据示例
Flume NG是Cloudera提供的一个分布式、可靠、可用的系统,它能够将不同数据源的海量日志数据进行高效收集、聚合、移动,最后存储到一个中心化数据存储系统中。本文主要演示如何使用Flume-DataHub插件导入数据到阿里云数据总线(DataHub)。
2692 0
阿里云数据总线(DataHub)使用Flume插件导入数据示例
|
5月前
|
SQL 关系型数据库 数据管理
Datahub实践——Sqllineage解析Sql实现端到端数据血缘
Datahub实践——Sqllineage解析Sql实现端到端数据血缘
701 1
|
7月前
|
数据采集 JSON 关系型数据库
将 MySQL 数据抽取并写入 DataHub,您可以按照以下步骤进行
将 MySQL 数据抽取并写入 DataHub,您可以按照以下步骤进行
297 2
|
3月前
|
存储 监控 Apache
Flink整库同步 如何把数据丢入到 datahub 中
Flink整库同步 如何把数据丢入到 datahub 中
|
5月前
|
数据采集 大数据 数据挖掘
企业级数据治理工作怎么开展?Datahub这样做
企业级数据治理工作怎么开展?Datahub这样做
|
7月前
|
数据采集 JSON 关系型数据库
将 MySQL 数据抽取并写入 DataHub
将 MySQL 数据抽取并写入 DataHub
175 3
|
9月前
|
Java API Maven
Fink在处理DataHub数据源时无法正确识别RecordData类的字段
Fink在处理DataHub数据源时无法正确识别RecordData类的字段
77 1
|
JSON 物联网 数据格式
物联网平台数据流转到datahub时报错
记录一次物联网平台数据流转到datahub时的报错
389 0
物联网平台数据流转到datahub时报错