美文网首页
Springboot2.x 使用list做消息队列 & 消息

Springboot2.x 使用list做消息队列 & 消息

作者: 骑蚂蚁上高速_jun | 来源:发表于2020-08-23 23:08 被阅读0次

一、使用list数据类型实现Queue
springboot 使用 2.3.1.RELEASE 版本

  1. 引入redis和连接池包
<!-- redis -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency><!-- 配置redis连接池 -->
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
</dependency>

<!-- json格式化 -->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.73</version>
</dependency>
  1. redis的配置类
package cn.waimaolang.demo.configura;

import cn.waimaolang.demo.command.MyMessageListenCommand;
import cn.waimaolang.demo.service.MessageConsumerService;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

import java.io.Serializable;

@Configuration
public class RedisConfigura {
    @Bean
    public RedisTemplate<String, Serializable> redisTemplate(
            LettuceConnectionFactory factory
    ) {
        RedisTemplate<String, Serializable> redisTemplate = new RedisTemplate<>();
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
        redisTemplate.setConnectionFactory(factory);
        return redisTemplate;
    }
}
  1. 创建对列消息实体 类
package cn.waimaolang.demo.params;

import org.springframework.stereotype.Component;

@Component
public class QueueParams {

    private Long sendId;

    public Long getSendId() {
        return sendId;
    }

    public void setSendId(Long sendId) {
        this.sendId = sendId;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }

    private String content;

}

4.创建定时器任务类用作 redis消费端

package cn.waimaolang.demo.task;

import cn.waimaolang.demo.params.QueueParams;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.ListOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

@Component
public class CrontabTask {

    @Autowired
    RedisTemplate redisTemplate;
    /**
     * @title 消费redis queue 
     */
    @Scheduled(cron = "* * * * * *") // 每一秒钟执行一次
    public void consumeRedisQueue(){
        ListOperations redisList=redisTemplate.opsForList();
        while (true){
            String params = (String)redisList.rightPop("queue", 60, TimeUnit.SECONDS);
            if(this.isJSONValid(params)){
                JSONObject queue = JSONObject.parseObject(params);
                QueueParams queueParams = JSON.toJavaObject(queue,QueueParams.class);
                System.out.println(queueParams.getSendId());
                System.out.println(queueParams.getContent());
            }
        }
    }

    /**
     * 使用 fastjson判断是否为合法的json
     * @param test
     * @return bool
     */
    public static boolean isJSONValid(String test) {
        if(test == null || "".equals(test)){
            return false;
        }
        try {
            JSONObject.parseObject(test);
        } catch (JSONException ex) {
            try {
                JSONObject.parseArray(test);
            } catch (JSONException ex1) {
                return false;
            }
        }
        return true;
    }
}
  1. 创建 controller 生产消息
@RestController
@RequestMapping(value = "home")
public class Index
{
    @Autowired
    RedisTemplate redisTemplate;
    @Autowired
    QueueParams queueParams;

    @PostMapping("/product")
    public String product(){
        ListOperations redisList= redisTemplate.opsForList(); // 操作list


        queueParams.setSendId(102L);
        queueParams.setContent("这是消息内容");
        // 向对列 queue中发布消息
        // 向对列 queue中发布消息
        Long result= redisList.leftPush("queue",JSON.toJSONString(queueParams));
        return String.valueOf(result);
    }

    
}

相关文章

网友评论

      本文标题:Springboot2.x 使用list做消息队列 & 消息

      本文链接:https://www.haomeiwen.com/subject/mktgjktx.html