美文网首页
SpringBoot整合RabbitMQ

SpringBoot整合RabbitMQ

作者: 西界__ | 来源:发表于2020-12-17 09:30 被阅读0次

环境搭建

创建一个SpringBoot项目rabbitmq_springboot

image-20200910192043363

可以选择导入依赖

image-20200910192128413

也可以使用坐标导入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置文件配置RabbitMQ

image-20200910193155625
spring:
  application:
    name: rabbitmq-springboot
  rabbitmq:
    host: localhost
    port: 5672
    username: ems
    password: 123
    virtual-host: /ems

Hello World 模型

生产者

在测试类中注入RabbitMQTemplate,使用convertAndSend()方法

image-20200910192727359
@SpringBootTest
class RabbitmqSpringbootApplicationTests {

    //注入rabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;

    //hello world
    @Test
    public void testHello() {
        //向hello队列中发送 hello world 消息
        rabbitTemplate.convertAndSend("hello", "hello world");
    }
}

此时点击运行的话并不会生成hello队列产生消息,必须要有对应的消费者,才可以生效。

消费者

使用@Componet注解将该类注册到Spring中,通过@RabbitListener监听队列,@RabbitHandler处理。

image-20200910193942917
@Component
//默认 持久化 非独占 非自动删除 true false false
@RabbitListener(queuesToDeclare = @Queue("hello"))
public class HelloCustomer {

    @RabbitHandler
    public void receive(String message) {
        System.out.println("message=" + message);
    }
}

@RabbitListener 可以标注在类上面,需配合 @RabbitHandler 注解一起使用。

使用 @RabbitListener 注解标记方法,当监听到所指队列 中有消息时则会进行接收并处理

@RabbitListener 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理,具体使用哪个方法处理,根据 MessageConverter 转换后的参数类型

@RabbitListener(queuesToDeclare = @Queue("hello"))给queuesToDeclare赋值声明队列 @Queue创建队列

测试

此时再次启动生产者

image-20200910194854253 image-20200910194906975

消费成功!

细节

如果我们声明队列是想要配置队列的durable持久化,exclusive独占性,autoDelete自动删除怎么配置呢?

很简单依然通过上文中使用的@Queues注解

image-20200910195304227

通过查看源码我们可以得知默认队列属性是durable=true,exclusive=false,autoDelete=false

image-20200910195627320

Work Queue 模型

生产者

image-20200910195849195
@SpringBootTest
class RabbitmqSpringbootApplicationTests {

    //注入rabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;

    //hello world
    @Test
    public void testHello() {
        //向hello队列中发送 hello world 消息
        rabbitTemplate.convertAndSend("hello", "hello world");
    }

    //work
    @Test
    public void testWork() {
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend("work", "work模型" + i);
        }
    }

}

消费者

类上标注@Componet注解,方法上使用@RabbitMQ注解监听work队列。相当于两个创建了两个消费者

image-20200910200029746
@Component
public class WorkCustomer {

    //一个消费者
    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receive1(String message1) {
        System.out.println("message1=" + message1);
    }

    //一个消费者
    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receive2(String message2) {
        System.out.println("message2=" + message2);
    }

}

测试

点击运行testWork()

image-20200910200447797 image-20200910200515227

测试成功!

Fanout 模型

生产者

image-20200910201012594
@SpringBootTest
class RabbitmqSpringbootApplicationTests {

    //注入rabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;

    //hello world
    @Test
    public void testHello() {
        //向hello队列中发送 hello world 消息
        rabbitTemplate.convertAndSend("hello", "hello world");
    }

    //work
    @Test
    public void testWork() {
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend("work", "work模型" + i);
        }
    }

    //fanout 广播
    @Test
    public void testFanout() {
        rabbitTemplate.convertAndSend("logs", "", "Fanout的模型发送的消息");
    }
    
}

消费者

image-20200910201130361

@Compoent注册到Spring中,方法使用@RabbitListener,使用其中的bindings属性。使用@QueueBinding注解绑定value属性值为@Queue创建临时队列,exchange属性值为@Exchange从而绑定交换机。该注解的value值为交换机的名称logs,type为类型fanout

@Component
public class FanoutCustomer {

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,//创建临时队列
                    exchange =@Exchange(value = "logs",type = "fanout")  //绑定的交换机
            )
    })
    public void receive1(String  message){
        System.out.println("message1 = " + message);
    }


    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,//创建临时队列
                    exchange =@Exchange(value = "logs",type = "fanout")  //绑定的交换机
            )
    })
    public void receive2(String  message){
        System.out.println("message2 = " + message);
    }
}

测试

点击运行

image-20200910201716972 image-20200910201747502

Direct 模型

生产者

image-20200910201943941
@SpringBootTest
class RabbitmqSpringbootApplicationTests {

    //注入rabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;

    //hello world
    @Test
    public void testHello() {
        //向hello队列中发送 hello world 消息
        rabbitTemplate.convertAndSend("hello", "hello world");
    }
    //work
    @Test
    public void testWork() {
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend("work", "work模型" + i);
        }
    }
    //fanout 广播
    @Test
    public void testFanout() {
        rabbitTemplate.convertAndSend("logs", "", "Fanout的模型发送的消息");
    }

    //route 路由模式
    @Test
    public void testRoute(){
        rabbitTemplate.convertAndSend("directs","error","发送info的key的路由信息");
    }

}

消费者

@Compoent注册到Spring中,方法使用@RabbitListener,使用其中的bindings属性。使用@QueueBinding注解绑定value属性值为@Queue创建临时队列,exchange属性值为@Exchange从而绑定交换机。该注解的value值为交换机的名称logs,type为类型fanout。key属性值为routingKey值

image-20200910202439808

消费者1rotingKey值为info,error,warn。消费者2routingKey值为error

测试

生产者的routingKey为error,所以消费者1,消费者2都能消费该信息。

image-20200910202748884 image-20200910202815706

Topic 模型

生产者

image-20200910203901798
@SpringBootTest
class RabbitmqSpringbootApplicationTests {

    //注入rabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;

    //hello world
    @Test
    public void testHello() {
        //向hello队列中发送 hello world 消息
        rabbitTemplate.convertAndSend("hello", "hello world");
    }
    //work
    @Test
    public void testWork() {
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend("work", "work模型" + i);
        }
    }
    //fanout 广播
    @Test
    public void testFanout() {
        rabbitTemplate.convertAndSend("logs", "", "Fanout的模型发送的消息");
    }

    //route 路由模式
    @Test
    public void testRoute(){
        rabbitTemplate.convertAndSend("directs","error","发送info的key的路由信息");
    }

    //topic 动态路由  订阅模式
    @Test
    public void testTopic(){
        rabbitTemplate.convertAndSend("topics","product.save.add","produce.save.add 路由消息");
    }

}

消费者

image-20200910204002611
@Component
public class TopicCustomer {

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,
                    exchange = @Exchange(type = "topic",name="topics"),
                    key={"user.save","user.*"}
            )
    })
    public void receive1(String message){
        System.out.println("message1 = " + message);
    }



    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,
                    exchange = @Exchange(type = "topic",name="topics"),
                    key={"order.#","product.#","user.*"}
            )
    })
    public void receive2(String message){
        System.out.println("message2 = " + message);
    }
}

测试

生产者的routingKey为product.save.add,只有消费者2匹配。

image-20200910204149401 image-20200910204219076

相关文章

网友评论

      本文标题:SpringBoot整合RabbitMQ

      本文链接:https://www.haomeiwen.com/subject/pvbagktx.html