RocketMq 堆积查询

作者: 晴天哥_王志 | 来源:发表于2020-05-12 11:56 被阅读0次

系列

开篇

  • 这篇文章的主要目的是描述RocketMq的消息堆积查询逻辑,消费堆积查询是以consumeGroup维度进行查询,查询该consumeGroup下的所有topic的所有queue的写入进度和消费进行比对。

例子

  • 查看指定消费组下的所有topic数据堆积情况
  • sh mqadmin consumerProgress -n localhost:9876 -g consumer_group_test
#Topic                            #Broker Name                      #QID  #Broker Offset        #Consumer Offset      #Client IP           #Diff                 #LastTime
%RETRY%consumer_group_test        broker-a                          0     0                     0                     N/A                  0                     N/A
TopicTest                         broker-a                          0     1155                  1004                  N/A                  151                   2020-05-08 22:53:52
TopicTest                         broker-a                          1     1159                  1007                  N/A                  152                   2020-05-08 22:54:58
TopicTest                         broker-a                          2     1124                  982                   N/A                  142                   2020-05-08 22:53:54
TopicTest                         broker-a                          3     1141                  987                   N/A                  154                   2020-05-08 22:54:58
TopicTest                         broker-a                          4     1126                  985                   N/A                  141                   2020-05-08 22:53:17
TopicTest                         broker-a                          5     1119                  980                   N/A                  139                   2020-05-08 22:53:17
TopicTest                         broker-a                          6     1127                  983                   N/A                  144                   2020-05-08 22:53:52
TopicTest                         broker-a                          7     1131                  987                   N/A                  144                   2020-05-08 22:53:52

AdminBrokerProcessor

public class AdminBrokerProcessor implements NettyRequestProcessor {

    private RemotingCommand getConsumeStats(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        final GetConsumeStatsRequestHeader requestHeader =
            (GetConsumeStatsRequestHeader) request.decodeCommandCustomHeader(GetConsumeStatsRequestHeader.class);

        ConsumeStats consumeStats = new ConsumeStats();

        Set<String> topics = new HashSet<String>();
        // 1、先获取这个消费分组订阅的topic信息
        if (UtilAll.isBlank(requestHeader.getTopic())) {
            topics = this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(requestHeader.getConsumerGroup());
        } else {
            topics.add(requestHeader.getTopic());
        }
        // 2、根据topic维度进行统计
        for (String topic : topics) {
            TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
            if (null == topicConfig) {
                log.warn("consumeStats, topic config not exist, {}", topic);
                continue;
            }

            {
                SubscriptionData findSubscriptionData =
                    this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getConsumerGroup(), topic);

                if (null == findSubscriptionData
                    && this.brokerController.getConsumerManager().findSubscriptionDataCount(requestHeader.getConsumerGroup()) > 0) {
                    log.warn("consumeStats, the consumer group[{}], topic[{}] not exist", requestHeader.getConsumerGroup(), topic);
                    continue;
                }
            }
            // 3、根据topic下可读队列进行统计
            for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
                MessageQueue mq = new MessageQueue();
                mq.setTopic(topic);
                mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
                mq.setQueueId(i);

                OffsetWrapper offsetWrapper = new OffsetWrapper();

                long brokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
                if (brokerOffset < 0)
                    brokerOffset = 0;

                long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(
                    requestHeader.getConsumerGroup(),
                    topic,
                    i);
                if (consumerOffset < 0)
                    consumerOffset = 0;

                offsetWrapper.setBrokerOffset(brokerOffset);
                offsetWrapper.setConsumerOffset(consumerOffset);

                long timeOffset = consumerOffset - 1;
                if (timeOffset >= 0) {
                    long lastTimestamp = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, timeOffset);
                    if (lastTimestamp > 0) {
                        offsetWrapper.setLastTimestamp(lastTimestamp);
                    }
                }
                // 4、按照MessageQueue维度进行统计
                consumeStats.getOffsetTable().put(mq, offsetWrapper);
            }

            double consumeTps = this.brokerController.getBrokerStatsManager().tpsGroupGetNums(requestHeader.getConsumerGroup(), topic);

            consumeTps += consumeStats.getConsumeTps();
            consumeStats.setConsumeTps(consumeTps);
        }

        byte[] body = consumeStats.encode();
        response.setBody(body);
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }
}
  • broker针对消息堆积处理按照Topic到MessageQueue的顺序进行统计。
  • 1、先获取这个消费分组订阅的topic信息。
  • 2、先按照topic维度进行统计,再根据topic下可读队列进行统计。
  • 3、按照MessageQueue维度进行统计进行返回。

相关文章

  • RocketMq 堆积查询

    系列 RocketMq 消息Tag过滤 RocketMq 广播模式 RocketMQ 同步调用的新特性 Rocke...

  • rocketMq的消息堆积

    概述 这篇文章的目的主要是为了讲清楚rocketMq消息堆积的概念,我印象中的rocketMq的消息堆积应该分为两...

  • rocketMq消息查询

    概述 最近有人问我知道rocketMq是怎么查询消息的,我发现我貌似回答不上来,所以抽空就把这块内容补充一下,...

  • RocketMq 消息查询

    系列 RocketMq 消息Tag过滤 RocketMq 广播模式 RocketMQ 同步调用的新特性 Rocke...

  • RocketMQ 添加监控和系统告警通知

    前言 最近由于RocketMQ在使用过程中,发现在某些时候消息堆积,并且还是长时间堆积不消费,这种情况下没能及时发...

  • 消息疯狂堆积!RocketMQ 出 Bug 了?

    推荐阅读: 我总结了72份面试题,累计3170页,斩获了30+互联网公司offer(含BATJM) 2020首战告...

  • 消息疯狂堆积!RocketMQ出Bug了?

    用过 MQ 的同学,可能会遇到过 消息****堆积的问题。而肥壕最近也踩上了这个坑,但是发现结果竟然是这么一个意料...

  • rocketMq - index介绍

    概述 这篇文章主要是想讲清楚rocketMq中index的存储和查询功能,着重分为存储和查询两个点进行讲解,这...

  • RocketMQ源码-Index索引介绍

    1 概述2 入口方法介绍3 索引结构介绍4 索引操作5 索引查询 1 概述 RocketMQ中Broker在收到生...

  • RocketMQ-Broker模块解析之Broker初始化以及启

    RocketMQ整体物理架构如下: Broker服务器主要负责消息的存储、投递和查询以及服务高可用保证,为了实现这...

网友评论

    本文标题:RocketMq 堆积查询

    本文链接:https://www.haomeiwen.com/subject/hlpsnhtx.html