Kafka深度解析(下)

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 测试环境  该benchmark用到了六台机器,机器配置如下Intel Xeon 2.5 GHz processor with six coresSix 7200 RPM SATA drives32GB of RAM1Gb Ethernet这6台机器其中3台用来搭建Kafka broker集群,另外3台用来安装Zookeeper及生成测试数据。

测试环境

  该benchmark用到了六台机器,机器配置如下

Intel Xeon 2.5 GHz processor with six cores

Six 7200 RPM SATA drives

32GB of RAM

1Gb Ethernet

这6台机器其中3台用来搭建Kafka broker集群,另外3台用来安装Zookeeper及生成测试数据。6个drive都直接以非RAID方式挂载。实际上kafka对机器的需求与Hadoop的类似。

Producer吞吐率

  该项测试只测producer的吞吐率,也就是数据只被持久化,没有consumer读数据。

1个producer线程,无replication

在这一测试中,创建了一个包含6个partition且没有replication的topic。然后通过一个线程尽可能快的生成50 million条比较短(payload100字节长)的消息。测试结果是821,557 records/second78.3MB/second)。

之所以使用短消息,是因为对于消息系统来说这种使用场景更难。因为如果使用MB/second来表征吞吐率,那发送长消息无疑能使得测试结果更好。

整个测试中,都是用每秒钟delivery的消息的数量乘以payload的长度来计算MB/second的,没有把消息的元信息算在内,所以实际的网络使用量会比这个大。对于本测试来说,每次还需传输额外的22个字节,包括一个可选的key,消息长度描述,CRC等。另外,还包含一些请求相关的overhead,比如topic,partition,acknowledgement等。这就导致我们比较难判断是否已经达到网卡极限,但是把这些overhead都算在吞吐率里面应该更合理一些。因此,我们已经基本达到了网卡的极限。

初步观察此结果会认为它比人们所预期的要高很多,尤其当考虑到Kafka要把数据持久化到磁盘当中。实际上,如果使用随机访问数据系统,比如RDBMS,或者key-velue store,可预期的最高访问频率大概是5000到50000个请求每秒,这和一个好的RPC层所能接受的远程请求量差不多。而该测试中远超于此的原因有两个。

Kafka确保写磁盘的过程是线性磁盘I/O,测试中使用的6块廉价磁盘线性I/O的最大吞吐量是822MB/second,这已经远大于1Gb网卡所能带来的吞吐量了。许多消息系统把数据持久化到磁盘当成是一个开销很大的事情,这是因为他们对磁盘的操作都不是线性I/O。

在每一个阶段,Kafka都尽量使用批量处理。如果想了解批处理在I/O操作中的重要性,可以参考David Patterson的”Latency Lags Bandwidth

1个producer线程,3个异步replication

该项测试与上一测试基本一样,唯一的区别是每个partition有3个replica(所以网络传输的和写入磁盘的总的数据量增加了3倍)。每一个broker即要写作为leader的partition,也要读(从leader读数据)写(将数据写到磁盘)作为follower的partition。测试结果为786,980 records/second75.1MB/second)。

该项测试中replication是异步的,也就是说broker收到数据并写入本地磁盘后就acknowledge producer,而不必等所有replica都完成replication。也就是说,如果leader crash了,可能会丢掉一些最新的还未备份的数据。但这也会让message acknowledgement延迟更少,实时性更好。

这项测试说明,replication可以很快。整个集群的写能力可能会由于3倍的replication而只有原来的三分之一,但是对于每一个producer来说吞吐率依然足够好。

1个producer线程,3个同步replication

该项测试与上一测试的唯一区别是replication是同步的,每条消息只有在被in sync集合里的所有replica都复制过去后才会被置为committed(此时broker会向producer发送acknowledgement)。在这种模式下,Kafka可以保证即使leader crash了,也不会有数据丢失。测试结果为421,823 records/second40.2MB/second)。

Kafka同步复制与异步复制并没有本质的不同。leader会始终track follower replica从而监控它们是否还alive,只有所有in sync集合里的replica都acknowledge的消息才可能被consumer所消费。而对follower的等待影响了吞吐率。可以通过增大batch size来改善这种情况,但为了避免特定的优化而影响测试结果的可比性,本次测试并没有做这种调整。

