kafka简单安装部署

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 一、安装、配置  1.下载 kafka是由linkedin开源的,但是已经托管在了apache,所以需要从apache下载,http://kafka.apache.org/downloads.html。

一、安装、配置

 1.下载

kafka是由linkedin开源的,但是已经托管在了apache,所以需要从apache下载,http://kafka.apache.org/downloads.html。
安装推荐的版本安装就可以了,例如下面0.10.0.0是最新的release,也是推荐稳定的release。
建议下载scala 2.11版本(kafka是scala语言开发的)

Releases
0.10.0.0 is the latest release. The current stable version is 0.10.0.0. 
You can verify your download by following these procedures and using these KEYS. 

0.10.0.0
Release Notes 
Source download: kafka-0.10.0.0-src.tgz (asc, md5) 
Binary downloads: 
Scala 2.10  - kafka_2.10-0.10.0.0.tgz (asc, md5) 
Scala 2.11  - kafka_2.11-0.10.0.0.tgz (asc, md5) 
We build for multiple versions of Scala. This only matters if you are using Scala and you want a version built for the same Scala version you use. Otherwise any version should work (2.11 is recommended).

 

2. kafka目录结构

解压后:

drwxr-xr-x 3 root root  4096 Sep  3  2015 bin
drwxr-xr-x 2 root root  4096 Sep  3  2015 config
drwxr-xr-x 2 root root  4096 Sep  3  2015 libs
-rw-r--r-- 1 root root 11358 Sep  3  2015 LICENSE
-rw-r--r-- 1 root root   162 Sep  3  2015 NOTICE

 

bin: 可执行文件,例如启动关闭kafka,生产者、消费者客户端,zookeeper启动(kafka依赖zookeeper)等等
config: kafka的相关配置
libs: kafka的类库

3. 配置

kafka有很多配置选项,本次只说明重要的或者必备的一些配置。

[@zw_94_190 /opt/soft/kafka/config]# cat server.properties | grep -v "#" | grep -v "^$"
broker.id=0
port=9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000

 

broker.id broker的唯一标识 0
log.dirs 看着名字是日志目录,其实是kafka的持久化文件目录(可以配置多个用逗号隔开) /tmp/kafka-logs
zookeeper.connect zookeeper集群地址(多个地址用逗号隔开) localhost:2181
host.name broker的hostname,类似于bind,一般绑定内网网卡ip Null
num.partitions 默认每个topic的分片数(如果没有指定的话) 1
auto.create.topics.enable 是否自动创建topic,例如producer publish不存在的topic就会创建出来,类似es自动创建索引(一般线上系统都false) true
default.replication.factor 默认副本数量 1

4. 完整步骤

cd /opt/soft
wget http://apache.fayea.com/kafka/0.8.2.2/kafka_2.10-0.8.2.2.tgz
tar -xvf kafka_2.10-0.8.2.2.tgz
ln -s kafka_2.10-0.8.2.2 kafka
cd kafka
mkdir -p /opt/soft/kafka/data/kafka-logs

 

5. 设置环境变量

export KAFKA_HOME=/opt/soft/kafka
export PATH=$PATH:$KAFKA_HOME/bin

 

二、单机单broker(单点)

1. 拓扑

2. 部署

(1). 修改配置文件 ${kafka_home}/config/server.properties

修改了log.dirs、zookeeper地址、num.partitions个数

[@zw_94_190 /opt/soft/kafka/config]# cat server.properties | grep -v "#" | grep -v "^$"
broker.id=0
port=9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/soft/kafka/data/kafka-logs
num.partitions=2
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=10.10.53.159:2181,10.10.53.162:2181,10.16.14.182:2181

 

