美文网首页
MQ随记(2)

MQ随记(2)

作者: 喧嚣城外 | 来源:发表于2019-02-14 16:08 被阅读0次
如何保证消息不会被重复消费(保证消息消费时的幂等性)
kafka
  • 按照数据进入kafka的顺序,kafka会给每条数据分配一个offset代表这个数据代号
  • 消费者会提交offset,告诉kafka已消费到多少offset条数据。
  • zk记录消费者当前消费到多少offset条消息。

遇到的坑:
消费者不是消费完就提交offset的,而是定时定期提交。
消费者如果在准备提交offset时,但是还未提交,消费者被重启了,那么此时消费过的数据offset还没有提交,kafka也就不知道已经消费了哪些条消息,一旦消费者启动,就会重复消费。

幂等性:通俗的讲,就是一个数据或者一个请求,重复几次,确保对应数据不回改变,不能出错。

保证幂等性结合业务来思考,以下是几个思路:

  • 比如消费到数据用来写库,先查询,如果有了就不插入,update一下。
  • 比如redis,没问题了,每次都set,天然幂等性。
  • 使用唯一键,重复插入报错。

如何确保消息可靠性传输(如何处理消息丢失问题)?
rabbitmq

1.写消息过程,消息都没到rabbitmq在网络传输过程中就丢失了,或者消息到了rabbitmq,但是内部出现错误没有保存下来。
2.rabbitmq接收到消息后先暂时存在内存中,结果消息还没有被消费,rabbitmq自己挂掉了,导致内存中的消息搞丢。
3.消费者消费到这个消息,还没有来得及处理,自己挂掉了,但是rabbitmq以为这个消息已经被消费掉了。

解决写消息丢失:

  • 把channel设置为confirm模式。
  • 发送一个消息。
  • 发送完消息就不管了。
  • rabbitmq如果接收了这条消息,就回调生产者本地的接口
  • 如果接收失败,回调生产者本地的失败接口
channel.confirm;
//发送消息
//回调接口
public void ack(String message){
}
public void nack(String massage){
    //重新发送
}

rabbitmq生产者这块如果要保证消息不丢失,一般是用connfirm机制,异步的模式,你发送消息后不回阻塞,直接发下一个,吞吐量高一些。

解决mq自身丢失问题:
开启消息持久化。

解决消费者丢失问题:
关闭autoAck机制,每次自己确定处理完再发送ack给rabbitmq。

kafka

消费端弄丢数据:
唯一可能导致消费者弄丢数据的情况,也就是说,你那个消费到了这个消息,然后消费者那边自动提交了offset,让kafka以为已经消费好了这个消息,但是在准备处理时,消费端挂了,此时消息丢失。
解决方法:kafka会自动提交offset,那么只要关闭自动提交offset,在处理完后自己手动提交offset,就可以保证数据不丢失,但是此时还是会遇到重复消费问题,自己保证幂等性即可。

kafka弄丢了数据:
kafka某个broker宕机,然后重新选举出来的partiton的leader时。如果此时其他follower刚好还有些数据没有同步,结果此时leader挂了,然后选举出某个follower成了leader,就造成数据丢失。
所以此时一般 要求设置一下 4个参数。

  • 给这个topic设置replication.factor参数:这个值必须大于1,要求每个partition必须有至少两个副本。
  • 在kafka服务端设置min.insync.replicas参数:这个值必须大于1,这个是要求一个leader至少感知一到一个follower还跟自己保持联系,没有掉队。才能确保leader挂了还有一个follower吧。
  • 在producer端设置acks=all:这个要求每条数据,必须写入所有replica之后才能认为写成功。
  • 在producer端设置retries=MAX(很大很大的值,无限重试的意思)
    这样配置后,至少可以在kafka broker端保证leader所在broker发生故障,进行leader切换时,数据不回丢失。

生产者会不会丢失数据:
如果按上述思路配置ack=all,一定不会丢失,因为leader接收到消息,所有follower都同步到了消息之后,才认为本次写入成功,如果没有满足这个条件,生产者会自动不断重试,重试无限次。


如何保证消息消费顺序正确
rabbitmq如何保证
给每个消费者开一个queue 图片02.png
kafka如何保证

写入一个partition是有顺序的,生产者在写时,可以指定一个key,比如说指定某个订单id作为key,这个订单相关数据一定会被分发到一个partition中去。
partition只能被一个消费者消费。
可确保消费者以顺序取出。
但是可能会出现问题:
消费者内部多线程,消费者内部可能造成顺序不一致。


图片02.png
如何解决消息队列的延时以及过期失效时间?消息队列满了以后怎么处理?有几百万消息持续积压了几个小时,怎么解决?
如果消息积压了几百万或者上千万数据,及时消费者恢复了,也需要大概1小时时间才可以恢复过来。

一般这个时候就需要紧急扩容了,具体操作及思路如下:

  • 先修复consumer的问题,确保恢复消费速度,然后将consumer都停掉
  • 新建一个topic,partition是原来的10倍,临时建好原先10倍或者20倍的queue数量
  • 然后写一个临时的分发数据的consumer程序,这个程序部署上去消息积压的数据,消息之后不做耗时处理,直接均匀轮询的写入临时建好的10倍数量的queue
  • 接着临时征用10倍的机器来部署consumer,每一批consumer消费一个临时queue的数据。
  • 这种做法相当于是临时将queue资源和consumer资源扩大10倍,以正常的10倍速度消费。
  • 等快速消费完积压数据后,得恢复原先部署架构,重新用原先的consumer机器来消费消息。
第二个坑(设置消息过期时间)

等过了高峰期后,这个时候开始写程序,将丢失的那批数据,写一个临时程序,一点点查出来,然后重新灌入mq里,进行补偿。

第三个坑(mq快满了)

如果走的方式是消息积压在mq里,那么如果你很长时间都没有处理掉,此时导致mq都快写满了,临时写程序,消费一个丢弃一个,尽快消费掉所有消息,然后写程序进行补偿。


如何设计一个消息队列的架构
  • 首先这个mq得支持可伸缩性,需要时可快速扩容,就可以增加吞吐量和容量。参照kafka的设计理念,broker->topic->partition,每个partition放一个机器,就存一部分数据。如果资源不够,给topic增加partition,然后数据迁移,增加机器。
  • 其次考虑mq数据落地磁盘,才能保证数据不会丢失,顺序写,这样就没有磁盘随机读写的寻址开销,磁盘顺序读写性能是很高的,kafka思路。
  • 考虑mq可用性。参考kafka高可用保障机制。多副本->leader&follower->broker挂了重新选举leader即可对外服务。
  • 能不能支持数据0丢失,参考kafka数据零丢失方案。

相关文章

网友评论

      本文标题:MQ随记(2)

      本文链接:https://www.haomeiwen.com/subject/jdveeqtx.html