美文网首页
RabbitMQ实现代码-行情服务场景

RabbitMQ实现代码-行情服务场景

作者: 我是电饭煲 | 来源:发表于2020-04-01 15:59 被阅读0次

maven引入包

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>2.1.9.RELEASE</version>
        </dependency>

 <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.10.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.10.0</version>
        </dependency>

application.yml配置

spring:
  rabbitmq:
    port: 5672
    host: 127.0.0.1
    virtual-host: /
    username: admin
    password: admin
    publisher-confirms: true
    publisher-returns: true

MQ消费者基础抽象类

/**
 * MQ消费者基础抽象类
 *
 * @author rqh
 * @version 1.0
 * @date 2019-08-30
 */
public abstract class BaseMqConsumer<T> {

    protected Logger logger = LoggerFactory.getLogger(BaseMqConsumer.class);

    /**
     * 接收消息
     *
     * @param message
     * @param channel
     * @param model
     */
    public abstract void receiveMessage(Message message, Channel channel, T model) throws IOException;

    /**
     * 手动确认消息
     * 注意:当手动确认消息失败,那么在消费者关闭后消息会自动返回到队列中,即消息的状态从Unacked变更为Ready
     *
     * @param message  消息
     * @param channel  管道
     * @param multiple false:确认当前消息 true:批量确认当前管道内的消息
     * @return 是否成功
     */
    public boolean ackMessage(Message message, Channel channel, boolean multiple) {
        try {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), multiple);
            return true;
        } catch (Exception e) {
            logger.error("mq message ack fail: " + e.getMessage());
        }
        return false;
    }

    /**
     * 手动取消确认消息
     * 使用场景:当业务处理出现异常,则根据具体的业务决定是否需要取消确认消息,消息一旦成功取消确认,消息将重新放回队列中
     *
     * @param message 消息
     * @param channel 管道
     * @param requeue 消息是否重新放回队列中
     * @return 是否成功
     */
    public boolean rejectMessage(Message message, Channel channel, boolean requeue) {
        try {
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), requeue);
            return true;
        } catch (Exception e) {
            logger.error("mq message reject fail: " + e.getMessage());
        }
        return false;
    }
}

MQ生产者基础抽象类

/**
 * MQ生产者基础抽象类
 *
 * @author rqh
 * @version 1.0
 *   @date 2019-08-30
 */
@Component
public abstract class BaseMqProducer<T> {

    @Autowired
    public AmqpTemplate amqpTemplate;

    protected Logger logger = LoggerFactory.getLogger(BaseMqProducer.class);

    /**
     * 发送消息
     *
     * @param message
     */
    public abstract void sendMessage(T message);
}

生产者确认消息发送成功的回调(非ACK确认机制)

/**
 * 生产者确认消息发送成功的回调(非ACK确认机制)
 *
 * @author rqh
 * @version 1.0
 * @date 2019-08-30
 */
@Component
public class MqConfirmCallbackListener implements RabbitTemplate.ConfirmCallback {

    private Logger logger = LoggerFactory.getLogger(MqConfirmCallbackListener.class);

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        // TODO 记录消息发送日志
    }
}

生产者发送消息到队列失败时的回调

/**
 * 生产者发送消息到队列失败时的回调
 *
 * @author rqh
 * @version 1.0
 * @date 2019-08-30
 */
@Component
public class MqReturnCallbackListener implements RabbitTemplate.ReturnCallback {

    private Logger logger = LoggerFactory.getLogger(MqReturnCallbackListener.class);

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        // TODO 记录消息发送日志
        logger.error("消息发送失败 ==> msg:" + new String(message.getBody()) + " replyCode:" + replyCode + " replyText:" + replyText + " exchange:" + exchange + " routingKey:" + routingKey);
    }
}

RabbitMQ 配置类

/**
 * RabbitMQ 配置类
 *
 * @author rqh
 * @version 1.0
 * @date 2019-06-15
 */
@Configuration
public class RabbitMqConfig {

    @Value("${spring.rabbitmq.port}")
    private Integer port;

    @Value("${spring.rabbitmq.host}")
    private String host;

    @Value("${spring.rabbitmq.username}")
    private String username;

    @Value("${spring.rabbitmq.password}")
    private String password;

    @Value("${spring.rabbitmq.virtual-host}")
    private String virtualHost;

    /**
     * RabbitMQ的连接工厂
     *
     * @return
     */
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setPort(this.port);
        connectionFactory.setHost(this.host);
        connectionFactory.setUsername(this.username);
        connectionFactory.setPassword(this.password);
        connectionFactory.setVirtualHost(this.virtualHost);
        // 是否确认回调(设置发送确认,相对于消费者)
        connectionFactory.setPublisherConfirms(false);
        // 是否返回回调
        connectionFactory.setPublisherReturns(false);
        return connectionFactory;
    }

    /**
     * RabbitMq的消息转换器
     *
     * @return
     */
    @Bean
    public MessageConverter jsonMessageConverter() {
        Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
        return messageConverter;
    }

    /**
     * RabbitMq的模版
     *
     * @return
     */
    @Bean(name = "rabbitTemplate")
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        // 设置消息发送失败后自动返回到队列,需要配合使用ConnectionFactory的publisher-returns=true
//        template.setMandatory(false);
        // 设置发送消息时所用的消息转换器
        template.setMessageConverter(messageConverter);
        // 设置确认回调
