Kafka与Logstash的数据采集对接 —— 看图说话,从运行机制到部署

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介:

基于Logstash跑通Kafka还是需要注意很多东西,最重要的就是理解Kafka的原理。

Logstash工作原理

由于Kafka采用解耦的设计思想,并非原始的发布订阅,生产者负责产生消息,直接推送给消费者。而是在中间加入持久化层——broker,生产者把数据存放在broker中,消费者从broker中取数据。这样就带来了几个好处:

  • 1 生产者的负载与消费者的负载解耦
  • 2 消费者按照自己的能力fetch数据
  • 3 消费者可以自定义消费的数量

另外,由于broker采用了主题topic-->分区的思想,使得某个分区内部的顺序可以保证有序性,但是分区间的数据不保证有序性。这样,消费者可以以分区为单位,自定义读取的位置——offset。

Kafka采用zookeeper作为管理,记录了producer到broker的信息,以及consumer与broker中partition的对应关系。因此,生产者可以直接把数据传递给broker,broker通过zookeeper进行leader-->followers的选举管理;消费者通过zookeeper保存读取的位置offset以及读取的topic的partition分区信息。

449064-20160804212941856-303257932.png

由于上面的架构设计,使得生产者与broker相连;消费者与zookeeper相连。有了这样的对应关系,就容易部署logstash-->kafka-->logstash的方案了。

接下来,按照下面的步骤就可以实现logstash与kafka的对接了。

449064-20160804212951497-1844100718.png

启动kafka

启动zookeeper:

$zookeeper/bin/zkServer.sh start

启动kafka:

$kafka/bin/kafka-server-start.sh $kafka/config/server.properties &

创建主题

创建主题:

$kafka/bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic hello --replication-factor 1 --partitions 1

查看主题:

$kafka/bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe

测试环境

执行生产者脚本:

$kafka/bin/kafka-console-producer.sh --broker-list 10.0.67.101:9092 --topic hello

执行消费者脚本,查看是否写入:

$kafka/bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --from-beginning --topic hello

输入测试

input{
    stdin{}
}
output{
    kafka{
        topic_id => "hello"
        bootstrap_servers => "192.168.0.4:9092" # kafka的地址
        batch_size => 5
    }
    stdout{
        codec => rubydebug
    }
}

读取测试

logstash配置文件:

input{
    kafka {
        codec => "plain"
        group_id => "logstash1"
        auto_offset_reset => "smallest"
        reset_beginning => true
        topic_id => "hello"
        #white_list => ["hello"]
        #black_list => nil
        zk_connect => "192.168.0.5:2181" # zookeeper的地址
   }

}
output{
    stdout{
        codec => rubydebug
    }
}
本文转自博客园xingoo的博客,原文链接:Kafka与Logstash的数据采集对接 —— 看图说话,从运行机制到部署,如需转载请自行联系原博主。
相关文章
|
7天前
|
消息中间件 存储 算法
深入了解Kafka的数据持久化机制
深入了解Kafka的数据持久化机制
24 0
|
6月前
|
消息中间件 存储 Kubernetes
Helm方式部署 zookeeper+kafka 集群 ——2023.05
Helm方式部署 zookeeper+kafka 集群 ——2023.05
237 0
|
7月前
|
消息中间件 缓存 Kafka
连Producer端的主线程模块运行原理都不清楚,就敢说自己精通Kafka?
连Producer端的主线程模块运行原理都不清楚,就敢说自己精通Kafka?
33 0
|
6月前
|
消息中间件 存储 Kafka
Flink集群使用kafka_appender收集flink产生的日志,但是现在实时运行的任务超过了
Flink集群使用kafka_appender收集flink产生的日志,但是现在实时运行的任务超过了
141 1
|
7月前
|
消息中间件 设计模式 Java
聊聊 Kafka: Consumer 源码解析之 Rebalance 机制
聊聊 Kafka: Consumer 源码解析之 Rebalance 机制
199 0
|
3月前
|
消息中间件 存储 缓存
KafKa存储机制
KafKa存储机制
38 0
|
3月前
|
消息中间件 分布式计算 监控
腾讯技术官手撸笔记,全新演绎“Kafka部署实战”,已开源
我们知道,当下流行的MQ非常多,不过很多公司在技术选型上还是选择使用Kafka。与其他主流MQ进行对比,我们会发现Kafka最大的优点就是吞吐量高。实际上Kafka是高吞吐低延迟的高并发、高性能的消息中间件,配置良好的Kafka集群甚至可以做到每秒几十万、上百万的超高并发写入。
|
4月前
|
消息中间件 算法 Kafka
docker-compose部署kafka
docker-compose部署kafka
|
4月前
|
消息中间件 存储 中间件
Greenplum GPKafka【部署 01】使用GPKafka实现Kafka数据导入Greenplum数据库完整流程分享(扩展安装文件网盘分享)
Greenplum GPKafka【部署 01】使用GPKafka实现Kafka数据导入Greenplum数据库完整流程分享(扩展安装文件网盘分享)
37 0
|
3月前
|
消息中间件 存储 Java
Kafka 部署教程
Kafka 部署教程

热门文章

最新文章