美文网首页
浅析rocketmq顺序消息

浅析rocketmq顺序消息

作者: 秃秃少年小猪 | 来源:发表于2022-02-16 21:32 被阅读0次

背景

在一个支付场景中,需要先支开通会员,才可以短信收到开通会员对应的权益,假设开通会员的短信是在获取权益之前的,大概的流程图如下

image-20220216210723046

一但会员功能开通,为了降低系统的耦合性(高内聚,低耦合),系统可能会立马发出两个消息,一个是通知用户开通(A),第二个是通知权益到账(B),再由下游MSG系统来负责对接对接对应的渠道商。在这过程中,可能下游系统第二个比第一个消费的快,就导致权益到账先于会员开通提醒。那有没有什么好的办法来解决呢?

解决办法

问题查找

产生以上问题的原因大概率的是这两条消息不在同一个队列中,导致了并排消费,切B早于A

利用rocketmq的顺序消息

在rmq中,有一个顺序消息的概念,可以保证让发送的消息发送到一个消息队列中去(MessageQueue)

实现原理

image-20220216211622252

在rmq中的实现

生产者api

 SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg)
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException;

在我们发送一条消息的时候,我们可以指定一个 MessageQueueSelector (队列选择器),来指明消息需要发送到哪个队列中去

参数解释

参数名称 解释 备注
msg 消息内容
selector 队列路由规则
arg 规则参数 一般是一个不变的key

示例代码

  SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                Integer id = (Integer) arg;
                int index = id % mqs.size();
                return mqs.get(index);
            }
            }, orderId); //orderId 为定值

MessageQueueSelector代码解析

1.只需要实现 MessageQueueSelector 这个接口就好了,系统也提供了一个默认的实现

public interface MessageQueueSelector {
    MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}

2.默认的实现

image-20220216212621579
实现名称 说明 备注
哈希 通过hash算法路由
机房 通过机房路由
随机 随机路由

几种实现

哈希
public class SelectMessageQueueByHash implements MessageQueueSelector {

    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        int value = arg.hashCode() % mqs.size();
        if (value < 0) {
            value = Math.abs(value);
        }
        return mqs.get(value);
    }
}
机房
public class SelectMessageQueueByMachineRoom implements MessageQueueSelector {
    private Set<String> consumeridcs;

    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        return null;
    }

    public Set<String> getConsumeridcs() {
        return consumeridcs;
    }

    public void setConsumeridcs(Set<String> consumeridcs) {
        this.consumeridcs = consumeridcs;
    }
}
随机
public class SelectMessageQueueByRandom implements MessageQueueSelector {
    private Random random = new Random(System.currentTimeMillis());

    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        int value = random.nextInt(mqs.size());
        return mqs.get(value);
    }
}

总结

rmq通过对指定的key进行规则路由,然后选取一个指定队列,把需要发送的消息发送到同一个队列中去,根据队列的FIFO特性,做到顺序消费。

相关文章

  • 浅析rocketmq顺序消息

    背景 在一个支付场景中,需要先支开通会员,才可以短信收到开通会员对应的权益,假设开通会员的短信是在获取权益之前的,...

  • RocketMQ(2) 顺序消息、事务消息

    RocketMQ 顺序消息:消息有序是指可以按照消息发送顺序来消费。RocketMQ 可以严格的保证消息有序,但是...

  • Windows 安装 RocketMQ

    一、RocketMQ 介绍 1、消息顺序2、消息重复消费3、事务消息 二、RocketMQ 安装 Windows:...

  • RocketMQ(六)消息类型--顺序消息

    rocketmq支持顺序消息,而在rocketmq-spring-boot-starter中,分别提供了顺序同步,...

  • RocketMQ顺序消息

    顺序消息 之前我本地使用的client版本是3.6.2的,但是公司服务器上安得是3.2.6的版本。导致我测试顺序消...

  • RocketMQ顺序消息

    阿里help:https://help.aliyun.com/document_detail/49319.html...

  • RocketMQ 顺序消息

    背景: 业务使用 RocketMQ 的场景增多,但是有一些消息状态依赖的场景没有考虑顺序正确的使用RocketMQ...

  • RocketMQ顺序消息

    我们知道消息队列的特性导致其消息不是顺序进行消费的,RocketMQ没有提供所谓的顺序消息来供我们使用,但是有时候...

  • 详细讲解RocketMQ!包含多个知识点!通俗易懂!(顺序、延时

    一、顺序消息 顺序消息(FIFO 消息)是消息队列 RocketMQ 提供的一种严格按照顺序来发布和消费的消息。顺...

  • 关于RocketMQ顺序消息

    RocketMQ是一款 分布式、队列模型的消息中间件,由阿里巴巴团队研发,借鉴参考了JMS规范的MQ实现,更参考了...

网友评论

      本文标题:浅析rocketmq顺序消息

      本文链接:https://www.haomeiwen.com/subject/mmodlrtx.html