美文网首页
Kafka系列 - 生产者详解

Kafka系列 - 生产者详解

作者: 家硕先生 | 来源:发表于2020-09-07 16:59 被阅读0次

前言

一个消息队列,必然存在着生产者和消费者,而生产者(Producer)负责向Kafka服务节点(Broker)。

从一个示例开始

public class KafkaProducerDemo {
    private static final String brokerList = "localhost:9092";
    private static final String topic = "topic-demo";
    
    public static Properties initConfig() {
        Properties props = new Properties();
        props.put("bootstrap.server", brokerList);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("client.id", "id.demo");
        return props;
    }

    public static void main(String[] args) throw Exception {
        Properties props = initConfig();
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        ProducerRecord<String, String> record = new ProducerRecord<topic, "Message Test">;
        producer.send(record);
    }
}

1. 生产者客户端

1.1 生产者创建及参数说明

创建Kafka生产者客户端KafkaProducer有3个参数必填项:
1)bootstrap.servers:指定生产者客户端连接Kafka集群的地址列表,多个以逗号隔开(如:127.0.0.1:9092,127.0.0.1:9093)。连接Kafka集群并不需要配置所有的broker地址,因为生产者能从broker获取到其他broker的信息,一般至少设置两个,一个broker宕机时也仍然能连接到Kafka集群。
2)key.serializer:将Key(可以用key来计算分区号,从而将消息归类到某个指定分区)序列化成字节数组。
3)value.serializer:将value序列化成字节数组。

1.2 消息发送

根据参数构建完生产者后,就是创建消息对象 ProducerRecord,属性如下:

public class ProducerRecord<K, V> {
    private final String topic;
    private final Integer partitoin;
    private final Headers headers; 
    private final K key;
    private final V value;
    private final Long timestamp; 
}
  • 其中 topicvalue 为必填项,其余属性可选填
  • key 用来指定消息的键,属于消息的附加信息,可以用来计算分区号,让消息可以发往特定的分区,除了 topic 外消息的二次归类,即同一个key的消息会被划分到同一个分区

发送消息的三种模式

发送消息的方法本身是异步的,同步只是在调用方法后对后续操作进行了阻塞

发送消息方法如下:

public class KafkaProducer<K, V> implements Producer<K, V> {
    // ...
    public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
        return this.send(record, (Callback)null);
    }

    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
        return this.doSend(interceptedRecord, callback);
    }
    // ...
}
  • 发后即忘
producer.send(record);
  • 同步
Future<RecordMetadata> future = producer.send(record);
// 调用 get方法阻塞等待响应,从而达到同步效果
RecordMetadata metadata = future.get(); 
  • 异步
    利用send()方法的Callback,在Kafka返回响应时调用该函数实现异步的操作,如:
producer.send(record, new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        if (exception != null) {
            log.error("send message exception:{}", exception);
        } else {
            log.info("send message success, topic:{} - partition:{} - offset:{}", metadata.topic(), metadata.partition(), metadata.offset());
        }
    }
});

通常KafkaProducer不会只负责发送单条消息,一般是发送多条消息。对于同一分区的不同消息,先发送的消息,回调也会先执行,即回调函数的执行是分区有序的。

关于close方法
发送完消息后,需要调用KafkaProducer.close()方法回收资源。
close()方法会阻塞等待之前所有的发送请求完成后再关闭KafkaProducer。
也提供了带有超时时间的close方法,在超过等待时间后会强行关闭KafkaProducer,一般不建议使用。

1.3 序列化

生产者需要用序列化器(Serializer)把对象转换成字节数组才能通过网络发送给Kafka,同样的,消费者需要用反序列化器(Deserializer)把从Kafka中收到的字节数组转换成相应的对象。

序列化器均实现接口:

public interface Serializer<T> extends Closeable {
   
    default void configure(Map<String, ?> configs, boolean isKey) {
    }

    byte[] serialize(String var1, T var2);

    default byte[] serialize(String topic, Headers headers, T data) {
        return this.serialize(topic, data);
    }

    default void close() {
    }
}

1)configure()方法在KafkaProducer创建时调用,用来配置当前类,如编码类型的确定。
2)serialize()方法用来执行序列化操作。
3)close()方法用来关闭当前的序列化器(一般是个空方法),该方法可能会被KafkaProducer调用多次,实现的话需要保证方法的幂等性。

默认实现的序列化器如:
org.apache.kafka.common.serialization.StringSerializer

1.4 分区器

消息经过send()方法发往broker的过程中,有可能需要经过拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)才会被送达broker。

如果消息ProducerRecord 指定了 partition 字段,就不需要分区器进行分区,因为分区已被 partition 指定。

分区器默认需要实现接口:

public interface Partitioner extends Configurable, Closeable {
    int partition(String var1, Object var2, byte[] var3, Object var4, byte[] var5, Cluster var6);

    void close();

    default void onNewBatch(String topic, Cluster cluster, int prevPartition) {
    }
}

默认实现:org.apache.kafka.clients.producer.internals.DefaultPartitioner

public class DefaultPartitioner implements Partitioner {
    // ...
    public void configure(Map<String, ?> configs) {
    }

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        return this.partition(topic, key, keyBytes, value, valueBytes, cluster, cluster.partitionsForTopic(topic).size());
    }

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster, int numPartitions) {
        return keyBytes == null ? this.stickyPartitionCache.partition(topic, cluster) : Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }

    public void close() {
    }
}

