Flink Batch SQL 1.10 实践

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 1.10可以说是第一个成熟的生产可用的Flink Batch SQL版本,它一扫之前Dataset的羸弱,从功能和性能上都有大幅改进,以下我从架构、外部系统集成、实践三个方面进行阐述。

作者:李劲松(之信)

Flink作为流批统一的计算框架,在1.10中完成了大量batch相关的增强与改进。1.10可以说是第一个成熟的生产可用的Flink Batch SQL版本,它一扫之前Dataset的羸弱,从功能和性能上都有大幅改进,以下我从架构、外部系统集成、实践三个方面进行阐述。

架构

Stack

图片 1.png

首先来看下stack,在新的Blink planner中,batch也是架设在Transformation上的,这就意味着我们和Dataset完全没有关系了:

  1. 我们可以尽可能的和streaming复用组件,复用代码,有同一套行为。
  2. 如果想要Table/SQL的toDataset或者fromDataset,那就完全没戏了。尽可能的在Table的层面来处理吧。
  3. 后续我们正在考虑在DataStream上构建BoundedStream,给DataStream带来批处理的功能。

网络模型

图片 2.png

Batch模式就是在中间结果落盘,这个模式和典型的Batch处理是一致的,比如MapReduce/Spark/Tez。

Flink以前的网络模型也分为Batch和Pipeline两种,但是Batch模式只是支持上下游隔断执行,也就是说资源用量可以不用同时满足上下游共同的并发。但是另外一个关键点是Failover没有对接好,1.9和1.10在这方面进行了改进,支持了单点的Failover。

建议在Batch时打开:

jobmanager.execution.failover-strategy = region

为了避免重启过于频繁导致JobMaster太忙了,可以把重启间隔提高:

restart-strategy.fixed-delay.delay = 30 s

Batch模式的好处有:

  • 容错好,可以单点恢复
  • 调度好,不管多少资源都可以运行
  • 性能差,中间数据需要落盘,强烈建议开启压缩
    taskmanager.network.blocking-shuffle.compression.enabled = true

Batch模式比较稳,适合传统Batch作业,大作业。

图片 3.png

Pipeline模式是Flink的传统模式,它完全和Streaming作业用的是同一套代码,其实社区里Impala和Presto也是类似的模式,纯走网络,需要处理反压,不落盘,它主要的优缺点是:

  • 容错差,只能全局重来
  • 调度差,你得保证有足够的资源
  • 性能好,Pipeline执行,完全复用Stream,复用流控反压等功能。

有条件可以考虑开启Pipeline模式。

调度模型

Flink on Yarn支持两种模式,Session模式和Per job模式,现在已经在调度层次高度统一了。

  1. Session模式没有最大进程限制,当有Job需要资源时,它就会去Yarn申请新资源,当Session有空闲资源时,它就会给Job复用,所以它的模型和PerJob是基本一样的。
  2. 唯一的不同只是:Session模式可以跨作业复用进程。

另外,如果想要更好的复用进程,可以考虑加大TaskManager的超时释放:
resourcemanager.taskmanager-timeout = 900000

资源模型

先说说并发:

  1. 对Source来说:目前Hive的table是根据InputSplit来定需要多少并发的,它之后能Chain起来的Operators自然都是和source相同的并发。
  2. 对下游网络传输过后的Operators(Tasks)来说:除了一定需要单并发的Task来说,其它Task全部统一并发,由table.exec.resource.default-parallelism统一控制。

我们在Blink内部实现了基于统计信息来推断并发的功能,但是其实以上的策略在大部分场景就够用了。

Manage内存

图片 4.png

目前一个TaskManager里面含有多个Slot,在Batch作业中,一个Slot里只能运行一个Task (关闭SlotShare)。

对内存来说,单个TM会把Manage内存切分成Slot粒度,如果1个TM中有n个Slot,也就是Task能拿到1/n的manage内存。

我们在1.10做了重大的一个改进就是:Task中chain起来的各个operators按照比例来瓜分内存,所以现在配置的算子内存都是一个比例值,实际拿到的还要根据Slot的内存来瓜分。

