1. 安装

1.1 Linux下安装

Kafka运行环境需要先安装好Java环境。

进入官网http://kafka.apache.org/downloads,选择相应的版本的Kafka链接并下载:

$ wget https://downloads.apache.org/kafka/2.8.0/kafka_2.13-2.8.0.tgz

解压安装包

$ tar zxf kafka_2.13-2.8.0.tgz -C /usr/local/
$ ln -s /usr/local/kafka_2.13-2.8.0/ /usr/local/kafka
$ cd /usr/local/kafka

启动ZooKeeper

$ ./bin/zookeeper-server-start.sh config/zookeeper.properties

# 后台运行
$ nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties >> zookeeper.log 2>&1 &

启动Kafka

$ ./bin/kafka-server-start.sh config/server.properties

# 后台运行
$ nohup ./bin/kafka-server-start.sh config/server.properties >> kafka.log 2>&1 &

1.2 MacOS下通过brew安装

在MacOS下,还可以通过brew来安装和运行Kafka,并且可以很方便地启动。

安装:

$ brew install zookeeper
$ brew install kafka

启动服务:

$ brew services start zookeeper
$ brew services start kafka

如果只是临时启动的话:

$ zkServer start
$ kafka-server-start /usr/local/etc/kafka/server.properties

这里需要注意的是,由于Kafka是依赖ZooKeeper来运作的,所以需要先启动ZooKeeper再启动Kafka,关闭的时候也注意要先关闭Kafka再关闭ZooKeeper。

而Kafka对应的一系列脚本工具,可以直接用命令的方式进行调用,如以下的一些管理常用命令:

kafka-topics
kafka-console-producer
kafka-console-consumer
...

2. 配置系统服务单元

这一步是可选的,配置了之后通过systemctl命令启动和停止,也可以直接执行脚本来启动停止。

2.1 Zookeeper

创建系统服务单元

$ cd /etc/systemd/system
$ vi zookeeper.service

贴上以下内容

[Unit]
Description=Apache Zookeeper server
Documentation=http://zookeeper.apache.org
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=simple
ExecStart=/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
ExecStop=/usr/local/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal
User=root
Group=root

[Install]
WantedBy=multi-user.target

操作命令

# 启动ZooKeeper
$ systemctl start zookeeper

# 查看ZooKeeper状态
$ systemctl status zookeeper

# 关闭ZooKeeper
$ systemctl stop zookeeper

2.2 Kafka

创建系统服务单元

$ cd /etc/systemd/system
$ vi kafka.service

贴上以下内容

[Unit]
Description=Apache Kafka Server
Documentation=http://kafka.apache.org/documentation.html
Requires=zookeeper.service

[Service]
Type=simple
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

操作命令

# 启动Kafka
$ systemctl start kafka

# 查看Kafka状态
$ systemctl status kafka

# 关闭Kafka
$ systemctl stop kafka

3. 目录结构

下面进入Kafka的目录,也就是/usr/local/kafka,看一下目录的结构。

