Streamworks,基于扩展FlinkSQL实现流计算的源表导入、维表关联与结果表导出

本文涉及的产品
云原生数据库 PolarDB MySQL 版,Serverless 5000PCU 100GB
实时计算 Flink 版,5000CU*H 3个月
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介: Streamworks,袋鼠云基于SQL的流计算开发平台,其通过扩展FlinkSQL,实现FlinkSQL与界面化配置映射结合的方式,完成Kafka源数据的读入,并支持流数据与Mysql/Oracle/MongDB等数据源进行维表关联,将最终结果数据导出至Hbase/ES/Greenplum/Oracle/OceanBase等目标数据库,进行一站式的流数据开发。

Streamworks,袋鼠云基于SQL的流计算开发平台,其通过扩展FlinkSQL,实现FlinkSQL与界面化配置映射结合的方式,完成Kafka源数据的读入,并支持流数据与Mysql/Oracle/MongDB等数据源进行维表关联,将最终结果数据导出至Hbase/ES/Greenplum/Oracle/OceanBase等目标数据库,进行一站式的流数据开发。
s1

为什么扩展Flink-SQL?

Flink 本身的SQL语法并不提供对接输入源和输出目的的SQL语法,数据开发在使用过程中需要根据其提供的API接口编写Source和 Sink,不仅需要了解FLink 各类Operator的API,还需要对各个组件的相关调用方式有了解(比如Kafka,Redis,Mongo、Hbase等),异常繁琐。并且在需要关联到外部数据源的时候Flink也没有提供SQL相关的实现方式,若数据开发直接基于原生的Flink SQL进行实时的数据分析,需要较大的额外工作量。

袋鼠云的SteamWorks则聚焦于数据开发人员使用Flink SQL时专注于业务逻辑,只需要关心做什么,而不需要关心怎么做。研发团队对FlinkSQL进行了扩展,用户只需通过可视化配置,完成源表到导入、维表的关联、结果表的导出。

扩展了哪些Flink相关SQL

1.创建源表语句

CREATE TABLE tableName(

colName colType,
...
function(colNameX) AS aliasName,
WATERMARK FOR colName AS withOffset( colName , delayTime )

)WITH(

type ='kafka09',
bootstrapServers ='ip:port,ip:port...',
zookeeperQuorum ='ip:port,ip:port/zkparent',
offsetReset ='latest',
topic ='topicName',
parallelism ='parllNum'

);
2.创建输出表语句

CREATE TABLE tableName(

    colName colType,

    ...

    colNameX colType

)WITH(

    type ='mysql',

    url ='jdbcUrl',

    userName ='userName',

    password ='pwd',

    tableName ='tableName',

    parallelism ='parllNum'

  );

3.创建自定义函数;

Create (scala|table) FUNCTION name WITH com.xx.xx
4.维表关联语句;

CREATE TABLE tableName(

    colName cloType,

    ...

    PRIMARY KEY(keyInfo),

    PERIOD FOR SYSTEM_TIME

)WITH(

    type='mysql',

    url='jdbcUrl',

    userName='dbUserName',

    password='dbPwd',

    tableName='tableName',

    cache ='LRU',

    cacheSize ='10000',

    cacheTTLMs ='60000',

     parallelism ='1',

     partitionedJoin='false'

 );

 # 各个模块是如何翻译到Flink,进行数据处理

1.如何将创建源表的SQL语句转换为Flink的operator

Flink中表的都会映射到Table这个类。然后调用注册方法将Table注册到environment,StreamTableEnvironment.registerTable(tableName, table)。

当前我们只支持kafka数据源。Flink本身有读取kafka 的实现类(FlinkKafkaConsumer09),所以只需要根据指定参数实例化出该对象,并调用注册方法注册即可。

另外需要注意在Flink SQL经常会需要用到Rowtime、Proctime, 所以我们在注册表结构的时候额外添加Rowtime、Proctime。当需要用到rowtime的时候需要额外指定DataStream.watermarks(assignTimestampsAndWatermarks),自定义watermark主要做两个事情1:如何从Row中获取时间字段。 2:设定最大延迟时间。

2.如何将创建的输出表sql语句转换为Flink的operator

Flink输出Operator的基类是OutputFormat, 我们这里继承的是RichOutputFormat, 该抽象类继承OutputFormat,额外实现了获取运行环境的方法getRuntimeContext(), 方便于我们之后自定义metric等操作。

我们以输出到Mysql插件Mysql-Sink为例。

分两部分

(1)将create table 解析出表名称,字段信息,mysql连接信息。

    该部分使用正则表达式的方式将create table 语句转换为内部的一个实现类。该类存储了表名称,字段信息,插件类型,插件连接信息。

(2)继承RichOutputFormat将数据写到对应的外部数据源。

    主要是实现writeRecord方法,在mysql插件中其实就是调用jdbc 实现插入或者更新方法。

3.如何将自定义函数语句转换为Flink的operator;

   Flink对udf提供两种类型的实现方式:

继承ScalarFunction
继承TableFunction
需要做的将用户提供的jar添加到URLClassLoader, 并加载指定的class (实现上述接口的类路径),然后调用TableEnvironment.registerFunction(funcName, udfFunc);即完成UDF的注册。之后即可使用改定义的UDF;

4.维表功能是如何实现的?

流计算中一个常见的需求就是为数据流补齐字段。因为数据采集端采集到的数据往往比较有限,在做数据分析之前,就要先将所需的维度信息补全,但是当前Flink并未提供join外部数据源的SQL功能。

