Apache Kafka源码分析 – Broker Server

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介:

1. Kafka.scala

在Kafka的main入口中startup KafkaServerStartable, 而KafkaServerStartable这是对KafkaServer的封装

   1: val kafkaServerStartble = new KafkaServerStartable(serverConfig)
   2: kafkaServerStartble.startup

 

   1: package kafka.server
   2: class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging {
   3:   private var server : KafkaServer = null
   4:  
   5:   private def init() {
   6:     server = new KafkaServer(serverConfig)
   7:   }
   8:  
   9:   def startup() {
  10:     try {
  11:       server.startup()
  12:     }
  13:     catch {...}
  14:   }
  15: }

2. KafkaServer

KafkaServer代表一个kafka broker, 这是kafka的核心. 
只需要看看里面startup了哪些modules, 就知道broker做了哪些工作, 后面一个个具体分析吧

   1: package kafka.server
   2: /**
   3:  * Represents the lifecycle of a single Kafka broker. Handles all functionality required
   4:  * to start up and shutdown a single Kafka node.
   5:  */
   6: class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logging {
   7:   var socketServer: SocketServer = null
   8:   var requestHandlerPool: KafkaRequestHandlerPool = null
   9:   var logManager: LogManager = null
  10:   var kafkaHealthcheck: KafkaHealthcheck = null
  11:   var topicConfigManager: TopicConfigManager = null
  12:   var replicaManager: ReplicaManager = null
  13:   var apis: KafkaApis = null
  14:   var kafkaController: KafkaController = null
  15:   val kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
  16:   var zkClient: ZkClient = null
  17:  
  18:   /**
  19:    * Start up API for bringing up a single instance of the Kafka server.
  20:    * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
  21:    */
  22:   def startup() {
  23:     /* start scheduler */
  24:     kafkaScheduler.startup()
  25:     
  26:     /* setup zookeeper */
  27:     zkClient = initZk()
  28:  
  29:     /* start log manager */
  30:     logManager = createLogManager(zkClient)
  31:     logManager.startup()
  32:  
  33:     socketServer = new SocketServer(config.brokerId,
  34:                                     config.hostName,
  35:                                     config.port,
  36:                                     config.numNetworkThreads,
  37:                                     config.queuedMaxRequests,
  38:                                     config.socketSendBufferBytes,
  39:                                     config.socketReceiveBufferBytes,
  40:                                     config.socketRequestMaxBytes)
  41:     socketServer.startup()
  42:  
  43:     replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown)
  44:     kafkaController = new KafkaController(config, zkClient)
  45:     
  46:     /* start processing requests */
  47:     apis = new KafkaApis(socketServer.requestChannel, replicaManager, zkClient, config.brokerId, config, kafkaController)
  48:     requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
  49:    
  50:     replicaManager.startup()
  51:  
  52:     kafkaController.startup()
  53:     
  54:     topicConfigManager = new TopicConfigManager(zkClient, logManager)
  55:     topicConfigManager.startup()
  56:     
  57:     /* tell everyone we are alive */
  58:     kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient)
  59:     kafkaHealthcheck.startup()
  60:   }

2.1 KafkaScheduler

KafkaSchduler用于在后台执行一些任务,用ScheduledThreadPoolExecutor实现

   1: package kafka.utils
   2:  
   3: /**
   4:  * A scheduler based on java.util.concurrent.ScheduledThreadPoolExecutor
   5:  * 
   6:  * It has a pool of kafka-scheduler- threads that do the actual work.
   7:  * 
   8:  * @param threads The number of threads in the thread pool
   9:  * @param threadNamePrefix The name to use for scheduler threads. This prefix will have a number appended to it.
  10:  * @param daemon If true the scheduler threads will be "daemon" threads and will not block jvm shutdown.
  11:  */
  12: @threadsafe
  13: class KafkaScheduler(val threads: Int, 
  14:                      val threadNamePrefix: String = "kafka-scheduler-", 
  15:                      daemon: Boolean = true) extends Scheduler with Logging {
  16:   @volatile private var executor: ScheduledThreadPoolExecutor = null   
  17:   override def startup() {
  18:     this synchronized {
  19:       executor = new ScheduledThreadPoolExecutor(threads) //创建ScheduledThreadPoolExecutor
  20:       executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false)
  21:       executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)
  22:       executor.setThreadFactory(new ThreadFactory() {
  23:                                   def newThread(runnable: Runnable): Thread = 
  24:                                     Utils.newThread(threadNamePrefix + schedulerThreadId.getAndIncrement(), runnable, daemon)
  25:                                 })
  26:     }
  27:   }
  28:  
  29: def schedule(name: String, fun: ()=>Unit, delay: Long, period: Long, unit: TimeUnit) = {
  30:   val runnable = new Runnable { //将fun封装成Runnable
  31:     def run() = {
  32:       try {
  33:         fun()
  34:       } catch {...} 
  35:       finally {...}
  36:     }
  37:   }
  38:   if(period >= 0) //在pool中进行delay schedule
  39:     executor.scheduleAtFixedRate(runnable, delay, period, unit)
  40:   else
  41:     executor.schedule(runnable, delay, unit)
  42: }