3个producer,3个异步replication

该测试相当于把上文中的1个producer,复制到了3台不同的机器上(在1台机器上跑多个实例对吞吐率的增加不会有太大帮忙,因为网卡已经基本饱和了),这3个producer同时发送数据。整个集群的吞吐率为2,024,032 records/second193,0MB/second)。

Producer Throughput Vs. Stored Data

消息系统的一个潜在的危险是当数据能都存于内存时性能很好,但当数据量太大无法完全存于内存中时(然后很多消息系统都会删除已经被消费的数据,但当消费速度比生产速度慢时,仍会造成数据的堆积),数据会被转移到磁盘,从而使得吞吐率下降,这又反过来造成系统无法及时接收数据。这样就非常糟糕,而实际上很多情景下使用queue的目的就是解决数据消费速度和生产速度不一致的问题。

但Kafka不存在这一问题,因为Kafka始终以O(1)的时间复杂度将数据持久化到磁盘,所以其吞吐率不受磁盘上所存储的数据量的影响。为了验证这一特性,做了一个长时间的大数据量的测试,下图是吞吐率与数据量大小的关系图。

img_001f655d31888717d47478cb03cf0383.png

上图中有一些variance的存在,并可以明显看到,吞吐率并不受磁盘上所存数据量大小的影响。实际上从上图可以看到,当磁盘数据量达到1TB时,吞吐率和磁盘数据只有几百MB时没有明显区别。

这个variance是由Linux I/O管理造成的,它会把数据缓存起来再批量flush。上图的测试结果是在生产环境中对Kafka集群做了些tuning后得到的,这些tuning方法可参考这里

consumer吞吐率

  需要注意的是,replication factor并不会影响consumer的吞吐率测试,因为consumer只会从每个partition的leader读数据,而与replicaiton factor无关。同样,consumer吞吐率也与同步复制还是异步复制无关。

1个consumer

该测试从有6个partition,3个replication的topic消费50 million的消息。测试结果为940,521 records/second89.7MB/second)。

可以看到,Kafkar的consumer是非常高效的。它直接从broker的文件系统里读取文件块。Kafka使用sendfile API来直接通过操作系统直接传输,而不用把数据拷贝到用户空间。该项测试实际上从log的起始处开始读数据,所以它做了真实的I/O。在生产环境下,consumer可以直接读取producer刚刚写下的数据(它可能还在缓存中)。实际上,如果在生产环境下跑I/O stat,你可以看到基本上没有物理“读”。也就是说生产环境下consumer的吞吐率会比该项测试中的要高。

3个consumer

将上面的consumer复制到3台不同的机器上,并且并行运行它们(从同一个topic上消费数据)。测试结果为2,615,968 records/second249.5MB/second)。

正如所预期的那样,consumer的吞吐率几乎线性增涨。

Producer and Consumer

上面的测试只是把producer和consumer分开测试,而该项测试同时运行producer和consumer,这更接近使用场景。实际上目前的replication系统中follower就相当于consumer在工作。

该项测试,在具有6个partition和3个replica的topic上同时使用1个producer和1个consumer,并且使用异步复制。测试结果为795,064 records/second75.8MB/second)。

可以看到,该项测试结果与单独测试1个producer时的结果几乎一致。所以说consumer非常轻量级。

消息长度对吞吐率的影响

上面的所有测试都基于短消息(payload 100字节),而正如上文所说,短消息对Kafka来说是更难处理的使用方式,可以预期,随着消息长度的增大,records/second会减小,但MB/second会有所提高。下图是records/second与消息长度的关系图。

img_3816994955dfa080f70b13d801e5c4da.png

正如我们所预期的那样,随着消息长度的增加,每秒钟所能发送的消息的数量逐渐减小。但是如果看每秒钟发送的消息的总大小,它会随着消息长度的增加而增加,如下图所示。

img_5f63816732e110925f04d3e1075b7d86.png

