Iceberg 在基于 Flink 的流式数据入库场景中的应用

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 本文以流式数据入库的场景为基础,介绍引入 Iceberg 作为落地格式和嵌入 Flink sink 的收益,并分析了当前可实现的框架及要点。

本文以流式数据入库的场景为基础,介绍引入 Iceberg 作为落地格式和嵌入 Flink sink 的收益,并分析了当前可实现的框架及要点。

应用场景

流式数据入库,是大数据和数据湖的典型应用场景。上游的流式数据,如日志,或增量修改,通过数据总线,经过必要的处理后,汇聚并存储于数据湖,供下游的应用(如报表或者商业智能分析)使用。

640 1.jpg

上述的应用场景通常有如下的痛点,需要整个流程不断的优化:

  • 支持流式数据写入,并保证端到端的不重不丢(即 exactly-once);
  • 尽量减少中间环节,能支持更实时(甚至是 T+0)的读取或导出,给下游提供更实时更准确的基础数据;
  • 支持 ACID,避免脏读等错误发生;
  • 支持修改已落地的数据,虽然大数据和数据湖长于处理静态的或者缓慢变化的数据,即读多写少的场景,但方便的修改功能可以提升用户体验,避免用户因为极少的修改,手动更换整个数据文件,甚至是重新导出;
  • 支持修改表结构,如增加或者变更列;而且变更不要引起数据的重新组织。

引入 Iceberg 作为 Flink sink

为了解决上述痛点,我们引入了 Iceberg 作为数据落地的格式。Iceberg 支持 ACID 事务、修改和删除、独立于计算引擎、支持表结构和分区方式动态变更等特性,很好的满足我们的需求。

同时,为了支持流式数据的写入,我们引入 Flink 作为流式处理框架,并将 Iceberg 作为 Flink sink。

下文主要介绍 Flink Iceberg sink 的实现框架和要点。但在这之前,需要先介绍一些实现中用到的 Flink 基本概念。

Flink 基本概念

从 Flink 的角度如何理解"流"和"批"

640 2.png

Flink 使用 DataFrame API 来统一的处理流和批数据。

Stream, Transformation 和 Operator

一个 Flink 程序由 stream 和 transformation 组成:

  • Stream: Transformation 之间的中间结果数据;
  • Transformation:对(一个或多个)输入 stream 进行操作,输出(一个或多个)结果 stream。

当 Flink 程序执行时,其被映射成 Streaming Dataflow,由如下的部分组成:

  • Source (operator):接收外部输入给 Flink;
  • Transformation (operator):中间对 stream 做的任何操作;
  • Sink (operator):Flink 输出给外部。

下图为 Flink 官网的示例,展示了一个以 Kafka 作为输入 Source,经过中间两个 transformation,最终通过 sink 输出到 Flink 之外的过程。

640 3.jpg

State, Checkpoint and Snapshot

Flink 依靠 checkpoint 和基于 snapshot 的恢复机制,保证程序 state 的一致性,实现容错。

Checkpoint 是对分布式的数据流,以及所有 operator 的 state,打 snapshot 的过程。

■ State

一个 operator 的 state,即它包含的所有用于恢复当前状态的信息,可分为两类:

  • 系统 state:如 operator 中对数据的缓存。
  • 用户自定义 state:和用户逻辑相关,可以利用 Flink 提供的 managed state,如 ValueState、ListState,来存储。

State 的存储位置,可以分为:

  • Local:内存,或者本地磁盘
  • State backend:远端的持久化存储,如 HDFS。

如下图所示:

640 4.jpg

■ Checkpoint

Flink 做 checkpoint 的过程如下:

  1. Checkpoint coordinator 首先发送 barrier 给 source。
  2. Source 做 snapshot,完成后向 coordinator 确认。
  3. Source 向下游发送 barrier。
  4. 下游 operator 收到所有上游的 barrier 后,做 snapshot,完成后向 coordinator 确认。
  5. 继续往下游发送 barrier,直到 sink。
  6. Sink 通知 coordinator 自己完成 checkpoint。
  7. Coordinator 确认本周期 snapshot 做完。

