《KAFKA官方文档》第三章:快速入门(二)

简介: 第八步:使用Kafka流(Kafka Streams)处理数据 Kafka流是一个针对存储于Kafka brokers上的数据进行实时流处理和分析的客户端类库。快速入门中的示例将展示如何使用这个类库实现一个数据流处理应用。

第八步:使用Kafka流(Kafka Streams)处理数据

Kafka流是一个针对存储于Kafka brokers上的数据进行实时流处理和分析的客户端类库。快速入门中的示例将展示如何使用这个类库实现一个数据流处理应用。下面是其中的WordCountDemo数单词示例代码片段(转换成Java8的lambda表达式更便于阅读)。

“` // 字符串和长整型的序列化器与反序列化器(serde) final Serde stringSerde = Serdes.String(); final Serde longSerde = Serdes.Long();

// 从streams-file-input主题构造一个KStream,消息值代表了文本的每一行(这里我们忽略消息key中存储的数据) KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, “streams-file-input”);

KTable<String, Long> wordCounts = textLines // 按空格拆分每个文本行为多个单独的词 .flatMapValues(value -> Arrays.asList(value.toLowerCase().split(“\W+”)))

// 将单词分组后作为消息的key
.groupBy((key, value) -> value)

// 统计每个单词出现的次数(即消息的key)
.count("Counts")

// 将运行结果作为变更记录流发送到输出主题 wordCounts.to(stringSerde, longSerde, “streams-wordcount-output”); “`

上面的代码实现了数单词算法(WordCount algorithm),即计算了输入文本中每一个单词出现的次数。不同于我们常见的计算给定数量文本的数单词算法,这个示例被设计来操作一个无限的、不确定数据量的数据流。跟有界的情况类似,这是一个有状态的算法,会跟踪和更新单词的数目。此算法必须假定输入的数据是没有边界的,这样因为不知道什么时候处理完所有的数据,所以每当处理了新输入的数据时,上述代码会随时输出当前的状态和处理结果。

我们先准备发送到Kafka主题的输入数据,这些数据将被Kafka流程序依次处理。

>echo -e “all streams lead to kafka\nhello kafka streams\njoin kafka summit” > file-input.txt

在Windows上执行:

>echo all streams lead to kafka> file-input.txt

>echo hello kafka streams>> file-input.txt

>echo|set /p=join kafka summit>> file-input.txt

接着,我们控制台上使用生产者脚本将这些数据发送到一个叫streams-file-input的主题,此脚本会从标准输入(STDIN)一行行的读取数据,然后把每一行内容作为一个单独的、key为null、值为字符串格式的Kafka消息,发送到这个主题(在实际应用中,只要应用程序一直运行,数据就可以一直持续的流向Kafka):

>bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic streams-file-input

>bin/kafka-console-producer.sh –broker-list localhost:9092 –topic streams-file-input < file-input.txt

此时我们可以运行数单词示例程序来处理输入数据:

>bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

示例程序将从streams-file-input主题读入数据,然后用数单词算法计算每一个消息数据,持续将当前计算结果写入到streams-wordcount-output主题。到目前为止,我们在任何命令行的标准输出上看不到这些结果,因为这些结果数据都被写入到Kafka输出主题streams-wordcount-output了。这个示例程序也将会一直运行,不像常见的流处理程序会在处理完以后自动结束。

现在我们可以通过读取Kafka输出主题来查看数单词示例程序的输出:

>bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic streams-wordcount-output –from-beginning –formatter kafka.tools.DefaultMessageFormatter –property print.key=true –property print.value=true –property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer –property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

可以看到在控制台上输出如下:

all 1

lead 1

to 1

hello 1

streams 2

join 1

kafka 3

summit 1

第一列是java.lang.String类型的Kafka消息key,第二列是java.lang.Long类型的消息值。注意到输出的实际上是一个持续更新的流,每一个数据记录(上面输出的每一行)代表一个单词(比如kafka)每次更新的计数。对于同一个key,会有多个记录,每一个后面的记录代表之前记录的更新结果。

下面的两幅图展示了这个场景下到底发生了什么。图上的第一列展示了计算单词出现次数的KTable<String, Long>的当前状态演化。第二列展示了状态更新导致的KTable变化记录,也是被发送到Kafka输出主题streams-wordcount-output中的数据。

“all stream lead to kafka”这行文本先被处理。随着每一个新单词作为一个表格中的新项(绿色背景高亮显示)加入到KTable,KTable表格逐渐增长,同时相应的变化记录被发送到下面的KStream中。

当第二行文本“hello kafka streams”被处理后,我们可以看到与此同时在KTable中已经存在的项立即被更新(即kafak和streams)。同样的变化记录也被发送到输出主题。

第三行处理也类似,我们暂且略去。这就解释了为何输出主题中的内容如上所,因为它包含了全部的变化记录。

如果我们跳出这个具体的示例来看,Kafka流所做的事情就是表和变更记录流之间的相互作用(这里表指的是KTable,变更记录流指的是下面的KStream):表中的每一个变化记录会发送到流中,当然如果我们从头至尾的消费一个完整的变更记录流,则可以重建这个表的全部内容。

现在我们可以写入更多消息到stream-file-input主题,然后观察这些消息被添加到streams-wordcount-output主题,这些消息反映出了更新的单词计数(可以在控制台使用上面提及的生产者脚本和消费者脚本来操作)。

最后我们可以在控制台使用Ctrl-C快捷键来结束消费者。

 

 

相关文章
|
7月前
|
消息中间件 存储 安全
kafka快速入门1
kafka快速入门1
92 0
|
7月前
|
消息中间件 JSON Java
kafka快速入门2
kafka快速入门2
65 0
|
8月前
|
消息中间件 缓存 大数据
Kafka学习---1、Kafka 概述、Kafka快速入门
Kafka学习---1、Kafka 概述、Kafka快速入门
Kafka学习---1、Kafka 概述、Kafka快速入门
|
9月前
|
消息中间件 存储 传感器
macOS 系统 安装 Kafka 快速入门
macOS 系统 安装 Kafka 快速入门
150 0
|
消息中间件 存储 Java
Kafka快速入门
Kafka快速入门
|
消息中间件 存储 缓存
Kafka快速入门(Kafka Broker)节点服役和退役、手动调整副本(下)
Kafka快速入门(Kafka Broker)节点服役和退役、手动调整副本
Kafka快速入门(Kafka Broker)节点服役和退役、手动调整副本(下)
|
消息中间件 存储 缓存
Kafka快速入门(Kafka Broker)节点服役和退役、手动调整副本(上)
Kafka快速入门(Kafka Broker)节点服役和退役、手动调整副本
Kafka快速入门(Kafka Broker)节点服役和退役、手动调整副本(上)
|
消息中间件 存储 缓存
Kafka快速入门(生产者)同步异步发送、分区、消息精确一次发送、幂等性、事务
Kafka快速入门(生产者)同步异步发送、分区、消息精确一次发送、幂等性、事务
Kafka快速入门(生产者)同步异步发送、分区、消息精确一次发送、幂等性、事务
|
消息中间件 Kafka
Kafka快速入门(命令行操作)
Kafka快速入门(命令行操作)
Kafka快速入门(命令行操作)
|
消息中间件 Kafka
Kafka快速入门(安装集群)
Kafka快速入门(安装集群)

热门文章

最新文章