这样做的一个重要好处是:

  1. 不管当前Slot有多少内存,作业能都run起来,这大大提高了开箱即用。
  2. 不管当前Slot有多少内存,Operators都会把内存瓜分干净,不会存在浪费的可能。

当然,为了运行的效率,我们一般建议单个Slot的manage内存应该大于500MB。

另一个事情,在1.10后,我们去除了OnHeap的manage内存,所以只有off-heap的manage内存。

外部系统集成

Hive

强烈推荐Hive Catalog + Hive,这也是目前批处理最成熟的架构。在1.10中,除了对以前功能的完善以外,其它做了几件事:

  1. 多版本支持,支持Hive 1.X 2.X 3.X
  2. 完善了分区的支持,包括分区读,动态/静态分区写,分区统计信息的支持。
  3. 集成Hive内置函数,可以通过以下方式来load:
    a)TableEnvironment.loadModule("hiveModule",new HiveModule("hiveVersion"))
  4. 优化了ORC的性能读,使用向量化的读取方式,但是目前只支持Hive 2+版本,且要求列没有复杂类型。有没有进行过优化差距在5倍量级。

兼容Streaming Connectors

得益于流批统一的架构,目前的流Connectors也能在batch上使用,比如HBase的Lookup和Sink、JDBC的Lookup和Sink、Elasticsearch的Sink,都可以在Batch无缝对接使用起来。

实践

SQL-CLI

在1.10中,SQL-CLI也做了大量的改动,比如把SQL-CLI做了stateful,里面也支持了DDL,还支持了大量的DDL命令,给SQL-CLI暴露了很多TableEnvironment的能力,这让用户可以方便得多。后续,我们也需要对接JDBC的客户端,让用户可以更好的对接外部工具。但是SQL-CLI仍然待继续改进,比如目前仍然只支持Session模式,不支持Per Job模式。

编程方式

TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings
  .newInstance()
  .useBlinkPlanner()
  .inBatchMode()
  .build());
AI 代码解读

老的BatchTableEnv因为绑定了Dataset,而且区分Java和Scala,是不干净的设计方式,所以Blink planner只支持新的TableEnv。

TableEnv注册的source, sink, connector, functions,都是temporary的,重启之后即失效了。如果需要持久化的object,考虑使用HiveCatalog。

tEnv.registerCatalog(“hive”, hiveCatalog);
tEnv.useCatalog(“hive”);
AI 代码解读

可以通过tEnv.sqlQuery来执行DML,这样可以获得一个Table,我们也通过collect来获得小量的数据:

Table table = tEnv.sqlQuery(“SELECT COUNT(*) FROM MyTable”);
List<Row> results = TableUtils.collectToList(table);
System.out.println(results);
AI 代码解读

可以通过tEnv.sqlUpdate来执行DDL,但是目前并不支持创建hive的table,只能创建Flink类型的table:

tEnv.sqlUpdate(
   "CREATE TABLE myResult (" +
      "  cnt BIGINT"
      ") WITH (" +
      "  'connector.type'='jdbc'," 
         ……
      ")");
AI 代码解读

可以通过tEnv.sqlUpdate来执行insert语句,Insert到临时表或者Catalog表中,比如insert到上面创建的临时JDBC表中:

tEnv.sqlUpdate(“INSERT INTO myResult SELECT COUNT(*) FROM MyTable”);
tEnv.execute(“MyJob”);
AI 代码解读

当结果表是Hive表时,可以使用Overwrite语法,也可以使用静态Partition的语法,这需要打开Hive的方言:

tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
AI 代码解读

结语

