一、在Linux下搭建Rabbitmq
1、需要先安装erlang环境
下载:wget http://www.rabbitmq.com/releases/erlang/erlang-19.0.4-1.el7.centos.x86_64.rpm
安装:rpm -ivh erlang-19.0.4-1.el7.centos.x86_64.rpm
2、安装RabbitMQ
下载:wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.10/rabbitmq-server-3.6.10-1.el7.noarch.rpm
安装:rpm -ivh rabbitmq-server-3.6.10-1.el7.noarch.rpm
如果安装时提示缺少一个socat....的东西
使用yum install socat安装即可
3、启动Rabbit服务
启动命令:systemctl start rabbitmq-server 或者 service rabbitmq-server start
停止命令:service rabbitmq-server stop 或 rabbitmqctl stop
重启rabbitmq: rabbitmq-server restart
查看服务状态:service rabbitmq-server status
4、开启远程访问
首先找到配置文件,输入命令:service rabbitmq-server status,找到日志文件
image
打开日志文件
image
提示日志文件没有创建,然后创建并写入:
[
{rabbit, [{tcp_listeners, [5672]}, {loopback_users, ["admin"]}]}
].
image
停止然后重启服务。
5.不能开启远程web访问,看是否安装了rabbitmq的插件
打开安装目录 $cd /usr/lib/rabbitmq/bin/
运行以下命令 安装插件 $ sudo rabbitmq-plugins enable rabbitmq_management
二、springboot集成Rabbitmq
1、pom文件
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.2.0.RELEASE</version>
</dependency>
2、Rabbitmq配置文件
#Rabbit MQ
spring.rabbitmq.virtual-host=/
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username = guest
spring.rabbitmq.password = guest
# 开启发送确认
spring.rabbitmq.publisher-confirms: true
# 开启发送失败退回
spring.rabbitmq.publisher-returns: true
#消息确认模式手动确认
spring.rabbitmq.listener.simple.acknowledge-mode: manual
3、配置文件
@EnableRabbit
@Configuration
public class RabbitConfig {
//已json形式存储数据
@Bean
public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
return new Jackson2JsonMessageConverter();
}
}
4、发送消息 RabbitTemplate
4.1 消息生产者
private int sendPushLargeMsg(QueryStat qs) {
PushLargeMsgModel msg = new PushLargeMsgModel();
msg.setUserType(qs.getUserType());
msg.setMiId(qs.getId());
msg.setUsers(userMapper.getUserConcatByType(qs));
int count = userMapper.countUserConcatByType(qs);
return count;
}
private void sendPushLargeMsg(PushLargeMsgModel msg) {
rabbitTemplate.convertAndSend(MQConstants.MESSAGE_EXCHANGE, MQConstants.PUSH_MSG_KEY, msg);
}
4.2静态类
public class MQConstants {
//消息交换机,它指定消息按什么规则,路由到哪个队列
public final static String MESSAGE_EXCHANGE = "message";
//消息队列载体,每个消息都会被投入到一个或多个队列
public final static String PUSH_MSG_QUEUE = "push_msg";
//消息路由key
public final static String PUSH_MSG_KEY = "push_msg";
}
5、消费者
@Component
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = MQConstants.MESSAGE_EXCHANGE),
value = @Queue(value = MQConstants.PUSH_MSG_QUEUE, durable = "true"),
key = MQConstants.PUSH_MSG_KEY
))
public class PushLargeMsgReceiver extends BaseReceiver {
@Autowired
private MessageInfoService messageInfoService;
@Autowired
private MessageReceivingService messageReceivingService;
@Autowired
private AppPushApi appPushApi;
@RabbitHandler
public void process(PushLargeMsgModel msg, Channel channel, @Header(name = "amqp_deliveryTag") long deliveryTag) throws IOException {
//amqp_deliveryTag 每次接收消息+1,deliveryTag 可以用来回传告诉 rabbitmq 这个消息处理成功 清除此消息(basicAck方法)
safeAck(channel, deliveryTag);
pushLargeMsg(msg);
}
private void pushLargeMsg(PushLargeMsgModel msg) {
MessageInfo mi = messageInfoService.getById(msg.getMiId());
List<MessageReceiving> list = new ArrayList<>();
List<String> ios = new ArrayList<>();
List<String> android = new ArrayList<>();
msg.getUsers().forEach(p -> {
String[] us = p.split("$");
MessageReceiving mr = new MessageReceiving();
mr.setMessageId(msg.getMiId());
mr.setRecipient(Integer.parseInt(us[0]));
mr.setUserType(Integer.parseInt(us[1]));
list.add(mr);
if (us.length > 3) {
if ("ad".equals(us[2])) {
android.add(us[3]);
} else if ("iOs".equals(us[2])) {
ios.add(us[3]);
}
}
});
messageReceivingService.saveBatch(list);
if (android.size() != 0) {
try {
PushMessage pm = new PushMessage();
pm.setModel(1);
pm.setTitle(mi.getTitle());
pm.setContent(mi.getContent());
pm.setMsgId(mi.getId());
pm.setSevCId(String.join(",", android));
String r = appPushApi.listPush(pm);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
if (ios.size() != 0) {
try {
PushMessage pm = new PushMessage();
pm.setModel(5);
pm.setMsgId(mi.getId());
pm.setTitle(mi.getTitle());
pm.setContent(mi.getContent());
pm.setSevCId(String.join(",", ios));
String r = appPushApi.listPush(pm);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}
//手动应答模式
private void safeAck(Channel channel, long deliveryTag) throws IOException {
try {
//告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 后续还会在发
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
channel.basicReject(deliveryTag, false);
}
}
}













网友评论