kafka练习

简介:
package com.ocean.kafka;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import javax.swing.plaf.multi.MultiButtonUI;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

public class MennuCommitConsumer {
    private Properties properties = new Properties();
    private KafkaConsumer<String, String> consumer;

    public MennuCommitConsumer() {

        properties.setProperty("bootstrap.servers", "master:9092");
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("group.id", "java_group");
        // properties.setProperty("auto.offset.reset", "null");
        properties.setProperty("enable.auto.commit", "false");
        consumer = new KafkaConsumer<String, String>(properties);
    }

    public void subscirbleTopc() {
        List<String> topics = new ArrayList<String>();
        topics.add("b");
        topics.add("from-java");
        consumer.subscribe(topics);

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("partition:" + record.partition() + "offset:" + record.offset() + "key:"
                        + record.key() + "value:" + record.value());
            }
            // consumer.commitSync();
            // 这句话是为了提交数据 如果不写 则会在下次启动时 还会出现

        }
    }

    public void getOffset() {
        OffsetAndMetadata offsets = consumer.committed(new TopicPartition("b", 0));
        System.out.println("offsets:" + offsets.offset());
    }
    // 制定分区消费 指定从offset的值出开始消费
    // 对消费着topic的消费指定有两种方式
    // 1.consumer.subscribe(topics);

    // 2.consumer.assign(topicPartitions);
    public void sonsumerAssigned() {
        // List<String>topics= new ArrayList<String>();
        // topics.add("b");
        // consumer.subscribe(topics);
        // 指定分区
        List<TopicPartition> topicPartitions = new ArrayList<TopicPartition>();
        topicPartitions.add(new TopicPartition("from-java", 0));
        consumer.assign(topicPartitions);
        // 指定分区的offset分区的位置
        consumer.seek(new TopicPartition("from-java", 0), 21);

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(
                        "partition:" + record.partition() + "offset:" + record.offset() + "value:" + record.value());
            }
        }

    }

    public void setCommentOffset() {

        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        offsets.put(new TopicPartition("from_java", 0), new OffsetAndMetadata(0));
        List<String> topics = new ArrayList<String>();
        
        topics.add("from_java");
        consumer.subscribe(topics);
        // 指定位置提交某个分区的offsets的值 这会在下一次拉取数据前生效
        consumer.commitSync(offsets);

        while (true) {

            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {

                if (record.partition() == 0) {
                    System.out.println("partition:" + record.partition() + "offset:" + record.offset() + "value:"
                            + record.value());
                }
            }
        }

    }

    public void exactlyOnceConSumer(){
        //1.配置上参数
        properties.setProperty("enable.auto.commmit", "false");
        //2.订阅主题或者分区
        //consumer.subscribe(topics);
        //重设offset (offset)的值需要从mysql中获取
        //3.从mysql中获取
        //4.1 consumer.commitSync(offsets);
        //提交到kafka服务器中
        //或者使用
        //4.2 consumer.seek(new TopicPartition("from-java",0),0);
        //来指定要从kafka中高消费数据的初始值位置
        
        //订阅主题或分区
        //consumer.subscribe(topics);
        
        //5. poll数据
//        recordes =consumer.pool(1000)
        
        //6. 遍历参数值分析计算
        
        //7.计算结束之后使用consumer.committed(new TopicPartition("from-java",1))
        //获取当前消费的offset值
        
        //8.把计算结果和offset值 以原子操作(事物)的形式保存到mysql数据库
        
        //9.重新调到第五步循环执行 进行下一次pool和下一次计算
    }
    
    
    public static void main(String[] args) {
        MennuCommitConsumer mennuCommitConsumer = new MennuCommitConsumer();
        // mennuCommitConsumer.subscirbleTopc();
        // mennuCommitConsumer.getOffset();
        mennuCommitConsumer.sonsumerAssigned();
        mennuCommitConsumer.setCommentOffset();

    }

}

package com.ocean.kafka;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.log4j.pattern.PropertiesPatternConverter;

public class ProducerConsumer {

    private Properties properties = new Properties();
    private KafkaConsumer<String, String> consumer;

    public ProducerConsumer() {

        properties = new Properties();
        properties.put("bootstrap.servers", "master:9092");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        properties.setProperty("group.id", "java-group");
        consumer = new KafkaConsumer<String, String>(properties);

    }

    public void subscribeTopic() {
        List<String> topics = new ArrayList<String>();
        topics.add("home-work_pic");
        consumer.subscribe(topics);
        // 循环从kafka中拉取数据
        while (true) {
            // 从kafka中拉取数据
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("接收信息:partition" + record.partition() + "offset:" + record.offset() + "key:"
                        + record.key() + "value:" + record.value());
            }
        }
    }

    public static void main(String[] args) {
        ProducerConsumer producerConsumer = new ProducerConsumer();
        producerConsumer.subscribeTopic();

    }
}


