美文网首页
kafka偏移量不更新、重复消费问题

kafka偏移量不更新、重复消费问题

作者: Divenier | 来源:发表于2023-04-25 18:27 被阅读0次

最近负责的短信项目业务量增长较多,使用kafka异步发送短信的过程遇到了一些问题,在业务和线上环境上具体的表现就是:

  • 外部请求一直得不到处理,kafka的消息出现大量积压。
  • 部分请求被反复处理,其对应的kafka消息被反复消费。

对于发送短信这种实时性要求较高的项目,五分钟以前的请求未得到处理的话,基本就已经晚了。不管短信内容是数分钟有效的验证码,还是运维的监控告警,五分钟的延迟都会带来很多问题。

因此,如果可以容忍消息丢失,这时候可以用以下方法止血:

  1. 更新consumer的消费组,这样旧的积压的消息就不会出现在新的消费组上,新的消费组在创建时就以kafka的LOG-END-OFFSET作为当前偏移量。
  2. 更新对应的topic,让生产者将新的消息发送到新的topic上,消费者也消费新的topic。

但是,消费者的消费积压一般都是因为消费者自身有问题,可以参考如下文章,本次线上问题和下面文章描述的情况也基本一致:(链接已删除)
下面记录一下项目的问题排查和复现、解决过程。

问题排查和复现

1、消费积压和重复消费出现

kafka消费积压情况:(这时候消费者已经不行了,一直在消费消息,但是位移无法提交,因此在重启生产环境的服务)


image.png

后来拿到消费者的服务日志,发现对很多消息存在反复消费的情况:


image.png

2、问题排查

2.1、消费者消费缓慢的原因

通过检查消费者日志,发现在消费一条消息时,消费者需要2次更新MongoDB数据库,而每次更新都会花费一秒钟以上。于是检查对应的数据库,发现单个集合数据量已经达到了170万条,且没有添加任何索引,这不慢才怪了。
于是将集合做了归档(rename),随即新建了带有索引的新集合,更新MongoDB的时间缩短到了0.01s,线上问题初步解决。

2.2、kafka offset无法更新的原因

问万能的chatGpt和阅读上面的文章,基本确认是消费者单次拉取消息过多,在默认时间内未完成处理,触发集群rebanlance导致的,如下文:

消费超时会发生什么?
Kafka Handle Error, Client Will Seek Soon: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

报错信息非常良心,简单解释下:
  集群以为消费线程挂了,触发了rebanlance(这一批已经给别的消费者线程消费了)。当前消费者线程业务逻辑执行完了再去同步游标报错了,没有提交成功,这就导致了两个消费者线程把同一批消息消息了两遍。
kafka消费者超时解决方案_kafka消费超时_7im0thyZhang的博客-CSDN博客

如果超过了 [max.poll.interval.ms] 所设置的时间,就会被消费组所在的 coordinator 剔除掉,从而导致重平衡,Kafka 重平衡过程中是不能消费的,会导致消费组处于类似 stop the world 的状态下,重平衡过程中也不能提交位移,这会导致消息重复消费从而使得消费组的消费速度下降,导致消息堆积。
kafka消息堆积原因解析kafka消费堆积鸭梨山大哎的博客-CSDN博客

2.3、问题复现

1、在消费者程序中加入线程休眠模拟处理时长(5s);
2、配置max-poll-records参数为500(默认值),使消费者可以单次拉取较多的消息;
3、通过本地工程短时间循环请求接口600次;
4、检查消费者日志和消费积压情况;

消费积压情况:消费者的kafka offset在更新几个之后,逐渐卡在3035,不再更新,如下图


image.png

随后,获取消费者服务日志,检查也发现了重复消费的情况。

解决重复消费问题的解决办法验证:

1、配置max-poll-records参数为10,使消费者可以单次最多仅可拉取10条消息
2、在不改变消费者消费能力(速度)的情况下,检查消费情况
3、发现消费积压逐渐减少,且减少的步长就是配置的max-poll-records的值(10)(如下图),说明只要 单次拉取的消息数 * 处理时间 < kafka的超时阈值[max.poll.interval.ms](默认五分钟) 即可保证消费结果可成功提交;

image.png

3、问题解决

  • 首先,要提升消费者消费能力,涉及到数据库的就添加索引和缓存,涉及到网络连接的考虑添加连接池等;

  • 其次,kafka的相关配置一定要根据自己的需要及时修改,避免一次拉取过多消息无法处理。反复触发rebanlance的话,位移就一点都不会更新了。也就是卡死了。

  • 最后,可以根据需求为kafka配置多线程消费,要注意为topic配置多个分区,注意上游消息生产者不要使用完全一样的key(否则消息无法发送到多个分区),然后再为消费者配置多线程的KafkaListenerContainerFactory,注意代码做好线程安全修改即可。

相关文章

  • Kafka消费者总结

    提交偏移量的方式,如何保证消息不丢失,不重复消费自动提交偏移量,会导致消息重复消费和消息丢失。重复消费是因为,当一...

  • Kafka - 偏移量提交

    Kafka - 偏移量提交 一、偏移量提交 消费者提交偏移量的主要是消费者往一个名为_consumer_offse...

  • Kafka-15.实现-分发

    消费者偏移量追踪 Kafka消费者跟踪它在每个分区消费的最大偏移量,并且能够提交偏移量,以便在重新启动的时候可以从...

  • Kafka实际案例问题

    kafka consumer防止数据丢失 Kafka学习之怎么保证不丢,不重复消费数据 1 消费者pull数据时,...

  • logstash 重复消费kafka问题

    前两天业务方突然找到我说当天索引ES查询很慢,原来毫秒级的查询现在竟然要20s,让我处理下。我看了下索引大小,原来...

  • 2、Kafka重复消费问题

    Kafka实际上有个offset的概念,就是每个消息写进去,都有一个offset,代表他的序号,然后consu...

  • 无镜--kafka之消费者(一)

    此篇开始进入kafka的另外一侧:消费者。kafka中的消费者比生产者要复杂的多,里面涉及到的消费组,偏移量等概念...

  • kafka重复消费

    问题背景 笔者基于java做了一个动态添加topic,并落地数据到Hbase的功能,其他同事在复用消费topic代...

  • 如何保证消息不被重复消费

    首先, RabbitMQ、RocketMQ、Kafka,都有可能会出现消息重复消费的问题,正常。因为这问题通常不是...

  • Kafka日志

    kafka是怎么通过偏移量找到对应的消息?首先消费者消费时会指定Topic和Partition,每个Partiti...

网友评论

      本文标题:kafka偏移量不更新、重复消费问题

      本文链接:https://www.haomeiwen.com/subject/sdlmjdtx.html