flume 整合kafka

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介:


1,安装并成功能运行flume

2,安装并成功能运行kafka

3,安装并成功能运行zookeeper

4,开始整合flume收集的数据,写入kafka

a,修改flume的配置文加:

vim  flume_kafka.conf

agent1.sources = r1

agent1.sinks = k1

agent1.channels = c1

# Describe/configure the source

agent1.sources.r1.type = exec

agent1.sources.r1.command=tail -f /opt/logs/usercenter.log


# Use a channel which buffers events in memory

agent1.channels.c1.type = memory

agent1.channels.c1.capacity = 1000

agent1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

agent1.sources.r1.channels = c1

agent1.sinks.k1.channel = c1


# # Describe the sink  这部分就是输入到kafka的写法

##############################################

agent1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

agent1.sinks.k1.topic = test

agent1.sinks.k1.brokerList = hadoop1:9092,hadoop2:9092,hadoop3:9092

agent1.sinks.k1.requiredAcks = 1

agent1.sinks.k1.batchSize = 20

##############################################


b,下载第三方插件

下载flume-kafka-plus:https://github.com/beyondj2ee/flumeng-kafka-plugin


lib目录下的

package下的

 

都放到flumelib目录

如果,报错,请看这个文档

http://wenda.chinahadoop.cn/question/4079?notification_id=290954&rf=false&item_id=10382#!answer_10382


修改原有的flume-conf文件


在插件包里有一个flume-conf.properties,把这个文件放到flume的conf文件夹里

然后修改以下内容

1
2
3
4
5
6
7
producer.sources.s. type  exec
  producer.sources.s. command  tail  -f -n+1   /opt/logs/test .log
  producer.sources.s.channels = c
……
producer.sinks.r.custom.topic.name= test
……
consumer.sources.s.custom.topic.name= test

c:启动服务

 

启动zookeeper集群

zkServer.sh start

zkServer.sh start

zkServer.sh start

还需要创建一个新的地址

zookeeper/bin/zkCli.sh

create /kafka  test

启动kafka broker 集群

bin/kafka-server-start.sh config/server.properties

bin/kafka-server-start.sh config/server.properties

bin/kafka-server-start.sh config/server.properties


创建kafka topic

bin/kafka-topics.sh --create --zookeeper localhost:2181/kafka --replication-factor 1 --partitions 1 --topic test


启动kafka consumer

bin/kafka-console-consumer.sh --zookeeper localhost:2181/kafka --topic test --from-beginning


启动flume

bin/flume-ng agent --conf conf --conf-file conf/flume_kafka.properties --name producer -Dflume.root.logger=INFO,console


测试

1
echo  "this is a test"  >>  /opt/logs/test .log

此时只要能在consumer里现“this is a test”就表示成功


错误总结:

http://472053211.blog.51cto.com/3692116/1655844






      本文转自crazy_charles 51CTO博客,原文链接:http://blog.51cto.com/douya/1860896,如需转载请自行联系原作者


相关文章
|
4月前
|
消息中间件 分布式计算 大数据
【大数据技术Hadoop+Spark】Flume、Kafka的简介及安装(图文解释 超详细)
【大数据技术Hadoop+Spark】Flume、Kafka的简介及安装(图文解释 超详细)
68 0
|
4月前
|
数据可视化 JavaScript 关系型数据库
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(五)FineBI可视化
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(五)FineBI可视化
44 0
|
4月前
|
SQL 消息中间件 关系型数据库
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(四)实时计算需求及技术方案
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(四)实时计算需求及技术方案
74 0
|
4月前
|
SQL 消息中间件 分布式数据库
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(三)离线分析
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(三)离线分析
63 0
|
4月前
|
消息中间件 存储 数据采集
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(二)数据源
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(二)数据源
53 0
|
4月前
|
存储 消息中间件 分布式数据库
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(一)案例需求
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(一)案例需求
56 0
|
4月前
|
SQL 消息中间件 存储
案例:Flume消费Kafka数据保存Hive
案例:Flume消费Kafka数据保存Hive
66 0
|
4月前
|
存储 Java 关系型数据库
【Kafka+Flume+Mysql+Spark】实现新闻话题实时统计分析系统(附源码)
【Kafka+Flume+Mysql+Spark】实现新闻话题实时统计分析系统(附源码)
52 1
【Kafka+Flume+Mysql+Spark】实现新闻话题实时统计分析系统(附源码)
|
4月前
|
消息中间件 分布式计算 大数据
【大数据技术】Spark+Flume+Kafka实现商品实时交易数据统计分析实战(附源码)
【大数据技术】Spark+Flume+Kafka实现商品实时交易数据统计分析实战(附源码)
69 0
|
7月前
|
消息中间件 关系型数据库 MySQL
使用Flume实现MySQL与Kafka实时同步
使用Flume实现MySQL与Kafka实时同步