美文网首页
KafkaProducer之Sender

KafkaProducer之Sender

作者: _孙行者_ | 来源:发表于2020-09-17 14:19 被阅读0次

初始化

初始化,org.apache.kafka.clients.producer.KafkaProducer#newSender

// visible for testing
    Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) {
        //一个connection 上的最大请求数
        //有个风险。一个connection上有同时多个请求,A,B,C。 然后B失败,C成功。 B再重试。顺序就不对了。
        int maxInflightRequests = configureInflightRequests(producerConfig, transactionManager != null);

        //发送请求的超时时长
        int requestTimeoutMs = producerConfig.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
        //构建 channel
        ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(producerConfig, time);
        ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics);
        Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics);
        //如果没有指定 kafkaClient , 再初始化一个新的 client
        KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient(
                //初始化一个 selector 对象
                new Selector(producerConfig.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
                        this.metrics, time, "producer", channelBuilder, logContext),
                //元数据
                metadata,
                clientId,
                maxInflightRequests,
                producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
                producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
                //TCP 一次发送的缓存大小,默认 -1,由操作系统OS决定
                producerConfig.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
                //TCP 一次接收的缓存大小,默认 -1,由操作系统OS决定
                producerConfig.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
                requestTimeoutMs,
                //DNS服务
                ClientDnsLookup.forConfig(producerConfig.getString(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG)),
                time,
                true,
                apiVersions,
                throttleTimeSensor,
                logContext);
        //重试次数
        int retries = configureRetries(producerConfig, transactionManager != null, log);
        //acks配置
        short acks = configureAcks(producerConfig, transactionManager != null, log);
        //初始化 Sender对象
        return new Sender(logContext,
                client,
                metadata,
                // 关键缓存 , 从RecordAccumulator 里读取缓存数据
                this.accumulator,
                //是否保证消息的顺序,只有等于1的情况下,才能保证
                maxInflightRequests == 1,
                //一次请求,发送的最大数据量
                producerConfig.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
                acks,
                retries,
                metricsRegistry.senderMetrics,
                time,
                requestTimeoutMs,
                //失败到下次重试的等待时长
                producerConfig.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
                this.transactionManager,
                apiVersions);
    }
  • 关于消息的顺序,max.in.flight.requests.per.connection > 1时不保证消息的顺序 ,一个 connection 上同时只有一个请求在发送时,才会保证消息的顺序。

启动 run() 方法

/**
     * KafkaProducer启动的时候会启动sender线程,sender 实现了 Runnable方法,启动执行 run() 方法。
     */
    @Override
    public void run() {
        log.debug("Starting Kafka producer I/O thread.");

        // main loop, runs until close is called
        //启动初始化 running = true , 所以启动后就一直是死循环,直到调用了 close() 方法 , 修改 running = false 
        while (running) {
            //启动之后死循环
            try {
                runOnce();
            } catch (Exception e) {
                //异常了也会循环,会是循环异常
                log.error("Uncaught error in kafka producer I/O thread: ", e);
            }
        }

        log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");

        // okay we stopped accepting requests but there may still be
        // requests in the transaction manager, accumulator or waiting for acknowledgment,
        // wait until these are completed.
        // 关了,但不是强关。那还还有剩下没发送完的,再循环直到发送完了
        while (!forceClose && ((this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0) || hasPendingTransactionalRequests())) {
            try {
                runOnce();
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", e);
            }
        }

        // Abort the transaction if any commit or abort didn't go through the transaction manager's queue
        // 关了,但不是强关,放弃事务,把事务中的发出去
        while (!forceClose && transactionManager != null && transactionManager.hasOngoingTransaction()) {
            if (!transactionManager.isCompleting()) {
                log.info("Aborting incomplete transaction due to shutdown");
                transactionManager.beginAbort();
            }
            try {
                runOnce();
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", e);
            }
        }
        //强关,事务关了,accumulator中的数据都放弃掉
        if (forceClose) {
            // We need to fail all the incomplete transactional requests and batches and wake up the threads waiting on
            // the futures.
            if (transactionManager != null) {
                log.debug("Aborting incomplete transactional requests due to forced shutdown");
                transactionManager.close();
            }
            log.debug("Aborting incomplete batches due to forced shutdown");
            //进行中的停止掉
            //batches 中已写的,清除掉
            this.accumulator.abortIncompleteBatches();
        }
        try {
            // client 关了
            this.client.close();
        } catch (Exception e) {
            log.error("Failed to close network client", e);
        }

        log.debug("Shutdown of Kafka producer I/O thread has completed.");
    }

