美文网首页
SpringBoot+RabbitMQ+快速入门

SpringBoot+RabbitMQ+快速入门

作者: pingwazi | 来源:发表于2020-09-03 16:16 被阅读0次

有问题请联系我QQ:273206491

前提条件

你需要先安装RabbitMQ服务,并保证能够使用。如果还没有到搭建请参考我的另外两篇文章。
Centos 7+RabbitMQ+镜像集群之基础环境搭建
Centos 7+RabbitMQ+镜像集群之集群环境搭建

如果需要在windows系统上安装Rabbitmq服务,网上有很多安装教程。

1、目录结构

image.png

2、导包

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.3.RELEASE</version>
    </parent>
    <groupId>com.pingwazi</groupId>
    <artifactId>SpringBootRabbitMq</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <!-- spring boot的核心依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--rabbitmq的客户端工具包-->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.7.3</version>
        </dependency>
    </dependencies>
</project>

3、编写发送消息和接收消息的类

这里面每一步骤都做了相应的说明,使用@PostConstruct标记的方法可以在类准备好之后自动执行(这也刚好符合了消费者的使用场景)。

/**
 * @author pingwazi
 * @description
 */
@Component
public class RabbitMQService {
    private ConnectionFactory connectionFactory;
    private Connection connection;
    public RabbitMQService(
            @Value("${rabbitmq.serverhost:localhost}") String rabbitmqServerHost,
            @Value("${rabbitmq.username:guest}")String rabbitmqUserName,
            @Value("${rabbitmq.password:guest}") String rabbitmqPassword
    ) {
        try
        {
            this.connectionFactory=new ConnectionFactory();
            this.connectionFactory.setHost(rabbitmqServerHost);
            this.connectionFactory.setUsername(rabbitmqUserName);
            this.connectionFactory.setPassword(rabbitmqPassword);
            this.connection =connectionFactory.newConnection();
        }
        catch (Exception ex)
        {
           ex.printStackTrace();
        }

    }


    /**
     * 发送消息
     */
    public void sendMessage(String message) {
        try {
            //每次发送消息都要单独创建信道,因为信道并线程安全的
            Channel upChannel = connection.createChannel();
            //交换器类型:fanout、direct、topic
            //声明一个名字为msgExchange、类型为direct并且持久化的交换器,如果交换器已经存在就不再创建
            upChannel.exchangeDeclare("msgExchange", BuiltinExchangeType.DIRECT, false);
            //声明一个名字为msgQueue、持久化、不排他、不自动删除的队列,如果此队列已经存在就不再创建
            upChannel.queueDeclare("msgQueue", true, false, false, null);
            //将交换器与队列进行绑定通过message进行绑定
            upChannel.queueBind("msgQueue", "msgExchange", "message");
            //发送消息
            upChannel.basicPublish("msgExchange", "message", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("utf-8"));
            upChannel.close();//释放信道资源
        }catch (ShutdownSignalException ex) {
            //连接异常关闭了,这里要进行检查,并尝试重新建立连接
            ex.printStackTrace();
        } catch (IOException ex) {
            //发生io异常需要进行处理,对应channel可能关闭了
            ex.printStackTrace();
        } catch (TimeoutException e) {
            //资源释放出现问题
            e.printStackTrace();
        }
    }

