Kafka 消息队列 Java版

简介: 消费者 apache kafka工具类,消费者Consumer类 public class Consumer { private ConsumerHandler handler; private ConsumerConfig config; private KafkaConsu...

消费者

apache kafka工具类,消费者Consumer类

public class Consumer {

private ConsumerHandler handler;

private ConsumerConfig config;

private KafkaConsumer<String, String> consumer;

private boolean startFlag = false;

/**
 * 创建消费者
 * 
 * @param handler
 *            消费者处理类
 * @param config
 *            消费者处理配置
 */
public Consumer(ConsumerHandler handler, ConsumerConfig config) {
    this.handler = handler;
    this.config = config;
    init();
}

/**
 * 初始化接收器
 */
private void init() {
    Properties props = new Properties();
    props.put("bootstrap.servers", config.getBootstrapServers());// 服务器ip:端口号,集群用逗号分隔
    props.put("group.id", config.getGroupID());
    /* 是否自动确认offset */
    props.put("enable.auto.commit", "true");
    /* 自动确认offset的时间间隔 */
    props.put("auto.commit.interval.ms", config.getAutoCommitInterVal());
    props.put("session.timeout.ms", config.getSessionTimeOut());
    /* key的序列化类 */
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    /* value的序列化类 */
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    consumer = new KafkaConsumer<>(props);
    if (config.isProcessBeforeData()) {
        /* 消费者订阅的topic, 可同时订阅多个 */
        consumer.subscribe(config.getTopicList(), new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {

            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                for (TopicPartition partition : partitions) {
                    long offset = handler.getSeek(partition.topic(), partition.partition());
                    if (offset >= 0) {
                        if (consumer != null) {
                            consumer.seek(partition, offset + 1);
                        }
                    } else {
                        consumer.seekToBeginning(partitions);
                    }

                }
            }
        });
        start();
    } else {
        consumer.subscribe(config.getTopicList());
    }
}

public void start() {

    startFlag = true;

    while (startFlag) {
        /* 读取数据,读取超时时间为XXms */
        ConsumerRecords<String, String> records = consumer.poll(config.getPollTime());

        if (records.count() > 0) {
            long offset = 0;
            int partition = 0;
            for (ConsumerRecord<String, String> record : records) {
                if (record != null) {
                    offset = record.offset();
                    partition = record.partition();
                    try {
                        handler.processObject(record.topic(), record.partition(), record.offset(), record.value());
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        }

        try {
            Thread.currentThread();
            Thread.sleep(1000);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    consumer.close();
}

public void stop() {
    startFlag = false;
}

}

消费者配置ConsumerConfig类

public class ConsumerConfig {

private String bootstrapServers;
private String groupID;
private int autoCommitInterVal =1000;
private int sessionTimeOut = 30000;
private List<String> topicList;
private boolean processBeforeData;
private long pollTime = 100;


public ConsumerConfig() {
    super();
}

/**
 * 创建消费者配置
 * @param bootstrapServers     服务器配合 格式为服务器ip:端口号,集群用逗号分隔 例如 192.168.1.1:9092,192.168.1.2:9092
 * @param groupID               groupID
 * @param autoCommitInterVal  自动提交时间单位毫秒, 默认1000
 * @param sessionTimeOut       超时时间单位毫秒 , 默认30000
 * @param topicList             topicList列表
 * @param processBeforeData   是否处理启动之前的数据,该开关需要配置consumerHandler的跨步存储使用
 * @param pollTime              每次获取数据等待时间单位毫秒,默认100毫秒
 */
public ConsumerConfig(String bootstrapServers, String groupID, int autoCommitInterVal, int sessionTimeOut
        ,List<String> topicList,boolean processBeforeData,long pollTime) {
    this.bootstrapServers = bootstrapServers;
    this.groupID = groupID;
    this.autoCommitInterVal = autoCommitInterVal;
    this.sessionTimeOut = sessionTimeOut;
    this.topicList = topicList;
    this.processBeforeData = processBeforeData;
    this.pollTime = pollTime;
}

public String getBootstrapServers() {
    return bootstrapServers;
}

public void setBootstrapServers(String bootstrapServers) {
    this.bootstrapServers = bootstrapServers;
}

public String getGroupID() {
    return groupID;
}

public void setGroupID(String groupID) {
    this.groupID = groupID;
}

public int getAutoCommitInterVal() {
    return autoCommitInterVal;
}

public void setAutoCommitInterVal(int autoCommitInterVal) {
    this.autoCommitInterVal = autoCommitInterVal;
}

public int getSessionTimeOut() {
    return sessionTimeOut;
}

public void setSessionTimeOut(int sessionTimeOut) {
    this.sessionTimeOut = sessionTimeOut;
}

public List<String> getTopicList() {
    return topicList;
}

public void setTopicList(List<String> topicList) {
    this.topicList = topicList;
}
public boolean isProcessBeforeData() {
    return processBeforeData;
}

public void setProcessBeforeData(boolean processBeforeData) {
    this.processBeforeData = processBeforeData;
}

public long getPollTime() {
    return pollTime;
}

public void setPollTime(long pollTime) {
    this.pollTime = pollTime;
}
}

消费者处理ConsumerHandler类

public interface ConsumerHandler {
/**
 * 处理收到的消息
 * @param topic             收到消息的topic名称
 * @param partition         收到消息的partition内容
 * @param offset            收到消息在队列中的编号
 * @param value             收到的消息
 */
void processObject(String topic,int partition,long offset,String value);

/**
 * 获取跨步
 * @param topic             接受消息的topic
 * @param partition         接受消息的partition
 * @return                  当前topic,partition下的seek
 */
long getSeek(String topic , int partition);
}

生产者

kafka生产者,工具Producer类

public class Producer {

private ProducerConfig  config ;
private org.apache.kafka.clients.producer.Producer<String,String> producer;

public Producer(ProducerConfig config){
    this.config = config;
    init();
}

private void init(){
    Properties props = new Properties();
    props.put("bootstrap.servers",config.getBootstrapServers());
    props.put("acks", "all");
    props.put("retries", config.getRetries());
    props.put("batch.size", config.getBatchSize());
    props.put("linger.ms", config.getLingerMs());
    props.put("buffer.memory", config.getBufferMemory());
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    producer = new KafkaProducer<>(props);
}
/**
 * 发送消息
 * @param topic             要发送的topic
 * @param msg
 */
public void sendMessage(String topic,String msg){
    try {
        producer.send(new ProducerRecord<String, String>(config.getTopic(), String.valueOf(new Date().getTime()), msg)).get();
    } catch (InterruptedException e) {
        // TODO 自动生成的 catch 块
        e.printStackTrace();
    } catch (ExecutionException e) {
        // TODO 自动生成的 catch 块
        e.printStackTrace();
    }
    producer.flush();
}

public void close(){
    producer.close();
}

}

kafka生产者配置ProducerConfig类

public class ProducerConfig {

private String bootstrapServers;
private String topic;
private int retries = 0;
private int batchSize = 16384;
private int lingerMs=1;
private int bufferMemory=33554432;

public ProducerConfig() {
    super();
}

/**
 * 创建生产者配置文件
 * @param bootstrapServers          服务器配合 格式为服务器ip:端口号,集群用逗号分隔 例如 192.168.1.1:9092,192.168.1.2:9092
 * @param retries
 * @param batchSize
 * @param lingerMs
 * @param bufferMemory
 */
public ProducerConfig(String bootstrapServers,int retries, int batchSize, int lingerMs, int bufferMemory) {
    this.bootstrapServers = bootstrapServers;
    this.retries = retries;
    this.batchSize = batchSize;
    this.lingerMs = lingerMs;
    this.bufferMemory = bufferMemory;
}

public String getBootstrapServers() {
    return bootstrapServers;
}

public void setBootstrapServers(String bootstrapServers) {
    this.bootstrapServers = bootstrapServers;
}

public String getTopic() {
    return topic;
}

public void setTopic(String topic) {
    this.topic = topic;
}

public int getRetries() {
    return retries;
}

public void setRetries(int retries) {
    this.retries = retries;
}

public int getBatchSize() {
    return batchSize;
}

public void setBatchSize(int batchSize) {
    this.batchSize = batchSize;
}

public int getLingerMs() {
    return lingerMs;
}

public void setLingerMs(int lingerMs) {
    this.lingerMs = lingerMs;
}

public int getBufferMemory() {
    return bufferMemory;
}

public void setBufferMemory(int bufferMemory) {
    this.bufferMemory = bufferMemory;
}
}

测试

消费者处理实现ConsumerHandlerImpl类

public class ConsumerHandlerImpl implements ConsumerHandler{
/**
 * 处理收到的消息
 * @param topic             收到消息的topic名称
 * @param partition         收到消息的partition内容
 * @param offset            收到消息在队列中的编号
 * @param value             收到的消息
 */
public void processObject(String topic,int partition,long offset,String value) {
    System.out.println(topic+"从kafka接收"+partition+"到"+offset+"的消息是:"+value);
}

/**
 * 获取跨步
 * @param topic             接受消息的topic
 * @param partition         接受消息的partition
 * @return                  当前topic,partition下的seek
 */
public long getSeek(String topic , int partition) {
    return 1;
    
}
}

main方法类

public class AppResourceTest{  
public static void main(String[] args){  
      BeanDefinitionRegistry reg=new DefaultListableBeanFactory();  
      PropertiesBeanDefinitionReader reader=new PropertiesBeanDefinitionReader(reg);  
      reader.loadBeanDefinitions(new ClassPathResource("resources/kafka-consumer.properties")); 
      reader.loadBeanDefinitions(new ClassPathResource("resources/kafka-producer.properties")); 
      BeanFactory factory=(BeanFactory)reg;  
      ConsumerConfig consumerConfig=(ConsumerConfig)factory.getBean("consumerConfig");  
      System.out.println(consumerConfig.getPollTime());  
      ProducerConfig producerConfig=(ProducerConfig)factory.getBean("producerConfig");  
      System.out.println(producerConfig.getBatchSize());  
      
      Producer producer = new Producer(producerConfig);
      producer.sendMessage(producerConfig.getTopic(),"s4335453453454");
      producer.close();
      System.out.println("consumer"); 
      Consumer consumer = new Consumer(new ConsumerHandlerImpl(),consumerConfig);
      try{
          Thread.currentThread();
          Thread.sleep(10000);
      }catch(Exception e){
          e.printStackTrace();
      }
      
}  
}  

运行结果

QQ_20190122175655

目录
相关文章
|
1月前
|
消息中间件 存储 大数据
Apache Kafka: 强大消息队列系统的介绍与使用
Apache Kafka: 强大消息队列系统的介绍与使用
|
1月前
|
消息中间件 存储 缓存
Kafka【基础知识 01】消息队列介绍+Kafka架构及核心概念(图片来源于网络)
【2月更文挑战第20天】Kafka【基础知识 01】消息队列介绍+Kafka架构及核心概念(图片来源于网络)
81 2
|
1天前
|
消息中间件 存储 安全
从零开始构建Java消息队列系统
【4月更文挑战第18天】构建一个简单的Java消息队列系统,包括`Message`类、遵循FIFO原则的`MessageQueue`(使用`LinkedList`实现)、`Producer`和`Consumer`类。在多线程环境下,`MessageQueue`的操作通过`synchronized`保证线程安全。测试代码中,生产者发送10条消息,消费者处理这些消息。实际应用中,可能需要考虑持久化、分布式队列和消息确认等高级特性,或者使用成熟的MQ系统如Kafka或RabbitMQ。
|
2天前
|
消息中间件 存储 Java
深度探索:使用Apache Kafka构建高效Java消息队列处理系统
【4月更文挑战第17天】本文介绍了在Java环境下使用Apache Kafka进行消息队列处理的方法。Kafka是一个分布式流处理平台,采用发布/订阅模型,支持高效的消息生产和消费。文章详细讲解了Kafka的核心概念,包括主题、生产者和消费者,以及消息的存储和消费流程。此外,还展示了Java代码示例,说明如何创建生产者和消费者。最后,讨论了在高并发场景下的优化策略,如分区、消息压缩和批处理。通过理解和应用这些策略,可以构建高性能的消息系统。
|
16天前
|
消息中间件 存储 负载均衡
消息队列学习之kafka
【4月更文挑战第2天】消息队列学习之kafka,一个分布式的,支持多分区、多副本,基于 Zookeeper 的分布式消息流平台。
13 2
|
2月前
|
消息中间件 缓存 Java
Kafka Consumer java api 配置
Kafka Consumer java api 配置
|
2月前
|
消息中间件 存储 监控
美团面试:Kafka如何处理百万级消息队列?
美团面试:Kafka如何处理百万级消息队列?
130 1
|
1月前
|
消息中间件 存储 监控
RabbitMQ:分布式系统中的高效消息队列
RabbitMQ:分布式系统中的高效消息队列
|
1月前
|
消息中间件 Java
springboot整合消息队列——RabbitMQ
springboot整合消息队列——RabbitMQ
74 0
|
3月前
|
消息中间件 JSON Java
RabbitMQ消息队列
RabbitMQ消息队列
45 0

相关产品

  • 云消息队列 Kafka 版