美文网首页
基于Redission使用Redis的Stream

基于Redission使用Redis的Stream

作者: 小狼在IT | 来源:发表于2019-01-16 16:40 被阅读0次
        redisson = Redisson.create(config);

        RStream<String, String> stream = redisson.getStream("test3");
        //初始化,不知为啥,但不这样做create不到group
        stream.add("0","0");
        //创建一个group,一个group需要在stream数据添加前创建,否则这个group只能读它创建以后写入stream的数据
        stream.createGroup("testGroup31");
        //往stream添加消息
        for(Integer i=0;i<30;i++){
            stream.add(i.toString(), i.toString());
        }

        //消费消息
        for(Integer i=0;i<6;i++){
            Integer finalI = i;
            Thread t = new Thread( ()->{
                try {
                    Thread.sleep(1000);
                }catch (Exception e){

                }

                Map<StreamMessageId, Map<String, String>> s = stream.readGroup("testGroup31", "consumer"+ finalI.toString(),1);
                if(s!=null && s.size()>0){
                    for (Map.Entry<StreamMessageId, Map<String, String>> entry : s.entrySet()) {
                        Map<String, String> m2 = entry.getValue();
                        for(Map.Entry<String,String> entry1:m2.entrySet()){
                            System.out.println(Thread.currentThread().getName()+" : Key = " + entry1.getKey() + ", Value = " + entry1.getValue());
                        }
                        //消费了消息,要应答一下
                        stream.ack("testGroup31",entry.getKey());
                        //如果消费了消息想删除,可以删除掉
                        //stream.remove(entry.getKey());
                    }
                }

            });
            t.start();
        }
image.png image.png

这里,有个group名字叫testGroup31,里面有消费者6个,pending代表目前有6个数据被读取了,但没有ack。last_delivered-id代表这个group目前读到哪条消息。

相关文章

网友评论

      本文标题:基于Redission使用Redis的Stream

      本文链接:https://www.haomeiwen.com/subject/lbimdqtx.html