    public void batchSendMessage(List<String> messages) {
        try {
            //每次发送消息都要单独创建信道,因为信道并线程安全的
            Channel upChannel = connection.createChannel();
            //交换器类型:fanout、direct、topic
            //声明一个名字为msgExchange、类型为direct并且持久化的交换器,如果交换器已经存在就不再创建
            upChannel.exchangeDeclare("msgExchange", BuiltinExchangeType.DIRECT, false);
            //声明一个名字为msgQueue、持久化、不排他、不自动删除的队列,如果此队列已经存在就不再创建
            upChannel.queueDeclare("msgQueue", true, false, false, null);
            //将交换器与队列进行绑定通过message进行绑定
            upChannel.queueBind("msgQueue", "msgExchange", "message");
            //发送消息
            for(String message:messages)
            {
                upChannel.basicPublish("msgExchange", "message", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("utf-8"));
            }
            upChannel.close();//释放信道资源
        }catch (ShutdownSignalException ex) {
            //连接异常关闭了,这里要进行检查,并尝试重新建立连接
            ex.printStackTrace();
        } catch (IOException ex) {
            //发生io异常需要进行处理,对应channel可能关闭了
            ex.printStackTrace();
        } catch (TimeoutException e) {
            //资源释放出现问题
            e.printStackTrace();
        }
    }
    /**
     * 拉模式适合处理消息消费时间比较常的消息
     */
    //@PostConstruct
    private void receiveGetMessage()
    {
        try
        {
            Channel downChannel=connection.createChannel();
            //交换器类型:fanout、direct、topic
            //声明一个名字为msgExchange、类型为direct并且持久化的交换器,如果交换器已经存在就不再创建
            downChannel.exchangeDeclare("msgExchange",BuiltinExchangeType.DIRECT,false);
            //声明一个名字为msgQueue、持久化、不排他、不自动删除的队列,如果此队列已经存在就不再创建
            downChannel.queueDeclare("msgQueue", true, false, false, null);
            //将交换器与队列进行绑定通过message进行绑定
            downChannel.queueBind("msgQueue","msgExchange","message");
            //消息未确认消息的数量
            downChannel.basicQos(1);//在非自动确认的模式下,限制最多允许未确认的消息数量
            boolean isBreak=false;
            while (!isBreak)
            {
                //消费消息
                GetResponse msgData = downChannel.basicGet("", false);
                String msgBody=new String(msgData.getBody(), "utf-8");
                System.out.println(Thread.currentThread().getId()+"RabbitMQ拉模式消费者收到消息: " + msgBody);
                //回复确认消息
                downChannel.basicAck(msgData.getEnvelope().getDeliveryTag(),false);
                if(StringUtils.isEmpty(msgBody))
                    isBreak=true;
            }
            downChannel.close();
        }
        catch (ShutdownSignalException ex)
        {
            //连接异常关闭了,这里要进行检查,并尝试重新建立连接
            ex.printStackTrace();
        }
        catch (IOException ex)
        {
            //发生io异常需要进行处理,对应channel可能关闭了
            ex.printStackTrace();
        } catch (TimeoutException e) {
            //信道资源释放超时,可能对应的channel关闭了
            e.printStackTrace();
        }
    }
    /**
     *  接收消息
     */
    @PostConstruct
    private void receivePushMessage()
    {
        try
        {
            Channel downChannel=connection.createChannel();
            //交换器类型:fanout、direct、topic
            //声明一个名字为msgExchange、类型为direct并且持久化的交换器,如果交换器已经存在就不再创建
            downChannel.exchangeDeclare("msgExchange",BuiltinExchangeType.DIRECT,false);
            //声明一个名字为msgQueue、持久化、不排他、不自动删除的队列,如果此队列已经存在就不再创建
            downChannel.queueDeclare("msgQueue", true, false, false, null);
            //将交换器与队列进行绑定通过message进行绑定
            downChannel.queueBind("msgQueue","msgExchange","message");
            //消息未确认消息的数量
            downChannel.basicQos(10000);//在非自动确认的模式下,限制最多允许未确认的消息数量
            //消费消息
            downChannel.basicConsume("msgQueue",createConsumer(downChannel));
            System.out.println("RabbitMQ消费者正在运行中...");
            //不能释放信道资源!!!
            //因为这里的消费者是用的推模式,如果关闭了信道,后面在进行消息消费的时候会报错
            //downChannel.close();
        }
        catch (ShutdownSignalException ex)
        {
            //连接异常关闭了,这里要进行检查,并尝试重新建立连接
            ex.printStackTrace();
        }
        catch (IOException ex)
        {
            //发生io异常需要进行处理,对应channel可能关闭了
            ex.printStackTrace();
        }
    }

    /**
     * 释放资源
     */
    @PreDestroy
    private void releaseSource() {
        try {
            if (connection != null) connection.close();//连接关闭时,会自动将channel关闭掉
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 创建消费对象
     * @param channel
     * @return
     */
    private Consumer createConsumer(Channel channel)
    {
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                String message = new String(body);
                System.out.println(Thread.currentThread().getId()+"RabbitMQ推模式消费者收到消息: " + message);
                // 消息确认
                try {
                    channel.basicAck(envelope.getDeliveryTag(), false);//手动确认消息
                } catch (IOException e) {
                    //发生io异常需要进行处理,对应channel可能关闭了
                    e.printStackTrace();
                }
            }
        };
        return consumer;
    }
}

4、编写用于测试的Controller和启动类

/**
 * @author pingwazi
 * @description
 */
@RestController
@RequestMapping("/home")
public class HomeController {
    @Autowired
    private RabbitMQService rabbitMQService;
    @GetMapping("/index")
    public String index(String msg)
    {
        rabbitMQService.sendMessage(msg);
        return "已向Rabbitmq发送了消息:"+msg;
    }
}
/**
 * @author pingwazi
 * @description
 */
@SpringBootApplication
public class ApplicationRun {
    public static void main(String[] args) {
        SpringApplication.run(ApplicationRun.class,args);
    }
}

相关文章

网友评论

      本文标题:SpringBoot+RabbitMQ+快速入门

      本文链接:https://www.haomeiwen.com/subject/ahcosktx.html