从上图可以看出,当消息长度为10字节时,因为要频繁入队,花了太多时间获取锁,CPU成了瓶颈,并不能充分利用带宽。但从100字节开始,我们可以看到带宽的使用逐渐趋于饱和(虽然MB/second还是会随着消息长度的增加而增加,但增加的幅度也越来越小)。

端到端的Latency

上文中讨论了吞吐率,那消息传输的latency如何呢?也就是说消息从producer到consumer需要多少时间呢?该项测试创建1个producer和1个consumer并反复计时。结果是,2 ms (median), 3ms (99th percentile, 14ms (99.9th percentile)

(这里并没有说明topic有多少个partition,也没有说明有多少个replica,replication是同步还是异步。实际上这会极大影响producer发送的消息被commit的latency,而只有committed的消息才能被consumer所消费,所以它会最终影响端到端的latency)

重现该benchmark

如果读者想要在自己的机器上重现本次benchmark测试,可以参考本次测试的配置和所使用的命令

实际上Kafka Distribution提供了producer性能测试工具,可通过bin/kafka-producer-perf-test.sh脚本来启动。所使用的命令如下

Producer

Setup

bin/kafka-topics.sh --zookeeper esv4-hcl197.grid.linkedin.com:2181 --create --topictest-rep-one --partitions 6 --replication-factor 1

bin/kafka-topics.sh --zookeeper esv4-hcl197.grid.linkedin.com:2181 --create --topictest--partitions 6 --replication-factor 3

Single thread, no replication

bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformancetest7 50000000 100 -1 acks=1 bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=8196

Single-thread, async 3x replication

bin/kafktopics.sh --zookeeper esv4-hcl197.grid.linkedin.com:2181 --create --topictest--partitions 6 --replication-factor 3

bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformancetest6 50000000 100 -1 acks=1 bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=8196

Single-thread, sync 3x replication

bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformancetest50000000 100 -1 acks=-1 bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=64000

Three Producers, 3x async replication

bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformancetest50000000 100 -1 acks=1 bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=8196

Throughput Versus Stored Data

bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformancetest50000000000 100 -1 acks=1 bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=8196

Effect of message size

foriin10 100 1000 10000 100000;

do

echo""

echo$i

bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformancetest$((1000*1024*1024/$i))$i-1 acks=1 bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=128000

done;

Consumer

Consumer throughput

bin/kafka-consumer-perf-test.sh --zookeeper esv4-hcl197.grid.linkedin.com:2181 --messages 50000000 --topictest--threads 1

3 Consumers

On three servers, run:

bin/kafka-consumer-perf-test.sh --zookeeper esv4-hcl197.grid.linkedin.com:2181 --messages 50000000 --topictest--threads 1

End-to-end Latency

bin/kafka-run-class.sh kafka.tools.TestEndToEndLatency esv4-hcl198.grid.linkedin.com:9092 esv4-hcl197.grid.linkedin.com:2181test5000

Producer and consumer

bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformancetest50000000 100 -1 acks=1 bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=8196

bin/kafka-consumer-perf-test.sh --zookeeper esv4-hcl197.grid.linkedin.com:2181 --messages 50000000 --topictest--threads 1

broker配置如下

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.

broker.id=0

############################# Socket Server Settings #############################

# The port the socket server listens on

port=9092

# Hostname the broker will bind to and advertise to producers and consumers.

# If not set, the server will bind to all interfaces and advertise the value returned from

# from java.net.InetAddress.getCanonicalHostName().

#host.name=localhost

# The number of threads handling network requests

num.network.threads=4

# The number of threads doing disk I/O

num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server

socket.send.buffer.bytes=1048576

# The receive buffer (SO_RCVBUF) used by the socket server

socket.receive.buffer.bytes=1048576

# The maximum size of a request that the socket server will accept (protection against OOM)

socket.request.max.bytes=104857600

############################# Log Basics #############################

# The directory under which to store log files

log.dirs=/grid/a/dfs-data/kafka-logs,/grid/b/dfs-data/kafka-logs,/grid/c/dfs-data/kafka-logs,/grid/d/dfs-data/kafka-logs,/grid/e/dfs-data/kafka-logs,/grid/f/dfs-data/kafka-logs

# The number of logical partitions per topic per server. More partitions allow greater parallelism

# for consumption, but also mean more files.

num.partitions=8

############################# Log Flush Policy #############################

# The following configurations control the flush of data to disk. This is the most

# important performance knob in kafka.

# There are a few important trade-offs here:

#    1. Durability: Unflushed data is at greater risk of loss in the event of a crash.

#    2. Latency: Data is not made available to consumers until it is flushed (which adds latency).

#    3. Throughput: The flush is generally the most expensive operation.

# The settings below allow one to configure the flush policy to flush data after a period of time or

# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# Per-topic overrides for log.flush.interval.ms

#log.flush.intervals.ms.per.topic=topic1:1000, topic2:3000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can

# be set to delete segments after a period of time, or after a given size has accumulated.

# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens

# from the end of the log.

# The minimum age of a log file to be eligible for deletion

log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining

# segments don't drop below log.retention.bytes.

#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.

log.segment.bytes=536870912

# The interval at which log segments are checked to see if they can be deleted according

# to the retention policies

log.cleanup.interval.mins=1

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).

