美文网首页JavaJava 程序员
RocketMQ 消息投递解析——时序图、调用链、源码级解析

RocketMQ 消息投递解析——时序图、调用链、源码级解析

作者: 马小莫QAQ | 来源:发表于2022-04-08 22:32 被阅读0次

消息投递模型

在前几篇文章里我曾经也画过消息投递的模型图,这里再来简单复习一下:

  • 消息生产者集群从注册中心获取到路由信息(负载均衡),然后将消息发送给Broker集群
  • 注册中心是无状态集群,即每一台服务器都不影响其他的服务器。Broker会同时向所有的注册中心服务器里发送注册信息
  • 注册中心存储的是Topic、Queue、IP地址等信息,正常情况下每台机器存储的应该是相同的
  • Broker采用主从架构提供服务,主服务器负责写入操作,从服务器负责处理读请求

消息投递流程

消息发送的时序图如下图所示:

Producer首先要知道向哪个Broker发送消息,所以具体流程如下:

  1. Producer先从本地尝试获取路由信息
  2. 本地无缓存的路由信息时,从注册中心中获取路由信息,并缓存到本地
  3. 获取到的路由信息包含了Topic下的所有Queue,Producer就可以采取负载均衡策略把消息发送到某个队列里
  4. Producer发送消息到Broker成功之后,服务器就会返回消息发送成功对象SendResult

消息投递方法链

下面以时序图的形式展示了从获取路由表到消息投递过程的整体方法调用链:

上图涉及到的核心API如下:

// 发送消息
DefaultMQProducer#send(Message msg);
// 发送消息,增加超时时间
DefaultMQProducer#send(Message msg, long timeout);
// 发送消息,增加发送消息的模式(异步/同步)
DefaultMQProducer#sendDefaultImpl(Message msg, CommunicationMode mode, long timeout);
// 查询消息发送的路由信息
DefaultMQProducerImpl#tryToFindTopicPublishInfo(String topic);
// 根据topic的名称更新注册中心的路由信息
MQClientInstance#updateTopicRouteInfoFromNameServer(String topic);
// 根据topic的名称更新注册中心的路由信息,并获取路由信息
MQClientInstance#updateTopicRouteInfoFromNameServer(String topic, Boolean isDefault, MQDefaultProducer mqDefaultProducer);
// 根据负载均衡算法,选择一个队列进行消息发送
DefaultMQProducerImpl#selectOneMessageQueue(TopicPublishInfo topic, String lastBrokerName);
// 发送消息
DefaultMQProducerImpl#sendKernelImpl(Message msg, MessageQueue queue);

接下来我们进行源码级分析,可以对照上图学习:

SendResult

如果消息发送成功,会返回一个SendResult对象:

/**
* 发送消息结果
*/
public class SendResult {
    /**
    * 发送消息结果状态
    */
    private SendStatus sendStatus;
    /**
    * 消息的唯一key,由Client发送消息时生成
    */
    private String msgId;
    /**
    * 消息队列
    */
    private MessageQueue messageQueue;
    /**
    * 消息队列偏移量
    */
    private long queueOffset;
    /**
    * 事务ID
    */
    private String transactionId;
    /**
    * 下一条消息的偏移量
    */
    private String offsetMsgId;
    /**
    * 区域ID
    */
    private String regionId;
}

其中SendStatus是一个枚举值:

public enum SendStatus {
    SEND_OK,
    FLUSH_DISK_TIMEOUT,
    FLUSH_SLAVE_TIMEOUT,
    SLAVE_NOT_AVAILABLE,
}
  • SEND_OK:消息发送成功且存储同步成功
  • FLUSH_DISK_TIMEOUT:消息发送成功但存储失败
  • FLUSH_SLAVE_TIMEOUT:消息发送成功但slave节点超时
  • SLAVE_NOT_AVAILABLE:消息发送成功但slave节点不可用

消息投递源码解析

Producer发送消息

DefaultMQProducer发送消息类模型:

  • MQAdmin:MQ管理的基类
  • ClientConfig:客户端配置类
  • DefaultMQProducer:消息生产者

