美文网首页
SpringBoot整合RabbitMQ延迟队列的实现

SpringBoot整合RabbitMQ延迟队列的实现

作者: 潇豪 | 来源:发表于2019-08-05 19:36 被阅读0次

利用springboot快速创建脚手架

添加依赖

配置文件application.properties

spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/test
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672

在配置类里创建交换机,队列,绑定路由

package com.example.rabbitmqdemo.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitmqConfig {


    /**
     *
     * 创建实际消费队列绑定的交换机
     */
    @Bean
    public DirectExchange  demoExchange(){


        return new DirectExchange("demoExchange",true,false);
    }

    /**
     *
     * 创建延迟队列(死信队列)绑定的交换机
     */
    @Bean
    public DirectExchange  demoTtlExchange(){

        return new DirectExchange("demoTtlExchange",true,false);
    }


    /**
     *
     * 创建实际消费队列
     */
    @Bean
    public Queue demoQueue(){

        return new Queue("demoQueue",true);
    }


    /**
     *
     * 创建延迟队列(死信队列)
     */
    @Bean
    public Queue demoTtlQueue(){
        Map<String, Object> arguments=new HashMap<>();
        arguments.put("x-dead-letter-exchange", "demoExchange");
        arguments.put("x-dead-letter-routing-key", "demoRoutes");
        return new Queue("demoTtlQueue",true,false, false, arguments);
    }


    /**
     * 绑定实际消费队列到交换机
     */

    @Bean
    public Binding demoBinding(){

        return new Binding("demoQueue", Binding.DestinationType.QUEUE,"demoExchange","demoRoutes",null);
    }

    /**
     * 绑定延迟队列(死信队列)到交换机
     */
    @Bean
    public Binding demoTtlBinding(){


        return new Binding("demoTtlQueue", Binding.DestinationType.QUEUE,"demoTtlExchange","demoTtlRoutes",null);
    }


}

创建生产者

package com.example.rabbitmqdemo.service;

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;

@Configuration
public class SendMessage {

    @Autowired
    private RabbitTemplate rabbitTemplate;



    public void sendMessage(String message){

        rabbitTemplate.convertAndSend("demoTtlExchange", "demoTtlRoutes", message, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
            //设置10s过期,过期转发到指定路由
                message.getMessageProperties().setExpiration("10000");

                return message;
            }
        });




    }

创建消费者

@Configuration
@RabbitListener(queues = "demoQueue")
public class ReceiverMessage {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitHandler
    private void  receiverMessage(String message){

        System.out.println("message = "+message);
    }

}

创建测试类,发送消息

package com.example.rabbitmqdemo;

import com.example.rabbitmqdemo.service.SendMessage;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqdemoApplicationTests {

    @Autowired
    private SendMessage sendMessage;

    @Test
    public void contextLoads() {
        for(int i=1;i<=20;i++){

            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            sendMessage.sendMessage("测试消息"+i);

        }


    }

}

控制台

延时加载出来

相关文章

网友评论

      本文标题:SpringBoot整合RabbitMQ延迟队列的实现

      本文链接:https://www.haomeiwen.com/subject/cenmdctx.html