美文网首页
RocketMQ 基础

RocketMQ 基础

作者: Tinyspot | 来源:发表于2024-01-18 09:38 被阅读0次

1. RocketMQ

1.1 设计理念

消息中间件的难题:如何保证消息一定能被消息消费者消费,并且保证只消费一次
RocketMQ 的设计者给出的解决办法是不解决这个难题,而是退而求其次,只保证消息至少被消费一次,但不承诺消息不会被消费者多次消费,其消费的幂等由消费者实现

1.2 启动顺序

  • 首先启动 NameServer,再启动 Broker 向 NameServer 注册(NameServer 对 Broker 进行心跳检测)
  • NameServer是RocketMQ的轻量级注册中心,负责存储和管理Broker集群以及Topic路由信息
  • Broker是RocketMQ的消息处理节点,负责接收生产者发送的消息并存储消息,以及响应消费者的消息读取请求

2. 路由中心NameServer

2.1 路由注册

  • (1) Broker发送心跳包
  • (2) NameServer处理心跳包

3. Topic

  • Group 标识一类消息的生产者、消费者;一个分组对应一个主题
  • RocketMQ 基于订阅发布机制,一个Topic拥有多个消息队列,一个Broker为每一主题默认创建4个读队列4个写队列

4. 消息发送

  • 同步发送:发送消息后,同步响应
  • 异步发送:发送消息后,异步响应
  • 单向发送:无返回值
public void demo() throws MQClientException {
    DefaultMQProducer producer = new DefaultMQProducer("producer_group");
    producer.start();

    Message msg = new Message(topic, tag, byte[]);

    // 单向发送
    producer.sendOneway(msg);

    // 同步发送
    SendResult result = producer.send(msg);

    // 异步回调
    producer.send(msg, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            String msgId = sendResult.getMsgId;
        }
        @Override
        public void onException(Throwable e) {
            String msgId = sendResult.getMsgId;
        }
    });
}

5. 消息消费

  • 两种消费模式
    • 拉模式 (Pull Consumer)
    • 推模式 (Push Consumer)

5.1 拉模式

  • 消费者(Consumer) 主动从 Broker (消息服务器) 中拉取消息,它会周期性地向 Broker 发送请求,询问是否有新的消息可以消费
  • 消费者通过 DefaultMQPullConsumer 实现拉取逻辑,它会定期或根据需要调用 pull() 方法从 Broker 获取消息队列中的消息

5.2 推模式 (Push Consumer)

  • 在 RocketMQ 中,Push 模式实际上是基于 Pull 模式的一种封装优化,它并不是传统意义上的服务端主动推送消息给客户端。
  • 在推模式下,虽然消费者看起来像是被动接收消息,但实际上内部仍然是通过定期拉取的方式来实现的,只是对用户隐藏了这一细节
  • DefaultMQPushConsumer 作为推模式的消费者,在内部使用定时任务或者监听器机制自动执行拉取操作,对使用者而言更像是消息被推送过来
public void consumer() throws MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
    consumer.setNamesrvAddr("localhost:9876");
    consumer.subscribe("my_topic", "tag_test");
    // CONSUME_FROM_LAST_OFFSET(默认值):从最新的消息开始消费,即忽略之前的所有消息
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    // 批量消费的消息数量上限
    consumer.setConsumeMessageBatchMaxSize(1);

    // 消息事件监听
    consumer.registerMessageListener((MessageListenerConcurrently) (messages, context) -> {
        try {
            for (MessageExt message : messages) {
                logger.info("consumer_message, topic={}, tags={}, msgId={}, properties={}, body={}",
                        message.getTopic(), message.getTags(), message.getMsgId(), message.getProperties(), new String(message.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        } catch (Exception e) {
            // 消息重试
            logger.error("consumer_message_exception_retry, topic=my_topic, tags=tag_test, exception={}", e.getMessage());
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    });

    consumer.start();
    logger.info("consumer_message_success");
}

相关文章

  • RocketMQ-基础使用(一)

    零、本文纲要 一、RocketMQ基础 MQ特点 RocketMQ安装 测试RocketMQ 二、RocketMQ...

  • Apache RocketMQ 的基础特性介绍

    Apache RocketMQ 的基础特性介绍 Apache RocketMQ 系列: Apache Rocket...

  • Apache RocketMQ 基础概念及架构解析

    Apache RocketMQ 基础概念及架构解析 Apache RocketMQ 系列: Apache Rock...

  • RocketMQ 调研问题记录

    实践&基础知识 首先贴出我参考的实践类的文章 springboot的RocketMq实例 RocketMQ 入门 ...

  • RocketMQ 基础

    RcoketMQ 是一个队列模型的消息中间件,具有高性能、高可靠、可伸缩、高实时、分布式特点。 RocketMQ特...

  • Apache Kafka 基础介绍

    Apache Kafka 基础介绍 介绍完RocketMQ,就不得不介绍一下kafka,RocketMQ就是照着k...

  • RocketMq基础认知

    RocketMQ是一款分布式、队列模型的消息中间件,单机支持1万以上的持久化队列,前提是足够的内存、硬盘空间。 消...

  • RocketMQ基础架构

    基于官方文档进行翻译 概览 Apache RocketMQ是一个分布式消息和流平台,它的特性包括低延迟,高性能,高...

  • RocketMQ基础入门

    一、MQ介绍 1.1 为什么要用MQ 消息队列是一种先进先出的数据结构 其应用场景主要包含以下3个方面 1.1.1...

  • RocketMQ基础原理

    1.MQ的作用、优缺点及对比 MQ的作用主要有以下三个方面: 异步作用:异步能提高系统的响应速度、吞吐量。 解耦作...

网友评论

      本文标题:RocketMQ 基础

      本文链接:https://www.haomeiwen.com/subject/aairodtx.html