如下图所示:

640 5.jpg

■ Barrier

Barrier 是 Flink 做分布式 snapshot 的重要概念。它作为一个系统标记,被插入到数据流中,随真实数据一起,按照数据流的方向,从上游向下游传递。

由于每个 barrier 唯一对应 checkpoint id,所以数据流中的 record 实际被 barrier 分组,如下图所示,barrier n 和 barrier n-1 之间的 record,属于 checkpoint n。

640 6.jpg

Barrier 的作用是在分布式的数据流中,将 operator 的多个输入流按照 checkpoint对齐(align),如下图所示:

640 7.jpg

Flink Iceberg sink

了解了上述 Flink 的基本概念,这些概念又是如何被应用和映射到 Flink Iceberg sink 当中的呢?

总体框架

640 8.jpg

如图,Flink Iceberg sink 有两个主要模块和两个辅助模块组成:

640 9.png

实现要点

■ Writer

  1. 在当前的实现中,Java 的 Map 作为每条记录,输入给 writer。内部逻辑先将其转化为作为中间格式的 Avro IndexedRecord,而后通过 Iceberg 里的 Parquet 相关 API,累积的写入 DataFile。
  2. 使用 Avro 作为中间格式是一个临时方案,为简化适配,并最大限度的利用现有逻辑。但长期来看,使用中间格式会影响处理效率,社区也在试图通过 ISSUE-870 来去掉 Avro,进而使用 Iceberg 内建的数据类型作为输入,同时也需要加入一个到 Flink 内建数据类型的转换器。
  3. 在做 checkpoint 的过程中,发送 writer 自己的 barrier 到下游的 committer 之前,关闭单个 Parquet 文件,构建 DataFile,并发送 DataFile 的信息给下游。

■ Committer

  1. 全局唯一的 Committer 在收到上游所有 writer 的 barrier 以后,将收到的 DataFile 的信息填入 manifest file,并使用 ListState 把 manifest file 作为用户自定义的 state,保存于 snapshot 中。
  2. 当 checkpoint 完成以后,通过 merge append 将 manifest file 提交给 Iceberg。Iceberg 内部通过后续的一系列操作完成 commit。最终让新加入的数据对其他的读任务可见。

试用 Flink Iceberg sink

社区上 https://github.com/apache/incubator-iceberg/pull/856 提供了可以试用的原型代码。下载该 patch 放入 master 分支,编译并构建即可。如下的程序展示了如何将该 sink 嵌入到 Flink 数据流中:

// Configurate catalog
org.apache.hadoop.conf.Configuration hadoopConf =
    new org.apache.hadoop.conf.Configuration();
hadoopConf.set(
    org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS.varname,
    META_STORE_URIS);
hadoopConf.set(
    org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE.varname,
    META_STORE_WAREHOUSE);
 
Catalog icebergCatalog = new HiveCatalog(hadoopConf);
 
// Create Iceberg table
Schema schema = new Schema(
   ...
);
PartitionSpec partitionSpec = builderFor(schema)...
TableIdentifier tableIdentifier =
    TableIdentifier.of(DATABASE_NAME, TABLE_NAME);
// If needed, check the existence of table by loadTable() and drop it
// before creating it
icebergCatalog.createTable(tableIdentifier, schema, partitionSpec);
 
// Obtain an execution environment
StreamExecutionEnvironment env = 
    StreamExecutionEnvironment.getExecutionEnvironment();
 
// Enable checkpointing
env.enableCheckpointing(...);
  
// Add Source
DataStream<Map<String, Object>> dataStream =
    env.addSource(source, typeInformation);
 
// Configure Ieberg sink
Configuration conf = new Configuration();
conf.setString(
    org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE.varname,
    META_STORE_URIS);