//        template.setConfirmCallback(confirmCallback);
//        // 设置返回回调
//        template.setReturnCallback(returnCallback);
//        // 设置通道的事务
//        template.setChannelTransacted(false);
        return template;
    }

    /**
     * RabbitMQ的管理对象(该类封装了对 RabbitMQ 的管理操作)
     *
     * @param connectionFactory
     * @return
     */
    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        return rabbitAdmin;
    }


    /**
     * RabbitMq的监听容器工厂
     *
     * @return
     */
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        // 设置自动启动
        factory.setAutoStartup(true);
        // 设置最小的消费者数量
        factory.setConcurrentConsumers(1);
        // 设置最大的消费者数量
        factory.setMaxConcurrentConsumers(15);
        // 设置接收消息时所用的消息转换器
        factory.setMessageConverter(messageConverter);
        // 将由于监听器抛出异常而拒绝的消息重新放回队列
        factory.setDefaultRequeueRejected(false);
        // 为每个消费者指定最大的 unacked messages 数目(autoAck),设值过大或过小都会影响MQ的整体吞吐量
        factory.setPrefetchCount(1);
        // 设置ACK机制为手动确认消息
        factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
        return factory;
    }
}

mq业务配置类-制定exchange、queue

/**
 * mq业务配置类
 * @author yuanfeng.z
 * @date 2019/10/11 16:55
 */
@Configuration
public class RabbitMqBusinessConfig {

    /**
     * 行情数据交换机
     */
    public static final String MD_EXCHANGE = "zdd.md";

    /**
     * ticker行情数据队列的名称
     */
    public static final String MD_TICKER_QUEUE = "zdd.md.ticker";

    /**
     * 路由键:ticker
     */
    public static final String TICKER_ROUTINGKEY = "zdd.md.ticker";

    /**
     * depth行情数据队列的名称
     */
    public static final String MD_DEPTH_QUEUE = "zdd.md.depth";

    /**
     * 路由键:depth
     */
    public static final String DEPTH_ROUTINGKEY = "zdd.md.depth";

//    /**
//     * ticker行情数据队列(需要消息持久化)
//     *
//     * @return
//     */
//    @Bean("mdTickerQueue")
//    public Queue mdTickerQueue() {
//        return new Queue(MD_TICKER_QUEUE, false, false, false);
//    }
//
    /**
     * topic行情数据交换机
     *
     * @return
     */
    @Bean(name = "mdTopicExchange")
    public TopicExchange exchangeMd() {
        return new TopicExchange(MD_EXCHANGE, false, false, null);
    }
//
//    /**
//     * 绑定ticker行情数据队列与交换机
//     *
//     * @param queue
//     * @param exchange
//     * @return
//     */
//    @Bean(name = "tickerBindingExchange")
//    public Binding tickerBindingExchange(@Qualifier("mdTickerQueue") Queue queue, @Qualifier("mdTopicExchange") FanoutExchange exchange) {
//        return BindingBuilder.bind(queue).to(exchange);
//    }

//    /**
//     * depth行情数据队列(需要消息持久化)
//     *
//     * @return
//     */
//    @Bean("mdDepthQueue")
//    public Queue mdDepthQueue() {
//        return new Queue(MD_DEPTH_QUEUE, false, false, false);
//    }

//    /**
//     * depth行情数据队列(需要消息持久化)
//     *
//     * @return
//     */
//    @Bean("mdDepthQueue")
//    public Queue mdDepthQueue() {
//        return new AnonymousQueue();
//    }
//
//    /**
//     * 绑定depth行情数据队列与交换机
//     *
//     * @param queue
//     * @param exchange
//     * @return
//     */
//    @Bean(name = "depthBindingExchange")
//    public Binding depthBindingExchange(@Qualifier("mdDepthQueue") Queue queue, @Qualifier("mdTopicExchange") TopicExchange exchange) {
//        return BindingBuilder.bind(queue).to(exchange).with("ftx.BTC/USDT.depth");
//    }
}

行情生产者实现类

/**
 * 行情mq生产者实现类
 * @author yuanfeng.z
 * @date 2019/10/11 16:21
 */
@Component
public class MdMqProducer extends BaseMqProducer<Message> {

    @Override
    public void sendMessage(Message message) {
        amqpTemplate.convertAndSend(RabbitMqBusinessConfig.MD_EXCHANGE, message);
    }
}

行情消费者实现类

/**
 * depth行情消费者实现类
 *
 * @author yuanfeng.z
 * @date 2019/10/11 16:21
 */
@Component
public class DepthMqConsumer extends BaseMqConsumer<Depth> {

    @Override
    @RabbitListener(
            bindings = @QueueBinding(
                    exchange = @Exchange(value = "zdd.md", type = ExchangeTypes.TOPIC,
                            durable = "false", autoDelete = "false"),
                    value = @Queue(durable = "false",
                            autoDelete = "true"),
                    key = "coinex_stock.ZRXETH.depth"
            ))
    public void receiveMessage(Message message, Channel channel, Depth model) throws IOException {
        try {
            logger.info("==> received message: " + model.getExchangeName());
        } catch (RejectedExecutionException e) {
            logger.warn("==> thread pool reject execute");
        } catch (Exception e) {
            // 抛出其他异常
            logger.error(e.getMessage());
        }
    }
}

相关文章

网友评论

      本文标题:RabbitMQ实现代码-行情服务场景

      本文链接:https://www.haomeiwen.com/subject/jeccmctx.html