有问题请联系我QQ:273206491
前提条件
你需要先安装RabbitMQ服务,并保证能够使用。如果还没有到搭建请参考我的另外两篇文章。
Centos 7+RabbitMQ+镜像集群之基础环境搭建
Centos 7+RabbitMQ+镜像集群之集群环境搭建
如果需要在windows系统上安装Rabbitmq服务,网上有很多安装教程。
1、目录结构
image.png
2、导包
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.3.RELEASE</version>
</parent>
<groupId>com.pingwazi</groupId>
<artifactId>SpringBootRabbitMq</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<!-- spring boot的核心依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--rabbitmq的客户端工具包-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.3</version>
</dependency>
</dependencies>
</project>
3、编写发送消息和接收消息的类
这里面每一步骤都做了相应的说明,使用@PostConstruct标记的方法可以在类准备好之后自动执行(这也刚好符合了消费者的使用场景)。
/**
* @author pingwazi
* @description
*/
@Component
public class RabbitMQService {
private ConnectionFactory connectionFactory;
private Connection connection;
public RabbitMQService(
@Value("${rabbitmq.serverhost:localhost}") String rabbitmqServerHost,
@Value("${rabbitmq.username:guest}")String rabbitmqUserName,
@Value("${rabbitmq.password:guest}") String rabbitmqPassword
) {
try
{
this.connectionFactory=new ConnectionFactory();
this.connectionFactory.setHost(rabbitmqServerHost);
this.connectionFactory.setUsername(rabbitmqUserName);
this.connectionFactory.setPassword(rabbitmqPassword);
this.connection =connectionFactory.newConnection();
}
catch (Exception ex)
{
ex.printStackTrace();
}
}
/**
* 发送消息
*/
public void sendMessage(String message) {
try {
//每次发送消息都要单独创建信道,因为信道并线程安全的
Channel upChannel = connection.createChannel();
//交换器类型:fanout、direct、topic
//声明一个名字为msgExchange、类型为direct并且持久化的交换器,如果交换器已经存在就不再创建
upChannel.exchangeDeclare("msgExchange", BuiltinExchangeType.DIRECT, false);
//声明一个名字为msgQueue、持久化、不排他、不自动删除的队列,如果此队列已经存在就不再创建
upChannel.queueDeclare("msgQueue", true, false, false, null);
//将交换器与队列进行绑定通过message进行绑定
upChannel.queueBind("msgQueue", "msgExchange", "message");
//发送消息
upChannel.basicPublish("msgExchange", "message", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("utf-8"));
upChannel.close();//释放信道资源
}catch (ShutdownSignalException ex) {
//连接异常关闭了,这里要进行检查,并尝试重新建立连接
ex.printStackTrace();
} catch (IOException ex) {
//发生io异常需要进行处理,对应channel可能关闭了
ex.printStackTrace();
} catch (TimeoutException e) {
//资源释放出现问题
e.printStackTrace();
}
}
public void batchSendMessage(List<String> messages) {
try {
//每次发送消息都要单独创建信道,因为信道并线程安全的
Channel upChannel = connection.createChannel();
//交换器类型:fanout、direct、topic
//声明一个名字为msgExchange、类型为direct并且持久化的交换器,如果交换器已经存在就不再创建
upChannel.exchangeDeclare("msgExchange", BuiltinExchangeType.DIRECT, false);
//声明一个名字为msgQueue、持久化、不排他、不自动删除的队列,如果此队列已经存在就不再创建
upChannel.queueDeclare("msgQueue", true, false, false, null);
//将交换器与队列进行绑定通过message进行绑定
upChannel.queueBind("msgQueue", "msgExchange", "message");
//发送消息
for(String message:messages)
{
upChannel.basicPublish("msgExchange", "message", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("utf-8"));
}
upChannel.close();//释放信道资源
}catch (ShutdownSignalException ex) {
//连接异常关闭了,这里要进行检查,并尝试重新建立连接
ex.printStackTrace();
} catch (IOException ex) {
//发生io异常需要进行处理,对应channel可能关闭了
ex.printStackTrace();
} catch (TimeoutException e) {
//资源释放出现问题
e.printStackTrace();
}
}
/**
* 拉模式适合处理消息消费时间比较常的消息
*/
//@PostConstruct
private void receiveGetMessage()
{
try
{
Channel downChannel=connection.createChannel();
//交换器类型:fanout、direct、topic
//声明一个名字为msgExchange、类型为direct并且持久化的交换器,如果交换器已经存在就不再创建
downChannel.exchangeDeclare("msgExchange",BuiltinExchangeType.DIRECT,false);
//声明一个名字为msgQueue、持久化、不排他、不自动删除的队列,如果此队列已经存在就不再创建
downChannel.queueDeclare("msgQueue", true, false, false, null);
//将交换器与队列进行绑定通过message进行绑定
downChannel.queueBind("msgQueue","msgExchange","message");
//消息未确认消息的数量
downChannel.basicQos(1);//在非自动确认的模式下,限制最多允许未确认的消息数量
boolean isBreak=false;
while (!isBreak)
{
//消费消息
GetResponse msgData = downChannel.basicGet("", false);
String msgBody=new String(msgData.getBody(), "utf-8");
System.out.println(Thread.currentThread().getId()+"RabbitMQ拉模式消费者收到消息: " + msgBody);
//回复确认消息
downChannel.basicAck(msgData.getEnvelope().getDeliveryTag(),false);
if(StringUtils.isEmpty(msgBody))
isBreak=true;
}
downChannel.close();
}
catch (ShutdownSignalException ex)
{
//连接异常关闭了,这里要进行检查,并尝试重新建立连接
ex.printStackTrace();
}
catch (IOException ex)
{
//发生io异常需要进行处理,对应channel可能关闭了
ex.printStackTrace();
} catch (TimeoutException e) {
//信道资源释放超时,可能对应的channel关闭了
e.printStackTrace();
}
}
/**
* 接收消息
*/
@PostConstruct
private void receivePushMessage()
{
try
{
Channel downChannel=connection.createChannel();
//交换器类型:fanout、direct、topic
//声明一个名字为msgExchange、类型为direct并且持久化的交换器,如果交换器已经存在就不再创建
downChannel.exchangeDeclare("msgExchange",BuiltinExchangeType.DIRECT,false);
//声明一个名字为msgQueue、持久化、不排他、不自动删除的队列,如果此队列已经存在就不再创建
downChannel.queueDeclare("msgQueue", true, false, false, null);
//将交换器与队列进行绑定通过message进行绑定
downChannel.queueBind("msgQueue","msgExchange","message");
//消息未确认消息的数量
downChannel.basicQos(10000);//在非自动确认的模式下,限制最多允许未确认的消息数量
//消费消息
downChannel.basicConsume("msgQueue",createConsumer(downChannel));
System.out.println("RabbitMQ消费者正在运行中...");
//不能释放信道资源!!!
//因为这里的消费者是用的推模式,如果关闭了信道,后面在进行消息消费的时候会报错
//downChannel.close();
}
catch (ShutdownSignalException ex)
{
//连接异常关闭了,这里要进行检查,并尝试重新建立连接
ex.printStackTrace();
}
catch (IOException ex)
{
//发生io异常需要进行处理,对应channel可能关闭了
ex.printStackTrace();
}
}
/**
* 释放资源
*/
@PreDestroy
private void releaseSource() {
try {
if (connection != null) connection.close();//连接关闭时,会自动将channel关闭掉
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 创建消费对象
* @param channel
* @return
*/
private Consumer createConsumer(Channel channel)
{
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
String message = new String(body);
System.out.println(Thread.currentThread().getId()+"RabbitMQ推模式消费者收到消息: " + message);
// 消息确认
try {
channel.basicAck(envelope.getDeliveryTag(), false);//手动确认消息
} catch (IOException e) {
//发生io异常需要进行处理,对应channel可能关闭了
e.printStackTrace();
}
}
};
return consumer;
}
}
4、编写用于测试的Controller和启动类
/**
* @author pingwazi
* @description
*/
@RestController
@RequestMapping("/home")
public class HomeController {
@Autowired
private RabbitMQService rabbitMQService;
@GetMapping("/index")
public String index(String msg)
{
rabbitMQService.sendMessage(msg);
return "已向Rabbitmq发送了消息:"+msg;
}
}
/**
* @author pingwazi
* @description
*/
@SpringBootApplication
public class ApplicationRun {
public static void main(String[] args) {
SpringApplication.run(ApplicationRun.class,args);
}
}











网友评论