目前Flink batch SQL仍然在高速发展中,但是1.10已经是一个可用的版本了,它在功能上、性能上都有很大的提升,后续还有很多有意思的features,等待着大家一起去挖掘。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
打赏
0
0
0
0
36030
分享
相关文章
阿里妈妈基于 Flink+Paimon 的 Lakehouse 应用实践
本文总结了阿里妈妈数据技术专家陈亮在Flink Forward Asia 2024大会上的分享,围绕广告业务背景、架构设计及湖仓方案演进展开。内容涵盖广告生态运作、实时数仓挑战与优化,以及基于Paimon的湖仓方案优势。通过分层设计与技术优化,实现业务交付周期缩短30%以上,资源开销降低40%,并大幅提升系统稳定性和运营效率。文章还介绍了阿里云实时计算Flink版的免费试用活动,助力企业探索实时计算与湖仓一体化解决方案。
356 3
阿里妈妈基于 Flink+Paimon 的 Lakehouse 应用实践
Flink CDC + Hologres高性能数据同步优化实践
本文整理自阿里云高级技术专家胡一博老师在Flink Forward Asia 2024数据集成(二)专场的分享,主要内容包括:1. Hologres介绍:实时数据仓库,支持毫秒级写入和高QPS查询;2. 写入优化:通过改进缓冲队列、连接池和COPY模式提高吞吐量和降低延迟;3. 消费优化:优化离线场景和分区表的消费逻辑,提升性能和资源利用率;4. 未来展望:进一步简化用户操作,支持更多DDL操作及全增量消费。Hologres 3.0全新升级为一体化实时湖仓平台,提供多项新功能并降低使用成本。
222 1
Flink CDC + Hologres高性能数据同步优化实践
基于 Flink 进行增量批计算的探索与实践
基于 Flink 进行增量批计算的探索与实践
基于 Flink 进行增量批计算的探索与实践
万字长文带你深入广告场景Paimon+Flink全链路探索与实践
本文将结合实时、离线数据研发痛点和当下Paimon的特性,以实例呈现低门槛、低成本、分钟级延迟的流批一体化方案,点击文章阅读详细内容~
鹰角基于 Flink + Paimon + Trino 构建湖仓一体化平台实践项目
本文整理自鹰角网络大数据开发工程师朱正军在Flink Forward Asia 2024上的分享,主要涵盖四个方面:鹰角数据平台架构、数据湖选型、湖仓一体建设及未来展望。文章详细介绍了鹰角如何构建基于Paimon的数据湖,解决了Hudi入湖的痛点,并通过Trino引擎和Ranger权限管理实现高效的数据查询与管控。此外,还探讨了湖仓一体平台的落地效果及未来技术发展方向,包括Trino与Paimon的集成增强、StarRocks的应用以及Paimon全面替换Hive的计划。
198 1
鹰角基于 Flink + Paimon + Trino 构建湖仓一体化平台实践项目
Flink CDC 在阿里云 DataWorks 数据集成入湖场景的应用实践
Flink CDC 在阿里云 DataWorks 数据集成入湖场景的应用实践
鹰角基于 Flink + Paimon + Trino 构建湖仓一体化平台实践项目
鹰角基于 Flink + Paimon + Trino 构建湖仓一体化平台实践项目
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是为应对传统数据处理框架中流批分离的问题而诞生的,它融合了SQL的简洁性和Flink的强大流批处理能力,降低了大数据处理门槛。其核心工作原理包括生成逻辑执行计划、查询优化和构建算子树,确保高效执行。Flink SQL 支持过滤、投影、聚合、连接和窗口等常用算子,实现了流批一体处理,极大提高了开发效率和代码复用性。通过统一的API和语法,Flink SQL 能够灵活应对实时和离线数据分析场景,为企业提供强大的数据处理能力。
314 26
Flink CDC 在阿里云实时计算Flink版的云上实践
本文整理自阿里云高级开发工程师阮航在Flink Forward Asia 2024的分享,重点介绍了Flink CDC与实时计算Flink的集成、CDC YAML的核心功能及应用场景。主要内容包括:Flink CDC的发展及其在流批数据处理中的作用;CDC YAML支持的同步链路、Transform和Route功能、丰富的监控指标;典型应用场景如整库同步、Binlog原始数据同步、分库分表同步等;并通过两个Demo展示了MySQL整库同步到Paimon和Binlog同步到Kafka的过程。最后,介绍了未来规划,如脏数据处理、数据限流及扩展数据源支持。
253 0
Flink CDC 在阿里云实时计算Flink版的云上实践

相关产品

  • 实时计算 Flink版