使用Producer发送消息,具体编码实现方式如下:

  1. 创建DefaultMQProducer,传入指定发送消息所在组
  2. 设置注册中心地址,Producer会从里面获取到Topic以及队列
  3. 发送消息

发送消息时必须指定Topic,消息标签,消息体

package com.wjw;

import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.remoting.common.RemotingHelper;

public class MQProducerA {
    public static void main(String[] args) throws Exception {
        // 创建消息生产者,指定组
        DefaultMQProducer producer = new DefaultMQProducer("group-A");
        // 设置注册中心地址
        producer.setNamesrvAddr("localhost");

        producer.start();

        for (int i = 0; i < 10; i++) {
            // 创建消息对象
            Message message = new Message("topic-A", "tagA", ("Hello MQ " + i)
                    .getBytes(RemotingHelper.DEFAULT_CHARSET));
            // 设置消息延时级别
            message.setDelayTimeLevel(6);
            // 发送消息
            SendResult result = producer.send(message);
            System.out.println("发送消息结果:" + result);
        }
        producer.shutdown();
    }
}

DefaultMQProducer

发送消息的producer.send()方法调用的是DefaultMQPrducer里的send方法:

这里又调用了
defaultMQProducerImpl.send(msg):

public class DefaultMQProducer extends ClientConfig implements MQProducer {

    // ...

    @Override
    public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.defaultMQProducerImpl.send(msg);
    }

    // ...

}

defaultMQProducerImpl

使用defaultMQProducerImpl的send方法发送消息,这里的调用多传了一个超时时间参数,当producer没有指定时,取默认值3000ms:

public class DefaultMQProducerImpl implements MQProducerInner {

    // ...

    public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        // 发送消息,指定消息发送的超时时间
        return send(msg, this.defaultMQProducer.getSendMsgTimeout());
    }

    public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        // 发送消息,指定消息发送类型:同步 or 异步,超时时间
        return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
    }

    // ...

}

sendDefaultImpl

上面调用的sendDefaultImpl方法需要做下面几件事:

  1. 获取消息路由信息,包含Topic下的队列和IP信息
  2. 选择要发送到的消息队列,这个过程会采用负载均衡策略选择一个队列进行消息存储
  3. 发送消息(sendKernelImpl)并返回结果

核心逻辑我已经标注在下面的代码片段里,非核心代码已省略

public class DefaultMQProducerImpl implements MQProducerInner {

    // ...

    /**
     * 发送消息
     *
     * @param msg                   消息
     * @param communicationMode     通信模式
     * @param sendCallback          发送回调
     * @param timeout               请求超时时间
     * @return
     * @throws MQClientException
     * @throws RemotingException
     * @throws MQBrokerException
     * @throws InterruptedException
     */
    private SendResult sendDefaultImpl(//
                                       Message msg, //
                                       final CommunicationMode communicationMode, //
                                       final SendCallback sendCallback, //
                                       final long timeout//
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        // 确保MQ服务正在运行
        this.makeSureStateOK();
        // 检查消息、Topic、消息体是否为空且满足系统要求
        Validators.checkMessage(msg, this.defaultMQProducer);

        // ...

        // 获取Broker的路由信息
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            MessageQueue mq = null;
            Exception exception = null;
            SendResult sendResult = null;
            // 根据消息是否是同步的,确定总的发送时间
            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            int times = 0;
            String[] brokersSent = new String[timesTotal];
            // 循环调用发送消息方法,直到成功或超时
            for (; times < timesTotal; times++) {
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                // 选择消息要发送到的队列,默认策略是轮流发送,当发送失败时,按顺序发送到下一个Broker的MessageQueue
                MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                if (tmpmq != null) {
                    mq = tmpmq;
                    brokersSent[times] = mq.getBrokerName();
                    try {
                        beginTimestampPrev = System.currentTimeMillis();
                        // 发送消息核心方法
                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
                        endTimestamp = System.currentTimeMillis();
                        // 更新Broker的可用性信息,当发送时间超时时会有30s的不可用时长。只有开启了延迟容错机制才生效
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        switch (communicationMode) {
                            case ASYNC:
                                return null;
                            case ONEWAY:
                                return null;
                            case SYNC:
                                // 同步没有发送成功 且 配置了存储异常重新发送时,重试
                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                        continue;
                                    }
                                }

                                // 返回发送结果
                                return sendResult;
                            default:
                                break;
                        }
                    } catch (RemotingException e) {
                        /** (省略)异常处理逻辑 **/
                        continue;
                    } catch (MQClientException e) {
                        /** (省略)异常处理逻辑 **/
                        continue;
                    } catch (MQBrokerException e) {
                        /** (省略)异常处理逻辑 **/
                    } catch (InterruptedException e) {
                        /** (省略)异常处理逻辑 **/
                    }
                } else {
                    break;
                }
            } // end of for

            // 返回发送结果
            if (sendResult != null) {
                return sendResult;
            }

            /** (省略)异常处理逻辑 **/
        }

        // 获取不到注册中心的地址信息则抛出异常
        List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
        if (null == nsList || nsList.isEmpty()) {
            throw new MQClientException(
                    "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NoNameServerException);
        }

        // Topic为空则抛出异常
        throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
                null).setResponseCode(ClientErrorCode.NotFoundTopicException);
    }

    // ...

}

