美文网首页
RabbitMQ & RabbitMQ集群

RabbitMQ & RabbitMQ集群

作者: a9104fed92a0 | 来源:发表于2018-09-17 22:04 被阅读0次

RabbitMQ

Erang开发
支持集群
支持内存节点和磁盘节点

RabbitMQ的模式

(1)单一模式
(2)普通模式(默认集群模式):只复制交换机和队列,不复制数据。
(3)镜像模式:复制交换机队列和数据

RabbitMQ 安装

配置yum源
yum install -y epel-release
安装RabbitMQ-Server
yum install -y rabbitmq-server
启动服务

/usr/lib/rabbitmq/bin/rabbitmq-plugins list //查看插件安装情况
/usr/lib/rabbitmq/bin/rabbitmq-plugins enable rabbitmq_management //启用rabbitmq_management服务
service rabbitmq-server start

打开后台管理
打开浏览器输入http://192.168.100.143:15672, 输入默认的Username:guest,输入默认的Password:guest ,登录后出现如图所示的界面。


后台管理界面

集群配置

1.环境

IP 主机名 用处
192.168.33.10 centos01 磁盘
192.168.33.11 centos02 内存
192.168.33.12 centos03 内存

配置本地域名,以centos01为例
vi /etc/hostname
修改为:
centos01.localdomain
vi /etc/hosts
添加hosts
192.168.33.10 centos01 192.168.33.11 centos02 192.168.33.12 centos03
centos02,centos03,做类似配置。
<font color='#A52A2A'>注意:一定要先配置域名和hosts,确保RabbitMQ Managerment中的Nodes 为 Rabbit@centos01</font>

2.安装软件

三台机器上安装RabbitMQ

yum install -y epel-release
yum install -y rabbitmq-server

3.复制erlang.cookie

# cat /var/lib/rabbitmq/.erlang.cookie 
XAHPZVPYUQDWWJIOHUPQ

复制XAHPZVPYUQDWWJIOHUPQ到centos02和centos03

