maven引入包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.1.9.RELEASE</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.10.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.10.0</version>
</dependency>
application.yml配置
spring:
rabbitmq:
port: 5672
host: 127.0.0.1
virtual-host: /
username: admin
password: admin
publisher-confirms: true
publisher-returns: true
MQ消费者基础抽象类
/**
* MQ消费者基础抽象类
*
* @author rqh
* @version 1.0
* @date 2019-08-30
*/
public abstract class BaseMqConsumer<T> {
protected Logger logger = LoggerFactory.getLogger(BaseMqConsumer.class);
/**
* 接收消息
*
* @param message
* @param channel
* @param model
*/
public abstract void receiveMessage(Message message, Channel channel, T model) throws IOException;
/**
* 手动确认消息
* 注意:当手动确认消息失败,那么在消费者关闭后消息会自动返回到队列中,即消息的状态从Unacked变更为Ready
*
* @param message 消息
* @param channel 管道
* @param multiple false:确认当前消息 true:批量确认当前管道内的消息
* @return 是否成功
*/
public boolean ackMessage(Message message, Channel channel, boolean multiple) {
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), multiple);
return true;
} catch (Exception e) {
logger.error("mq message ack fail: " + e.getMessage());
}
return false;
}
/**
* 手动取消确认消息
* 使用场景:当业务处理出现异常,则根据具体的业务决定是否需要取消确认消息,消息一旦成功取消确认,消息将重新放回队列中
*
* @param message 消息
* @param channel 管道
* @param requeue 消息是否重新放回队列中
* @return 是否成功
*/
public boolean rejectMessage(Message message, Channel channel, boolean requeue) {
try {
channel.basicReject(message.getMessageProperties().getDeliveryTag(), requeue);
return true;
} catch (Exception e) {
logger.error("mq message reject fail: " + e.getMessage());
}
return false;
}
}
MQ生产者基础抽象类
/**
* MQ生产者基础抽象类
*
* @author rqh
* @version 1.0
* @date 2019-08-30
*/
@Component
public abstract class BaseMqProducer<T> {
@Autowired
public AmqpTemplate amqpTemplate;
protected Logger logger = LoggerFactory.getLogger(BaseMqProducer.class);
/**
* 发送消息
*
* @param message
*/
public abstract void sendMessage(T message);
}
生产者确认消息发送成功的回调(非ACK确认机制)
/**
* 生产者确认消息发送成功的回调(非ACK确认机制)
*
* @author rqh
* @version 1.0
* @date 2019-08-30
*/
@Component
public class MqConfirmCallbackListener implements RabbitTemplate.ConfirmCallback {
private Logger logger = LoggerFactory.getLogger(MqConfirmCallbackListener.class);
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
// TODO 记录消息发送日志
}
}
生产者发送消息到队列失败时的回调
/**
* 生产者发送消息到队列失败时的回调
*
* @author rqh
* @version 1.0
* @date 2019-08-30
*/
@Component
public class MqReturnCallbackListener implements RabbitTemplate.ReturnCallback {
private Logger logger = LoggerFactory.getLogger(MqReturnCallbackListener.class);
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
// TODO 记录消息发送日志
logger.error("消息发送失败 ==> msg:" + new String(message.getBody()) + " replyCode:" + replyCode + " replyText:" + replyText + " exchange:" + exchange + " routingKey:" + routingKey);
}
}
RabbitMQ 配置类
/**
* RabbitMQ 配置类
*
* @author rqh
* @version 1.0
* @date 2019-06-15
*/
@Configuration
public class RabbitMqConfig {
@Value("${spring.rabbitmq.port}")
private Integer port;
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtual-host}")
private String virtualHost;
/**
* RabbitMQ的连接工厂
*
* @return
*/
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setPort(this.port);
connectionFactory.setHost(this.host);
connectionFactory.setUsername(this.username);
connectionFactory.setPassword(this.password);
connectionFactory.setVirtualHost(this.virtualHost);
// 是否确认回调(设置发送确认,相对于消费者)
connectionFactory.setPublisherConfirms(false);
// 是否返回回调
connectionFactory.setPublisherReturns(false);
return connectionFactory;
}
/**
* RabbitMq的消息转换器
*
* @return
*/
@Bean
public MessageConverter jsonMessageConverter() {
Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
return messageConverter;
}
/**
* RabbitMq的模版
*
* @return
*/
@Bean(name = "rabbitTemplate")
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
// 设置消息发送失败后自动返回到队列,需要配合使用ConnectionFactory的publisher-returns=true
// template.setMandatory(false);
// 设置发送消息时所用的消息转换器
template.setMessageConverter(messageConverter);
// 设置确认回调
// template.setConfirmCallback(confirmCallback);
// // 设置返回回调
// template.setReturnCallback(returnCallback);
// // 设置通道的事务
// template.setChannelTransacted(false);
return template;
}
/**
* RabbitMQ的管理对象(该类封装了对 RabbitMQ 的管理操作)
*
* @param connectionFactory
* @return
*/
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
return rabbitAdmin;
}
/**
* RabbitMq的监听容器工厂
*
* @return
*/
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
// 设置自动启动
factory.setAutoStartup(true);
// 设置最小的消费者数量
factory.setConcurrentConsumers(1);
// 设置最大的消费者数量
factory.setMaxConcurrentConsumers(15);
// 设置接收消息时所用的消息转换器
factory.setMessageConverter(messageConverter);
// 将由于监听器抛出异常而拒绝的消息重新放回队列
factory.setDefaultRequeueRejected(false);
// 为每个消费者指定最大的 unacked messages 数目(autoAck),设值过大或过小都会影响MQ的整体吞吐量
factory.setPrefetchCount(1);
// 设置ACK机制为手动确认消息
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
return factory;
}
}
mq业务配置类-制定exchange、queue
/**
* mq业务配置类
* @author yuanfeng.z
* @date 2019/10/11 16:55
*/
@Configuration
public class RabbitMqBusinessConfig {
/**
* 行情数据交换机
*/
public static final String MD_EXCHANGE = "zdd.md";
/**
* ticker行情数据队列的名称
*/
public static final String MD_TICKER_QUEUE = "zdd.md.ticker";
/**
* 路由键:ticker
*/
public static final String TICKER_ROUTINGKEY = "zdd.md.ticker";
/**
* depth行情数据队列的名称
*/
public static final String MD_DEPTH_QUEUE = "zdd.md.depth";
/**
* 路由键:depth
*/
public static final String DEPTH_ROUTINGKEY = "zdd.md.depth";
// /**
// * ticker行情数据队列(需要消息持久化)
// *
// * @return
// */
// @Bean("mdTickerQueue")
// public Queue mdTickerQueue() {
// return new Queue(MD_TICKER_QUEUE, false, false, false);
// }
//
/**
* topic行情数据交换机
*
* @return
*/
@Bean(name = "mdTopicExchange")
public TopicExchange exchangeMd() {
return new TopicExchange(MD_EXCHANGE, false, false, null);
}
//
// /**
// * 绑定ticker行情数据队列与交换机
// *
// * @param queue
// * @param exchange
// * @return
// */
// @Bean(name = "tickerBindingExchange")
// public Binding tickerBindingExchange(@Qualifier("mdTickerQueue") Queue queue, @Qualifier("mdTopicExchange") FanoutExchange exchange) {
// return BindingBuilder.bind(queue).to(exchange);
// }
// /**
// * depth行情数据队列(需要消息持久化)
// *
// * @return
// */
// @Bean("mdDepthQueue")
// public Queue mdDepthQueue() {
// return new Queue(MD_DEPTH_QUEUE, false, false, false);
// }
// /**
// * depth行情数据队列(需要消息持久化)
// *
// * @return
// */
// @Bean("mdDepthQueue")
// public Queue mdDepthQueue() {
// return new AnonymousQueue();
// }
//
// /**
// * 绑定depth行情数据队列与交换机
// *
// * @param queue
// * @param exchange
// * @return
// */
// @Bean(name = "depthBindingExchange")
// public Binding depthBindingExchange(@Qualifier("mdDepthQueue") Queue queue, @Qualifier("mdTopicExchange") TopicExchange exchange) {
// return BindingBuilder.bind(queue).to(exchange).with("ftx.BTC/USDT.depth");
// }
}
行情生产者实现类
/**
* 行情mq生产者实现类
* @author yuanfeng.z
* @date 2019/10/11 16:21
*/
@Component
public class MdMqProducer extends BaseMqProducer<Message> {
@Override
public void sendMessage(Message message) {
amqpTemplate.convertAndSend(RabbitMqBusinessConfig.MD_EXCHANGE, message);
}
}
行情消费者实现类
/**
* depth行情消费者实现类
*
* @author yuanfeng.z
* @date 2019/10/11 16:21
*/
@Component
public class DepthMqConsumer extends BaseMqConsumer<Depth> {
@Override
@RabbitListener(
bindings = @QueueBinding(
exchange = @Exchange(value = "zdd.md", type = ExchangeTypes.TOPIC,
durable = "false", autoDelete = "false"),
value = @Queue(durable = "false",
autoDelete = "true"),
key = "coinex_stock.ZRXETH.depth"
))
public void receiveMessage(Message message, Channel channel, Depth model) throws IOException {
try {
logger.info("==> received message: " + model.getExchangeName());
} catch (RejectedExecutionException e) {
logger.warn("==> thread pool reject execute");
} catch (Exception e) {
// 抛出其他异常
logger.error(e.getMessage());
}
}
}
网友评论