2.2 Zookeeper Client

由于Kafka是基于zookeeper进行配置管理的, 所以需要创建zkclient和zookeeper集群通信

2.3 logManager

The entry point to the kafka log management subsystem. The log manager is responsible for log creation, retrieval, and cleaning. 
Apache Kafka源码分析 – Log Management

 

2.4 ReplicaManager

在0.8中新加入的replica相关模块

Apache Kafka Replication Design – High level
kafka Detailed Replication Design V3
Apache Kafka源码分析 – ReplicaManager

 

2.5 Kafka Socket Server

首先broker server是socket server,所有和broker的交互都是通过往socket端口发送request来实现的

socketServer = new SocketServer(config.brokerId...)

KafkaApis
该类封装了所有request的处理逻辑

KafkaRequestHandler

 

2.6 offsetManager

offsetManager = createOffsetManager()
定期清除过期的offset数据,即compact操作,

scheduler.schedule(name = "offsets-cache-compactor",
                     fun = compact,
                     period = config.offsetsRetentionCheckIntervalMs,
                     unit = TimeUnit.MILLISECONDS)

以及consumer相关的一些offset操作,不细究了,因为我们不用highlevel consumer

 

2.7 KafkaController

kafkaController = new KafkaController(config, zkClient, brokerState)

Apache Kafka源码分析 – Controller

0.8后,为了处理replica,会用一个broker作为master,即controller,用于协调replica的一致性

2.8 TopicConfigManager

 
topicConfigManager = new TopicConfigManager(zkClient, logManager)

TopicConfigManager用于处理topic config的change,kafka除了全局的配置,还有一种叫Topic-level configuration

> bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic 
    --config max.message.bytes=128000

比如你可以这样设置,那么这些topic config如何生效的?

topic-level config默认是被存储在,

/brokers/topics/<topic_name>/config
但是topic很多的情况下,为了避免创建太多的watcher,

所以单独创建一个目录

/brokers/config_changes

来触发配置的变化
所以上面的命令除了,把配置写入topic/config,还有增加一个通知,告诉watcher哪个topic的config发生了变化

/brokers/config_changes/config_change_13321

并且这个通知有个suffix,用于区别是否已处理过

复制代码
/**
   * Process the given list of config changes
   */
  private def processConfigChanges(notifications: Seq[String]) {
    if (notifications.size > 0) {
      info("Processing config change notification(s)...")
      val now = time.milliseconds
      val logs = logManager.logsByTopicPartition.toBuffer
      val logsByTopic = logs.groupBy(_._1.topic).mapValues(_.map(_._2))
      for (notification <- notifications) {
        val changeId = changeNumber(notification)
        if (changeId > lastExecutedChange) {  //未处理过
          val changeZnode = ZkUtils.TopicConfigChangesPath + "/" + notification
          val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, changeZnode)
          if(jsonOpt.isDefined) {
            val json = jsonOpt.get
            val topic = json.substring(1, json.length - 1) // hacky way to dequote,从通知中获取topic name
            if (logsByTopic.contains(topic)) {
              /* combine the default properties with the overrides in zk to create the new LogConfig */
              val props = new Properties(logManager.defaultConfig.toProps)
              props.putAll(AdminUtils.fetchTopicConfig(zkClient, topic))
              val logConfig = LogConfig.fromProps(props)
              for (log <- logsByTopic(topic))
                log.config = logConfig    //真正的更新log配置
              info("Processed topic config change %d for topic %s, setting new config to %s.".format(changeId, topic, props))
              purgeObsoleteNotifications(now, notifications) //删除过期的notification,10分钟
            }
          }
          lastExecutedChange = changeId
        }
      }
    }
  }
