美文网首页
Kafka学习(一)

Kafka学习(一)

作者: 金融非耐斯 | 来源:发表于2023-01-22 12:07 被阅读0次
  • 下载安装

wget https://downloads.apache.org/kafka/3.3.1/kafka_2.13-3.3.1.tgz
# 解压
tar -zxvf  kafka_2.13-3.3.1.tgz -C /opt/module/
mv  kafka_2.13-3.3.1/ kafka
# 修改配置文件
cd config/
vi server.properties #设置broker.id、log.dirs目录、zookeeper.connect
 # 添加到系统变量
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin

  • kafka启停脚本

#!/bin/bash

case $1 in 
"start")
    for i in hadoop1 hadoop2 hadoop3
    do
        echo "--- 启动 $i kafka ---"
        ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
        done
;;
"stop")
    for i in hadoop1 hadoop2 hadoop3
    do
        echo "--- 停止 $i kafka ---"
        ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh "
        done
;;
esac
  • kafka主题使用

# 创建主题
bin/kafka-topics.sh --bootstrap-server hadoop1:9092 --topic one --create --partitions 1 --replication-factor 3
# 查看所有主题
bin/kafka-topics.sh --bootstrap-server hadoop1:9092 --list
# 查看主题详细描述
bin/kafka-topics.sh --bootstrap-server hadoop1:9092 --topic one --describe
  • kafka生产者使用

bin/kafka-console-producer.sh --bootstrap-server hadoop1:9092 --topic one 

生产者有main线程和sender线程,main中的分区器默认32M,DQuene默认16K
batch.size:16K,数据积累到16K后,sender才会发送或者
linger.ms:发果长时间不到batch.size,可以设置等待时间,默认0ms
应答机制
0:生产者不需要等待数据落盘应答
1:生产者要等Leader收到数据后应答
-1:生产者要等Leader和ISR队列中所有节点收齐数据后应答

异步发送

# 导入依赖
<dependencies>  
    <dependency>  
        <groupId>org.apache.kafka</groupId>  
        <artifactId>kafka-clients</artifactId>  
        <version>3.0.0</version>  
    </dependency>  
</dependencies>

KafkaProducer类

//callback换成.get()就是同步
public class KafkaProducer {
 
    public static void main(String[] args) {
        Properties properties = new Properties();
        //连接集群
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop1:9092,hadoop2:9092");
        //指定序列化类型
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); 
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("batch.size", 16384);
1       properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> kafkaProducer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(properties);
        for (int i = 1; i <= 600; i++) {
            //参数1:topic名, 参数2:消息文本; ProducerRecord多个重载的构造方法
            kafkaProducer.send(new ProducerRecord<String, String>("one", "message"+i),new Callback(){
                 @Override
                 public void onCompletion(RecordMetadata metadata, Exception exception){
                       if(exception == null){
                           System.out.println("主题:"+ metadata.topic() + "  分区: "+metadata.partition());
                       }
                 }
            });
            System.out.println("message"+i);
        }
        kafkaProducer.close();
    }
}
  • kafka消费者使用

bin/kafka-console-consumer.sh --bootstrap-server hadoop1:9092 --topic one 
# --from-beginning 从开始(历史数据也接收)接收
  • kafka自定义分区

properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.test.kafka.MyPartition");   //partition类名全路径

public class MyPartition implements Partitioner {
  private Random random = new Random();
  @Override
  public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
       String msgValues = value.toString();
        int partition;
        if(msgValues.contains("hello")){
            partition = 0;
        }else{
            partition = 1; 
       }
  }

  @Override
  public void close() {}

  @Override
  public void configure(Map<String, ?> configs) {}
}
  • 提高生产者吞吐量

# KafkaProducer类中加入
//缓冲区大小
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
//批次大小
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
//linger.ms
properties.put(ProducerConfig.LINGER_MS_CONFIG,1);
//压缩,压缩可配置gzip,snappy,lz4,zstd
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
  • ACK应答级别

1.acks=0,生产者发送过来的数据就不管,可靠性差,效率高
2.acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等
3.acks=-1,生产者发送过来数据Leader和ISR队列里所有Follwer应答,可靠性高,效率低。

//具体配置
properties.put(ProducerConfig.ACKS_CONFIG,"all");
//重试次数
properties.put(ProducerConfig.RETRIES_CONFIG,3);
  • kafka事务

注意:开启事务,必须开启幂等性,每个broker都有一个事务协调器,

//幂等性默认是开启的
enable.idempotence=true

生产者事务

# KafkaProducer类中加入
//0.指定事务id
properties.put(ProducerConfig.TRANSCATIONAL_ID_CONFIG,"transcational_id");
//1.初始化事务
kafkaProducer.initTransaction();
//2.开启事务
kafkaProducer.beginTransaction();
//3.发送数据
try{
    for (int i = 1; i <= 600; i++) {
            //参数1:topic名, 参数2:消息文本; ProducerRecord多个重载的构造方法
            kafkaProducer.send(new ProducerRecord<String, String>("one", "message"+i));
    }
    //4.提交事务
    kafkaProducer.commitTransaction();
}catch(Exception e){
    kafkaProducer.abortTransaction();
}finally{
    //关闭资源
    kafkaProducer.close();
}
  • 数据的有序与乱序

1.kafka在1.x版本之前保证数据单分区有序,条件是max.in.flight.requests.connection=1,无需考虑是否开启幂等性
2.1.x以后的版本,分为未开启幂等性max.in.flight.requests.connection=1,需要设置为1,开启幂等性,max.in.flight.requests.connection=需要设置小于等于5.
原因:因为1.x后,启用幂等后,kafka服务器会缓存producer发来的最近5个request的元数据,
故无论如何,都可以保证最近5个request的数据都是有序的。

相关文章

网友评论

      本文标题:Kafka学习(一)

      本文链接:https://www.haomeiwen.com/subject/lebthdtx.html