实现该功能需要注意的几个问题

(1)维表的数据是不断变化的

     在实现的时候需要支持定时更新内存中的缓存的外部数据源,比如使用LRU等策略。

(2)IO吞吐问题

     如果每接收到一条数据就串行到外部数据源去获取对应的关联记录的话,网络延迟将会是系统最大的瓶颈。这里我们选择阿里贡献给flink社区的算子RichAsyncFunction。该算子使用异步的方式从外部数据源获取数据,大大减少了花费在网络请求上的时间。

(3)如何将SQL 中包含的维表解析到Flink operator   

      为了从SQL中解析出指定的维表和过滤条件,使用正则明显不是一个合适的办法,需要匹配各种可能性,将是一个无穷无尽的过程。查看Flink本身对SQL的解析,它使用了calcite做为sql解析的工作。将sql解析出一个语法树,通过迭代的方式,搜索到对应的维表,然后将维表和非维表结构分开。

s2

通过上述步骤便可以通过Flink SQL完成常用的从kafka源表读取数据,join部数据源,并写入到指定的外部目标结构中,且袋鼠云开源了相关实现:https://github.com/DTStack/flinkStreamSQL

Demo:Kafka流数据关联Oracle维表,写入PostgreSQL 。

读取Kafka源数据:

CREATE TABLE MyTable(

    name varchar,

    channel varchar,

    pv INT,

    xctime bigint,

    CHARACTER_LENGTH(channel) AS timeLeng

 )WITH(

    type ='kafka09',

    kafka.bootstrap.servers ='172.16.8.198:9092',

    kafka.zookeeper.quorum ='172.16.8.198:2181/kafka',

    kafka.auto.offset.reset ='latest',

    kafka.topic ='nbTest1,nbTest2,nbTest3',

    --kafka.topic ='mqTest.*',

    --patterntopic='true'

    parallelism ='1',

    sourcedatatype ='json' #可不设置

 );

维表关联Oracle语句

create table sideTable(

    channel varchar,

    info varchar,

    PRIMARY KEY(channel),

    PERIOD FOR SYSTEM_TIME

 )WITH(

    type='oracle',

    url='jdbc:oracle:thin:@xx.xx.xx.xx:1521:orcl',

    userName='xx',

    password='xx',

    tableName='sidetest',

    cache ='LRU',

    cacheSize ='10000',

    cacheTTLMs ='60000',

    cacheMode='unordered',

    asyncCapacity='1000',

    asyncTimeout='10000'

    parallelism ='1',

    partitionedJoin='false',

    schema = 'MQTEST'

 );

--创建PostgreSQL输出表语句

Greenplum、LibrA数据库写入方式与PostgreSQL类似

CREATE TABLE MyResult(

    channel VARCHAR,

    info VARCHAR

 )WITH(

    type ='postgresql',

    url ='jdbc:postgresql://localhost:9001/test?sslmode=disable',

    userName ='dtstack',

    password ='abc123',

    tableName ='pv2',

    parallelism ='1'

 );

--基于FlinkSQL进行Kafka流数据读入关联Oracle维表,并写入PostgreSQL的实现

insert

into

    MyResult

    select

        a.channel,

        b.info

    from

        MyTable a

    join

        sideTable b

            on a.channel=b.channel

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
1月前
|
分布式计算 DataWorks 关系型数据库
DataWorks支持将ODPS表拆分并回流到MySQL的多个库和表中
【2月更文挑战第14天】DataWorks支持将ODPS表拆分并回流到MySQL的多个库和表中
59 8
|
4月前
|
SQL 分布式计算 Hadoop
创建hive表并关联数据
创建hive表并关联数据
35 0
|
4月前
|
SQL 关系型数据库 MySQL
使用CTAS 把mysql 表同步数据 到hologres ,Flink有什么参数可以使hologres 的字段都小写吗?
使用CTAS 把mysql 表同步数据 到hologres ,Flink有什么参数可以使hologres 的字段都小写吗?
281 0
|
SQL Oracle 关系型数据库
sqoop的导入导出以及where条件过滤数据导出
sqoop的导入导出以及where条件过滤数据导出
|
4月前
|
SQL 关系型数据库 MySQL
Hologres ctas怎么映射呢?
Hologres ctas怎么映射呢?
21 0
|
4月前
|
SQL 流计算
Hologres源表定义的时候,可以映射为flink的timestmap么,那这个怎么映射呢?
Hologres源表定义的时候,可以映射为flink的timestmap么,那这个怎么映射呢?
106 0
|
4月前
|
SQL 存储 监控
Flink CDC用flinksql方式采集多张表,是每张表启动一个java进程吗?
Flink CDC用flinksql方式采集多张表,是每张表启动一个java进程吗?
56 0
|
11月前
|
数据采集 运维 Ubuntu
使用kettle进行数据的多表关联
使用kettle进行数据的多表关联
|
分布式计算 DataWorks API
基于pyodps对MaxComputer表的数据探查
本次脚本设计是针对大批量表,并且没有明确业务支持下的数据探查,会根据不同的类型进行判定,根据结果值进行分析得出结论,并给出一定的建议,同时该脚本仅仅支持普通表,一级分区表和二级分区表;一级分区表的分区字段必须是ds或者pt。
862 0
|
SQL 存储 HIVE
Hive内部表与外部表的区别及使用场景
Hive内部表与外部表的区别及使用场景