sendKernelImpl

其实看函数名就能看出来,这是发送消息的核心方法。

  1. 根据brokername从本地缓存表brokerAddrTable中获取Broker服务器的IP地址,如果无法从本地获取到Broker的地址,则去请求注册中心获取;
  2. Broker会开启两个端口对外服务,如果开启VIP通道,则VIP端口号是原始端口号 - 2
  3. 构造RequestHeader请求头
  4. 根据同步策略发送消息,ONEWAY表示单向消息无需返回结果,发送失败会抛异常

核心逻辑我已经标注在下面的代码片段里,非核心代码已省略

public class DefaultMQProducerImpl implements MQProducerInner {

    // ...

    /**
     * 发送消息核心方法,返回发送结果
     *
     * @param msg               消息
     * @param mq                消息队列
     * @param communicationMode 通信模式
     * @param sendCallback      发送回调
     * @param topicPublishInfo  Topic信息
     * @param timeout           超时时间
     * @return
     * @throws MQClientException
     * @throws RemotingException
     * @throws MQBrokerException
     * @throws InterruptedException
     */
    private SendResult sendKernelImpl(final Message msg, //
                                      final MessageQueue mq, //
                                      final CommunicationMode communicationMode, //
                                      final SendCallback sendCallback, //
                                      final TopicPublishInfo topicPublishInfo, //
                                      final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        // 根据broker name查询Broker的地址
        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        // 本地缓存为空则从注册中心查询Broker的地址
        if (null == brokerAddr) {
            tryToFindTopicPublishInfo(mq.getTopic());
            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        }

        SendMessageContext context = null;
        if (brokerAddr != null) {
            // 是否使用VIP channel
            brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);

            byte[] prevBody = msg.getBody();
            try {

                // ......省略部分逻辑

                // 构造请求头
                SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
                // 设置producer的group
                requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                // 设置topic名称
                requestHeader.setTopic(msg.getTopic());
                // 设置默认topic
                requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
                // 设置topic queue的数量
                requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
                // 设置queue的id
                requestHeader.setQueueId(mq.getQueueId());
                // 设置系统标记
                requestHeader.setSysFlag(sysFlag);
                // 设置消息创建时间
                requestHeader.setBornTimestamp(System.currentTimeMillis());
                requestHeader.setFlag(msg.getFlag());
                // 设置消息属性
                requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
                // 设置被消费过的次数
                requestHeader.setReconsumeTimes(0);
                requestHeader.setUnitMode(this.isUnitMode());
                // 如果topic是"%RETRY%"表示消息重发
                if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                    if (reconsumeTimes != null) {
                        requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
                    }

                    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
                    if (maxReconsumeTimes != null) {
                        requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
                    }
                }

                SendResult sendResult = null;
                // 根据消息发送的不同模式发送消息
                switch (communicationMode) {
                    case ASYNC:
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(//
                                brokerAddr, // 1
                                mq.getBrokerName(), // 2
                                msg, // 3
                                requestHeader, // 4
                                timeout, // 5
                                communicationMode, // 6
                                sendCallback, // 7
                                topicPublishInfo, // 8
                                this.mQClientFactory, // 9
                                this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), // 10
                                context, //
                                this);
                        break;
                    case ONEWAY:
                    case SYNC:
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(//
                                brokerAddr, // 1
                                mq.getBrokerName(), // 2
                                msg, // 3
                                requestHeader, // 4
                                timeout, // 5
                                communicationMode, // 6
                                context,//
                                this);
                        break;
                    default:
                        assert false;
                        break;
                }