# This is a comma separated host:port pairs, each corresponding to a zk

# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".

# You can also append an optional chroot string to the urls to specify the

# root directory for all kafka znodes.

zookeeper.connect=esv4-hcl197.grid.linkedin.com:2181

# Timeout in ms for connecting to zookeeper

zookeeper.connection.timeout.ms=1000000

# metrics reporter properties

kafka.metrics.polling.interval.secs=5

kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter

kafka.csv.metrics.dir=/tmp/kafka_metrics

# Disable csv reporting by default.

kafka.csv.metrics.reporter.enabled=false

replica.lag.max.messages=10000000

转载请务必将下面这段话置于文章开头处(保留超链接)。

本文转发自技术世界原文链接http://www.jasongj.com/2015/01/02/Kafka深度解析

相关文章
|
8天前
|
消息中间件 存储 算法
深度解析Kafka中的消息奥秘
深度解析Kafka中的消息奥秘
31 0
|
3月前
|
消息中间件 缓存 Kafka
探究Kafka原理-5.Kafka设计原理和生产者原理解析(下)
探究Kafka原理-5.Kafka设计原理和生产者原理解析
38 0
|
3月前
|
消息中间件 存储 负载均衡
探究Kafka原理-5.Kafka设计原理和生产者原理解析(上)
探究Kafka原理-5.Kafka设计原理和生产者原理解析
63 0
|
3月前
|
消息中间件 分布式计算 Java
探究Kafka原理-3.生产者消费者API原理解析(上)
探究Kafka原理-3.生产者消费者API原理解析
34 0
|
3月前
|
消息中间件 供应链 Java
Kafka的发布-订阅功能: Java实现与应用场景解析
Kafka的发布-订阅功能: Java实现与应用场景解析
64 0
|
4月前
|
消息中间件 缓存 Kafka
kafka源码解析——第一篇:producer
kafka源码解析——第一篇:producer
42 0
|
4月前
|
消息中间件 运维 Kafka
深度解析 Kafka 消息保证机制
Kafka作为分布式流处理平台的重要组成部分,其消息保证机制是保障数据可靠性、一致性和顺序性的核心。在本文中,将深入探讨Kafka的消息保证机制,并通过丰富的示例代码展示其在实际应用中的强大功能。
|
2月前
|
消息中间件 安全 Kafka
2024年了,如何更好的搭建Kafka集群?
我们基于Kraft模式和Docker Compose同时采用最新版Kafka v3.6.1来搭建集群。
408 2
2024年了,如何更好的搭建Kafka集群?
|
3月前
|
消息中间件 存储 数据可视化
kafka高可用集群搭建
kafka高可用集群搭建
42 0
|
6月前
|
消息中间件 存储 Kubernetes
Helm方式部署 zookeeper+kafka 集群 ——2023.05
Helm方式部署 zookeeper+kafka 集群 ——2023.05
239 0

热门文章

最新文章

推荐镜像

更多