美文网首页
rocketmq_顺序消息

rocketmq_顺序消息

作者: kele2018 | 来源:发表于2022-05-26 18:49 被阅读0次
Q:在rocketmq语境下,如何定义【顺序】这个词?
消费效果应该保持我们业务定义上的一种顺序;比如:一个订单的支付状态:待支付、已支付、已退款。
Q:为了保证这种效果,生产端应该如何做?
  (1)保证消息的发送顺序;即先发待支付,再发已支付、最后再发已退款;
  (2)让一个订单的消息进入同一个队列;比如,待支付消息进入1号队列,已支付消息进入2号队列,已退款消息进入3号队列,那么虽然它们发送的时候有先后顺序,但是到了队列中都是第一条消息,那么假如有三个消费者,很有可能后面发送的先消费;
Q:为了保证这种效果,消费端应该如何做?
  (1)分布式锁:防止一个消费者组下的多个消费者同时消费一个队列;比如,1号队列现在负载给了消费者A,过一会儿之后又负载给了B;(因为大都用同一套负载均衡算法,只要消费者数量不变,这种情况就不会发生)。
  (2)本地排他锁:防止两个任务并行,后面的消息先消费;比如,1号队列现在负载给了消费者A,A拉取一批消息,提交到线程池;接着又拉下一批消息,有提交到线程池;因为线程池中有多个线程,所以有可能下一批消息和上一批消息同时消费,甚至早于后者。
Q:生产端如何让一个订单的消息进入同一个队列?
使用队列选择器,即MessageQueueSelector,可以自己定义,也可以用现成的;
// hashKey为订单号
SendResult sendResult = this.producer.send(rocketMsg, this.messageQueueSelector, hashKey, timeout);
Q:消费端如何实现分布式锁?

消费端在拉取消息前,需要先构造拉取请求,即PullRequest对象;rocketmq会在构造这个对象前,向broker请求锁定当前对象中的队列;

private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) {
       ......
        List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
        for (MessageQueue mq : mqSet) {
            if (!this.processQueueTable.containsKey(mq)) {
                if (isOrder && !this.lock(mq)) {
                    log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
                    continue;
                } // 锁队列:即向broker发送一条消息,告诉他我要锁哪个队列,broker在自己的记账本上记录一下
           }    
        } 
    }

Q:如何实现本地锁?

前面我们看到消费者拉取一批消息后会做成一个任务,提交到线程池;rocketmq会在任务中加锁,所以即使多个任务并行,因为大家拿的是同一把锁,所以同一时刻只会有一个任务在跑,其他任务等待;

public void run() {
            final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue); // 每个队列都会有一把锁
            synchronized (objLock) {
                    for (boolean continueConsume = true; continueConsume; ) {                    
                        if (!msgs.isEmpty()) {                
                            try {
                                this.processQueue.getLockConsume().lock(); // 再次加锁,即使外层的对象锁没有锁住,这里的锁也可以保证有序
                                if (this.processQueue.isDropped()) {
                                    log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
                                        this.messageQueue);
                                    break;
                                }
                                status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
                            } 
                            catch (Throwable e) {
                                log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
                                    RemotingHelper.exceptionSimpleDesc(e),
                                    ConsumeMessageOrderlyService.this.consumerGroup,
                                    msgs,
                                    messageQueue);
                                hasException = true;
                            } 
                            finally {
                                this.processQueue.getLockConsume().unlock();
                            } 
                            
                            
                    } 
                } 
               
        }
}       

相关文章

  • rocketmq_顺序消息

    Q:在rocketmq语境下,如何定义【顺序】这个词? Q:为了保证这种效果,生产端应该如何做? Q:为了保证这种...

  • 顺序消息

    rocketmq的顺序消息需要满足2点: 1.Producer端保证发送消息有序,且发送到同一个队列。 2.con...

  • 顺序消息

    顺序消息 顺序消息是指消息消费的顺序和生产者发送消息的顺序一样的。 例如:一个订单产生了三条消息分别是订单创建、订...

  • rocketmq_消费消息出现异常时

    最近项目组的一个服务出现了消息丢失的问题,虽然当时通过手动重发的方式解决了,但是对于丢失的原因一直没有去深挖,这几...

  • RocketMQ 10.顺序消息在高并发下的使用

    前言 顺序消息:指的是消息的消费顺序和生产顺序相同 全局顺序:在某个topic下,所有的消息都要保证顺序局部顺序:...

  • RocketMQ的顺序消息

    顺序消息指的是生产者投递消息的顺序与消息消费的顺序一致。某个Topic上所有消息都是顺序的称为全局顺序消息,如果是...

  • RabbitMQ面试题:如何解决消息的顺序问题? --- 202

    RabbitMQ的消息顺序问题,需要分三个环节看待,发送消息的顺序、队列中消息的顺序、消费消息的顺序。 发送消息的...

  • 消息队列之RocketMQ-顺序消息

    1、什么是顺序消息 顺序消息(FIFO 消息)是 MQ 提供的一种严格按照顺序进行发布和消费的消息类型。顺序消息由...

  • 50.消息类型-顺序消息

    顺序消息 顺序消息是消息队列提供得一种严格按照顺序来发布和消费的消息类型。

  • 详细讲解RocketMQ!包含多个知识点!通俗易懂!(顺序、延时

    一、顺序消息 顺序消息(FIFO 消息)是消息队列 RocketMQ 提供的一种严格按照顺序来发布和消费的消息。顺...

网友评论

      本文标题:rocketmq_顺序消息

      本文链接:https://www.haomeiwen.com/subject/pccpprtx.html