美文网首页
阿里云mns迁移rabbitmq(附代码实现)

阿里云mns迁移rabbitmq(附代码实现)

作者: 一个小废材 | 来源:发表于2020-03-22 19:13 被阅读0次

公司之前的项目一直使用的是(阿里云的mns)作为基础队列。
mns简单易用,而且有阿里云保证可用性,基本不需要操太多心,也就一直没研究新的队列产品。
但是随着业务的增多,发现mns的成本问题越来越明显。我们有两套测试环境,每套测试环境需要建三十多个topic,还有将近百十个队列。按照mns的收费标准,基本测试环境每天就要出去两百多。

image.png

上图是阿里云的mns收费标准,小公司的测试经费伤不起啊。


image.png

而且测试环境也不需要多好的稳定性,本着测试环境节约成本,能用就行的原则,我们开始考虑更换其他的队列产品。
由于之前使用过一段时间的rabbitmq,还算是比较熟悉,所以考虑将队列产品升级rabbitmq。
rabbtimq是基于amqp协议的队列实现,功能相对mns复杂不少,这里将迁移中的几个注意点记录下来,供大家参考。

几个重要的概念

概念 解释 对应rabbitmq的实现
地域 阿里云所有的产品都会分地域,地域之间网络不通。有些产品还会在地域的基础上分可用区。mns产品只有地域的区分,地域之间不能互通队列或者主题 rabbitmq对应有虚拟机概念,每个虚拟机下的队列、交换机是独立的
队列 最核心的概念,消息的实际承载体。队列是一个先进先出的数据结构,最先进入队列的消息最早会被消费端收到 rabbitmq也有队列的概念
主题 mns的发布订阅模型的起点。消息生产者,可以把消息推给主题,主题会把消息分发给所有订阅者 rabbitmq有很多种方式可以实现发布订阅,实现mns的主题只需要direct模式的交换机就可以实现(但是mns的主题可以直接把消息推给一个url地址,或者通过短信发出,目前rabbitmq不支持这种业务)

地域

首先,mns的主题和队列都是基于阿里云的地域,每一个地域可以有一套主题和队列。
而rabbitmq是按照vhost区分,一个vhost可以有一套交换机、队列等。

基本队列

mns的队列概念比较简单,基本就是一个压入和弹出。
rabbitmq的队列不能单独存在,至少要有一个exchange。

image.png
如果要迁移一个mns的队列到rabbitmq,可以先选择一个rabbitmq的vhost,创建一个交换机。
然后创建一个和mns队列名称一样的队列,之后将交换机和队列绑定,绑定的时候使用队列名称作为routekey(rabbitmq如何向队列压入数据)。
当然,如果要迁移一批mns的队列,可以考虑通过一个交换机,然后创建一批队列,在绑定队列和交换机的时候,使用队列名称作为route key。

延时队列

有时候,我们希望队列的消息不能被消费端立刻消费,而是等一段时间再被消费。在mns中,设置延时队列很简单,只需要设置队列的队列延迟(DelaySeconds属性)就可以了,最大可以设置604800秒,基本可以满足大多数的使用场景。
但是在rabbtimq中设置消息延迟比较麻烦,因为rabbitmq没有直接支持延迟队列。我们可以使用rabbitmq的消息ttl和死信队列实现延迟队列。


image.png

操作步骤:

1·首先创建一个中转队列,中转队列名称可以随意设置,最好和我们要创建的延迟队列使用相似的队列名称再加上一些特殊标识,要保证中转队列不会被任何消费者消费。
2·创建中转队列的时候,设置队列的两个属性,一个是 x-message-ttl,这个参数决定消息进入当前队列,多久不被消费会被丢弃。另一个是 x-dead-letter-exchange,这个叫死信交换机,被丢弃的消息会重新进入这个交换机,可以设置为当前中转队列一样的交换机,还有一个是x-dead-letter-routing-key,这个是被丢弃的消息重新进入交换机时使用的route key。
3·下一步就是创建一个要被消费端消费的队列,同时将队列和刚才设置的 x-dead-letter-routing-key 进行绑定。
这样,被压入的消息,首先会进入中转队列,经过 x-message-ttl 毫秒后,因为没有消费端消费,会被丢弃到x-dead-letter-exchange,同时通过 x-dead-letter-routing-key 压入到真实被消费的队列。从而实现队列数据的延时消费。