默认分区器DefaultPartitioner的分区规则:

  • 如果 key 不为null,则对 key 进行哈希,最终根据得到的哈希值来计算分区号,拥有相同 key 的消息会被写入同一个分区(计算得到的分区号会是所有分区中的任意一个,与key为null是轮询可用分区有差别)。
  • 如果 key 为null,消息会已轮询的方式发往 topic 内的各个可用分区
1.5 拦截器

生产者拦截器可以用来在消息发送前做一些准备工作,如按指定规则过滤不符合要求的消息,对消息内容进行加工处理等;也可以用来在发送回调逻辑前做一些操作,比如统计消息发送的成功率。

public interface ProducerInterceptor<K, V> extends Configurable {
    ProducerRecord<K, V> onSend(ProducerRecord<K, V> var1);

    void onAcknowledgement(RecordMetadata var1, Exception var2);

    void close();
}

1)将消息序列化和计算分区之前会调用拦截器的onSend()方法对消息进行相应的定制化操作(一般不要对topic、key 和 partition进行修改)。
2)在消息被应答(Acknowledgement)之前或消息发送失败时调用生产者拦截器的 onAcknowledgement()方法,且优先于Callback。该方法运行在Producer的IO线程,应尽量简单,不然会影响消息发送效率。

2. 原理分析

2.1 生产者客户端架构
生产者客户端整体架构(图片来源https://blog.csdn.net/LINBE_blazers/article/details/104072886

整个生产者客户端由主线程和Sender线程协调运行。主线程中创建消息,然后通过拦截器、序列化器和分区器处理后缓存到消息累加器(RecordAccumulator),Sender线程负责从消息累加器中获取消息并发送到broker。

RecordAccumulator用来缓存消息便于Sender线程可以批量发送,减少网络传输的资源消耗,从而提高性能。如果生产者发送消息的速度超过Sender发送到服务器的速度,会导致生产者空间不足,将会阻塞一段时间后,抛出异常,与参数max.block.ms有关,默认60秒。而buffer.memory参数则可以设置缓存空间大小,默认为32MB。

RecordAccumulator中为每个分区维护了一个双端队列,队列中的内容是ProducerBatch,即Deque<ProduderBatch>,创建消息写入到尾部,发送消息从头部读取。ProducerBatch是消息发送的一个批次,里面包含了一个或多个ProducerRecord。

Sender从RecordAccumulator中获取到缓存的消息之后,会进一步将<分区,Dequeue<ProducerBatch>>
转换为<Node,List<ProruderBatch>>,Node表示的是kafka集群的broker节点。这里是一个概念的转变,对于网络连接来说,生产者客户端与具体broker节点建立的连接,也就是向具体的broker节点发送消息而不关心具体分区。而对于KafkaProducer来说,它只关心向哪个分区发送消息。所以这里做一个从应用逻辑层面到网络IO层面的转换。

请求在发送给Kafka之前还会保存到InFlightRequests中,形式为: Map<NodeId,Dequeue<Request>>
主要作用是缓存了已经发出去但是还未收到响应的请求。InFlightRequests通过配置参数max.flight.requests.per.connection可以限制每个链接最多缓存数量,默认值为5,即每个链接最多只能缓存5个未响应的请求,超过该参数之后就不能继续像这个连接发送请求。

相关文章

  • Kafka系列 - 生产者详解

    前言 一个消息队列,必然存在着生产者和消费者,而生产者(Producer)负责向Kafka服务节点(Broker)...

  • kafka Consumer — offset的控制

    前言 在N久之前,曾写过kafka 生产者使用详解,今天补上关于 offset 相关的内容。那么本文主要涉及: K...

  • kafka0.8

    1、Kafka分为:生产者(producer),消费者(consumer) 2、生产者提交消息,给Kafka集群,...

  • Kafka - 生产者初步学习

    Kafka - 生产者初步学习 一、kafka生产者组件 我们从创建一个 ProducerRecord 对象开始,...

  • kafka 生产者使用详解

    前言 看完本文你将学会以下知识: kafka 数据的生产大致流程 如何创建并使用 kafka生产者 kafka生产...

  • Kafka生产者:写消息到Kafka

    本章我们将会讨论Kafka生产者是如何发送消息到Kafka的。Kafka项目有一个生产者客户端,我们可以通过这个客...

  • [kafka系列]之producer端消息发送

    本小节我们来讨论Kafka生产者是如何发送消息到Kafka的, Kafka项目有一个生产者客户端,我们可以通过这个...

  • Kafka生产者:写消息到Kafka

    本章我们将会讨论Kafka生产者是如何发送消息到Kafka的。Kafka项目有一个生产者客户端,我们可以通过这个客...

  • Kafka生产者:写消息到Kafka

    本章我们将会讨论Kafka生产者是如何发送消息到Kafka的。Kafka项目有一个生产者客户端,我们可以通过这个客...

  • kafka使用

    框架: spring-kafka 1.2.2 生产者端代码 说明: bootstrapServers: kafka...

网友评论

      本文标题:Kafka系列 - 生产者详解

      本文链接:https://www.haomeiwen.com/subject/gmlcektx.html