以下概念性的语言均为本人理解,欢迎大佬指出错误,小白希望深入理解请到官网
搭建kafka集群
准备工作
- 准备三台服务器, 安装jdk1.8 ,其中每一台虚拟机的hosts文件中都需要配置如下的内容
192.168.72.141 node01
192.168.72.142 node02
192.168.72.143 node03
上传安装包并解压
修改kafka的核心配置文件
cd /export/servers/kafka/config/
vi server.properties
主要修改一下四个地方:
1) broker.id 需要保证每一台kafka都有一个独立的broker
2) listeners = PLAINTEXT://当前虚拟机ip地址:9092
3) log.dirs 数据存放的目录
4) zookeeper.connect zookeeper的连接地址信息
#broker.id 标识了kafka集群中一个唯一broker。
broker.id=0
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
#listeners : 表示的监听的地址. 需要更改为当前虚拟机的ip地址, 保证其他主机都能连接
listeners = PLAINTEXT://当前虚拟机的ip地址:9092
# 存放生产者生产的数据 数据一般以topic的方式存放
log.dirs=/export/data/kafka
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
# zk的信息
zookeeper.connect=node01:2181,node02:2181,node03:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
将配置好的kafka分发到其他二台主机
cd /export/servers
scp -r kafka/ node02:$PWD
scp -r kafka/ node03:$PWD
- 拷贝后, 需要修改每一台的broker.id
ip为141的服务器: broker.id=0
ip为142的服务器: broker.id=1
ip为143的服务器: broker.id=2
- 修改每一台的listeners的ip地址
- 在每一台的服务器执行创建数据文件的命令
mkdir -p /export/data/kafka
启动集群
cd /export/servers/kafka/bin
./kafka-server-start.sh /export/servers/kafka/config/server.properties 1>/dev/kafka.log 2>&1 &
注意:可以启动一台broker,单机版。也可以同时启动三台broker,组成一个kafka集群版
image.png
可以通过 jps 查看 kafka进程是否已经启动了
kafka的基本使用
kafka其本身就是一个消息队列的中间件, 主要是用来实现系统与系统之间信息传输, 一般有两大角色, 一个是生产者, 一个是消费者.
故kafka的基本使用, 就是来学习如何使用生产者发送数据, 已经如何进行消费数据, kafka提供了两种方式来进行实现, 一种是采用kafka自带的脚本来操作, 另一种是使用相关的语言的API来进行操作
使用脚本操作kafka
说明: 索引执行的脚本文件都存放在kafka的bin目录中, 需要先进入bin目录才可以执行
cd /export/servers/kafka/bin
-
- 创建一个topic:
- topic: 指的是话题, 主题的意思, 在消息发送的时候, 我们需要对消息进行分类, 生产者和消费者需要在同一个topic下, 才可以进行发送和接收
./kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 1 --partitions 1 --topic order
- 使用Kafka自带一个命令行客户端启动一个生产者,生产数据
./kafka-console-producer.sh --broker-list node01:9092 --topic order
- 使用Kafka自带一个命令行客户端启动一个消费者,消费数据
./kafka-console-consumer.sh --bootstrap-server node01:9092 --topic order
该消费语句,只能获取最新的数据,要想历史数据,需要添加选项--from-beginning
如:bin/kafka-console-consumer.sh --bootstrap-server node01:9092 --from-beginning --topic order
- 查看有哪些topic
./kafka-topics.sh --list --zookeeper node01:2181
image.png
- 查看某一个具体的Topic的详细信息
./kafka-topics.sh --describe --topic order --zookeeper node01:2181
- 删除topic
./kafka-topics.sh --delete --topic order --zookeeper node01:2181
注意:彻底删除一个topic,需要在server.properties中配置delete.topic.enable=true,否则只是标记删除
配置完成之后,需要重启kafka服务。









网友评论