顺序消费

作者: 念䋛 | 来源:发表于2021-06-17 19:36 被阅读0次

生产者
根据自定义的规则,将某一类的消息发送到同一个queue中,可以hash与队列的个数取余

public static void main(String[] args) throws UnsupportedEncodingException {
    try {
        //默认情况下,broker为每一个topic创建4个queue,生产者把要顺序生产的消息一次的发送到同一个队列
        DefaultMQProducer producer = new DefaultMQProducer ("please_rename_unique_group_name");
        producer.setNamesrvAddr ("192.168.44.145:9876");
        producer.start ();
        for (int i = 0; i < 10; i++) {
            int orderId = i;
            for (int j = 0; j <= 5; j++) {
                Message msg =
                        new Message ("OrderTopicTest", "order_" + orderId, "KEY" + orderId,
                                ("order_" + orderId + " step " + j).getBytes (RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send (msg, new MessageQueueSelector () {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        Integer id = ( Integer ) arg;
                        int index = id % mqs.size ();
                        //返回就是消息发往到queue的id
                        return mqs.get (index);
                    }
                  //orderId就是select的arg变量
                }, orderId);
                System.out.printf ("%s%n", sendResult);
            }
        }

        producer.shutdown ();
    } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
        e.printStackTrace ();
    }
}

消费者
消费者要注意使用的监听器MessageListenerOrderly,处理同一个queue的消息使用的是一个线程,单线程获取一个queue的消息保证了消息的顺序消费

public static void main(String[] args) throws MQClientException {
    //顺序消费不是全局顺序,只是分区顺序。要全局顺序只能分配一个queue。
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer ("please_rename_unique_group_name_3");
    consumer.setNamesrvAddr ("192.168.44.145:9876");
    consumer.setConsumeFromWhere (ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    consumer.subscribe ("OrderTopicTest", "*");
    //顺序消费,就需要保证消费端用同一个线程处理一个queue的消息
    consumer.registerMessageListener (new MessageListenerOrderly () {
        @Override
        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
            context.setAutoCommit (true);
            for (MessageExt msg : msgs) {
                System.out.println ("收到消息内容 " + new String (msg.getBody ())+"消息队列id-"+msg.getQueueId ());
            }
            return ConsumeOrderlyStatus.SUCCESS;
        }
    });
    consumer.start ();
    System.out.printf ("Consumer Started.%n");
}

消费者收到的消息, order_X代表同一类的消息,step X 是该类消息的步骤,
可以看出来,同一类的消息的步骤是按照顺序的,但是不同类之间可能会相互的穿插,证明了顺序消费是局部消费,不是全局消费.如果要实现全局消费的话,topic创建的时候只创建一个queue(默认为4个)
收到消息内容 order_3 step 0消息队列id-3
收到消息内容 order_2 step 0消息队列id-2
收到消息内容 order_0 step 0消息队列id-0
收到消息内容 order_1 step 0消息队列id-1
收到消息内容 order_1 step 1消息队列id-1
收到消息内容 order_3 step 1消息队列id-3
收到消息内容 order_2 step 1消息队列id-2
收到消息内容 order_0 step 1消息队列id-0
收到消息内容 order_3 step 2消息队列id-3
收到消息内容 order_1 step 2消息队列id-1
收到消息内容 order_0 step 2消息队列id-0
收到消息内容 order_2 step 2消息队列id-2
收到消息内容 order_1 step 3消息队列id-1
收到消息内容 order_3 step 3消息队列id-3
收到消息内容 order_2 step 3消息队列id-2
收到消息内容 order_0 step 3消息队列id-0
收到消息内容 order_3 step 4消息队列id-3
收到消息内容 order_1 step 4消息队列id-1

相关文章

  • 顺序消费

    生产者根据自定义的规则,将某一类的消息发送到同一个queue中,可以hash与队列的个数取余 消费者消费者要注意使...

  • RocketMQ 顺序消费

    1、前言 对于所有的 MQ 来说,必问的一道面试题就是 RocketMQ 顺序消息怎样做?原理是什么? 首先我们要...

  • RocketMQ 10.顺序消息在高并发下的使用

    前言 顺序消息:指的是消息的消费顺序和生产顺序相同 全局顺序:在某个topic下,所有的消息都要保证顺序局部顺序:...

  • mq中如何保证消费者顺序消费

    通常mq可以保证先到队列的消息按照顺序分发给消费者消费来保证顺序,但是一个队列有多个消费者消费的时候,那将失去这个...

  • (十三)消费者的消费方式---顺序消费的流程

    消息消费的方式有俩种,一种是并发消费,一种是顺序消费。前面对于并发消费已经有一定的了解,现在了解一下顺序消费的实现...

  • 如何保证消息的顺序性?

    一、背景 有的时候消费者消费消息是顺序消费的。比如生成一个订单,先扣库存,然后扣款。如果顺序错了,扣了款库存没了,...

  • rocket源码 顺序消息和事务消息

    顺序消息的实现 顺序消息进行消费时,若是第一次消费失败,可以返回SUSPEND_CURRENT_QUEUE_A_M...

  • RocketMQ顺序消息消费

    RocketMQ顺序消息消费 1. 应用场景 消息队列中消息之间有先后的依赖关系,后一条消息的处理依赖于前一条消息...

  • Kafka消费顺序保证

    在实现事件流流经Kafka时遇到了这个问题,即如何满足消息按produce顺序去consume。 概念&问题 首先...

  • RocketMQ-顺序消费

    顺序消费 消息有序是指一类消息消费时,能按照发生的顺序来消费。例如:一个订单产生三个消息:订单创建,订单付款,订单...

网友评论

    本文标题:顺序消费

    本文链接:https://www.haomeiwen.com/subject/qtthyltx.html