美文网首页spring boot亚武学习
Rabbitmq打怪升级之路(十二)Headers-头交换机模式

Rabbitmq打怪升级之路(十二)Headers-头交换机模式

作者: 亚武de小文 | 来源:发表于2019-07-01 16:48 被阅读0次

简书:亚武de小文 【原创:转载请注明出处】

头交换机模式(Headers)

LengToo上学.png
RabbitMQ有以下几种工作模式 :
  • Work queues
  • Publish/Subscribe
  • Routing
  • Topic
  • Headers
  • RPC

Header
模型图
[亚武de小文]Headers模型图.png

消息header数据里有一个特殊的键x-match,它有两个值:

  • all: 默认值。一个传送消息的header里的键值对和交换机的header键值对全部匹配,才可以路由到对应交换机
  • any: 一个传送消息的header里的键值对和交换机的header键值对任意一个匹配,就可以路由到对应交换机

头交换机和主题交换机类似,区别在于:Topic路由值是基于路由键,Headers的路由值基于消息的header数据。 主题交换机路由键只有是字符串,而头交换机可以是整型和哈希值。

参考代码
生产者
  • Producer.java
    package com.yawu.xiaowen.header;
    
    import com.rabbitmq.client.*;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.Map;
    
    /**
     * Header交换机
     * 生产者
     *
     * @author yawu
     * @date 2019.07.01
     */
    public class Producer {
        private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);
        private static final String EXCHANGE_NAME = "mq_header_exchange";
    
        public static void execute(Map<String, Object> headerMap) {
            try {
                // RabbitMQ建立连接的管理器
                ConnectionFactory factory = new ConnectionFactory();
                // 设置服务器地址
                factory.setHost("127.0.0.1");
                factory.setUsername("guest");
                factory.setPassword("guest");
    
                // 创建一个连接
                Connection connection = factory.newConnection();
                // 创建一个信道
                Channel channel = connection.createChannel();
    
                String message = "发送信息-headers交换机";
    
                //声明一个Header类型的交换机
                channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS);
    
                // 生成发送消息的属性
                AMQP.BasicProperties props = new AMQP.BasicProperties
                        .Builder()
                        .headers(headerMap)
                        .build();
    
                // 向交换机发送消息
                channel.basicPublish(EXCHANGE_NAME, "like.orange.color", null, message.getBytes("UTF-8"));
    
                LOGGER.info("消息发送成功:{}", message);
                channel.close();
                connection.close();
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    }
    
消费者
  • Consumer01.java
    package com.yawu.xiaowen.header;
    
    import com.rabbitmq.client.*;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    import java.util.Map;
    
    /**
     * Header交换机
     * 消费者
     *
     * @author yawu
     * @date 2019.07.01
     */
    public class Consumer01 {
        private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);
        private static final String EXCHANGE_NAME = "mq_header_exchange";
    
        public static void execute(Map<String, Object> myHeaderMap) {
            try {
                ConnectionFactory factory = new ConnectionFactory();
                factory.setHost("127.0.0.1");
                Connection connection = factory.newConnection();
                Channel channel = connection.createChannel();
    
                //声明一个Headers类型的交换机
                channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS);
    
                // 声明一个临时队列
                String queue_name = channel.queueDeclare().getQueue();
    
                // 队列绑定时需要指定参数,注意虽然不需要路由键但仍旧不能写成null,需要写成空字符串""
                channel.queueBind(queue_name, EXCHANGE_NAME, "", myHeaderMap);
    
                LOGGER.info("【Consumer01:" + myHeaderMap + "】 等待消息...");
                Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        final String message = new String(body, "UTF-8");
                        LOGGER.info("【Consumer01:" + myHeaderMap + "】接收到的消息 '" + properties.getHeaders() + "':'" + message + "'");
                    }
                };
    
                // 队列一确认消息
                channel.basicConsume(queue_name, true, consumer);
    
            } catch (Exception e) {
                LOGGER.error("an exception was occurred , caused by :{}", e.getMessage());
            }
        }
    }
    
    
测试工具
  • HearTest.java

    package com.yawu.xiaowen.header;
    
    
    import org.junit.Test;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class HeaderTest {
    
        private ExecutorService executorService = Executors.newFixedThreadPool(10);
    
        @Test
        public void header() throws InterruptedException {
    
            // 消费者1:绑定 health=Nice,mentality=Good
            executorService.submit(() -> {
                Map<String, Object> headerMap = new HashMap<>();
                headerMap.put("health", "Nice");
                headerMap.put("mentality", "Good");
                headerMap.put("x-match", "all");
                Producer.execute(headerMap);
            });
    
            // 消费者2:绑定  health=Nice,mentality=Bad
            executorService.submit(() -> {
                Map<String, Object> headerMap = new HashMap<>();
                headerMap.put("health", "Nice");
                headerMap.put("mentality", "Bad");
                headerMap.put("x-match", "any");
                Producer.execute(headerMap);
            });
    
            // 消费者3:绑定  health=Terrible,mentality=Good
            executorService.submit(() -> {
                Map<String, Object> headerMap = new HashMap<>();
                headerMap.put("health", "Terrible");
                headerMap.put("mentality", "Good");
                headerMap.put("x-match", "all");
    //            headerMap.put("x-match","any");
                Producer.execute(headerMap);
            });
    
            Thread.sleep(2 * 1000);
            System.out.println("=============消息01===================");
            // 生产者1 : health=Nice,mentality=Good,x-match=all
            executorService.submit(() -> {
                Map<String, Object> headerMap = new HashMap<>();
                headerMap.put("health", "Nice");
                headerMap.put("mentality", "Good");
    //            headerMap.put("x-match","all");
                Consumer01.execute(headerMap);
            });
    
            Thread.sleep(5 * 100);
            System.out.println("=============消息02===================");
            // 生产者2 : health=Nice,x-match=any
            executorService.submit(() -> {
                Map<String, Object> headerMap = new HashMap<>();
                headerMap.put("health", "Nice");
    //            headerMap.put("x-match","any");
                Consumer01.execute(headerMap);
            });
    
            Thread.sleep(5 * 100);
            System.out.println("=============消息03===================");
            // 生产者1 : health=Terrible,mentality=Bad,x-match=all
            executorService.submit(() -> {
                Map<String, Object> headerMap = new HashMap<>();
                headerMap.put("health", "Terrible");
                headerMap.put("mentality", "Bad");
    //            headerMap.put("x-match","all");
                Consumer01.execute(headerMap);
            });
    
            // sleep 10s
            Thread.sleep(10 * 1000);
        }
    }
    
  • 运行HeaderTest测试工具,结果如图:


    Headers模式测试结果.png
  • 至此,Headers交换机模式学习完毕

相关文章

网友评论

    本文标题:Rabbitmq打怪升级之路(十二)Headers-头交换机模式

    本文链接:https://www.haomeiwen.com/subject/mglbcctx.html