美文网首页
Fanout exchange(扇形交换机)

Fanout exchange(扇形交换机)

作者: 初心myp | 来源:发表于2019-07-31 14:18 被阅读0次

工作流程:

  • 扇形交换机将消息路由给绑定到他身上的所有队列,给不理会绑定的路由键。如果N个队列绑定到
  • 某个扇形交换机上,当有消息发送到该扇形交换机上时,交换机会将消息的拷贝分别发送给这所有的N个队列。扇形用来交换机处理消息的广播路由

具体实现:

首先我们需要一个交换机的配置类:

package com.chuxin.fight.demo.rabbitmq.fanout;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 描述:配置广播模式或者订阅模式队列
 * <p>
 * Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。
 *
 * @author muyuanpei
 * @create 2018年11月21日15:30:52
 **/
@Configuration
public class RabbitFanoutConfig {

    final static String PENGLEI = "fanout.penglei.net";

    final static String SOUYUNKU = "fanout.souyunku.com";

    final static String YUANPEI = "fanout.yuanpei.com";

    @Bean
    public Queue queueYuanpei() {
        return new Queue(RabbitFanoutConfig.YUANPEI);
    }

    @Bean
    public Queue queuePenglei() {
        return new Queue(RabbitFanoutConfig.PENGLEI);
    }

    @Bean
    public Queue queueSouyunku() {
        return new Queue(RabbitFanoutConfig.SOUYUNKU);
    }

    /**
     * 任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有队列上。
     */
    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }

    @Bean
    Binding bindingExchangeQueueYuanpei(Queue queueYuanpei, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queueYuanpei).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeQueuePenglei(Queue queuePenglei, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queuePenglei).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeQueueSouyunku(Queue queueSouyunku, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queueSouyunku).to(fanoutExchange);
    }

}

然后编写对应的接受者:

package com.chuxin.fight.demo.rabbitmq.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 描述:接收者
 *
 * @author muyuanpei
 * @create 2018年11月21日10:50:15
 **/
@Component
@RabbitListener(queues = "fanout.penglei.net")
public class FanoutReceiver1 {

    @RabbitHandler
    public void process(String message) {
        System.out.println("接收者 FanoutReceiver1," + message);
    }
}
package com.chuxin.fight.demo.rabbitmq.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 描述:接收者
 *
 * @author muyuanpei
 * @create 2018年11月21日10:50:15
 **/
@Component
@RabbitListener(queues = "fanout.souyunku.com")
public class FanoutReceiver2 {

    @RabbitHandler
    public void process(String message) {
        System.out.println("接收者 FanoutReceiver2," + message);
    }

}

package com.chuxin.fight.demo.rabbitmq.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 描述:接收者
 *
 * @author muyuanpei
 * @create 2018年11月21日10:50:15
 **/
@Component
@RabbitListener(queues = "fanout.yuanpei.com")
public class FanoutReceiver3 {

    @RabbitHandler
    public void process(String message) {
        System.out.println("接收者 FanoutReceiver3," + message);
    }

}

最后编写测试类,完成我们最终的验证:

package com.chuxin.fight.demo.rabbitmq.fanout;

import com.chuxin.fight.demo.DemoApplication;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;


/**
 * 描述: 广播模式或者订阅模式队列
 *
 * @author: muyuanpei
 * @create: 2018年11月21日15:31:36
 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = DemoApplication.class)
public class RabbitFanoutTest {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    @Test
    public void sendPengleiTest() {

        String context = "此消息在,广播模式或者订阅模式队列下,有 FanoutReceiver1 FanoutReceiver2 FanoutReceiver3 可以收到";

        String routeKey = "topic.penglei.net";

        String exchange = "fanoutExchange";

        System.out.println("sendPengleiTest : " + context);

        context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context;

        this.rabbitTemplate.convertAndSend(exchange, routeKey, context);
    }

    @Test
    public void sendSouyunkuTest() {

        String context = "此消息在,广播模式或者订阅模式队列下,有 FanoutReceiver1 FanoutReceiver2 FanoutReceiver3 可以收到";

        String routeKey = "topic.souyunku.com";

        String exchange = "fanoutExchange";

        context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context;

        System.out.println("sendSouyunkuTest : " + context);

        this.rabbitTemplate.convertAndSend(exchange, routeKey, context);
    }
}

这样就完成了一个扇形交换机的配置与编码。由于自己的服务器过期了,不能展示相应的打印结果。。。

相关文章

网友评论

      本文标题:Fanout exchange(扇形交换机)

      本文链接:https://www.haomeiwen.com/subject/vitzrctx.html