conf.setString(IcebergConnectorConstant.DATABASE, DATABASE_NAME);
conf.setString(IcebergConnectorConstant.TABLE, TABLE_NAME);
 
// Append Iceberg sink to data stream
IcebergSinkAppender<Map<String, Object>> appender =
   new IcebergSinkAppender<Map<String, Object>>(conf, "test")
       .withSerializer(MapAvroSerializer.getInstance())
       .withWriterParallelism(1);
appender.append(dataStream);
 
// Trigger the execution
env.execute("Sink Test");

后续规划

Flink Iceberg sink 有很多需要完善的地方,例如:上文中提到的去掉 Avro 作为中间格式;以及在各种失败的情况下是否仍能保证端到端的 exactly-once;按固定时长做 checkpoint,在高低峰时生成不同大小的 DataFile,是否对后续读不友好等。这些问题都在我们的后续规划中,也会全数贡献给社区。

参考资料:

[1] Iceberg 官网:
https://iceberg.apache.org/
[2] Flink 1.10文 档:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/
[3] Neflix 提供的 Flink Iceberg connector 原型:
https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg
[4] Flink Iceberg sink 设计文档:
https://docs.google.com/document/d/19M-sP6FlTVm7BV7MM4Om1n_MVo1xCy7GyDl_9ZAjVNQ/edit?usp=sharing
[5] Flink 容错机制(checkpoint) :
https://www.cnblogs.com/starzy/p/11439988.html

# 社区活动推荐 #

普惠全球开发者,这一次,格外与众不同!首个 Apache 顶级项目在线会议 Flink Forward 全球直播中文精华版来啦,聚焦 Alibaba、Google、AWS、Uber、Netflix、新浪微博等海内外一线厂商,经典 Flink 应用场景,最新功能、未来规划一览无余。点击下方链接可了解更多大会详情:https://developer.aliyun.com/live/2594?spm=a2c6h.14242504.J_6074706160.2.3fca361f4cYyQx

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
1月前
|
消息中间件 关系型数据库 Kafka
flink cdc 数据问题之数据丢失如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
1月前
|
关系型数据库 MySQL Java
flink cdc 同步问题之多表数据如何同步
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
1月前
|
监控 关系型数据库 MySQL
Flink CDC产品常见问题之flink-cdc任务抓取全量的mysql数据不生效如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
2月前
|
消息中间件 SQL JSON
Flink问题之source并行度不同导致任务没有数据落地如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
330 0
|
29天前
|
API 数据库 流计算
有大佬知道在使用flink cdc实现数据同步,如何实现如果服务停止了对数据源表的某个数据进行删除操作,重启服务之后目标表能进行对源表删除的数据进行删除吗?
【2月更文挑战第27天】有大佬知道在使用flink cdc实现数据同步,如何实现如果服务停止了对数据源表的某个数据进行删除操作,重启服务之后目标表能进行对源表删除的数据进行删除吗?
45 3
|
29天前
|
Oracle 关系型数据库 MySQL
Flink CDC产品常见问题之flink Oraclecdc 捕获19C数据时报错错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
1月前
|
消息中间件 关系型数据库 Kafka
Flink CDC产品常见问题之 Oraclecdc JdbcIncrementalSource 捕获不到数据如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
2月前
|
消息中间件 Kafka Apache
Flink 提供了与 Kafka 集成的官方 Connector,使得 Flink 能够消费 Kafka 数据
【2月更文挑战第6天】Flink 提供了与 Kafka 集成的官方 Connector,使得 Flink 能够消费 Kafka 数据
69 2
|
29天前
|
分布式计算 Hadoop Java
Flink CDC产品常见问题之tidb cdc 数据量大了就疯狂报空指针如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
29天前
|
资源调度 关系型数据库 测试技术
Flink CDC产品常见问题之没有报错但是一直监听不到数据如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。

相关产品

  • 实时计算 Flink版