sendProducerData() ,准备节点(Node)和数据(data)

    private long sendProducerData(long now) {
        Cluster cluster = metadata.fetch();
        // get the list of partitions with data ready to send
        // 从 accumulator 判断并取出可以发送的 node
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

        // if there are any partitions whose leaders are not known yet, force metadata update
        // 当前缺失 leaders 的node 时,强制加刷新 metadata的标识,sender 将在本次结束后从 server 拉数据
        if (!result.unknownLeaderTopics.isEmpty()) {
            // The set of topics with unknown leader contains topics with leader election pending as well as
            // topics which may have expired. Add the topic again to metadata to ensure it is included
            // and request metadata update, since there are messages to send to the topic.
            // 准备要拉元数据的 topic
            for (String topic : result.unknownLeaderTopics)
                this.metadata.add(topic, now);

            log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",
                    result.unknownLeaderTopics);
            this.metadata.requestUpdate();
        }

        // remove any nodes we aren't ready to send to
        // 遍历 node , 检查网络是否准备好了 , 网络环境没准备好的也不行。
        Iterator<Node> iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;
        while (iter.hasNext()) {
            Node node = iter.next();
            if (!this.client.ready(node, now)) {
                iter.remove();
                notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
            }
        }

        // create produce requests
        // 从accumulator 抽出来可以发送的数据
        Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
        // 将这些batchs标识为正在 inflight (正在发送中)
        addToInflightBatches(batches);
        if (guaranteeMessageOrder) {
            // 要保证消息的顺序 , 一个请求只发送一个node,在没有真正发出去之前,保证没有其他线程操作,不然顺序可能会有问题
            // Mute all the partitions drained
            // 直到 handlerResponse的时候,再解除禁用
            for (List<ProducerBatch> batchList : batches.values()) {
                for (ProducerBatch batch : batchList)
                    this.accumulator.mutePartition(batch.topicPartition);
            }
        }
        // 重置下一次的超时时间,这个是为了下一次循环的设置
        accumulator.resetNextBatchExpiryTime();
        //发送中的,超时还没发出去的batches
        List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
        //accumulator中的超时的batches
        List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
        expiredBatches.addAll(expiredInflightBatches);

        // Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics
        // for expired batches. see the documentation of @TransactionState.resetIdempotentProducerId to understand why
        // we need to reset the producer id here.
        // 超时的触发失败回调
        if (!expiredBatches.isEmpty())
            log.trace("Expired {} batches in accumulator", expiredBatches.size());
        for (ProducerBatch expiredBatch : expiredBatches) {
            String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
                    + ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
            failBatch(expiredBatch, -1, NO_TIMESTAMP, new TimeoutException(errorMessage), false);
            if (transactionManager != null && expiredBatch.inRetry()) {
                // This ensures that no new batches are drained until the current in flight batches are fully resolved.
                transactionManager.markSequenceUnresolved(expiredBatch);
            }
        }
        sensors.updateProduceRequestMetrics(batches);

        // 如果有要发送的数据,那么 pollTime = 0,可以立即再次循环,发送更多数据。
        // 否则(指没是有数据要发送),pollTime 将是一个比 batch 超时略小的值,这个延时是为了检查数据(元数据)的有效性。
        // 有几个特别的场景,指等待中的,失败等待重试中的,这些是不算在Node里的。
        // 计算最小拉元数据的时长
        long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
        pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
        pollTimeout = Math.max(pollTimeout, 0);
        if (!result.readyNodes.isEmpty()) {
            log.trace("Nodes with data ready to send: {}", result.readyNodes);
            // 有发送要Node ,那么 pollTimeout = 0 ,立即开始下次循环
            pollTimeout = 0;
        }
        //将 batches 发送到 request
        sendProduceRequests(batches, now);
        return pollTimeout;
    }   

看图:


Sender-sendProducerData-时序图.png

sendProduceRequests 数据发送到 Request 里

private void sendProduceRequests(Map<Integer, List<ProducerBatch>> collated, long now) {
        for (Map.Entry<Integer, List<ProducerBatch>> entry : collated.entrySet())
            sendProduceRequest(now, entry.getKey(), acks, requestTimeoutMs, entry.getValue());
    }

这里很简单,遍历所有的ProducerBatch,然后去发送。