|-- bin                           // Kafka和ZooKeeper的脚本工具
|   |-- kafka-console-consumer.sh
|   |-- kafka-console-producer.sh
|   |-- kafka-server-start.sh
|   |-- kafka-server-stop.sh
|   |-- kafka-topics.sh
|   |-- windows                   // windows下的bat脚本
|   |-- zookeeper-server-start.sh
|   |-- zookeeper-server-stop.sh
|   `-- ...
|-- config                        // Kafka和ZooKeeper的配置文件
|   |-- kraft                     // Kafka2.8开始移除ZooKeeper依赖的新启动配置,本文暂不介绍
|   |-- server.properties
|   |-- zookeeper.properties
|   `-- ...
|-- libs                          // 一些依赖的jar包
|-- LICENSE
|-- licenses
|-- logs                          // 日志
|-- NOTICE
`-- site-docs                     // 文档

4. 脚本工具

Kafka提供了很多脚本工具,可以用来进行主题创建和查看、生产者、消费者等操作。

以下脚本执行需要先进入Kafka目录进行操作,脚本工具都在bin目录下。

kafka-server-start.sh

Kafka启动脚本。

# 启动Kafka
$ ./bin/kafka-server-start.sh config/server.properties

kafka-server-stop.sh

Kafka关闭脚本。通过发送SIGTERM信号给Kafka进程实现优雅关闭。

# 关闭Kafka
$ ./bin/kafka-server-stop.sh

kafka-topics.sh

与主题相关的脚本,用于查看主题、创建主题。

# 查看已创建的主题
$ ./bin/kafka-topics.sh --zookeeper localhost:2181 --list

# 创建主题
$ ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --replication-factor 1 --partitions 1

# 增加主题的分区数
$ ./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic test --partitions 3

# 删除主题
$ ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete -topic test

# 查看主题分区
$ ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe -topic test

kafka-console-producer.sh

生产者脚本。

# 通过生产者发送消息,在终端输入然后回车发送消息
$ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

kafka-console-consumer.sh

消费者脚本。

# 通过消费者接收消息
$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

# 通过消费者接收消息,从头开始
$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

# 使用消费组
$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group consumer-group1

kafka-configs.sh

配置管理脚本。

kafka-consumer-groups.sh

消费组管理脚本。

# 列出当前集群所有消费组
$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

# 展示消费组的详细信息
$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group groupname
$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group groupname --state # 状态
$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group groupname --members # 消费者成员信息
$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group groupname --members --verbose # 消费者分配情况

# 删除消费组
$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group groupname

# 将消费组所有分区的消费位移置0
$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupname --all-topics --reset-offsets --to-earliest --execute

# 将消费组某个分区的消费位移置为分区末尾
$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupname --topic test --reset-offsets --to-latest --execute

kafka-delete-records.sh

删除消息脚本。

以下是一个示例delete.json,该json指明删除主题topic-monitor下,分区0中偏移量为10、分区1中偏移量为11、分区2中偏移量为12的消息:

{
  "partitions": [
    {
      "topic": "topic-monitor",
      "partition": 0,
      "offset": 10
    },
    {
      "topic": "topic-monitor",
      "partition": 1,
      "offset": 11
    },
    {
      "topic": "topic-monitor",
      "partition": 2,
      "offset": 12
    }
  ],
  "version": 1
}
# 根据json文件规则删除消息
$ ./bin/kafka-delete-records.sh --bootstrap-server localhost:9092 --offset-json-file delete.json

kafka-configs.sh

配置管理脚本。

kafka-preferred-replica-election.sh

分区leader副本选举脚本。

# 分区leader副本选举
$ ./bin/kafka-preferred-replica-election.sh --zookeeper localhost:2181

kafka-reassign-partitions.sh

分区脚本。

# 分区重分配
$ ./bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --generate --topic-to-move-json-file reassign.json

# 修改副本因子
$ ./bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --execute --reassignment-json-file add.json

性能测试工具

kafka-producer-perf-test.sh用于生产者性能测试。

kafka-consumer-perf-test.sh用于消费者性能测试。

5. 配置

5.1 server.properties

Kafka服务端配置文件,启动Kafka时用到。

# 集群中broker的唯一标识,默认值为0,各个broker不同,设置为0开始的枚举值
broker.id=0

# 端口,默认为9092
port=9092

# broker连接的ZooKeeper集群的地址和端口,多个节点用逗号分隔
zookeeper.connect=localhost:2181

# 监听客户端连接的地址列表
# protocol 协议,支持的协议有PLAINTEXT、SSL、SASL_SSL等
# host     主机名,不指定表示默认网卡,0.0.0.0表示所有网卡
# port     端口,默认值为null
listeners=protocol1://host1:port1,protocol2://host2:port2,protocol3://host3:port3
listeners=PLAINTEXT://:9092

# 日志文件目录
# log.dirs存放多个目录,以逗号分隔,优先级更高
# log.dir存放单个目录
log.dirs=/tmp/kafka-logs

# 单个消息的最大值
message.max.bytes=1000000

# 创建新主题默认的分区数
num.partitions=1

# 数据可以被保留多久,默认为一周
log.retention.hours=168

# 数据可以被保留多久
log.retention.minutes = 100
log.retention.ms = 100

# 根据保留消息字节数判断消息是否过期,默认为1GB
log.retention.bytes=1073741824

# 日志片段大小上限,默认为1GB,达到上限时会打开新的日志片段
log.segment.bytes=1073741824

# 多长时间后日志片段会被关闭
log.segment.ms

参考