美文网首页
springboot 集成Rabbitmq

springboot 集成Rabbitmq

作者: Younger_Coding | 来源:发表于2019-11-06 15:51 被阅读0次

一、在Linux下搭建Rabbitmq

1、需要先安装erlang环境

下载:wget http://www.rabbitmq.com/releases/erlang/erlang-19.0.4-1.el7.centos.x86_64.rpm

安装:rpm -ivh erlang-19.0.4-1.el7.centos.x86_64.rpm

2、安装RabbitMQ

下载:wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.10/rabbitmq-server-3.6.10-1.el7.noarch.rpm

安装:rpm -ivh rabbitmq-server-3.6.10-1.el7.noarch.rpm

如果安装时提示缺少一个socat....的东西

使用yum install socat安装即可

3、启动Rabbit服务

启动命令:systemctl start rabbitmq-server 或者 service rabbitmq-server start

停止命令:service rabbitmq-server stop 或 rabbitmqctl stop

重启rabbitmq: rabbitmq-server restart
查看服务状态:service rabbitmq-server status

4、开启远程访问

首先找到配置文件,输入命令:service rabbitmq-server status,找到日志文件

image

打开日志文件

image

提示日志文件没有创建,然后创建并写入:

[

{rabbit, [{tcp_listeners, [5672]}, {loopback_users, ["admin"]}]}

].
image

停止然后重启服务。

5.不能开启远程web访问,看是否安装了rabbitmq的插件

打开安装目录 $cd /usr/lib/rabbitmq/bin/

运行以下命令 安装插件 $ sudo rabbitmq-plugins enable rabbitmq_management


二、springboot集成Rabbitmq

1、pom文件

<dependency>

  <groupId>org.springframework.boot</groupId>

  <artifactId>spring-boot-starter-amqp</artifactId>

  <version>2.2.0.RELEASE</version>

</dependency>

2、Rabbitmq配置文件

#Rabbit MQ
spring.rabbitmq.virtual-host=/ 
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username = guest
spring.rabbitmq.password = guest
# 开启发送确认
spring.rabbitmq.publisher-confirms: true
# 开启发送失败退回
spring.rabbitmq.publisher-returns: true
#消息确认模式手动确认
spring.rabbitmq.listener.simple.acknowledge-mode: manual

3、配置文件

@EnableRabbit
@Configuration
public class RabbitConfig {
//已json形式存储数据
  @Bean
  public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
      return new Jackson2JsonMessageConverter();
  }
}

4、发送消息 RabbitTemplate

4.1 消息生产者

 private int sendPushLargeMsg(QueryStat qs) {
        PushLargeMsgModel msg = new PushLargeMsgModel();
        msg.setUserType(qs.getUserType());
        msg.setMiId(qs.getId());
       msg.setUsers(userMapper.getUserConcatByType(qs));
        int count = userMapper.countUserConcatByType(qs);
        return count;
    }

    private void sendPushLargeMsg(PushLargeMsgModel msg) {
        rabbitTemplate.convertAndSend(MQConstants.MESSAGE_EXCHANGE, MQConstants.PUSH_MSG_KEY, msg);
    }

4.2静态类

public class MQConstants {
    //消息交换机,它指定消息按什么规则,路由到哪个队列
    public final static String MESSAGE_EXCHANGE = "message";
   //消息队列载体,每个消息都会被投入到一个或多个队列
    public final static String PUSH_MSG_QUEUE = "push_msg";
   //消息路由key
    public final static String PUSH_MSG_KEY = "push_msg";
}

5、消费者

@Component
@RabbitListener(bindings = @QueueBinding(
        exchange = @Exchange(value = MQConstants.MESSAGE_EXCHANGE),
        value = @Queue(value = MQConstants.PUSH_MSG_QUEUE, durable = "true"),
        key = MQConstants.PUSH_MSG_KEY
))
public class PushLargeMsgReceiver extends BaseReceiver {

    @Autowired
    private MessageInfoService messageInfoService;
    @Autowired
    private MessageReceivingService messageReceivingService;
    @Autowired
    private AppPushApi appPushApi;


    @RabbitHandler
    public void process(PushLargeMsgModel msg, Channel channel, @Header(name = "amqp_deliveryTag") long deliveryTag) throws IOException {
        //amqp_deliveryTag 每次接收消息+1,deliveryTag 可以用来回传告诉 rabbitmq 这个消息处理成功 清除此消息(basicAck方法)
        safeAck(channel, deliveryTag);
        pushLargeMsg(msg);
    }

    private void pushLargeMsg(PushLargeMsgModel msg) {
        MessageInfo mi = messageInfoService.getById(msg.getMiId());
        List<MessageReceiving> list = new ArrayList<>();
        List<String> ios = new ArrayList<>();
        List<String> android = new ArrayList<>();
        msg.getUsers().forEach(p -> {
            String[] us = p.split("$");
            MessageReceiving mr = new MessageReceiving();
            mr.setMessageId(msg.getMiId());
            mr.setRecipient(Integer.parseInt(us[0]));
            mr.setUserType(Integer.parseInt(us[1]));
            list.add(mr);
            if (us.length > 3) {
                if ("ad".equals(us[2])) {
                    android.add(us[3]);
                } else if ("iOs".equals(us[2])) {
                    ios.add(us[3]);
                }
            }
        });
        messageReceivingService.saveBatch(list);
        if (android.size() != 0) {
            try {
                PushMessage pm = new PushMessage();
                pm.setModel(1);
                pm.setTitle(mi.getTitle());
                pm.setContent(mi.getContent());
                pm.setMsgId(mi.getId());
                pm.setSevCId(String.join(",", android));
                String r = appPushApi.listPush(pm);
            } catch (Exception e) {
                log.error(e.getMessage(), e);
            }
        }
        if (ios.size() != 0) {
            try {
                PushMessage pm = new PushMessage();
                pm.setModel(5);
                pm.setMsgId(mi.getId());
                pm.setTitle(mi.getTitle());
                pm.setContent(mi.getContent());
                pm.setSevCId(String.join(",", ios));
                String r = appPushApi.listPush(pm);
            } catch (Exception e) {
                log.error(e.getMessage(), e);
            }
        }

    }
//手动应答模式
private  void safeAck(Channel channel, long deliveryTag) throws IOException {
        try {
  //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 后续还会在发
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            channel.basicReject(deliveryTag, false);
        }
    }
}

相关文章

网友评论

      本文标题:springboot 集成Rabbitmq

      本文链接:https://www.haomeiwen.com/subject/assvbctx.html