(2). 启动
${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties &

 

&是用守护进程的形式启动。

${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties & 
[1] 31596
[@zw_94_190 /opt/soft/kafka/bin]# [2016-07-02 16:09:32,436] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2016-07-02 16:09:32,475] INFO Property broker.id is overridden to 0 (kafka.utils.VerifiableProperties)
[2016-07-02 16:09:32,476] INFO Property log.cleaner.enable is overridden to false (kafka.utils.VerifiableProperties)
[2016-07-02 16:09:32,476] INFO Property log.dirs is overridden to /opt/soft/kafka/kafka-logs (kafka.utils.VerifiableProperties)
[2016-07-02 16:09:32,476] INFO Property log.retention.check.interval.ms is overridden to 300000 (kafka.utils.VerifiableProperties)
[2016-07-02 16:09:32,476] INFO Property log.retention.hours is overridden to 168 (kafka.utils.VerifiableProperties)
[2016-07-02 16:09:32,476] INFO Property log.segment.bytes is overridden to 1073741824 (kafka.utils.VerifiableProperties)
[2016-07-02 16:09:32,476] INFO Property num.io.threads is overridden to 8 (kafka.utils.VerifiableProperties)
[2016-07-02 16:09:32,477] INFO Property num.network.threads is overridden to 3 (kafka.utils.VerifiableProperties)
[2016-07-02 16:09:32,477] INFO Property num.partitions is overridden to 2 (kafka.utils.VerifiableProperties)
[2016-07-02 16:09:32,477] INFO Property num.recovery.threads.per.data.dir is overridden to 1 (kafka.utils.VerifiableProperties)
[2016-07-02 16:09:32,477] INFO Property port is overridden to 9092 (kafka.utils.VerifiableProperties)
[2016-07-02 16:09:32,477] INFO Property socket.receive.buffer.bytes is overridden to 102400 (kafka.utils.VerifiableProperties)
[2016-07-02 16:09:32,477] INFO Property socket.request.max.bytes is overridden to 104857600 (kafka.utils.VerifiableProperties)
[2016-07-02 16:09:32,478] INFO Property socket.send.buffer.bytes is overridden to 102400 (kafka.utils.VerifiableProperties)
[2016-07-02 16:09:32,478] INFO Property zookeeper.connect is overridden to 10.10.53.159:2181,10.10.53.162:2181,10.16.14.182:2181 (kafka.utils.VerifiableProperties)
[2016-07-02 16:09:32,478] INFO Property zookeeper.connection.timeout.ms is overridden to 6000 (kafka.utils.VerifiableProperties)
[2016-07-02 16:09:32,519] INFO [Kafka Server 0], starting (kafka.server.KafkaServer)
[2016-07-02 16:09:32,521] INFO [Kafka Server 0], Connecting to zookeeper on 10.10.53.159:2181,10.10.53.162:2181,10.16.14.182:2181 (kafka.server.KafkaServer)
[2016-07-02 16:09:32,534] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
[2016-07-02 16:09:32,543] INFO Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT (org.apache.zookeeper.ZooKeeper)
[2016-07-02 16:09:32,543] INFO Client environment:host.name=zw_94_190 (org.apache.zookeeper.ZooKeeper)
[2016-07-02 16:09:32,543] INFO Client environment:java.version=1.7.0_45 (org.apache.zookeeper.ZooKeeper)
[2016-07-02 16:09:32,543] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)
[2016-07-02 16:09:32,543] INFO Client environment:java.home=/opt/soft/jdk1.7.0_45/jre (org.apache.zookeeper.ZooKeeper)
[2016-07-02 16:09:32,543] INFO Client environment:java.class.path=:/usr/local/jdk/lib:/usr/local/jdk/jre/lib:/usr/local/jdk/lib:/usr/local/jdk/jre/lib:/usr/local/jdk/lib:/usr/local/jdk/jre/lib:/usr/local/jdk/lib:/usr/local/jdk/jre/lib:/usr/local/jdk/lib:/usr/local/jdk/jre/lib:/opt/soft/kafka/bin/../core/build/dependant-libs-2.10.4*/*.jar:/opt/soft/kafka/bin/../examples/build/libs//kafka-examples*.jar:/opt/soft/kafka/bin/../contrib/hadoop-consumer/build/libs//kafka-hadoop-consumer*.jar:/opt/soft/kafka/bin/../contrib/hadoop-producer/build/libs//kafka-hadoop-producer*.jar:/opt/soft/kafka/bin/../clients/build/libs/kafka-clients*.jar:/opt/soft/kafka/bin/../libs/jopt-simple-3.2.jar:/opt/soft/kafka/bin/../libs/kafka_2.10-0.8.2.2.jar:/opt/soft/kafka/bin/../libs/kafka_2.10-0.8.2.2-javadoc.jar:/opt/soft/kafka/bin/../libs/kafka_2.10-0.8.2.2-scaladoc.jar:/opt/soft/kafka/bin/../libs/kafka_2.10-0.8.2.2-sources.jar:/opt/soft/kafka/bin/../libs/kafka_2.10-0.8.2.2-test.jar:/opt/soft/kafka/bin/../libs/kafka-clients-0.8.2.2.jar:/opt/soft/kafka/bin/../libs/log4j-1.2.16.jar:/opt/soft/kafka/bin/../libs/lz4-1.2.0.jar:/opt/soft/kafka/bin/../libs/metrics-core-2.2.0.jar:/opt/soft/kafka/bin/../libs/scala-library-2.10.4.jar:/opt/soft/kafka/bin/../libs/slf4j-api-1.7.6.jar:/opt/soft/kafka/bin/../libs/slf4j-log4j12-1.6.1.jar:/opt/soft/kafka/bin/../libs/snappy-java-1.1.1.7.jar:/opt/soft/kafka/bin/../libs/zkclient-0.3.jar:/opt/soft/kafka/bin/../libs/zookeeper-3.4.6.jar:/opt/soft/kafka/bin/../core/build/libs/kafka_2.10*.jar (org.apache.zookeeper.ZooKeeper)
[2016-07-02 16:09:32,543] INFO Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper)
[2016-07-02 16:09:32,543] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)
[2016-07-02 16:09:32,543] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)
[2016-07-02 16:09:32,543] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)
[2016-07-02 16:09:32,543] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)
[2016-07-02 16:09:32,543] INFO Client environment:os.version=2.6.32-279.el6.x86_64 (org.apache.zookeeper.ZooKeeper)
[2016-07-02 16:09:32,543] INFO Client environment:user.name=root (org.apache.zookeeper.ZooKeeper)
[2016-07-02 16:09:32,543] INFO Client environment:user.home=/root (org.apache.zookeeper.ZooKeeper)
[2016-07-02 16:09:32,543] INFO Client environment:user.dir=/opt/soft/kafka_2.10-0.8.2.2/bin (org.apache.zookeeper.ZooKeeper)
[2016-07-02 16:09:32,544] INFO Initiating client connection, connectString=10.10.53.159:2181,10.10.53.162:2181,10.16.14.182:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@2abd2c1e (org.apache.zookeeper.ZooKeeper)
[2016-07-02 16:09:32,565] INFO Opening socket connection to server 10.10.53.162/10.10.53.162:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2016-07-02 16:09:32,569] INFO Socket connection established to 10.10.53.162/10.10.53.162:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2016-07-02 16:09:32,581] INFO Session establishment complete on server 10.10.53.162/10.10.53.162:2181, sessionid = 0x25380d7e74c9b71, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2016-07-02 16:09:32,583] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)
[2016-07-02 16:09:32,772] INFO Loading logs. (kafka.log.LogManager)
[2016-07-02 16:09:32,806] INFO Completed load of log test-0 with log end offset 0 (kafka.log.Log)
[2016-07-02 16:09:32,814] INFO Logs loading complete. (kafka.log.LogManager)
[2016-07-02 16:09:32,814] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
[2016-07-02 16:09:32,817] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
[2016-07-02 16:09:32,839] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
[2016-07-02 16:09:32,840] INFO [Socket Server on Broker 0], Started (kafka.network.SocketServer)
[2016-07-02 16:09:32,895] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
[2016-07-02 16:09:32,920] INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
[2016-07-02 16:09:33,089] INFO Registered broker 0 at path /brokers/ids/0 with address zw_94_190:9092. (kafka.utils.ZkUtils$)
[2016-07-02 16:09:33,094] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2016-07-02 16:09:33,101] INFO [Kafka Server 0], started (kafka.server.KafkaServer)
[2016-07-02 16:09:33,248] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager)
[2016-07-02 16:09:33,284] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager)

 

(3) 创建topic
${KAFKA_HOME}/bin/kafka-topics.sh --create --zookeeper 10.10.53.159:2181,10.10.53.162:2181,10.16.14.182:2181 --replication-factor 1 --partitions 1 --topic test_topic
Created topic "test_topic".
[2016-07-02 16:13:54,131] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [test_topic,0] (kafka.server.ReplicaFetcherManager)
[2016-07-02 16:13:54,136] INFO Completed load of log test_topic-0 with log end offset 0 (kafka.log.Log)
[2016-07-02 16:13:54,140] INFO Created log for partition [test_topic,0] in /opt/soft/kafka/kafka-logs with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 1073741824, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, min.insync.replicas -> 1, cleanup.policy -> delete, unclean.leader.election.enable -> true, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 604800000, segment.jitter.ms -> 0}. (kafka.log.LogManager)
[2016-07-02 16:13:54,141] WARN Partition [test_topic,0] on broker 0: No checkpointed highwatermark is found for partition [test_topic,0] (kafka.cluster.Partition

 

(4) 查看topic列表
${KAFKA_HOME}/bin/kafka-topics.sh --list --zookeeper 10.10.53.159:2181,10.10.53.162:2181,10.16.14.182:2181

test_topic

 

(5) producer
${KAFKA_HOME}/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic

 

[2016-07-02 16:17:34,545] WARN Property topic is not valid (kafka.utils.VerifiableProperties)
fff
[2016-07-02 16:17:41,705] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)
fff
[2016-07-02 16:19:27,978] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)
hello
world
(6) consumer
${KAFKA_HOME}/bin/kafka-console-consumer.sh --zookeeper 10.10.53.159:2181,10.10.53.162:2181,10.16.14.182:2181 --topic test_topic --from-beginning
fff
fff
hello
world

后启动的consumer依然能消费到消息,证明消息可以持久化,有关内部的实现原理以后的文章可能会介绍。

三、单机多broker(伪分布式)

1. 拓扑

2. 部署

添加三个配置文件(主要区别在broker.id,port,log.dirs,剩下的和第二章没有任何区别)

(1) 配置

(a) server-9093.properties

broker.id=1
port=9093
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/soft/kafka/data/kafka-0-logs
num.partitions=2
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=10.10.53.159:2181,10.10.53.162:2181,10.16.14.182:2181

 

(b) server-9094.properties

broker.id=2
port=9094
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/soft/kafka/data/kafka-1-logs
num.partitions=2
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=10.10.53.159:2181,10.10.53.162:2181,10.16.14.182:2181

 

(c) server-9095.properties

broker.id=3
port=9095
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/soft/kafka/data/kafka-2-logs
num.partitions=2
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=10.10.53.159:2181,10.10.53.162:2181,10.16.14.182:2181

 

(2) 启动三个broker
${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server-9093.properties &
${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server-9094.properties &
${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server-9095.properties &
(3) 创建topic

 

${KAFKA_HOME}/bin/kafka-topics.sh --create --zookeeper 10.10.53.159:2181,10.10.53.162:2181,10.16.14.182:2181 --replication-factor 3 --partitions 1 --topic hello_topic

 

(4) 查看topic列表

 

${KAFKA_HOME}/bin/kafka-topics.sh --list --zookeeper 10.10.53.159:2181,10.10.53.162:2181,10.16.14.182:2181

 

(5) producer

 

${KAFKA_HOME}/bin/kafka-console-producer.sh --broker-list localhost:9093,localhost:9094,localhost:9095 --topic hello_topic

 

(6) consumer

 

${KAFKA_HOME}/bin/kafka-console-consumer.sh --zookeeper 10.10.53.159:2181,10.10.53.162:2181,10.16.14.182:2181 --topic hello_topic --from-beginning
 

 

四、多机多broker(分布式)

1. 拓扑

2. 部署

和第三章差不多,只不过用了多台机器,最好加上一个host.name

五、 一些总结

1.一套zookeper配置一套broker。
2.生产环境要使用多机多实例。
3.最好不要使用自动创建topic功能。
4.producer依赖的资源是broker-list。(对应的bin下的可执行文件)
5.consumer依赖的资源是zookeeper-list。(对应的bin下的可执行文件)
6.目前看broker之间不会通信,分布式的实现比较依赖于zookeeper。

相关文章
|
6月前
|
消息中间件 分布式计算 Hadoop
Kafka安装部署
Kafka安装部署
130 1
|
4月前
|
消息中间件 分布式计算 大数据
【大数据技术Hadoop+Spark】Flume、Kafka的简介及安装(图文解释 超详细)
【大数据技术Hadoop+Spark】Flume、Kafka的简介及安装(图文解释 超详细)
66 0
|
1月前
|
消息中间件 Java Kafka
Kafka【环境搭建 01】kafka_2.12-2.6.0 单机版安装+参数配置及说明+添加到service服务+开机启动配置+验证+chkconfig配置说明(一篇入门kafka)
【2月更文挑战第19天】Kafka【环境搭建 01】kafka_2.12-2.6.0 单机版安装+参数配置及说明+添加到service服务+开机启动配置+验证+chkconfig配置说明(一篇入门kafka)
46 1
|
4月前
|
消息中间件 存储 分布式计算
Hadoop学习笔记(HDP)-Part.19 安装Kafka
01 关于HDP 02 核心组件原理 03 资源规划 04 基础环境配置 05 Yum源配置 06 安装OracleJDK 07 安装MySQL 08 部署Ambari集群 09 安装OpenLDAP 10 创建集群 11 安装Kerberos 12 安装HDFS 13 安装Ranger 14 安装YARN+MR 15 安装HIVE 16 安装HBase 17 安装Spark2 18 安装Flink 19 安装Kafka 20 安装Flume
73 0
Hadoop学习笔记(HDP)-Part.19 安装Kafka
|
1月前
|
消息中间件 Kafka Linux
Kafka【付诸实践 03】Offset Explorer Kafka 的终极 UI 工具安装+简单上手+关键特性测试(一篇学会使用 Offset Explorer)
【2月更文挑战第21天】Kafka【付诸实践 03】Offset Explorer Kafka 的终极 UI 工具安装+简单上手+关键特性测试(一篇学会使用 Offset Explorer)
158 2
|
2月前
|
消息中间件 Java Kafka
docker安装kafka(wurstmeister)
docker安装kafka(wurstmeister)
107 0
|
3月前
|
消息中间件 Kafka Linux
linux如何安装KafKa
linux如何安装KafKa
957 0
|
3月前
|
消息中间件 Java Kafka
使用Java编写代码安装Kafka及启动Zookeeper和Kafka
如何使用Java编写代码安装Kafka及启动Zookeeper和Kafka?其中包括安装前准备、修改配置文件、创建日志目录和数据目录等步骤,请提供详细实现过程。
43 0
|
3月前
|
消息中间件 分布式计算 监控
腾讯技术官手撸笔记,全新演绎“Kafka部署实战”,已开源
我们知道,当下流行的MQ非常多,不过很多公司在技术选型上还是选择使用Kafka。与其他主流MQ进行对比,我们会发现Kafka最大的优点就是吞吐量高。实际上Kafka是高吞吐低延迟的高并发、高性能的消息中间件,配置良好的Kafka集群甚至可以做到每秒几十万、上百万的超高并发写入。
|
4月前
|
消息中间件 Kafka Docker
docker 安装kafka
docker 安装kafka
72 0

热门文章

最新文章