主题

mns里支持主题,我们可以在mns里创建主题。然后在主题里创建订阅,订阅可以选择将主题的消息压入队列,或者请求rest接口,或者是发送短信。
我们这里主要讲压入队列的情况。
在rabbitmq里可以通过给exchange绑定多个route key 来实现主题订阅。

操作步骤:

1·创建一个exchange,最好和之前普通队列的交换机区分开。
2·使用mns的主题作为route key,绑定当前exchange和订阅队列。

mns队列属性

属性名称 说明 rabbitmq如何实现
PollingWaitSeconds(消息接收长轮询等待时间) 当Queue中没有消息时,针对该Queue的ReceiveMessage请求最长的等待时间,单位为秒。 这个参数可以通过消费端控制,rabbitmq消费端消费时可以指定最大等待超时时间(mns也可以在消费端处理)
VisibilityTimeout(取出消息隐藏时长) 消息从该Queue中取出后从Active状态变成Inactive状态后的持续时间,单位为秒。 这是防止一个消息被多个消费端重复消费。rabbitmq没有超时时间概念,消息被取出后,只要当前网络连接没有断开,rabbtimq不会将消息重新分发给其他消费者
MessageRetentionPeriod(消息存活时间(秒)) 消息在该Queue中最长的存活时间,从发送到该队列开始经过此参数指定的时间后,不论消息是否被取出过都将被删除,单位为秒。 rabbitmq可以通过 x-message-ttl 来实现消息存活时间,当然如果希望消息超时就删除,这个队列不能设置x-dead-letter-exchange和x-dead-letter-routing-key
MaximumMessageSize(消息最大长度(Byte)) 发送到该Queue的消息体的最大长度,单位为Byte。 rabbitmq可以通过 x-max-length-bytes 来实现消息最大长度。同时rabbitmq还提供了x-max-length 参数,通过限制消息总数的方式限制消息大小

代码实现

//入口代码
public static void main(String[] args) throws IOException, TimeoutException {
        //实例化配置管理
        MnsConf conf = new MnsConf("*********", "*******", "http://*******.aliyuncs.com/");
        //实例化mns操作类
        MnsUtil mnsUtil = new MnsUtil(conf);
        //实例化rabbitmq的操作类
        RabbitmqConf rabbitConf = new RabbitmqConf("localhost", "5672", "admin", "admin", "test-vhost", "topic-exchange", "default-exchange");
        //实例化rabbitmq的操作类
        RabbitmqUtil rabbitmqUtil = new RabbitmqUtil(rabbitConf);

        //实例化核心操作类
        Mns2RabbitmqService service = new Mns2RabbitmqService();
        //注入mns操作类
        service.setMnsUtil(mnsUtil);
        //注入rabbitmq操作类
        service.setRabbitmqUtil(rabbitmqUtil);
        //设置默认主题交换机
        service.setDefaultExchange("default-exchange");
        //设置主题交换机
        service.setTopicExchange("topic-exchange");

         //复制队列
        service.copyQueue();
        //复制主题订阅到rabbitmq
        service.copyTopic();
       
        //删除队列
        //service.delQueue();
}
//mns 配置类
package com.conf;

public class MnsConf {
    public String  accountendpoint;
    public String  accesskeyid;
    public String  accesskeysecret;

    public MnsConf(String accesskeyid, String accesskeysecret, String accountendpoint) {
        this.accesskeyid = accesskeyid;
        this.accesskeysecret = accesskeysecret;
        this.accountendpoint = accountendpoint;
    }
}
//rabbitmq 配置类
package com.conf;

public class RabbitmqConf {
    public String host;
    public String port;
    public String user;
    public String password;
    public String vhost;
    public String topicExchange;
    public String defaultExchange;

    public RabbitmqConf(String host, String port, String user, String password, String vhost, String topicExchange, String defaultExchange) {
        this.host = host;
        this.port = port;
        this.user = user;
        this.password = password;
        this.vhost = vhost;
    }
}

//mns操作类
package com.services;

import com.aliyun.mns.model.QueueMeta;
import com.aliyun.mns.model.SubscriptionMeta;
import com.aliyun.mns.model.TopicMeta;
import com.utils.MnsUtil;
import com.utils.RabbitmqUtil;

