美文网首页
Redis Stream xRead 与xReadGroup读取

Redis Stream xRead 与xReadGroup读取

作者: wds_94 | 来源:发表于2024-12-05 22:37 被阅读0次

现在有这样一个需求:
用户生成订单后,需要将订单数据放入缓存中,在订单数据里有一个参数为结束时间,每个订单的结束时间是不定的,没有先后顺序,可能先生成的订单 结束时间在后面,需要在结束时候那个时刻,对订单进行业务处理,写出缓存订单及取缓存订单的方法。

xReadGroup实现

刚开始使用stream group的方式实现,但发现在redis消费组的数据只能被消费一次,已经被read之后的数据不会被重复read。已消费未确认的数据可以被同一个stream的其他消费组读取。

xRead实现

xread 可以重复读取,每次读取都从队列的开头开始读,获取所有数据后过滤出你想要的数据进行处理 ,处理完成删除该数据。下次读取未被标记的数据还能被再次读取到,这样就可以简单方式完成该需求。
这里还有一个优化点是 可以根据结束时间生成streamId,读取数据时就查询指定节省时间内的数据,这样可以减少查询的数据量,具有更好的性能。这种方式后续等我更新完再补充。

两种代码对比如下:

使用group


    /**
     * 设置待平仓订单 已废弃
     * 使用redis stream+group方式
     */
    @Override
    public void setOrderCache(Order order) {
        redisTemplate.opsForStream().add(String.format(ORDER_STREAM_COIN_KEY, order.getGoodsName()), order.toMap());
        createOrderConsumerGroup(order.getGoodsName());
    }

    /**
     * 获取待处理订单 已废弃
     * 使用redis stream+group方式
     * 由于消费组的数据只会被消费一次,如果已消费但未到平仓时间,则不会被标记,下次查询也获取不到,
     * 故废弃该方法
     */
    @Override
    public List<Order> getOrderCache(String goodsName) {
        long currentTime = System.currentTimeMillis();  // 获取当前时间戳
        String streamKey = String.format(ORDER_STREAM_COIN_KEY, goodsName);
        String groupName = String.format(ORDER_STREAM_GROUP_COIN_KEY, goodsName);

        // 使用消费组读取数据,从未消费的位置开始
        List<MapRecord<String, Object, Object>> messages = redisTemplate.opsForStream()
                .read(Consumer.from(groupName, goodsName + "待处理订单Consumer"),
                      StreamReadOptions.empty().block(Duration.ofSeconds(0L)),
                      StreamOffset.create(streamKey, ReadOffset.latest()));
        if (CollectionUtils.isEmpty(messages)) {
            return Collections.emptyList();
        }
        log.info("消费组消息:{}", JacksonUtils.toJson(messages));
        List<Order> orderList = new ArrayList<>();
        for (MapRecord<String, Object, Object> record : messages) {
            log.info("record:{}", JacksonUtils.toJson(record.getValue()));
            Order order = new Order(record.getValue());
            if (order.getCloseTime() / 1000 <= currentTime / 1000) {
                orderList.add(order);
                // 处理完成后确认该消息
//                redisTemplate.opsForStream().acknowledge(streamKey, groupName, record.getId());
            }
        }
        return orderList;
    }

    public void createOrderConsumerGroup(String coin) {
        String groupExistsKey = String.format(ORDER_GROUP_EXISTS_KEY, goodsName);
        // 检查 Redis 中是否有消费组存在的标记
        Boolean groupExists = (Boolean) redisTemplate.opsForValue().get(groupExistsKey);
        if (Boolean.TRUE.equals(groupExists)) {
            // 如果消费组已存在,则跳过创建
            log.warn("消费组已存在:{}", coin);
            return;
        }
        try {
            String streamKey = String.format(ORDER_STREAM_COIN_KEY, goodsName);
            String groupName = String.format(ORDER_STREAM_GROUP_COIN_KEY, goodsName);
            // 创建消费组
            redisTemplate.opsForStream().createGroup(streamKey, ReadOffset.latest(), groupName);
            log.warn("消费组已存在:{}", groupName);

            // 设置消费组存在的标记,避免重复创建
            redisTemplate.opsForValue().set(groupExistsKey, true);

        } catch (Exception e) {
            redisTemplate.opsForValue().set(groupExistsKey, true);
        }
    }

使用stream可以重复读

    /**
     * 设置待平仓订单
     * @param order 订单数据
     */
    @Override
    public void setOrderCache(Order order) {
        String streamKey = String.format(ORDER_STREAM_COIN_KEY, order.getGoodsName());
        redisTemplate.opsForStream().add(streamKey, order.toMap());
    }

    /**
     * 获取待平仓订单
     * @param  商品名
     * @return 订单列表
     */
    @Override
    public List<Order> getOrderCache(String goodsName) {
        long currentTime = System.currentTimeMillis();  // 获取当前时间戳
        return getOrderCache(goodsName,currentTime);
    }
    /**
     * 获取待平仓订单
     * @param coin goods币种
     * @return 订单列表
     */
    @Override
    public List<Order> getOrderCache(String goodsName, long currentTime) {

        String streamKey = String.format(ORDER_STREAM_COIN_KEY, goodsName);
        List<MapRecord<String, Object, Object>> messages = redisTemplate.opsForStream()
                .read(StreamOffset.create(streamKey, ReadOffset.from("0")));

        if (CollectionUtils.isEmpty(messages)) {
            return Collections.emptyList();
        }

        log.info("消费组消息:{}", JacksonUtils.toJson(messages));
        List<Order> orderList = new ArrayList<>();
        for (MapRecord<String, Object, Object> record : messages) {
            Order order = new Order(record.getValue());
            if (order.getCloseTime() / 1000 <= currentTime / 1000) {
                log.info("record:{}", JacksonUtils.toJson(record.getValue()));
                orderList.add(order);
                redisTemplate.opsForStream().delete(streamKey, record.getId());
            }
        }
        return orderList;
    }

相关文章

  • Redis Stream 速记

    Redis[https://redis.io/] 5.0 引入 Stream[https://redis.io/t...

  • Redis高级数据结构Stream和HyperLogLog

    队列与Stream redis stream结构如上图所示 消息链表,每个消息都有一个唯一的 ID 和对应的内容。...

  • Redis 工具类

    Redis 工具类 实现的功能包括 redis 读取队列数据,存储和读取 hash 数据,Redis 的创建,Re...

  • 使用redis stream实现队列服务

    1. stream类型 Redis5.0引入了Stream类型。该Stream类型的出现,几乎满足了消息队列具备的...

  • Java NIO Channel

    Java NIO Channel(通道)与Java IO 的Stream(流)很相似但是有几点不同: 您可以读取和...

  • Redis Stream

    reids stream 介绍 由于简书关键字审核有问题,文章原文请跳转到博客访问

  • redis stream

    https://zhuanlan.zhihu.com/p/60501638[https://zhuanlan.zh...

  • 字符串相关函数总结2

    fgets(字符指针,字符大小,文件结构体指针) 从文件结构体指针stream中读取数据,每次读取一行。读取的数据...

  • redis和logstash配置

    filebeat存入redis,logstash从redis读取数据 filebeat配置 redis查看命令 l...

  • Java 踩坑笔记

    1、处理excel(读取[excel]时出现Unable to recognize OLE stream错误 )今...

网友评论

      本文标题:Redis Stream xRead 与xReadGroup读取

      本文链接:https://www.haomeiwen.com/subject/lefpsjtx.html