1、作用(解耦/异步/削峰)
2、如何保证幂等性(消息重复消费)
方法一:
如果这条消息是拿来写数据库的,那在数据库表里面直接查一下主键,看有没有记录,如果没有,插入一条,如果有,update一下;
方法二:
如果这条消息是拿来写redis的,那不用管,反正redis是key-value数据库,一个消息不能插入两次,天然幂等;
方法三:
每条消息设置一个全局唯一ID,每消费一次,往数据库表里加一条记录;每次消费前,查一下数据库表里面有没有这个ID,有的话,说明消费过,直接return,没有的话,说明没消费过,直接消费。
3、如何保证可靠性(消息丢失)
Producer丢失消息:
Producer丢失消息是指在Producer把消息发出去了,但是Broker没收到,一般是传输过程中丢了,可能是网络问题。
这种情况,主要是增加通知机制,让Producer知道Broker有没有收到消息,如果没收到,那Producer重发;如果收到了,就不管。
对于Kafka来说,配置request.required.acks=-1,表示broker的所有副本收到消息并且把消息放到内存里后,向Producer发送ack,表示收到了。
消息中间件丢失消息:
消息中间件丢失消息是指消息中间件拿到了Producer的消息,还没消费,消息中间件自己就宕机了,然后消息就丢了。
这种情况,有两种处理方式,一种是把消息持久化到硬盘里面,这样就算消息中间件挂了,消息还在硬盘里面;还有一种是为消息中间件的消息队列提供多个副本,这样就算挂了一个,还有其它的启着,不影响实际运行。
对于kafka来说,采用第二种,通过为Producer配置replication.factor,为每个topic的partition提供多个factor(副本),就样丢了也没事。同时,通过配置min.insync.replicas,确保leader至少有一个follower还跟它有连接,确保leader挂了可以随机选举出新的leader。
Consumer丢失消息:
Consumer丢失消息是指Consumer从消息中间件把消息拿过来以后,还没来得及处理,自己就挂了,这个时候消息中间件以为消息已经被消费了,其实根本没消费。
这种情况,主要通过通知机制,让Consumer处理完消息以后,再通知消息中间件这条消息已经消费完了,消息中间件收到消息以后,把消息拿掉。
对于Kafka来说,offset是自动提交的,这时我们把offset的自动提交关了,改成手动提交,每次消费完成了消息以后,手动提交。只有当offset提交了以后,才表示这条消息已经消费过了。
4、如何保证顺序性(消息有序)
对于Kafka来说,一个partition里面的消息本身就是有序的,因此,为了保证消息顺序性,把需要有序的消息放在一个Partition里面就可以了。而为了让它们能够放在一个partition里面,需要将它们的一个公共的ID作为消息的KEY,这样KAFKA就会把它们放在一个Parition里面。
5、如何保证高可用性
所谓高可用,就是指部分宕机了不影响消息中间件的整体运行。
对于kafka来说,一个topic有多个Partition,topic的消息分散在每一个partition中。这样,我们在消息的时候,就可以从多个partition进行消费,提高了消费的速度。
同时,因为每一个partition都可以设置replica(副本),这样,对于每一个partition就可以有一个leader和多个follower,都存储了这个partition的消息。其中leader用于读写,follower用于备份,follower会从leader节点pull数据。如果leader挂了,会重新选出一个follower作为leader。
6、如何解决消息积压
消息积压有两种可能:
第一种出现在消费者的消费能力完全跟不上生产者的生产能力的情况下,也即生产太快,消费太慢。(出现这种情况,一般是消费者出故障了,不太可能是消费者本身消费能力不行,因为上线前消费者的消费能力肯定是测试过的,肯定没有大问题,只可能是生产环境里面消费者出问题了,比如内存溢出了或者崩了)
碰上此类问题,应急的解决思路是,如果消息中间件的parition还没满,赶紧临时申请一个10倍大小的新的partition,然后写一个consumer把partition中的消息全部消费,扔到新的10倍大小的partition里面。然后,把之前的消费者部署10套,分别去消费10倍paritition里面的消息,把这些积压的消费赶紧消费掉。这样就临时把这个问题解决了。
万一partition已经满了,里面的消息已经丢了,那不好意思,把parition里面的消息清了,等半夜没人用的时候重新导数据吧。
第二种情况是生产者突然发大量消息到消息中间件,消费者来不及处理,报错了。
这种情况是因为消息堆在paritition里太多,consumer来不及消费,超出了session.timeout的时间,然后就报错了,报错之后,consumer当前消费的offset就没法提交了。因为没提交offset,consumer又会重新从老的offset位置开始读数据,然后因为来不及消费,又超时,又挂了,无限循环。
解决办法是关了offset的自动提交,然后用spring-kafka自带的offset提交机制处理offset的提交问题。Spring kafka新建了一个阻塞队列,临时放需要消费的数据,相当于多了一个中间队列。SpringKafka把partition中的数据拿到阻塞队列,然后启一个线程去消费他,如果这个阻塞队列中的数据消费完了,就把offset提交了;如果阻塞队列满了,会直接暂停consumer从队列里拿数据。
7、自己设计消息中间件架构
消息中间件本质上,是一个消息的中转中心,最核心的功能就是消息的接收、存储和发送。
为了确保消息接收的高吞吐量,可以借鉴KAFKA的分布式设计,一个topic分多个partition,消息分布在多个partition里面,这样就算接收的消息再多,我们只要把topic的partition增加,且每一个partition分布在不同的服务器上,就能增加整体的吞吐量了。
为了确保消息存储的高可用,可以借鉴KAFKA的replica副本机制,每一个partition都可以配至少一个副本,少副本在不同的机器上,这样Leader挂了,也可以选举出新的Leader来接收存储消息。
为了确保消息存储在消息中间件不丢失,需要将消息定期持久化到硬盘里,而不是放在内存里面。放到硬盘里面,可以采用顺序存储的方式,不断地在partition后面顺序添加消息;同时,消息的存储可以分索引文件和数据文件,索引文件存储消息的物理地址和offset,便于快速定位消息的位置。
为了确保消息接收和消费不会丢失,可以借鉴kafka的通知机制,消息中间件的所有副本都收到消息以后,通知消息的producer,消息已经接收完毕了;消息的consumer消息完了以后,通知消息中间件,消息已经消费完了。一旦没有收到消息,就可以重发或者重新消费。
网友评论