一、使用list数据类型实现Queue
springboot 使用 2.3.1.RELEASE 版本
- 引入redis和连接池包
<!-- redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency><!-- 配置redis连接池 -->
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<!-- json格式化 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.73</version>
</dependency>
- redis的配置类
package cn.waimaolang.demo.configura;
import cn.waimaolang.demo.command.MyMessageListenCommand;
import cn.waimaolang.demo.service.MessageConsumerService;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import java.io.Serializable;
@Configuration
public class RedisConfigura {
@Bean
public RedisTemplate<String, Serializable> redisTemplate(
LettuceConnectionFactory factory
) {
RedisTemplate<String, Serializable> redisTemplate = new RedisTemplate<>();
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
redisTemplate.setConnectionFactory(factory);
return redisTemplate;
}
}
- 创建对列消息实体 类
package cn.waimaolang.demo.params;
import org.springframework.stereotype.Component;
@Component
public class QueueParams {
private Long sendId;
public Long getSendId() {
return sendId;
}
public void setSendId(Long sendId) {
this.sendId = sendId;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
private String content;
}
4.创建定时器任务类用作 redis消费端
package cn.waimaolang.demo.task;
import cn.waimaolang.demo.params.QueueParams;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.ListOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
@Component
public class CrontabTask {
@Autowired
RedisTemplate redisTemplate;
/**
* @title 消费redis queue
*/
@Scheduled(cron = "* * * * * *") // 每一秒钟执行一次
public void consumeRedisQueue(){
ListOperations redisList=redisTemplate.opsForList();
while (true){
String params = (String)redisList.rightPop("queue", 60, TimeUnit.SECONDS);
if(this.isJSONValid(params)){
JSONObject queue = JSONObject.parseObject(params);
QueueParams queueParams = JSON.toJavaObject(queue,QueueParams.class);
System.out.println(queueParams.getSendId());
System.out.println(queueParams.getContent());
}
}
}
/**
* 使用 fastjson判断是否为合法的json
* @param test
* @return bool
*/
public static boolean isJSONValid(String test) {
if(test == null || "".equals(test)){
return false;
}
try {
JSONObject.parseObject(test);
} catch (JSONException ex) {
try {
JSONObject.parseArray(test);
} catch (JSONException ex1) {
return false;
}
}
return true;
}
}
- 创建 controller 生产消息
@RestController
@RequestMapping(value = "home")
public class Index
{
@Autowired
RedisTemplate redisTemplate;
@Autowired
QueueParams queueParams;
@PostMapping("/product")
public String product(){
ListOperations redisList= redisTemplate.opsForList(); // 操作list
queueParams.setSendId(102L);
queueParams.setContent("这是消息内容");
// 向对列 queue中发布消息
// 向对列 queue中发布消息
Long result= redisList.leftPush("queue",JSON.toJSONString(queueParams));
return String.valueOf(result);
}
}
网友评论