sendProduceRequest 发送的逻辑

== sendProduceRequest 开始 ==

private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {

四个参数:

  • now , 当前时间点的时间戳
  • destination, 节点id
  • acks,响应模式(面试必问的,快执行到了)
  • timeout , 超时时长
  • batches 所有的数据
//判空
if (batches.isEmpty())
  return;

// 保存 topicPartition , 和 数据
Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());
// 保存 topicPartition 和 缓存数据
final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());
  • 先判空,没有要发送的数据就直接结束掉
  • 又弄了两个用来倒手的数据结构,后面用到的
// find the minimum magic version used when creating the record sets
// batch 里有可能有各种版本号, 遍历取出来最小的版本号。
byte minUsedMagic = apiVersions.maxUsableProduceMagic();
for (ProducerBatch batch : batches) {
    if (batch.magic() < minUsedMagic)
        minUsedMagic = batch.magic();
}
  • 遍历取出最小的版本号,
  • 因为客户端的版本和服务端的版本不一定兼容的,都按最小版本号处理。
for (ProducerBatch batch : batches) {
    TopicPartition tp = batch.topicPartition;
    // batch 取出 缓存数据
    MemoryRecords records = batch.records();

    // 按最小的版本号,向下兼容
    if (!records.hasMatchingMagic(minUsedMagic))
        records = batch.records().downConvert(minUsedMagic, 0, time).records();
    produceRecordsByPartition.put(tp, records);
    recordsByPartition.put(tp, batch);
}

这里还是做版本号的兼容。如果数据里有版本差比较大的,做个向下兼容。还是按最小版本号来。

// 根据版本号构建 requestBuilder 对象
ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,
        produceRecordsByPartition, transactionalId);
RequestCompletionHandler callback = response -> handleProduceResponse(response, recordsByPartition, time.milliseconds());
  • 根据版本号,构建request
  • 定义回调方法,send完事后回执行回调
String nodeId = Integer.toString(destination);
ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0,
        requestTimeoutMs, callback);
client.send(clientRequest, now);

取到 node 节点,构建一个 request,并最终发送请求

== sendProduceRequest 结束 ==

handleProduceResponse 发送完的回调方法

== handleProduceResponse 开始 ==

RequestHeader requestHeader = response.requestHeader();
int correlationId = requestHeader.correlationId();
  • 取到 请求的header ,
  • correlationId , 相当于当前请求的id
if (response.wasDisconnected()) {
    log.trace("Cancelled request with header {} due to node {} being disconnected",
        requestHeader, response.destination());
    for (ProducerBatch batch : batches.values())
        completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NETWORK_EXCEPTION), correlationId, now);
}

如果没有连接上,回调一个 NETWORK_EXCEPTION 的异常。

else if (response.versionMismatch() != null) {
    log.warn("Cancelled request {} due to a version mismatch with node {}",
            response, response.destination(), response.versionMismatch());
    for (ProducerBatch batch : batches.values())
        completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.UNSUPPORTED_VERSION), correlationId, now);
}

如果是版本不匹配,回调一个 UNSUPPORTED_VERSION 的异常. 这里是与服务端(broker)API不兼容,没办法执行

else {
    log.trace("Received produce response from node {} with correlation id {}", response.destination(), correlationId);
    // if we have a response, parse it
    if (response.hasResponse()) {
        //有响应,做下正常的回调
        ProduceResponse produceResponse = (ProduceResponse) response.responseBody();
        for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) {
            TopicPartition tp = entry.getKey();
            ProduceResponse.PartitionResponse partResp = entry.getValue();
            ProducerBatch batch = batches.get(tp);
            completeBatch(batch, partResp, correlationId, now);
        }
        this.sensors.recordLatency(response.destination(), response.requestLatencyMs());
    } else {
        // 超时没响应,但还要回调
        // this is the acks = 0 case, just complete all requests
        for (ProducerBatch batch : batches.values()) {
            completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NONE), correlationId, now);
        }
    }
}
  • 有正常响应,就正常处理,回调结果集。
  • 超时没有响应,但acks配置的0 ,必须得有回调,那么回调个 NONE 的异常
    == handleProduceResponse 结束 ==

completeBatch 完成最后的处理动作

== completeBatch 开始 ==