import java.io.IOException;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class Mns2RabbitmqService {
    private MnsUtil mns = null;
    private RabbitmqUtil rabbitmq = null;
    private String defaultExchange = null;
    private String topicExchange = null;

    public void setMnsUtil(MnsUtil mns) {
        this.mns = mns;
    }

    public void setRabbitmqUtil(RabbitmqUtil rabbitmq) {
        this.rabbitmq = rabbitmq;
    }

    public void setDefaultExchange(String defaultExchange) {
        this.defaultExchange = defaultExchange;
    }

    public void setTopicExchange(String topicExchange) {
        this.topicExchange = topicExchange;
    }

    /**
     * 复制队列数据
     * @return
     */
    public boolean copyQueue() throws IOException {
        this.createExchange(this.defaultExchange, "direct");
        this.createExchange(this.topicExchange, "direct");

        List<QueueMeta> queueList = this.mns.getQueueList();
        for (QueueMeta oneQueue: queueList) {
            //队列名称
            System.out.println("queueName: "+ oneQueue.getQueueName());
            //消息延时(秒)
            System.out.println("delay second:"+ oneQueue.getDelaySeconds());
            //消息最大长度(Byte)
            System.out.println("max message size:"+ oneQueue.getMaxMessageSize());
            //消息接收长轮询等待时间(秒)
            System.out.println("polling wait second: "+ oneQueue.getPollingWaitSeconds());
            //消息存活时间(秒)
            System.out.println("MessageRetentionPeriod: "+ oneQueue.getMessageRetentionPeriod());

            if (oneQueue.getDelaySeconds() > 0) {
                //如果是延迟队列,则需要创建一个中转rabbitmq队列
                //创建一个中转队列
                this.rabbitmq.createQueue("dealysecond_dead_queue_"+ oneQueue.getQueueName(),
                                    oneQueue.getDelaySeconds(), oneQueue.getMaxMessageSize(),
                                    this.defaultExchange, "dealysecond_dead_routekey_"+ oneQueue.getQueueName());
                this.rabbitmq.bindQueue("dealysecond_dead_queue_"+ oneQueue.getQueueName(),
                                this.defaultExchange,  oneQueue.getQueueName());

                //创建中转队列到目标队列
                this.rabbitmq.createQueue(oneQueue.getQueueName(), 0, oneQueue.getMaxMessageSize(),
                            "", "");
                this.rabbitmq.bindQueue(oneQueue.getQueueName(),
                        this.defaultExchange,  "dealysecond_dead_routekey_"+ oneQueue.getQueueName());
            } else {
                //如果不是延迟队列,则直接创建
                this.rabbitmq.createQueue(oneQueue.getQueueName(), oneQueue.getMessageRetentionPeriod(), oneQueue.getMaxMessageSize(),
                                "", "");
                this.rabbitmq.bindQueue(oneQueue.getQueueName(), this.defaultExchange,  oneQueue.getQueueName());
            }
        }
        return true;
    }

    /**
     * 删除指定队列
     * @return
     * @throws IOException
     */
    public boolean delQueue() throws IOException {
        List<QueueMeta> queueList = this.mns.getQueueList();
        for (QueueMeta oneQueue: queueList) {
            this.rabbitmq.delQueue(oneQueue.getQueueName());
            this.rabbitmq.delQueue("dealysecond_dead_queue_"+oneQueue.getQueueName());
        }
        return true;
    }

    /**
     * 创建交换机
     * @param exchangeName
     * @param type
     * @return
     * @throws IOException
     */
    public boolean createExchange(String exchangeName, String type) throws IOException {
        this.rabbitmq.createExchange(exchangeName, type);
        return true;
    }

    /**
     * 复制mns主题到rabbitmq
     * @return
     * @throws IOException
     */
    public boolean copyTopic() throws IOException {
        //获取mns主题列表
        List<TopicMeta> listTopic = this.mns.getTopicList();
        if (null == listTopic) {
            return true;
        }
        List<SubscriptionMeta> listSub = null;
        for(TopicMeta oneTopic: listTopic) {
            //获取每个主题的订阅信息
            System.out.println("topic Name: "+ oneTopic.getTopicName());
            listSub = this.mns.getTopicSub(oneTopic.getTopicName());
            if (null != listSub) {
                String subName = null;
                String patt = "^(.*?)/";
                Pattern pattern = Pattern.compile(patt);
                Matcher match = null;
                for(SubscriptionMeta oneSub: listSub) {
                    //mns 主题订阅的end point
                    System.out.println("    sub end point: "+ oneSub.getEndpoint());
                    //替换 endpoint
                    match = pattern.matcher(oneSub.getEndpoint());
                    subName = match.replaceAll("");
                    //创建主题交换机到队列的绑定
                    this.rabbitmq.bindQueue(subName, this.topicExchange, oneTopic.getTopicName());
                }
            }
        }
        return true;
    }
}
//rabbitmq操作类
package com.utils;

