工作流程:
- 扇形交换机将消息路由给绑定到他身上的所有队列,给不理会绑定的路由键。如果N个队列绑定到
- 某个扇形交换机上,当有消息发送到该扇形交换机上时,交换机会将消息的拷贝分别发送给这所有的N个队列。扇形用来交换机处理消息的广播路由
具体实现:
首先我们需要一个交换机的配置类:
package com.chuxin.fight.demo.rabbitmq.fanout;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 描述:配置广播模式或者订阅模式队列
* <p>
* Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。
*
* @author muyuanpei
* @create 2018年11月21日15:30:52
**/
@Configuration
public class RabbitFanoutConfig {
final static String PENGLEI = "fanout.penglei.net";
final static String SOUYUNKU = "fanout.souyunku.com";
final static String YUANPEI = "fanout.yuanpei.com";
@Bean
public Queue queueYuanpei() {
return new Queue(RabbitFanoutConfig.YUANPEI);
}
@Bean
public Queue queuePenglei() {
return new Queue(RabbitFanoutConfig.PENGLEI);
}
@Bean
public Queue queueSouyunku() {
return new Queue(RabbitFanoutConfig.SOUYUNKU);
}
/**
* 任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有队列上。
*/
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
@Bean
Binding bindingExchangeQueueYuanpei(Queue queueYuanpei, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queueYuanpei).to(fanoutExchange);
}
@Bean
Binding bindingExchangeQueuePenglei(Queue queuePenglei, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queuePenglei).to(fanoutExchange);
}
@Bean
Binding bindingExchangeQueueSouyunku(Queue queueSouyunku, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queueSouyunku).to(fanoutExchange);
}
}
然后编写对应的接受者:
package com.chuxin.fight.demo.rabbitmq.fanout;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 描述:接收者
*
* @author muyuanpei
* @create 2018年11月21日10:50:15
**/
@Component
@RabbitListener(queues = "fanout.penglei.net")
public class FanoutReceiver1 {
@RabbitHandler
public void process(String message) {
System.out.println("接收者 FanoutReceiver1," + message);
}
}
package com.chuxin.fight.demo.rabbitmq.fanout;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 描述:接收者
*
* @author muyuanpei
* @create 2018年11月21日10:50:15
**/
@Component
@RabbitListener(queues = "fanout.souyunku.com")
public class FanoutReceiver2 {
@RabbitHandler
public void process(String message) {
System.out.println("接收者 FanoutReceiver2," + message);
}
}
package com.chuxin.fight.demo.rabbitmq.fanout;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 描述:接收者
*
* @author muyuanpei
* @create 2018年11月21日10:50:15
**/
@Component
@RabbitListener(queues = "fanout.yuanpei.com")
public class FanoutReceiver3 {
@RabbitHandler
public void process(String message) {
System.out.println("接收者 FanoutReceiver3," + message);
}
}
最后编写测试类,完成我们最终的验证:
package com.chuxin.fight.demo.rabbitmq.fanout;
import com.chuxin.fight.demo.DemoApplication;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* 描述: 广播模式或者订阅模式队列
*
* @author: muyuanpei
* @create: 2018年11月21日15:31:36
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = DemoApplication.class)
public class RabbitFanoutTest {
@Autowired
private AmqpTemplate rabbitTemplate;
@Test
public void sendPengleiTest() {
String context = "此消息在,广播模式或者订阅模式队列下,有 FanoutReceiver1 FanoutReceiver2 FanoutReceiver3 可以收到";
String routeKey = "topic.penglei.net";
String exchange = "fanoutExchange";
System.out.println("sendPengleiTest : " + context);
context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context;
this.rabbitTemplate.convertAndSend(exchange, routeKey, context);
}
@Test
public void sendSouyunkuTest() {
String context = "此消息在,广播模式或者订阅模式队列下,有 FanoutReceiver1 FanoutReceiver2 FanoutReceiver3 可以收到";
String routeKey = "topic.souyunku.com";
String exchange = "fanoutExchange";
context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context;
System.out.println("sendSouyunkuTest : " + context);
this.rabbitTemplate.convertAndSend(exchange, routeKey, context);
}
}
这样就完成了一个扇形交换机的配置与编码。由于自己的服务器过期了,不能展示相应的打印结果。。。
网友评论