private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId,
                               long now) {
  • batch , 请求的数据
  • response, 请求的响应,
  • correlationId , 当前请求的ID
  • now, 当前时间戳
Errors error = response.error;

直接先提取Errors,Errors有以下几种。

// 数据量过大 && 一批里有多个 record && batch还没处理完 && (已经是最大版本了 || batch已压缩过了)
if (error == Errors.MESSAGE_TOO_LARGE && batch.recordCount > 1 && !batch.isDone() &&
        (batch.magic() >= RecordBatch.MAGIC_VALUE_V2 || batch.isCompressed())) {
    // If the batch is too large, we split the batch and send the split batches again. We do not decrement
    // the retry attempts in this case.
    log.warn(
        "Got error produce response in correlation id {} on topic-partition {}, splitting and retrying ({} attempts left). Error: {}",
        correlationId,
        batch.topicPartition,
        this.retries - batch.attempts(),
        error);
    if (transactionManager != null)
        transactionManager.removeInFlightBatch(batch);
    this.accumulator.splitAndReenqueue(batch);
    maybeRemoveAndDeallocateBatch(batch);
    this.sensors.recordBatchSplit();
}

排除版本原因,确实是数据量过大了 。

  • 把数据拆了(split),再栋回队列里,等待下次发送
  • 可能的话,释放一些内存空间
//有异常,不是NONE
else if (error != Errors.NONE) {
    //能重试的,再重试一次
    if (canRetry(batch, response, now)) {
        log.warn(
            "Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}",
            correlationId,
            batch.topicPartition,
            this.retries - batch.attempts() - 1,
            error);
        reenqueueBatch(batch, now);
    } else if (error == Errors.DUPLICATE_SEQUENCE_NUMBER) {
        // 序列号重复了
        // If we have received a duplicate sequence error, it means that the sequence number has advanced beyond
        // the sequence of the current batch, and we haven't retained batch metadata on the broker to return
        // the correct offset and timestamp.
        //
        // The only thing we can do is to return success to the user and not return a valid offset and timestamp.
        completeBatch(batch, response);
    } else {
        final RuntimeException exception;
        if (error == Errors.TOPIC_AUTHORIZATION_FAILED)
            exception = new TopicAuthorizationException(Collections.singleton(batch.topicPartition.topic()));
        else if (error == Errors.CLUSTER_AUTHORIZATION_FAILED)
            exception = new ClusterAuthorizationException("The producer is not authorized to do idempotent sends");
        else
            exception = error.exception();
        // tell the user the result of their request. We only adjust sequence numbers if the batch didn't exhaust
        // its retries -- if it did, we don't know whether the sequence number was accepted or not, and
        // thus it is not safe to reassign the sequence.
        failBatch(batch, response, exception, batch.attempts() < this.retries);
    }
    if (error.exception() instanceof InvalidMetadataException) {
        if (error.exception() instanceof UnknownTopicOrPartitionException) {
            log.warn("Received unknown topic or partition error in produce request on partition {}. The " +
                    "topic-partition may not exist or the user may not have Describe access to it",
                batch.topicPartition);
        } else {
            log.warn("Received invalid metadata error in produce request on partition {} due to {}. Going " +
                    "to request metadata update now", batch.topicPartition, error.exception().toString());
        }
        metadata.requestUpdate();
    }
}
  • 先不管异常,直接重试,重试还不行再说
  • 序列异常,按注释,有这个异常,说明服务端的序列号超出了本地的序列号了,服务端没有把最新的 metadata 同步到本地来。只能当成功来处理,但不返回有效的offset
  • 其他必要的异常都撸一把。
  • metadata异常,去更新 metadata。
else {
    completeBatch(batch, response);
}

再就是正常的完成处理

// Unmute the completed partition.
if (guaranteeMessageOrder)
    this.accumulator.unmutePartition(batch.topicPartition);

为了保证顺序,发送前锁定了一些partition , 现在处理完可以释放了。

== completeBatch 结束 ==

梳理

Sender 差不多完了,监控指标和事务的略过。总结流程和重点吧。

  • Sender 是单线程死循环去执行。且没有循环之间没有等待时长。
  • RecordAccumulator 是重点缓存数据的地方,Sender的数据来源全部在RecordAccumulator
  • 其实梳理可以看到,发送前做足了数据校验。

如果文章有帮助到您,请点个赞,您的反馈会让我感到文章是有价值的

相关文章

网友评论

      本文标题:KafkaProducer之Sender

      本文链接:https://www.haomeiwen.com/subject/qepmektx.html