import com.conf.RabbitmqConf;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

/**
 * rabbitmq  操作类
 */
public class RabbitmqUtil {
    private RabbitmqConf conf;
    private Connection connection;
    private Channel channel;

    public RabbitmqUtil(RabbitmqConf conf) throws IOException, TimeoutException {
        this.conf = conf;

        //实例连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置地址
        connectionFactory.setHost(this.conf.host);
        //设置端口
        connectionFactory.setPort(Integer.parseInt(this.conf.port));
        //设置用户名
        connectionFactory.setUsername(this.conf.user);
        //设置密码
        connectionFactory.setPassword(this.conf.password);
        //设置虚拟机
        connectionFactory.setVirtualHost(this.conf.vhost);

        //获取连接(跟jdbc很像)
        this.connection = connectionFactory.newConnection();
        this.channel = connection.createChannel();

    }

    /**
     * 创建队列
     * @param queueName
     * @param delaySecond
     * @param maxMessageSize
     * @param deadexchangeName
     * @param deadRoutekey
     * @return
     * @throws IOException
     */
    public boolean createQueue(String queueName, long delaySecond,
                               long maxMessageSize, String deadexchangeName,
                               String deadRoutekey) throws IOException {
        Map<String, Object> arguments = new HashMap<String, Object>();
        arguments.put("x-message-ttl", delaySecond * 1000);
        arguments.put("x-max-length-bytes", maxMessageSize);
        if (deadexchangeName.length() > 0) {
            arguments.put("x-dead-letter-exchange", deadexchangeName);
            arguments.put("x-dead-letter-routing-key", deadRoutekey);
        }
        this.channel.queueDeclare(queueName, true, false, false, arguments);
        return true;
    }

    public boolean bindQueue(String queue, String exchangeName, String routekey) throws IOException {
        this.channel.queueBind(queue, exchangeName, routekey);
        return true;
    }
    public boolean delQueue(String queueName) throws IOException {
        com.rabbitmq.client.AMQP.Queue.DeleteOk ok = this.channel.queueDelete(queueName);
        return true;
    }

    public boolean createExchange(String exchangeName, String type) throws IOException {
        this.channel.exchangeDeclare(exchangeName, type);
        return true;
    }
}
//aliyun  account 操作类
package com.utils;

import com.aliyun.mns.client.CloudAccount;
import com.conf.MnsConf;

public class AliyunAccount {
    public static CloudAccount getCloudAccount(MnsConf conf) {
        CloudAccount account = new CloudAccount(conf.accesskeyid, conf.accesskeysecret, conf.accountendpoint);
        return account;
    }
}

//mns 复制到rabbitmq类
package com.services;

import com.aliyun.mns.model.QueueMeta;
import com.aliyun.mns.model.SubscriptionMeta;
import com.aliyun.mns.model.TopicMeta;
import com.utils.MnsUtil;
import com.utils.RabbitmqUtil;

