美文网首页
kafka之重复消费数据

kafka之重复消费数据

作者: xpbob | 来源:发表于2018-10-07 19:37 被阅读943次

在进入主题之前,我们先思考一个问题。
问题

kafka消费者使用自动提交的模式,提交间隔为2s,消费者在获取数据的时候处理0.5s,从kafka拉取过来的数据只够处理1秒。那么消费者下次拉取过来的数据是否是已经消费完的数据?或者说由于数据已经消费,但是偏移量没有被提交,是否会造成下次获取的数据是从旧的偏移量开始拉取?


答案

不会是旧数据,kafka的消费者也有自己偏移量,这个偏移量是从kafka中读取的量,和kafka提交的偏移量不一样。假设变成自动提交偏移量,而且没有写提交的逻辑,同一个消费者,除了第一次或者rebalance会根据已提交的offset来获取数据,剩下的时候都是根据自己本地的偏移量来获取的。这个模式有点类似于用桶取水,用瓢来喝水。消费者就是桶的角色,poll就是瓢的角色。

重复消费的情况

我们把重复消费的情况分为2种,一种是想避免的,一种是故意如此的。

想避免的场景

  1. 消费者使用了自动提交模式,当还没有提交的时候,有新的消费者加入或者移除,发生了rebalance。再次消费的时候,消费者会根据提交的偏移量来,于是重复消费了数据。
  2. 使用异步提交,并且在callback里写了失败重试,但是没有注意顺序。例如提交5的时候,发送网络故障,由于是异步,程序继续运行,再次提交10的时候,提交成功,此时正好运行到5的重试,并且成功。当发生了rebalance,又会重复消费了数据。

注:这里不讨论那个消费者提交的offset的作用。

故意的场景

  1. 使用不同的组消费同一个topic。改个 group.id属性即可。
  2. 自己手动提交偏移量。
    这里的麻烦的地方就是需要理解开头的问题,并不是说你提交完就可以了。你得想个办法去读取那个偏移量再次消费。下面提供一个暴力的手段,关闭消费者,然后再次开启新的。
 public static void consumer(Properties properties,String info) {
        System.out.println(info);
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer(properties);
        kafkaConsumer.subscribe(Arrays.asList(new String[]{"hello"}));
        boolean flag = false;
        while (true) {
            ConsumerRecords<String, String> poll = kafkaConsumer.poll(100);
            if (!poll.isEmpty()) {
                for (ConsumerRecord<String, String> o : poll) {
                    System.out.println(o.value() + o.offset());
                    //假设场景为重复消费3,这里需要根据业务来提交便宜量
                    if (o.offset() == 3) {
                        //手动提交偏移量
                        Map<TopicPartition, OffsetAndMetadata> currentOffset = new HashMap<TopicPartition, OffsetAndMetadata>();
                        //提交的偏移量,这个偏移量就是下次消费的第一条数据
                        currentOffset.put(new TopicPartition(o.topic(), o.partition()), new OffsetAndMetadata(o.offset()+1, ""));
                        kafkaConsumer.commitSync(currentOffset);
                        flag = true;
                        break;
                    }
                }
            }
            if(flag){
                kafkaConsumer.close();
                break;
            }
        }
    }

这里也必须注意,kafka并不是数据库,他保存的数据有持久化的时间和大小的限制,可能你提交的偏移量的数据已经被kafka清理掉了。

相关文章

  • Kafka实际案例问题

    kafka consumer防止数据丢失 Kafka学习之怎么保证不丢,不重复消费数据 1 消费者pull数据时,...

  • kafka之重复消费数据

    在进入主题之前,我们先思考一个问题。问题 kafka消费者使用自动提交的模式,提交间隔为2s,消费者在获取数据的时...

  • Kafka重复消费数据

    从消息发送和消息消费两个方面去说。 「ACK」 0:producer不等待broker的ack,这一操作提供了一个...

  • MQ随记(2)

    如何保证消息不会被重复消费(保证消息消费时的幂等性) kafka 按照数据进入kafka的顺序,kafka会给每条...

  • python3读写kafka

    消费kafka数据,方式一 消费kafka数据,方式二 将消息写入kafka

  • kafka数据丢失与重复

    1、Kafka重复消费原因 底层根本原因:已经消费了数据,但是offset没提交。 原因1:强行kill线程,导致...

  • kafka重复消费

    问题背景 笔者基于java做了一个动态添加topic,并落地数据到Hbase的功能,其他同事在复用消费topic代...

  • Logstash重复消费Kafka的数据

    Logstash消费Kafka的数据,然后输出到Elasticsearch,某一天发现使用Kibana查询不到最近...

  • kafka数据如何被重复消费

    近段时间学习极客时间李玥老师的后端存储实战课时,看到一个很多意思的东西:用kafka存储点击流的数据,并重复处理。...

  • 优雅的使用Kafka Consumer

    如何消费数据 我们已经知道了如何发送数据到Kafka,既然有数据发送,那么肯定就有数据消费,消费者也是Kafka整...

网友评论

      本文标题:kafka之重复消费数据

      本文链接:https://www.haomeiwen.com/subject/moqraftx.html