Spring for Apache Kafka实战

  1. 云栖社区>
  2. 博客>
  3. 正文

Spring for Apache Kafka实战

谢一鸣 2018-07-26 16:00:28 浏览2072 评论0

摘要: 背景介绍 Kafka是一个分布式的、可分区的、可复制的消息系统,在现在的互联网公司,应用广泛,在我们公司在主要运用在定时推送业务,批量数据处理,日志上传等方面,我发现网上大部分博客,在使用上还只是对Apache 官方提供的client,进行运用开发,在这里推荐使用 Spring for Apache Kafka(简称spring-kafka) ,更新维护稳定,方法众多,并且强大,现已加入Spring豪华大礼包。

背景介绍

Kafka是一个分布式的、可分区的、可复制的消息系统,在现在的互联网公司,应用广泛,在我们公司在主要运用在定时推送业务,批量数据处理,日志上传等方面,我发现网上大部分博客,在使用上还只是对Apache 官方提供的client,进行运用开发,在这里推荐使用 Spring for Apache Kafka(简称spring-kafka) ,更新维护稳定,方法众多,并且强大,现已加入Spring豪华大礼包。

大坑预警

spring-kafka实际上也是对apache的kafka-client进行了包装和开发,所以使用的时候一定注意,你引入的spring-kafka里封装的kafka-client的版本要和服务器上的kafka服务器版本要对应,不然就会产生问题,比如消费失败。官网上在首页就贴出了SpringKafka和kafka-client版本(它的版本号要和kafka服务器的版本保持一致)的对应关系Spring for Apache Kafka

avatar

生产者的使用

spring-kafka对于生产者提供了两个模板类,分别是

  • KafkaTemplate :包装了一个生产者,并提供方便的方法将数据发送到kafka的topics。
  • ReplyingKafkaTemplate :KafkaTemplate的子类,增加了一个请求/应答功能,在发送数据后会返回一个future ,里面封装了消费者的应答信息。
    这里主要介绍KafkaTemplate。对于使用,官方提供了这样一个案例

To use the template, configure a producer factory and provide it in the template’s constructor:

@Bean
public ProducerFactory<Integer, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    // See https://kafka.apache.org/documentation/#producerconfigs for more properties
    return props;
}

@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
    return new KafkaTemplate<Integer, String>(producerFactory());
}

—— Spring for Apache Kafka

官方的文档只是给出了,如何配置和通过工厂构造出一个模板。而我们需要的是一个完整的能在项目中使用的案例。在这里我给出一个与Spring boot集成,完整的生产者案例。

首先构造一个模板类

@Configuration
public class TestTemplateConfig{

    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        // kafka.metadata.broker.list=10.16.0.214:9092,10.16.0.215:9092,10.16.0.216:9092
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.ACKS_CONFIG,"-1");
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);
        props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,5048576);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    /** 获取工厂 */
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    /** 注册实例 */
    @Bean(name="testTemplate")
    public KafkaTemplate<String, String> testTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

运用@Configuration注解定义配置类,在内部通过spring-kafka提供的KafkaProducerFactory,来构造出KafkaTemplate,并在方法上通过@Bean注解将其注册,这样在初始化Spring容器时,就会将KafkaTemplate注入到容器中进行实例化。

发送消息

    @Resource
    private KafkaTemplate testTemplate;

   //同步发送
   public void syncSend(){
    testTemplate.send("topic",result.toString()).get(10, TimeUnit.SECONDS);


   }
    
  //异步发送
   public void asyncSend() {

      ListenableFuture<SendResult<Integer, String>> future = testTemplate.send("topic",result.toString());

      future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {

                    @Override
                    public void onSuccess(SendResult<Integer, String> result) {
                        System.out.println("success");
                    }

                    @Override
                    public void onFailure(Throwable ex) {
                        System.out.println("failure");

                    }

                });
}
   
        

消费者

对于消费者的使用,spring-kafka提供了,两种方式

Messages can be received by configuring a MessageListenerContainer and providing a Message Listener, or by using the @KafkaListener annotation.。 —— Spring for Apache Kafka

也就是通过配置MessageListenerContainer,可以提供消息侦听器或使用@KafkaListener注释来接收消息。
而这个官方的消息侦听器,其实就是接口,官方提供了8个接口来满足不同的应用场景,这里我们使用最常用的MessageListener接口

public class KafkaConsumerSerivceImpl implements MessageListener<String, String> {
    @Override
    public void onMessage(ConsumerRecord<String, String> data) {
        //根据不同的主题进行消费
        if("主题1".equals(data.topic())){
            
        }else if("主题2".equals(data.topic())){
           
        }
    }
}

另一种通过@KafkaListener注解极为方便

@Component
public class KafkaConsumer {

    @KafkaListener(topics = {"testTopic"})
    public void receive(String message){
        System.out.println("消费消息:" + message);
    }
}

无论是哪种方法,都需要创建一个MessageListenerContainer,对于这个容器的构造官方给的案例比较详细。

@Configuration
@EnableKafka
public class KafkaConfig {

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
                        kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
        ...
        return props;
    }
}

构造过程
avatar

Notice that to set container properties, you must use the getContainerProperties() method on the factory. It is used as a template for the actual properties injected into the container. —— Spring for Apache Kafka

说明,要设置监听容器的属性,比如轮询时间,只能是通过getContainerProperties().set()方法,来设置。

关键配置

生产者配置

bootstrap.servers:brokers集群地址
acks:

  • -1 :producer在所有follower副本确认接收到数据后才算一次发送完成
  • 0 :producer不等待来自broker同步完成的确认继续发送下一条
  • 1 :producer在leader已成功收到的数据并得到确认后发送下一条

retries:发送失败重试次数
batch.size:当多个记录被发送到同一个分区时,生产者会尝试将记录合并到更少的请求中
linger.ms:批处理延迟时间上限
buffer.memory:批处理缓冲区
max.request.size:发送数据最大大小

消费者配置

concurrency:消费监听器容器并发数
session.timeout.ms:检测消费者故障的超时
auto.offset.reset:

  • earliest :当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
  • latest :当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
  • none :topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

enable.auto.commit:

  • true : 自动提交位移
  • false :关闭自动提交位移, 在消息被完整处理之后再手动提交位移
    auto.commit.interval.ms:若enable.auto.commit=true,这里设置自动提交周期

备注:

对数据可靠性较高的场景建议 offset 手动提交并将acks 设置为 "all" 即所有副本都同步到数据时send方法才返回, 以此来完全判断数据是否发送成功, 理论上来讲数据不会丢失。

【云栖快讯】一站式开发者服务,海量学习资源免费学  详情请点击

网友评论