现在有这样一个需求:
用户生成订单后,需要将订单数据放入缓存中,在订单数据里有一个参数为结束时间,每个订单的结束时间是不定的,没有先后顺序,可能先生成的订单 结束时间在后面,需要在结束时候那个时刻,对订单进行业务处理,写出缓存订单及取缓存订单的方法。
xReadGroup实现
刚开始使用stream group的方式实现,但发现在redis消费组的数据只能被消费一次,已经被read之后的数据不会被重复read。已消费未确认的数据可以被同一个stream的其他消费组读取。
xRead实现
xread 可以重复读取,每次读取都从队列的开头开始读,获取所有数据后过滤出你想要的数据进行处理 ,处理完成删除该数据。下次读取未被标记的数据还能被再次读取到,这样就可以简单方式完成该需求。
这里还有一个优化点是 可以根据结束时间生成streamId,读取数据时就查询指定节省时间内的数据,这样可以减少查询的数据量,具有更好的性能。这种方式后续等我更新完再补充。
两种代码对比如下:
使用group
/**
* 设置待平仓订单 已废弃
* 使用redis stream+group方式
*/
@Override
public void setOrderCache(Order order) {
redisTemplate.opsForStream().add(String.format(ORDER_STREAM_COIN_KEY, order.getGoodsName()), order.toMap());
createOrderConsumerGroup(order.getGoodsName());
}
/**
* 获取待处理订单 已废弃
* 使用redis stream+group方式
* 由于消费组的数据只会被消费一次,如果已消费但未到平仓时间,则不会被标记,下次查询也获取不到,
* 故废弃该方法
*/
@Override
public List<Order> getOrderCache(String goodsName) {
long currentTime = System.currentTimeMillis(); // 获取当前时间戳
String streamKey = String.format(ORDER_STREAM_COIN_KEY, goodsName);
String groupName = String.format(ORDER_STREAM_GROUP_COIN_KEY, goodsName);
// 使用消费组读取数据,从未消费的位置开始
List<MapRecord<String, Object, Object>> messages = redisTemplate.opsForStream()
.read(Consumer.from(groupName, goodsName + "待处理订单Consumer"),
StreamReadOptions.empty().block(Duration.ofSeconds(0L)),
StreamOffset.create(streamKey, ReadOffset.latest()));
if (CollectionUtils.isEmpty(messages)) {
return Collections.emptyList();
}
log.info("消费组消息:{}", JacksonUtils.toJson(messages));
List<Order> orderList = new ArrayList<>();
for (MapRecord<String, Object, Object> record : messages) {
log.info("record:{}", JacksonUtils.toJson(record.getValue()));
Order order = new Order(record.getValue());
if (order.getCloseTime() / 1000 <= currentTime / 1000) {
orderList.add(order);
// 处理完成后确认该消息
// redisTemplate.opsForStream().acknowledge(streamKey, groupName, record.getId());
}
}
return orderList;
}
public void createOrderConsumerGroup(String coin) {
String groupExistsKey = String.format(ORDER_GROUP_EXISTS_KEY, goodsName);
// 检查 Redis 中是否有消费组存在的标记
Boolean groupExists = (Boolean) redisTemplate.opsForValue().get(groupExistsKey);
if (Boolean.TRUE.equals(groupExists)) {
// 如果消费组已存在,则跳过创建
log.warn("消费组已存在:{}", coin);
return;
}
try {
String streamKey = String.format(ORDER_STREAM_COIN_KEY, goodsName);
String groupName = String.format(ORDER_STREAM_GROUP_COIN_KEY, goodsName);
// 创建消费组
redisTemplate.opsForStream().createGroup(streamKey, ReadOffset.latest(), groupName);
log.warn("消费组已存在:{}", groupName);
// 设置消费组存在的标记,避免重复创建
redisTemplate.opsForValue().set(groupExistsKey, true);
} catch (Exception e) {
redisTemplate.opsForValue().set(groupExistsKey, true);
}
}
使用stream可以重复读
/**
* 设置待平仓订单
* @param order 订单数据
*/
@Override
public void setOrderCache(Order order) {
String streamKey = String.format(ORDER_STREAM_COIN_KEY, order.getGoodsName());
redisTemplate.opsForStream().add(streamKey, order.toMap());
}
/**
* 获取待平仓订单
* @param 商品名
* @return 订单列表
*/
@Override
public List<Order> getOrderCache(String goodsName) {
long currentTime = System.currentTimeMillis(); // 获取当前时间戳
return getOrderCache(goodsName,currentTime);
}
/**
* 获取待平仓订单
* @param coin goods币种
* @return 订单列表
*/
@Override
public List<Order> getOrderCache(String goodsName, long currentTime) {
String streamKey = String.format(ORDER_STREAM_COIN_KEY, goodsName);
List<MapRecord<String, Object, Object>> messages = redisTemplate.opsForStream()
.read(StreamOffset.create(streamKey, ReadOffset.from("0")));
if (CollectionUtils.isEmpty(messages)) {
return Collections.emptyList();
}
log.info("消费组消息:{}", JacksonUtils.toJson(messages));
List<Order> orderList = new ArrayList<>();
for (MapRecord<String, Object, Object> record : messages) {
Order order = new Order(record.getValue());
if (order.getCloseTime() / 1000 <= currentTime / 1000) {
log.info("record:{}", JacksonUtils.toJson(record.getValue()));
orderList.add(order);
redisTemplate.opsForStream().delete(streamKey, record.getId());
}
}
return orderList;
}









网友评论