创建消息队列(Kafka)源表

简介:

Kafka源表的实现来源于自社区的kafka版本实现。

注意:本文档只适合独享模式下使用。

Kafka需要定义的DDL如下。

 
  
  1. create table kafka_stream(
  2. messageKey VARBINARY,
  3. `message` VARBINARY,
  4. topic varchar,
  5. `partition` int,
  6. `offset` bigint
  7. ) with (
  8. type ='kafka010',
  9. topic = 'xxx',
  10. `group.id` = 'xxx',
  11. bootstrap.servers = 'ip:端口,ip:端口,ip:端口'
  12. );

注意:以上表中的五个字段顺序务必保持一致。

WITH参数

通用配置

参数 注释说明 备注
type Kafka对应版本 推荐使用KAFKA010
topic 读取的单个topic topic名称

必选配置

(1)kafka08必选配置:

参数 注释说明 备注
group.id 消费组id
zookeeper.connect zk链接地址 zk连接id

(2)kafka09/kafka010/kafka011必选配置:

参数 注释说明 备注
group.id 消费组id
bootstrap.servers kafka集群地址 kafka集群地址

Kafka集群地址:

如果您的kafka是阿里云商业版,请参考kafka商业版准备配置文档。

如果您的kafka是阿里云公测版,请参考kafka公测版准备配置文档。

可选配置

 
   
  1. "consumer.id","socket.timeout.ms","fetch.message.max.bytes","num.consumer.fetchers","auto.commit.enable","auto.commit.interval.ms","queued.max.message.chunks", "rebalance.max.retries","fetch.min.bytes","fetch.wait.max.ms","rebalance.backoff.ms","refresh.leader.backoff.ms","auto.offset.reset","consumer.timeout.ms","exclude.internal.topics","partition.assignment.strategy","client.id","zookeeper.session.timeout.ms","zookeeper.connection.timeout.ms","zookeeper.sync.time.ms","offsets.storage","offsets.channel.backoff.ms","offsets.channel.socket.timeout.ms","offsets.commit.max.retries","dual.commit.enabled","partition.assignment.strategy","socket.receive.buffer.bytes","fetch.min.bytes"

注意:其它可选配置项参考kafka官方文档:
Kafka09
https://kafka.apache.org/0110/documentation.html#consumerconfigs
Kafka010
https://kafka.apache.org/090/documentation.html#newconsumerconfigs
Kafka011
https://kafka.apache.org/0102/documentation.html#newconsumerconfigs

kafka版本对应关系

Type Kafka 版本
Kafka08 0.8.22
Kafka09 0.9.0.1
Kafka010 0.10.2.1
Kafka011 0.11.0.2

Kafka消息解析

默认Kafka读到的消息:

 
   
  1. messageKey varbianry,
  2. message varbianry,
  3. topic varchar,
  4. partition int,
  5. offset bigint

这样一个五元组,如果您希望在source阶段把数据parser成特定的其它格式,可以按照下面实践进行。

参数 注释说明 备注
parserUdtf 自定义解析函数 用于解析从kafka读到的消息映射到ddl具体对应的类型

如何写一个parserUdtf参见自定义表值函数(UDTF)

自建kafka

与阿里云Kafka消息队列一样,DDL定义相同。

示例:

 
   
  1. create table kafka_stream(
  2. messageKey VARBINARY,
  3. `message` VARBINARY,
  4. topic varchar,
  5. `partition` int,
  6. `offset` bigint
  7. ) with (
  8. type ='kafka011',
  9. topic = 'kafka_01',
  10. `group.id` = 'CID_blink',
  11. bootstrap.servers = '192.168.0.251:9092'
  12. );

WITH参数

关于自建Kafka的with参数,请参考本文档Kafka创建时DDL的with参数说明。需要注意的是 bootstrap.servers参数需要填写自建的地址和端口号。

注意:无论是阿里云Kafka还是自建Kafka,目前实时计算均无Tps、Rps等指标信息。在作业上线之后,运维界面暂时不支持显示指标信息。

本文转自实时计算——创建消息队列(Kafka)源表

相关文章
|
1月前
|
消息中间件 存储 大数据
Apache Kafka: 强大消息队列系统的介绍与使用
Apache Kafka: 强大消息队列系统的介绍与使用
|
1月前
|
消息中间件 存储 缓存
Kafka【基础知识 01】消息队列介绍+Kafka架构及核心概念(图片来源于网络)
【2月更文挑战第20天】Kafka【基础知识 01】消息队列介绍+Kafka架构及核心概念(图片来源于网络)
81 2
|
3天前
|
消息中间件 存储 Java
深度探索:使用Apache Kafka构建高效Java消息队列处理系统
【4月更文挑战第17天】本文介绍了在Java环境下使用Apache Kafka进行消息队列处理的方法。Kafka是一个分布式流处理平台,采用发布/订阅模型,支持高效的消息生产和消费。文章详细讲解了Kafka的核心概念,包括主题、生产者和消费者,以及消息的存储和消费流程。此外,还展示了Java代码示例,说明如何创建生产者和消费者。最后,讨论了在高并发场景下的优化策略,如分区、消息压缩和批处理。通过理解和应用这些策略,可以构建高性能的消息系统。
|
16天前
|
消息中间件 存储 负载均衡
消息队列学习之kafka
【4月更文挑战第2天】消息队列学习之kafka,一个分布式的,支持多分区、多副本,基于 Zookeeper 的分布式消息流平台。
13 2
|
2月前
|
消息中间件 存储 监控
美团面试:Kafka如何处理百万级消息队列?
美团面试:Kafka如何处理百万级消息队列?
130 1
|
3月前
|
消息中间件 监控 负载均衡
Kafka高级应用:如何配置处理MQ百万级消息队列?
在大数据时代,Apache Kafka作为一款高性能的分布式消息队列系统,广泛应用于处理大规模数据流。本文将深入探讨在Kafka环境中处理百万级消息队列的高级应用技巧。
172 0
|
3月前
|
消息中间件 存储 Kafka
云消息队列 Kafka 版生态谈第一期:无代码转储能力介绍
云消息队列 Kafka 版生态谈第一期:无代码转储能力介绍
|
2月前
|
消息中间件 安全 Kafka
2024年了,如何更好的搭建Kafka集群?
我们基于Kraft模式和Docker Compose同时采用最新版Kafka v3.6.1来搭建集群。
408 2
2024年了,如何更好的搭建Kafka集群?
|
3月前
|
消息中间件 存储 数据可视化
kafka高可用集群搭建
kafka高可用集群搭建
42 0
|
6月前
|
消息中间件 存储 Kubernetes
Helm方式部署 zookeeper+kafka 集群 ——2023.05
Helm方式部署 zookeeper+kafka 集群 ——2023.05
241 0

相关产品

  • 云消息队列 Kafka 版