import java.io.IOException;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class Mns2RabbitmqService {
    private MnsUtil mns = null;
    private RabbitmqUtil rabbitmq = null;
    private String defaultExchange = null;
    private String topicExchange = null;

    public void setMnsUtil(MnsUtil mns) {
        this.mns = mns;
    }

    public void setRabbitmqUtil(RabbitmqUtil rabbitmq) {
        this.rabbitmq = rabbitmq;
    }

    public void setDefaultExchange(String defaultExchange) {
        this.defaultExchange = defaultExchange;
    }

    public void setTopicExchange(String topicExchange) {
        this.topicExchange = topicExchange;
    }

    /**
     * 复制队列数据
     * @return
     */
    public boolean copyQueue() throws IOException {
        this.createExchange(this.defaultExchange, "direct");
        this.createExchange(this.topicExchange, "direct");

        List<QueueMeta> queueList = this.mns.getQueueList();
        for (QueueMeta oneQueue: queueList) {
            //队列名称
            System.out.println("queueName: "+ oneQueue.getQueueName());
            //消息延时(秒)
            System.out.println("delay second:"+ oneQueue.getDelaySeconds());
            //消息最大长度(Byte)
            System.out.println("max message size:"+ oneQueue.getMaxMessageSize());
            //消息接收长轮询等待时间(秒)
            System.out.println("polling wait second: "+ oneQueue.getPollingWaitSeconds());
            //消息存活时间(秒)
            System.out.println("MessageRetentionPeriod: "+ oneQueue.getMessageRetentionPeriod());

            if (oneQueue.getDelaySeconds() > 0) {
                //如果是延迟队列,则需要创建一个中转rabbitmq队列
                //创建一个中转队列
                this.rabbitmq.createQueue("dealysecond_dead_queue_"+ oneQueue.getQueueName(),
                                    oneQueue.getDelaySeconds(), oneQueue.getMaxMessageSize(),
                                    this.defaultExchange, "dealysecond_dead_routekey_"+ oneQueue.getQueueName());
                this.rabbitmq.bindQueue("dealysecond_dead_queue_"+ oneQueue.getQueueName(),
                                this.defaultExchange,  oneQueue.getQueueName());

                //创建中转队列到目标队列
                this.rabbitmq.createQueue(oneQueue.getQueueName(), 0, oneQueue.getMaxMessageSize(),
                            "", "");
                this.rabbitmq.bindQueue(oneQueue.getQueueName(),
                        this.defaultExchange,  "dealysecond_dead_routekey_"+ oneQueue.getQueueName());
            } else {
                //如果不是延迟队列,则直接创建
                this.rabbitmq.createQueue(oneQueue.getQueueName(), 0, oneQueue.getMaxMessageSize(),
                                "", "");
                this.rabbitmq.bindQueue(oneQueue.getQueueName(), this.defaultExchange,  oneQueue.getQueueName());
            }
        }
        return true;
    }

    /**
     * 删除指定队列
     * @return
     * @throws IOException
     */
    public boolean delQueue() throws IOException {
        List<QueueMeta> queueList = this.mns.getQueueList();
        for (QueueMeta oneQueue: queueList) {
            this.rabbitmq.delQueue(oneQueue.getQueueName());
            this.rabbitmq.delQueue("dealysecond_dead_queue_"+oneQueue.getQueueName());
        }
        return true;
    }

    /**
     * 创建交换机
     * @param exchangeName
     * @param type
     * @return
     * @throws IOException
     */
    public boolean createExchange(String exchangeName, String type) throws IOException {
        this.rabbitmq.createExchange(exchangeName, type);
        return true;
    }

    /**
     * 复制mns主题到rabbitmq
     * @return
     * @throws IOException
     */
    public boolean copyTopic() throws IOException {
        //获取mns主题列表
        List<TopicMeta> listTopic = this.mns.getTopicList();
        if (null == listTopic) {
            return true;
        }
        List<SubscriptionMeta> listSub = null;
        for(TopicMeta oneTopic: listTopic) {
            //获取每个主题的订阅信息
            System.out.println("topic Name: "+ oneTopic.getTopicName());
            listSub = this.mns.getTopicSub(oneTopic.getTopicName());
            if (null != listSub) {
                String subName = null;
                String patt = "^(.*?)/";
                Pattern pattern = Pattern.compile(patt);
                Matcher match = null;
                for(SubscriptionMeta oneSub: listSub) {
                    //mns 主题订阅的end point
                    System.out.println("    sub end point: "+ oneSub.getEndpoint());
                    //替换 endpoint
                    match = pattern.matcher(oneSub.getEndpoint());
                    subName = match.replaceAll("");
                    //创建主题交换机到队列的绑定
                    this.rabbitmq.bindQueue(subName, this.topicExchange, oneTopic.getTopicName());
                }
            }
        }
        return true;
    }
}

相关文章

网友评论

      本文标题:阿里云mns迁移rabbitmq(附代码实现)

      本文链接:https://www.haomeiwen.com/subject/qdivyhtx.html