美文网首页
RabbitMQ消费端的限流策略

RabbitMQ消费端的限流策略

作者: 若兮缘 | 来源:发表于2019-03-20 08:19 被阅读0次
为什么需要消费端的限流?
  • 假设一个场景,我们Rabbitmq服务器有上万条未处理的消息,我们随便打开一个消费者客户端,会出现这种情况:巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多数据!此时很有可能导致服务器崩溃,严重的可能导致线上的故障。
  • 除了这种场景,还有一些其他的场景,比如说单个生产者一分钟生产出了几百条数据,但是单个消费者一分钟可能只能处理60条数据,这个时候生产端和消费端肯定是不平衡的。通常生产端是没办法做限制的。所以消费端肯定需要做一些限流措施,否则如果超出最大负载,可能导致消费端性能下降,服务器卡顿甚至崩溃等一系列严重后果。
消费端限流机制

RabbitMQ提供了一种qos (服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息 (通过基于consume或者channel设置Qos的值) 未被确认前,不进行消费新的消息。
需要注意:
1.不能设置自动签收功能(autoAck = false)
2.如果消息没被确认,就不会到达消费端,目的就是给消费端减压

限流相关API
限流设置 - BasicQos()

void BasicQos(uint prefetchSize, ushort prefetchCount, bool global);
prefetchSize: 单条消息的大小限制,消费端通常设置为0,表示不做限制
prefetchCount: 一次最多能处理多少条消息,通常设置为1
global: 是否将上面设置应用于channel,false代表consumer级别

注意事项

prefetchSizeglobal这两项,rabbitmq没有实现,暂且不研究
prefetchCountautoAck=false 的情况下生效,即在自动应答的情况下这个值是不生效的

手工ACK - basicAck()

void basicAck(Integer deliveryTag,boolean multiple)
手工ACK,调用这个方法就会主动回送给Broker一个应答,表示这条消息我处理完了,你可以给我下一条了。参数multiple表示是否批量签收,由于我们是一次处理一条消息,所以设置为false

限流演示
生产端

生产端就是正常的逻辑

public class Producer {

    public static void main(String[] args) throws Exception {
        
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.11.76");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        
        String exchange = "test_qos_exchange";
        String routingKey = "qos.save";
        
        String msg = "Hello RabbitMQ QOS Message";
        //发送消息
        for(int i =0; i<5; i ++){
            channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
        }
    }
}
自定义消费者

在这里可以进行消息的手工ACK

public class MyConsumer extends DefaultConsumer {

    //接收channel
    private Channel channel ;
    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.err.println("-----------consume message----------");
        System.err.println("consumerTag: " + consumerTag);
        System.err.println("envelope: " + envelope);
        //System.err.println("properties: " + properties);
        System.err.println("body: " + new String(body));
        //手工ACK,参数multiple表示不批量签收
        //channel.basicAck(envelope.getDeliveryTag(), false);   
    }
}
消费端

关闭autoACK,进行限流设置

public class Consumer {

    public static void main(String[] args) throws Exception {
        //1 创建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.43.157");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        //2 获取Connection
        Connection connection = connectionFactory.newConnection();
        //3 通过Connection创建一个新的Channel
        Channel channel = connection.createChannel();
        
        String exchangeName = "test_qos_exchange";
        String queueName = "test_qos_queue";
        String routingKey = "qos.#";
        //4 声明交换机和队列,然后进行绑定设置路由Key
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        
        //进行参数设置:单条消息的大小限制,一次最多能处理多少条消息,是否将上面设置应用于channel
        channel.basicQos(0, 1, false);
        
        //限流: autoAck设置为 false
        channel.basicConsume(queueName, false, new MyConsumer(channel));
    }
}
运行说明

我们先注释掉手工ACK方法,然后启动消费端和生产端,此时消费端只打印了一条消息

-----------consume message----------
consumerTag: amq.ctag-vtsQsdK17o1Z3BWeGvZKRA
envelope: Envelope(deliveryTag=1, redeliver=false, exchange=test_qos_exchange, routingKey=qos.save)
body: Hello RabbitMQ QOS Message

这是因为我们设置了手工签收,并且设置了一次只处理一条消息,当我们没有回送ack应答时,Broker端就认为消费端还没有处理完这条消息,基于这种限流机制就不会给消费端发送新的消息了,所以消费端只打印了一条消息。
通过管控台也可以看到队列总共收到了5条消息,有一条消息没有ack。

将手工签收代码取消注释,再次运行消费端,此时就会打印5条消息的内容。

相关文章

  • RabbitMQ消费端的限流策略

    假设一个场景,由于我们的消费端突然全部不可用了,导致rabbitMQ服务器上有上万条未处理的消息,这时候如果没做任...

  • RabbitMQ消费端的限流策略

    为什么需要消费端的限流? 假设一个场景,我们Rabbitmq服务器有上万条未处理的消息,我们随便打开一个消费者客户...

  • RabbitMQ消费端的限流策略

    为什么需要消费端的限流? 假设一个场景,我们Rabbitmq服务器有上万条未处理的消息,我们随便打开一个消费者客户...

  • RabbitMQ 消费端的限流策略

    假设一个场景,由于我们的消费端突然全部不可用了,导致 rabbitMQ 服务器上有上万条未处理的消息,这时候如果没...

  • RabbitMQ 消费端限流

    RabbitMQ 提供了一种 qos(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数据的消息(通过基于...

  • RabbitMQ高级特性(2)

    一、消费端限流 1.什么是消费端的限流 假设一个场景,首先,Rabbitmq服务器有上万条未处理的消息,我们随便打...

  • RabbitMQ高级特性消费端限流策略实现

    应用范围为服务访问量突然剧增,原因可能有多种外部的调用或内部的一些问题导致消息积压,对服务的访问超过服务所能处理的...

  • RabbitMQ高级特性消费端限流策略实现

    应用范围为服务访问量突然剧增,原因可能有多种外部的调用或内部的一些问题导致消息积压,对服务的访问超过服务所能处理的...

  • RabbitMQ高级特性

    0. 前言 本文内容分为如下三部分RabbitMQ高级特性 消息可靠性投递Consumer ACK消费端限流TTL...

  • 消息队列之RabbitMQ-高级应用

    1、消费限流 假设一个场景:Rabbitmq 服务器积压了有上万条未处理的消息,我们随便打开一个消费者客户端,会出...

网友评论

      本文标题:RabbitMQ消费端的限流策略

      本文链接:https://www.haomeiwen.com/subject/yhewmqtx.html