初始化
初始化,org.apache.kafka.clients.producer.KafkaProducer#newSender
// visible for testing
Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) {
//一个connection 上的最大请求数
//有个风险。一个connection上有同时多个请求,A,B,C。 然后B失败,C成功。 B再重试。顺序就不对了。
int maxInflightRequests = configureInflightRequests(producerConfig, transactionManager != null);
//发送请求的超时时长
int requestTimeoutMs = producerConfig.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
//构建 channel
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(producerConfig, time);
ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics);
Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics);
//如果没有指定 kafkaClient , 再初始化一个新的 client
KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient(
//初始化一个 selector 对象
new Selector(producerConfig.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
this.metrics, time, "producer", channelBuilder, logContext),
//元数据
metadata,
clientId,
maxInflightRequests,
producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
//TCP 一次发送的缓存大小,默认 -1,由操作系统OS决定
producerConfig.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
//TCP 一次接收的缓存大小,默认 -1,由操作系统OS决定
producerConfig.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
requestTimeoutMs,
//DNS服务
ClientDnsLookup.forConfig(producerConfig.getString(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG)),
time,
true,
apiVersions,
throttleTimeSensor,
logContext);
//重试次数
int retries = configureRetries(producerConfig, transactionManager != null, log);
//acks配置
short acks = configureAcks(producerConfig, transactionManager != null, log);
//初始化 Sender对象
return new Sender(logContext,
client,
metadata,
// 关键缓存 , 从RecordAccumulator 里读取缓存数据
this.accumulator,
//是否保证消息的顺序,只有等于1的情况下,才能保证
maxInflightRequests == 1,
//一次请求,发送的最大数据量
producerConfig.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
acks,
retries,
metricsRegistry.senderMetrics,
time,
requestTimeoutMs,
//失败到下次重试的等待时长
producerConfig.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
this.transactionManager,
apiVersions);
}
- 关于消息的顺序,
max.in.flight.requests.per.connection > 1时不保证消息的顺序 ,一个 connection 上同时只有一个请求在发送时,才会保证消息的顺序。
启动 run() 方法
/**
* KafkaProducer启动的时候会启动sender线程,sender 实现了 Runnable方法,启动执行 run() 方法。
*/
@Override
public void run() {
log.debug("Starting Kafka producer I/O thread.");
// main loop, runs until close is called
//启动初始化 running = true , 所以启动后就一直是死循环,直到调用了 close() 方法 , 修改 running = false
while (running) {
//启动之后死循环
try {
runOnce();
} catch (Exception e) {
//异常了也会循环,会是循环异常
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");
// okay we stopped accepting requests but there may still be
// requests in the transaction manager, accumulator or waiting for acknowledgment,
// wait until these are completed.
// 关了,但不是强关。那还还有剩下没发送完的,再循环直到发送完了
while (!forceClose && ((this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0) || hasPendingTransactionalRequests())) {
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
// Abort the transaction if any commit or abort didn't go through the transaction manager's queue
// 关了,但不是强关,放弃事务,把事务中的发出去
while (!forceClose && transactionManager != null && transactionManager.hasOngoingTransaction()) {
if (!transactionManager.isCompleting()) {
log.info("Aborting incomplete transaction due to shutdown");
transactionManager.beginAbort();
}
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
//强关,事务关了,accumulator中的数据都放弃掉
if (forceClose) {
// We need to fail all the incomplete transactional requests and batches and wake up the threads waiting on
// the futures.
if (transactionManager != null) {
log.debug("Aborting incomplete transactional requests due to forced shutdown");
transactionManager.close();
}
log.debug("Aborting incomplete batches due to forced shutdown");
//进行中的停止掉
//batches 中已写的,清除掉
this.accumulator.abortIncompleteBatches();
}
try {
// client 关了
this.client.close();
} catch (Exception e) {
log.error("Failed to close network client", e);
}
log.debug("Shutdown of Kafka producer I/O thread has completed.");
}
sendProducerData() ,准备节点(Node)和数据(data)
private long sendProducerData(long now) {
Cluster cluster = metadata.fetch();
// get the list of partitions with data ready to send
// 从 accumulator 判断并取出可以发送的 node
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
// if there are any partitions whose leaders are not known yet, force metadata update
// 当前缺失 leaders 的node 时,强制加刷新 metadata的标识,sender 将在本次结束后从 server 拉数据
if (!result.unknownLeaderTopics.isEmpty()) {
// The set of topics with unknown leader contains topics with leader election pending as well as
// topics which may have expired. Add the topic again to metadata to ensure it is included
// and request metadata update, since there are messages to send to the topic.
// 准备要拉元数据的 topic
for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic, now);
log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",
result.unknownLeaderTopics);
this.metadata.requestUpdate();
}
// remove any nodes we aren't ready to send to
// 遍历 node , 检查网络是否准备好了 , 网络环境没准备好的也不行。
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
}
}
// create produce requests
// 从accumulator 抽出来可以发送的数据
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
// 将这些batchs标识为正在 inflight (正在发送中)
addToInflightBatches(batches);
if (guaranteeMessageOrder) {
// 要保证消息的顺序 , 一个请求只发送一个node,在没有真正发出去之前,保证没有其他线程操作,不然顺序可能会有问题
// Mute all the partitions drained
// 直到 handlerResponse的时候,再解除禁用
for (List<ProducerBatch> batchList : batches.values()) {
for (ProducerBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition);
}
}
// 重置下一次的超时时间,这个是为了下一次循环的设置
accumulator.resetNextBatchExpiryTime();
//发送中的,超时还没发出去的batches
List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
//accumulator中的超时的batches
List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
expiredBatches.addAll(expiredInflightBatches);
// Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics
// for expired batches. see the documentation of @TransactionState.resetIdempotentProducerId to understand why
// we need to reset the producer id here.
// 超时的触发失败回调
if (!expiredBatches.isEmpty())
log.trace("Expired {} batches in accumulator", expiredBatches.size());
for (ProducerBatch expiredBatch : expiredBatches) {
String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
+ ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
failBatch(expiredBatch, -1, NO_TIMESTAMP, new TimeoutException(errorMessage), false);
if (transactionManager != null && expiredBatch.inRetry()) {
// This ensures that no new batches are drained until the current in flight batches are fully resolved.
transactionManager.markSequenceUnresolved(expiredBatch);
}
}
sensors.updateProduceRequestMetrics(batches);
// 如果有要发送的数据,那么 pollTime = 0,可以立即再次循环,发送更多数据。
// 否则(指没是有数据要发送),pollTime 将是一个比 batch 超时略小的值,这个延时是为了检查数据(元数据)的有效性。
// 有几个特别的场景,指等待中的,失败等待重试中的,这些是不算在Node里的。
// 计算最小拉元数据的时长
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
pollTimeout = Math.max(pollTimeout, 0);
if (!result.readyNodes.isEmpty()) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);
// 有发送要Node ,那么 pollTimeout = 0 ,立即开始下次循环
pollTimeout = 0;
}
//将 batches 发送到 request
sendProduceRequests(batches, now);
return pollTimeout;
}
看图:
Sender-sendProducerData-时序图.png
sendProduceRequests 数据发送到 Request 里
private void sendProduceRequests(Map<Integer, List<ProducerBatch>> collated, long now) {
for (Map.Entry<Integer, List<ProducerBatch>> entry : collated.entrySet())
sendProduceRequest(now, entry.getKey(), acks, requestTimeoutMs, entry.getValue());
}
这里很简单,遍历所有的ProducerBatch,然后去发送。
sendProduceRequest 发送的逻辑
== sendProduceRequest 开始 ==
private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
四个参数:
- now , 当前时间点的时间戳
- destination, 节点id
- acks,响应模式(面试必问的,快执行到了)
- timeout , 超时时长
- batches 所有的数据
//判空
if (batches.isEmpty())
return;
// 保存 topicPartition , 和 数据
Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());
// 保存 topicPartition 和 缓存数据
final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());
- 先判空,没有要发送的数据就直接结束掉
- 又弄了两个用来倒手的数据结构,后面用到的
// find the minimum magic version used when creating the record sets
// batch 里有可能有各种版本号, 遍历取出来最小的版本号。
byte minUsedMagic = apiVersions.maxUsableProduceMagic();
for (ProducerBatch batch : batches) {
if (batch.magic() < minUsedMagic)
minUsedMagic = batch.magic();
}
- 遍历取出最小的版本号,
- 因为客户端的版本和服务端的版本不一定兼容的,都按最小版本号处理。
for (ProducerBatch batch : batches) {
TopicPartition tp = batch.topicPartition;
// batch 取出 缓存数据
MemoryRecords records = batch.records();
// 按最小的版本号,向下兼容
if (!records.hasMatchingMagic(minUsedMagic))
records = batch.records().downConvert(minUsedMagic, 0, time).records();
produceRecordsByPartition.put(tp, records);
recordsByPartition.put(tp, batch);
}
这里还是做版本号的兼容。如果数据里有版本差比较大的,做个向下兼容。还是按最小版本号来。
// 根据版本号构建 requestBuilder 对象
ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,
produceRecordsByPartition, transactionalId);
RequestCompletionHandler callback = response -> handleProduceResponse(response, recordsByPartition, time.milliseconds());
- 根据版本号,构建request
- 定义回调方法,send完事后回执行回调
String nodeId = Integer.toString(destination);
ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0,
requestTimeoutMs, callback);
client.send(clientRequest, now);
取到 node 节点,构建一个 request,并最终发送请求
== sendProduceRequest 结束 ==
handleProduceResponse 发送完的回调方法
== handleProduceResponse 开始 ==
RequestHeader requestHeader = response.requestHeader();
int correlationId = requestHeader.correlationId();
- 取到 请求的header ,
- correlationId , 相当于当前请求的id
if (response.wasDisconnected()) {
log.trace("Cancelled request with header {} due to node {} being disconnected",
requestHeader, response.destination());
for (ProducerBatch batch : batches.values())
completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NETWORK_EXCEPTION), correlationId, now);
}
如果没有连接上,回调一个 NETWORK_EXCEPTION 的异常。
else if (response.versionMismatch() != null) {
log.warn("Cancelled request {} due to a version mismatch with node {}",
response, response.destination(), response.versionMismatch());
for (ProducerBatch batch : batches.values())
completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.UNSUPPORTED_VERSION), correlationId, now);
}
如果是版本不匹配,回调一个 UNSUPPORTED_VERSION 的异常. 这里是与服务端(broker)API不兼容,没办法执行
else {
log.trace("Received produce response from node {} with correlation id {}", response.destination(), correlationId);
// if we have a response, parse it
if (response.hasResponse()) {
//有响应,做下正常的回调
ProduceResponse produceResponse = (ProduceResponse) response.responseBody();
for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) {
TopicPartition tp = entry.getKey();
ProduceResponse.PartitionResponse partResp = entry.getValue();
ProducerBatch batch = batches.get(tp);
completeBatch(batch, partResp, correlationId, now);
}
this.sensors.recordLatency(response.destination(), response.requestLatencyMs());
} else {
// 超时没响应,但还要回调
// this is the acks = 0 case, just complete all requests
for (ProducerBatch batch : batches.values()) {
completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NONE), correlationId, now);
}
}
}
- 有正常响应,就正常处理,回调结果集。
- 超时没有响应,但acks配置的0 ,必须得有回调,那么回调个
NONE的异常
== handleProduceResponse 结束 ==
completeBatch 完成最后的处理动作
== completeBatch 开始 ==
private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId,
long now) {
- batch , 请求的数据
- response, 请求的响应,
- correlationId , 当前请求的ID
- now, 当前时间戳
Errors error = response.error;
直接先提取Errors,Errors有以下几种。
// 数据量过大 && 一批里有多个 record && batch还没处理完 && (已经是最大版本了 || batch已压缩过了)
if (error == Errors.MESSAGE_TOO_LARGE && batch.recordCount > 1 && !batch.isDone() &&
(batch.magic() >= RecordBatch.MAGIC_VALUE_V2 || batch.isCompressed())) {
// If the batch is too large, we split the batch and send the split batches again. We do not decrement
// the retry attempts in this case.
log.warn(
"Got error produce response in correlation id {} on topic-partition {}, splitting and retrying ({} attempts left). Error: {}",
correlationId,
batch.topicPartition,
this.retries - batch.attempts(),
error);
if (transactionManager != null)
transactionManager.removeInFlightBatch(batch);
this.accumulator.splitAndReenqueue(batch);
maybeRemoveAndDeallocateBatch(batch);
this.sensors.recordBatchSplit();
}
排除版本原因,确实是数据量过大了 。
- 把数据拆了(split),再栋回队列里,等待下次发送
- 可能的话,释放一些内存空间
//有异常,不是NONE
else if (error != Errors.NONE) {
//能重试的,再重试一次
if (canRetry(batch, response, now)) {
log.warn(
"Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}",
correlationId,
batch.topicPartition,
this.retries - batch.attempts() - 1,
error);
reenqueueBatch(batch, now);
} else if (error == Errors.DUPLICATE_SEQUENCE_NUMBER) {
// 序列号重复了
// If we have received a duplicate sequence error, it means that the sequence number has advanced beyond
// the sequence of the current batch, and we haven't retained batch metadata on the broker to return
// the correct offset and timestamp.
//
// The only thing we can do is to return success to the user and not return a valid offset and timestamp.
completeBatch(batch, response);
} else {
final RuntimeException exception;
if (error == Errors.TOPIC_AUTHORIZATION_FAILED)
exception = new TopicAuthorizationException(Collections.singleton(batch.topicPartition.topic()));
else if (error == Errors.CLUSTER_AUTHORIZATION_FAILED)
exception = new ClusterAuthorizationException("The producer is not authorized to do idempotent sends");
else
exception = error.exception();
// tell the user the result of their request. We only adjust sequence numbers if the batch didn't exhaust
// its retries -- if it did, we don't know whether the sequence number was accepted or not, and
// thus it is not safe to reassign the sequence.
failBatch(batch, response, exception, batch.attempts() < this.retries);
}
if (error.exception() instanceof InvalidMetadataException) {
if (error.exception() instanceof UnknownTopicOrPartitionException) {
log.warn("Received unknown topic or partition error in produce request on partition {}. The " +
"topic-partition may not exist or the user may not have Describe access to it",
batch.topicPartition);
} else {
log.warn("Received invalid metadata error in produce request on partition {} due to {}. Going " +
"to request metadata update now", batch.topicPartition, error.exception().toString());
}
metadata.requestUpdate();
}
}
- 先不管异常,直接重试,重试还不行再说
- 序列异常,按注释,有这个异常,说明服务端的序列号超出了本地的序列号了,服务端没有把最新的 metadata 同步到本地来。只能当成功来处理,但不返回有效的offset
- 其他必要的异常都撸一把。
- metadata异常,去更新 metadata。
else {
completeBatch(batch, response);
}
再就是正常的完成处理
// Unmute the completed partition.
if (guaranteeMessageOrder)
this.accumulator.unmutePartition(batch.topicPartition);
为了保证顺序,发送前锁定了一些partition , 现在处理完可以释放了。
== completeBatch 结束 ==
梳理
Sender 差不多完了,监控指标和事务的略过。总结流程和重点吧。
- Sender 是单线程死循环去执行。且没有循环之间没有等待时长。
- RecordAccumulator 是重点缓存数据的地方,Sender的数据来源全部在RecordAccumulator
- 其实梳理可以看到,发送前做足了数据校验。
如果文章有帮助到您,请点个赞,您的反馈会让我感到文章是有价值的















网友评论