复制代码
这个failover也没问题,反正配置设置多次也是无害的,每次启动都会把所有没过期的notification处理一遍

并且broker重启后是会从zk中, loading完整的配置的,所以也ok的,这个主要用于实时更新topic的配置

 

2.8 KafkaHealthcheck

kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient)

这个很简单,就像注释的,告诉所有人我还活着。。。

实现就是在,

 /brokers/[0...N] --> advertisedHost:advertisedPort

register一个ephemeral znode,当SessionExpired时,再去register,典型zk应用
所以只需要watch这个路径就是知道broker是否还活着

2.9 ContolledShutdown

对于0.8之前,broker的startup和shutdown都很简单,把上面这些组件初始化,或stop就可以了

但是0.8后,增加replica,所以broker不能自己直接shutdown,需要先通知controller,controller做完处理后,比如partition leader的迁移,或replica offline,然后才能shutdown

private def controlledShutdown()

挺长的,逻辑就是找到controller,发送ControlledShutdownRequest,然后等待返回,如果失败,就是unclean shutdown


本文章摘自博客园,原文发布日期: 2014-02-14

目录
相关文章
|
2月前
|
消息中间件 存储 大数据
Apache Kafka: 强大消息队列系统的介绍与使用
Apache Kafka: 强大消息队列系统的介绍与使用
|
3天前
|
消息中间件 Cloud Native Kafka
一文搞懂 Kafka consumer 与 broker 交互机制与原理
AutoMQ致力于打造下一代云原生Kafka系统,解决Kafka痛点。本文深入解析Kafka Consumer与Broker的交互机制,涉及消费者角色、核心组件及常用接口。消费者以group形式工作,包括leader和follower。交互流程涵盖FindCoordinator、JoinGroup、SyncGroup、拉取消息和退出过程。文章还探讨了broker的consumer group状态管理和rebalance原理。AutoMQ团队分享Kafka技术,感兴趣的话可以关注他们。
30 2
一文搞懂 Kafka consumer 与 broker 交互机制与原理
|
22天前
|
消息中间件 存储 Java
深度探索:使用Apache Kafka构建高效Java消息队列处理系统
【4月更文挑战第17天】本文介绍了在Java环境下使用Apache Kafka进行消息队列处理的方法。Kafka是一个分布式流处理平台,采用发布/订阅模型,支持高效的消息生产和消费。文章详细讲解了Kafka的核心概念,包括主题、生产者和消费者,以及消息的存储和消费流程。此外,还展示了Java代码示例,说明如何创建生产者和消费者。最后,讨论了在高并发场景下的优化策略,如分区、消息压缩和批处理。通过理解和应用这些策略,可以构建高性能的消息系统。
|
24天前
|
消息中间件 负载均衡 监控
【Kafka】Kafka 创建Topic后如何将分区放置到不同的 Broker 中?
【4月更文挑战第13天】【Kafka】Kafka 创建Topic后如何将分区放置到不同的 Broker 中?
|
27天前
|
消息中间件 Kafka 网络安全
Kafka. Broker not available
Kafka. Broker not available
19 0
|
2月前
|
消息中间件 监控 Java
✈️【Kafka技术专题】「核心原理篇」深入实战探索Kafka的Broker的原理及可靠性机制分析
✈️【Kafka技术专题】「核心原理篇」深入实战探索Kafka的Broker的原理及可靠性机制分析
42 0
|
4月前
|
消息中间件 存储 缓存
Kafka - 3.x 图解Broker总体工作流程
Kafka - 3.x 图解Broker总体工作流程
77 0
|
4月前
|
消息中间件 Java Kafka
Apache Kafka-初体验Kafka(04)-Java客户端操作Kafka
Apache Kafka-初体验Kafka(04)-Java客户端操作Kafka
32 0
|
2月前
|
消息中间件 API Apache
官宣|阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会
本文整理自阿里云开源大数据平台徐榜江 (雪尽),关于阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会。
1618 2
官宣|阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会
|
2月前
|
SQL Java API
官宣|Apache Flink 1.19 发布公告
Apache Flink PMC(项目管理委员)很高兴地宣布发布 Apache Flink 1.19.0。
1623 2
官宣|Apache Flink 1.19 发布公告

热门文章

最新文章

推荐镜像

更多