三台服务器上启动
service rabbitmq-server start
如果过程如果报错:
{{shutdown,{failed_to_start_child,net_sup,{shutdown,{failed_to_start_child,auth,{"Cookie file /var/lib/rabbitmq/.erl
则修改:

chown rabbitmq:rabbitmq /var/lib/rabbitmq/.erlang.cookie 
chmod 600 /var/lib/rabbitmq/.erlang.cookie

4.将centos02、centos03作为内存节点加入centos01节点集群中

rabbitmqctl stop_app    //停掉rabbit应用
rabbitmqctl join_cluster --ram rabbit@centos01 //加入到磁盘节点
rabbitmqctl start_app  //启动rabbit应用

如果执行:rabbitmqctl join_cluster --ram rabbit@centos01 失败。要确保rabbitmq management的nodes中显示的名字为:rabbit@centos01。如果不是,
则要修改 /etc/hostname 为:centos01.localdomain,并重启

5.rabbitmq management

打开http://localhost:15672/#/,nodes中显示为三个,则代表成功

集群节点
从图中可以看出,centos01 为磁盘节点(Disk),centos02,centos03 为内存节点(RAM)

6.启动trace功能

rabbitmq-plugins enable rabbitmq_tracing
rabbitmq trace_on

7.负载均衡

Rabbitmq集群配置好之后,需要使用负载均衡技术来将所有的机器进行串联起来,可以使用HaProxy来进行负载均衡。

RabbitMQ基本概念

Broker:可以理解为消息队列服务器实体,负责接收消息生产者的消息,然后将消息发送至消息接收者或者其他的Broker。
Exchange:消息交换机,是消息第一个到达的地方,消息通过它指定的路由规则,分发到不同的消息队列中去。
Queue:消息队列,消息通过发送和路由之后最终到达的地方。
Binding:绑定,作用是把Exchange和Queue按照路由规则绑定起来。
Routing Key:路由关键字,Exchange根据这个关键字进行消息投递。
Virtual host:虚拟主机,他是对Broker的虚拟划分,将消费者、生产这和他们依赖的AMQP相关结构进行隔离,一般是为了安全考虑。
Connection:连接,代表生产者、消费者、Broker之间惊喜通信的物理网络。
Channel:消息通道,用于连接生产者和消费者的逻辑结构。
Producer:消息生产者
Consumer:消息消费者

RabbitMQ Sprint Boot集成

1.项目配置

pom.xml依赖

                <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

application.properties rabbitmq 配置

spring.application.name=spirng-boot-rabbitmq-sender
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

.direct

队列配置

@Configuration
public class MQConfig {
    @Bean
    public Queue queue(){
        return new Queue("direct.queue");
    }
}

消息体Bean

public class User implements Serializable{
    private long id;
    private String name;

    public long getId() {
        return id;
    }

    public void setId(long id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}

生产者

@Component
public class MQProducer {
    @Autowired
    AmqpTemplate amqpTemplate;
    public void direct(User msg){
        amqpTemplate.convertAndSend("direct.queue",msg);
    }
}

消费者
@Component
public class MQReceiver {
@RabbitListener(queues = "direct.queue")
public void direct(User msg){
System.out.println(msg.getName());
}
}

2.topic

队列配置

@Component
@Configuration
public class MQTopicProducer {
    @Autowired
    AmqpTemplate amqpTemplate;

    @Bean(name = "queueA")
    public Queue queueA(){
        return new Queue("topic.queue.a");
    }

    @Bean(name = "queueB")
    public Queue queueB(){
        return new Queue("topic.queue.b");
    }

    @Bean(name = "topicExchange")
    public TopicExchange exchange(){
        return new TopicExchange("topic.exchange");
    }

    @Bean
    Binding bindingExchangeQueueA(@Qualifier("queueA") Queue queue, TopicExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("topic.#");
    }

    @Bean
    Binding bindingExchangeQueueB(@Qualifier("queueB") Queue queue, TopicExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("topic.queue.b");
    }


  
}

生产者

  public void topic(String routingKey,User user){
        amqpTemplate.convertAndSend("topic.exchange",routingKey,user);
    }

消费者

@Component
public class MQTopicConsumer {
    @RabbitListener(queues = "topic.queue.a")
    public void queueA(User user){
        System.out.println("queueA:"+user.getName());
    }

    @RabbitListener(queues = "topic.queue.b")
    public void queueB(User user){
        System.out.println("queueB:"+user.getName());
    }
}

3.fanout

队列配置

@Configuration
public class MQFanoutProducer {
    @Bean(name = "fanoutQueueA")
    Queue queueA(){
        return new Queue("fanout.queue.a");
    }
    @Bean(name = "fanoutQueueB")
    Queue queueB(){
        return new Queue("fanout.queue.b");
    }
    @Bean(name = "fanoutQueueC")
    Queue queueC(){
        return new Queue("fanout.queue.c");
    }

    @Bean
    FanoutExchange fanoutExchange(){
        return new FanoutExchange("fanout.exchange");
    }

    @Bean
    Binding bindingQueueA(@Qualifier("fanoutQueueA") Queue queue,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }

    @Bean
    Binding bindingQueueB(@Qualifier("fanoutQueueB") Queue queue,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }
    @Bean
    Binding bindingQueueC(@Qualifier("fanoutQueueC") Queue queue,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }
}

生产者

@Autowired
    AmqpTemplate amqpTemplate;
    public void fanout(User user){
        amqpTemplate.convertAndSend("fanout.exchange","",user);
    }

消费者

@Component
public class MQFanoutConsumer {
    @RabbitListener(queues = "fanout.queue.a")
    public void queueA(User user){
        System.out.println("QueueA:"+user.getName());
    }
    @RabbitListener(queues = "fanout.queue.b")
    public void queueB(User user){
        System.out.println("QueueB:"+user.getName());
    }

    @RabbitListener(queues = "fanout.queue.c")
    public void queueC(User user){
        System.out.println("QueueC:"+user.getName());
    }

}

参考地址:http://blog.51cto.com/11134648/2155934

相关文章

  • RabbitMQ 集群高可用部署详细介绍

    为什么搭建rabbitmq集群?rabbitmq集群有那些模式?如何搭建Rabbitmq集群?rabbitmq镜像...

  • rabbitMq安装、集群搭建

    Rabbitmq集群高可用测试RabbitMQ3.6.3集群搭建+HAProxy1.6做负载均衡RabbitMQ高...

  • RabbitMQ相关文章

    RabbitMQ相关文章 运维 RabbitMQ常用命令 RabbitMQ的安装及集群搭建方法 RabbitMQ单...

  • 【rabbitMQ】消息队列rabbitMQ面试题

    简述rabbitMQ的架构设计 简述rabbitMQ的普通集群模式 注意: rabbitMQ普通集群模式不能解决高...

  • RabbitMQ + Keepalived + HAProxy

    最终结构图: RabbitMQ镜像集群简介 RabbitMQ镜像集群是通过在RabbitMQ服务器配置相应的规则,...

  • 2018-04-28

    最终结构图: RabbitMQ镜像集群简介 RabbitMQ镜像集群是通过在RabbitMQ服务器配置相应的规则,...

  • RabbitMQ 集群

    RabbitMQ 集群 RabbitMQ 有 3 种模式,其中 2 种是集群模式。 单一模式:即单机情况不做集群,...

  • 使用Docker部署RabbitMQ集群

    使用Docker部署RabbitMQ集群 概述 本文重点介绍的Docker的使用,以及如何部署RabbitMQ集群...

  • (3)RabbitMQ集群跨越

    RabbitMq跨越集群的界限 RabbitMq分布式搭建的方案有:集群、Federation、Shovel。该三...

  • Rabbitmq集群之镜像队列

    镜像集群模式是在RabbitMQ Cluster默认集群的基础上添加策略搭建完成的 RabbitMQ默认集群模式,...

网友评论

      本文标题:RabbitMQ & RabbitMQ集群

      本文链接:https://www.haomeiwen.com/subject/xviciftx.html