package com.ocean.kafka;

import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProducerKafka {

    private KafkaProducer<String, String> producer;
    private Properties properties;

    public ProducerKafka() {
    
        properties=new Properties();
        properties.put("bootstrap.servers", "master:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        
//        properties.put("acks",  "all");
//        properties.put("retries", 0);
//        
        producer=new KafkaProducer<String, String>(properties);
    
    }
    public void assignPartitionSend(String key,String value){
        
        ProducerRecord<String, String>record =new ProducerRecord<String, String>("from-java", 0,key,value);
        producer.send(record);
            
    }
    public void sendRecorder(String key,String value){
        Logger logger=LoggerFactory.getLogger(ProducerKafka.class);
        ProducerRecord<String, String> record =new ProducerRecord<String, String>("from-java", key,value);
        producer.send(record);
    }
    public void getTopicPartitions(String topic){
        
        Logger logger=LoggerFactory.getLogger(ProducerKafka.class);
//        ProducerRecord<String, String> record =new ProducerRecord<String, String>("from-java", key,value);
        
        List<PartitionInfo> partitionInfos =producer.partitionsFor(topic);
        for (PartitionInfo partitionInfo : partitionInfos) {
            System.out.println(partitionInfo);
        }
        
    }
    
    public void getMetrics(){
        @SuppressWarnings("unchecked")
        Map<MetricName, Metric> metrics =(Map<MetricName, Metric>) producer.metrics();
        for (MetricName name : metrics.keySet()) {
            System.out.println(name.name()+":"+metrics.get(name).value());
        }
        
    }
    
    public void sendRecorderWithCallback(String key,String value){
        final Logger logger=LoggerFactory.getLogger(ProducerKafka.class);
        ProducerRecord<String, String>record =new ProducerRecord<String, String>("from-java",key,value);
        Callback callback=new Callback() {
            //回掉方法
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                
                if(exception==null){
                    logger.info("存储位置:partition:"+metadata.partition()+",offset:"+metadata.offset()+",ts:"+metadata.timestamp());
                }else{
                    logger.warn("服务端出现异常");
                    exception.printStackTrace();
                }
            
            }
        };
        producer.send(record,callback);
    }
    public void close(){
        producer.flush();
        producer.close();
    }
    

    
    public static void main(String[] args) {
        
        ProducerKafka client =new ProducerKafka();
        for(int i=0;i<100;i++){
            client.sendRecorderWithCallback("Ckey"+i, "Cvalue"+i);
        }
//        client.getMetrics();
        client.close();
    }
}

相关文章
|
7月前
|
消息中间件 存储 Kafka
Kafka详解
当今数字化世界中,数据的流动变得至关重要。为了满足不断增长的数据需求,企业需要强大而可靠的数据处理工具。Apache Kafka就是这样一个工具,它在数据流处理领域表现出色。本文将详细介绍Apache Kafka,探讨它的核心概念、用途以及如何使用它来构建强大的数据流应用。
|
3月前
|
消息中间件 分布式计算 Java
kafka
kafka
55 0
|
19天前
|
消息中间件 Java Kafka
Kafka
Kafka
11 1
|
1月前
|
消息中间件 存储 分布式计算
|
2月前
|
消息中间件 存储 Java
玩转Kafka—初步使用
玩转Kafka—初步使用
29 0
|
5月前
|
消息中间件 缓存 算法
Kafka为什么这么快?
Kafka 是一个基于发布-订阅模式的消息系统,它可以在多个生产者和消费者之间传递大量的数据。Kafka 的一个显著特点是它的高吞吐率,即每秒可以处理百万级别的消息。那么 Kafka 是如何实现这样高得性能呢?本文将从七个方面来分析 Kafka 的速度优势。
38 1
|
5月前
|
消息中间件 开发框架 Java
113 Kafka介绍
113 Kafka介绍
38 0
|
8月前
|
消息中间件 缓存 Java
Kafka介绍
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。 Kafka是一种高吞吐量的分布式发布订阅消息系统,作为消息中间件来说都起到了系统间解耦、异步、削峰等作用,同时又提供了Kafka streaming插件包在应用端实现实时在线流处理,它可以收集并处理用户在网站中的所有动作流数据以及物联网设备的采样信息
133 0
|
9月前
|
消息中间件 分布式计算 Java
浅谈kafka 一
浅谈kafka 一
|
10月前
|
消息中间件 存储 缓存
kafka
kafka
300 0