环境搭建
创建一个SpringBoot项目rabbitmq_springboot

可以选择导入依赖

也可以使用坐标导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置文件配置RabbitMQ

spring:
application:
name: rabbitmq-springboot
rabbitmq:
host: localhost
port: 5672
username: ems
password: 123
virtual-host: /ems
Hello World 模型
生产者
在测试类中注入RabbitMQTemplate,使用convertAndSend()
方法

@SpringBootTest
class RabbitmqSpringbootApplicationTests {
//注入rabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
//hello world
@Test
public void testHello() {
//向hello队列中发送 hello world 消息
rabbitTemplate.convertAndSend("hello", "hello world");
}
}
此时点击运行的话并不会生成hello队列产生消息,必须要有对应的消费者,才可以生效。
消费者
使用@Componet注解将该类注册到Spring中,通过@RabbitListener监听队列,@RabbitHandler处理。

@Component
//默认 持久化 非独占 非自动删除 true false false
@RabbitListener(queuesToDeclare = @Queue("hello"))
public class HelloCustomer {
@RabbitHandler
public void receive(String message) {
System.out.println("message=" + message);
}
}
@RabbitListener 可以标注在类上面,需配合 @RabbitHandler 注解一起使用。
使用 @RabbitListener 注解标记方法,当监听到所指队列 中有消息时则会进行接收并处理
@RabbitListener 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理,具体使用哪个方法处理,根据 MessageConverter 转换后的参数类型
@RabbitListener(queuesToDeclare = @Queue("hello"))
给queuesToDeclare赋值声明队列 @Queue
创建队列
测试
此时再次启动生产者


消费成功!
细节
如果我们声明队列是想要配置队列的durable持久化
,exclusive独占性
,autoDelete自动删除
怎么配置呢?
很简单依然通过上文中使用的@Queues
注解

通过查看源码我们可以得知默认队列属性是durable=true,exclusive=false,autoDelete=false

Work Queue 模型
生产者

@SpringBootTest
class RabbitmqSpringbootApplicationTests {
//注入rabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
//hello world
@Test
public void testHello() {
//向hello队列中发送 hello world 消息
rabbitTemplate.convertAndSend("hello", "hello world");
}
//work
@Test
public void testWork() {
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("work", "work模型" + i);
}
}
}
消费者
类上标注@Componet注解,方法上使用@RabbitMQ注解监听work队列。相当于两个创建了两个消费者

@Component
public class WorkCustomer {
//一个消费者
@RabbitListener(queuesToDeclare = @Queue("work"))
public void receive1(String message1) {
System.out.println("message1=" + message1);
}
//一个消费者
@RabbitListener(queuesToDeclare = @Queue("work"))
public void receive2(String message2) {
System.out.println("message2=" + message2);
}
}
测试
点击运行testWork()


测试成功!
Fanout 模型
生产者

@SpringBootTest
class RabbitmqSpringbootApplicationTests {
//注入rabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
//hello world
@Test
public void testHello() {
//向hello队列中发送 hello world 消息
rabbitTemplate.convertAndSend("hello", "hello world");
}
//work
@Test
public void testWork() {
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("work", "work模型" + i);
}
}
//fanout 广播
@Test
public void testFanout() {
rabbitTemplate.convertAndSend("logs", "", "Fanout的模型发送的消息");
}
}
消费者

@Compoent注册到Spring中,方法使用@RabbitListener
,使用其中的bindings
属性。使用@QueueBinding注解绑定value属性值为@Queue
创建临时队列,exchange属性值为@Exchange
从而绑定交换机。该注解的value值为交换机的名称logs,type为类型fanout
@Component
public class FanoutCustomer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,//创建临时队列
exchange =@Exchange(value = "logs",type = "fanout") //绑定的交换机
)
})
public void receive1(String message){
System.out.println("message1 = " + message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,//创建临时队列
exchange =@Exchange(value = "logs",type = "fanout") //绑定的交换机
)
})
public void receive2(String message){
System.out.println("message2 = " + message);
}
}
测试
点击运行


Direct 模型
生产者

@SpringBootTest
class RabbitmqSpringbootApplicationTests {
//注入rabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
//hello world
@Test
public void testHello() {
//向hello队列中发送 hello world 消息
rabbitTemplate.convertAndSend("hello", "hello world");
}
//work
@Test
public void testWork() {
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("work", "work模型" + i);
}
}
//fanout 广播
@Test
public void testFanout() {
rabbitTemplate.convertAndSend("logs", "", "Fanout的模型发送的消息");
}
//route 路由模式
@Test
public void testRoute(){
rabbitTemplate.convertAndSend("directs","error","发送info的key的路由信息");
}
}
消费者
@Compoent注册到Spring中,方法使用@RabbitListener
,使用其中的bindings
属性。使用@QueueBinding注解绑定value属性值为@Queue
创建临时队列,exchange属性值为@Exchange
从而绑定交换机。该注解的value值为交换机的名称logs,type为类型fanout。key属性值为routingKey值

消费者1rotingKey值为info,error,warn。消费者2routingKey值为error
测试
生产者的routingKey为error,所以消费者1,消费者2都能消费该信息。


Topic 模型
生产者

@SpringBootTest
class RabbitmqSpringbootApplicationTests {
//注入rabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
//hello world
@Test
public void testHello() {
//向hello队列中发送 hello world 消息
rabbitTemplate.convertAndSend("hello", "hello world");
}
//work
@Test
public void testWork() {
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("work", "work模型" + i);
}
}
//fanout 广播
@Test
public void testFanout() {
rabbitTemplate.convertAndSend("logs", "", "Fanout的模型发送的消息");
}
//route 路由模式
@Test
public void testRoute(){
rabbitTemplate.convertAndSend("directs","error","发送info的key的路由信息");
}
//topic 动态路由 订阅模式
@Test
public void testTopic(){
rabbitTemplate.convertAndSend("topics","product.save.add","produce.save.add 路由消息");
}
}
消费者

@Component
public class TopicCustomer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(type = "topic",name="topics"),
key={"user.save","user.*"}
)
})
public void receive1(String message){
System.out.println("message1 = " + message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(type = "topic",name="topics"),
key={"order.#","product.#","user.*"}
)
})
public void receive2(String message){
System.out.println("message2 = " + message);
}
}
测试
生产者的routingKey为product.save.add
,只有消费者2匹配。


网友评论