概述
Kafka是一种高吞吐量的分布式发布/订阅消息系统,它的主要作用是在生产者与消费者之间,建立一个通信的桥梁,为不同的系统之间传递消息。
安装scala
1.上传scala安装包
- 将scala-2.11.4.tgz上传到/usr/local目录下
- 解压:tar -zxvf scala-2.11.4.tgz -C /usr/local
- 重命名:mv scala-2.11.4 scala
2.配置
- 配置环境变量
vi ~/.bashrc
export SCALA_HOME=/usr/local/scala
export PATH=$SCALA_HOME/bin:$PATH
source ~/.bashrc
3. 查看安装结果
scala -version
Scala code runner version 2.11.4 -- Copyright 2002-2013, LAMP/EPFL
按照上述步骤在其他两个节点上都安装好scala。
搭建kafka集群
1.上传kafka安装包
- 将kafka_2.9.2-0.8.1.tgz上传到/usr/local目录下
- 解压:tar -zxvf kafka_2.9.2-0.8.1.tgz -C /usr/local
- 重命名:mv kafka_2.9.2-0.8.1 kafka
2.配置
- 修改kafka配置文件
vi /usr/local/kafka/config/server.properties
broker.id:0 //集群中Broker的唯一id,另外两个节点为1和2
advertised.host.name=192.168.201.33 //在其他两节点也配置一下ip
zookeeper.connect=192.168.201.33:2181,192.168.201.34:2181,192.168.201.35:2181
3.拷贝slf4j的slf4j-nop-1.7.6.jar到kafka的libs目录 下
- 将slf4j-1.7.6.zip上传到服务器
- 解压:unzip slf4j-1.7.6.zip
- 拷贝:cp slf4j-nop-1.7.6.jar /usr/local/kafka/libs/
4.去掉权限
解决kafka Unrecognized VM option 'UseCompressedOops'问题
vi /usr/local/kafka/bin/kafka-run-class.sh
if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; then
KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseCompressedOops -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true"
fi
去掉-XX:+UseCompressedOops
5.启动
cd /usr/local/kafka/
nohup bin/kafka-server-start.sh config/server.properties &
[root@dlsc-201-33 kafka]# tail -f nohup.out
[2020-09-05 16:15:45,764] INFO Log directory '/tmp/kafka-logs' not found, creating it. (kafka.log.LogManager)
[2020-09-05 16:15:45,776] INFO Starting log cleanup with a period of 60000 ms. (kafka.log.LogManager)
[2020-09-05 16:15:45,786] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
[2020-09-05 16:15:45,826] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
[2020-09-05 16:15:45,836] INFO [Socket Server on Broker 0], Started (kafka.network.SocketServer)
[2020-09-05 16:15:45,897] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
[2020-09-05 16:15:45,947] INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
[2020-09-05 16:15:46,076] INFO Registered broker 0 at path /brokers/ids/0 with address dlsc-201-33:9092. (kafka.utils.ZkUtils$)
[2020-09-05 16:15:46,122] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2020-09-05 16:15:46,125] INFO [Kafka Server 0], started (kafka.server.KafkaServer)
6. 测试
bin/kafka-topics.sh --zookeeper 192.168.201.33:2181,192.168.201.34:2181,192.168.201.35:2181 --topic test --replication-factor 1 --partitions 1 --create
Created topic "test".
bin/kafka-console-producer.sh --broker-list 192.168.201.33:9092,192.168.201.34:9092,192.168.201.35:9092 --topic test
hello,world //输入
bin/kafka-console-consumer.sh --zookeeper 192.168.201.33:2181,192.168.201.34:2181,192.168.201.35:2181 --topic test --from-beginning //再打开一个终端
hello,world //输出
网友评论