                if (this.hasSendMessageHook()) {
                    context.setSendResult(sendResult);
                    this.executeSendMessageHookAfter(context);
                }

                return sendResult;
            } catch (RemotingException e) {
                /** 异常处理逻辑 **/
            } catch (MQBrokerException e) {
                /** 异常处理逻辑 **/
            } catch (InterruptedException e) {
                /** 异常处理逻辑 **/
            } finally {
                msg.setBody(prevBody);
            }
        }
        // broker不存在,抛异常
        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
    }

    // ...

}

该方法调用了sendMessage执行真正的发送逻辑:

sendMessage

  1. 构建消息发送的请求对象sendMessageRequestHeader
  2. 使用RemotingCommand创建请求指令并设置参数
  3. 发起远程调用请求,实现消息发送
  • 消息发送模式为ONEWAY时,消息只会单向发送一次
  • 消息发送模式为ASYNC时,如果消息发送失败,会根据重试次数重发消息
  • 消息发送模式为SYNC时,直接发送消息,不重试
/**
     * 发送消息,返回发送结果
     *
     * @param addr                      Broker地址
     * @param brokerName
     * @param msg                       消息
     * @param requestHeader             请求头
     * @param timeoutMillis             超时时间
     * @param communicationMode         通信模式
     * @param sendCallback              发送回调
     * @param topicPublishInfo          Topic信息
     * @param instance                  Client实例
     * @param retryTimesWhenSendFailed  最大重试次数
     * @param context                   发送消息context
     * @param producer
     * @return
     * @throws RemotingException
     * @throws MQBrokerException
     * @throws InterruptedException
     */
    public SendResult sendMessage(//
                         final String addr, // 1
                         final String brokerName, // 2
                         final Message msg, // 3
                         final SendMessageRequestHeader requestHeader, // 4
                         final long timeoutMillis, // 5
                         final CommunicationMode communicationMode, // 6
                         final SendCallback sendCallback, // 7
                         final TopicPublishInfo topicPublishInfo, // 8
                         final MQClientInstance instance, // 9
                         final int retryTimesWhenSendFailed, // 10
                         final SendMessageContext context, // 11
                         final DefaultMQProducerImpl producer // 12
    ) throws RemotingException, MQBrokerException, InterruptedException {
        // 创建请求,如果将sendSmartMsg设为true,可以将请求keey压缩,加快序列化
        RemotingCommand request = null;
        if (sendSmartMsg) {
            SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
            request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
        } else {
            request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
        }

        request.setBody(msg.getBody());

        switch (communicationMode) {
            case ONEWAY:
                // 基于Netty快速通信框架,发送消息给broker
                this.remotingClient.invokeOneway(addr, request, timeoutMillis);
                return null;
            case ASYNC:
                final AtomicInteger times = new AtomicInteger();
                this.sendMessageAsync(addr, brokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
                        retryTimesWhenSendFailed, times, context, producer);
                return null;
            case SYNC:
                return this.sendMessageSync(addr, brokerName, msg, timeoutMillis, request);
            default:
                assert false;
                break;
        }

        return null;
    }

这里调用了remotingClient客户端远程调用Broker服务发送消息

作者:小王曾是少年
链接:https://juejin.cn/post/7084048029271441444
来源:稀土掘金

相关文章

网友评论

    本文标题:RocketMQ 消息投递解析——时序图、调用链、源码级解析

    本文链接:https://www.haomeiwen.com/subject/zkdusrtx.html