美文网首页
Redis 使用list队列高速消费数据

Redis 使用list队列高速消费数据

作者: _大叔_ | 来源:发表于2020-10-14 18:26 被阅读0次

测试目的

本次测试目的是 消费 Redis List类型 里的数据 以各种方式来快速消费,得到最佳消费方式。消费框架为 spring boot,消费工具库为 lettuce,结合redisredisTemplate 的 api 来载入和消费数据,消费数据量分别为 1.5w、2w、10w。消费数据会提前加载到 Redis list 中,消费api 为 redisredisTemplate.opsForList().rightPop(key, Duration.ofSeconds(3)),该api 被封装为 redisSdk.LRightPopBlock(key,second)。

使用 ForkJoinPool 的方式测试

ForkJoinPool 的优势在于,可以充分利用多cpu,多核cpu的优势,把一个任务拆分成多个“小任务”,把多个“小任务”放到多个处理器核心上并行执行;当多个“小任务”执行完成之后,再将这些执行结果合并起来即可。线程的数量是通过构造函数传入,如果没有向构造函数中传入希望的线程数量,那么当前计算机可用的CPU数量会被设置为线程数量作为默认值。

测试机配置

CPU 类型 基准速度 系统类型 内存 内核 逻辑CPU
Intel(R) Core(TM) i7-8565U CPU @ 1.80GHz 1.99 GHz 64位 8G 4 8

单线程测试

单线程不存在 ForkJoinPool 的方式,所以这里没用到。

    @Override
    public void run(String... args) throws Exception {
        long sumTime = 0;
        while(true){
            try{
                long startTime = System.currentTimeMillis();
                String str = redisSdk.LRightPopBlock("a",3);
                if(!StrUtil.isEmpty(str)) {
                    long endTime = System.currentTimeMillis();
                    sumTime += endTime - startTime;
                    log.info("处理数据 {}, 累计时间 {}",str,sumTime);
                }
            }catch(Exception e){
                log.error(e.getMessage());
            }
        }
    }
测试结果

处理 15000 条数据,所需耗时 444s

ForkJoinPool 多线程消费代码

    public void test2(){
        int fcore = 2;
        ForkJoinPool forkJoinPool = new ForkJoinPool(fcore);
        AtomicInteger count = new AtomicInteger(0);
        AtomicInteger breaks = new AtomicInteger(0);

        long startTime = System.currentTimeMillis();
        IntStream.range(0,fcore).forEach(j ->{
            forkJoinPool.execute(() -> {
                while (true) {
                    try {
                        String str = redisSdk.LRightPopBlock("a", 3);
                        if (!StrUtil.isEmpty(str)) {
                            log.info("累计处理 {} 条数据", count.incrementAndGet());
                        } else {
                            breaks.incrementAndGet();
                            break;
                        }
                    } catch (Exception e) {
                        log.error(e.getMessage());
                    }
                }
            });
        });

        while(true) {
            try {
                if(count.get() >= 15000) {
                    break;
                }
                Thread.sleep(500);
            }catch(Exception e){
                e.printStackTrace();
            }
        }
        long endTime = System.currentTimeMillis();
        log.info("总耗时 {}",endTime - startTime - 1000);
    }
测试结果
并行线程数 耗时 消费数据 备注
2 210s 15000
4 108s 15000
6 79s 15000
8 81s 15000
16 31.234s 15000
32 36.271s 15000

ThreadPoolExecutor 测试

ThreadPoolExecutor 是一般线程池,ThreadPoolExecutor 我这里使用的是定长方式,和 Executors.newFixedThreadPool() 是一样的。代码如下:

    public void test5(){
        int fxcore = 4;

        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(fxcore, fxcore, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());

        AtomicInteger count = new AtomicInteger(0);
        AtomicInteger breaks = new AtomicInteger(0);

        long startTime = System.currentTimeMillis();
        IntStream.range(0,fxcore).forEach(j ->{
            threadPoolExecutor.execute(() -> {
                while (true) {
                    try {
                        String str = redisSdk.LRightPopBlock("a", 3);
                        if (!StrUtil.isEmpty(str)) {
                            log.info("累计处理 {} 条数据", count.incrementAndGet());
                        } else {
                            breaks.incrementAndGet();
                            break;
                        }
                    } catch (Exception e) {
                        log.error(e.getMessage());
                    }
                }
            });
        });
        while(true) {
            try {
                if(count.get() >= 15000) {
                    break;
                }
                Thread.sleep(500);
            }catch(Exception e){
                e.printStackTrace();
            }
        }
        long endTime = System.currentTimeMillis();
        log.info("总耗时 {}",endTime - startTime - 1000);
    }
测试结果
并行线程数 耗时 消费数据 备注
4 111.124s 15000
8 55.325s 15000
16 29.04s 15000
32 14.57s 15000
总结

ThreadPoolExecutor 开的线程越多,处理速度越快

ForkJoinPool + ThreadPoolExecutor 测试

ForkJoinPool + ThreadPoolExecutor 的测试方案是我自己想的一套,如果说 ForkJoinPool 可以充分让多核 cpu 处理任务,那让每个cpu在建立自己的多线程处理会不会更快呢?测试代码如下:

    public void test2(){
        int fcore = 4;
        int fxcore = 16;

        ForkJoinPool forkJoinPool = new ForkJoinPool(fcore);
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(fxcore, fxcore, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());

        AtomicInteger count = new AtomicInteger(0);
        AtomicInteger breaks = new AtomicInteger(0);

        long startTime = System.currentTimeMillis();
        IntStream.range(0,fcore).forEach(j ->{
            forkJoinPool.execute(() -> {
                for(int i=0;i< fxcore/fcore ;i++) {
                    threadPoolExecutor.execute(() -> {
                        while (true) {
                            try {
                                String str = redisSdk.LRightPopBlock("a", 3);
                                if (!StrUtil.isEmpty(str)) {
                                    log.info("累计处理 {} 条数据", count.incrementAndGet());
                                } else {
                                    breaks.incrementAndGet();
                                    break;
                                }
                            } catch (Exception e) {
                                log.error(e.getMessage());
                            }
                        }
                    });
                }
            });
        });
        while(true) {
            try {
                if(count.get() >= 15000) {
                    break;
                }
                Thread.sleep(500);
            }catch(Exception e){
                e.printStackTrace();
            }
        }
        long endTime = System.currentTimeMillis();
        log.info("总耗时 {}",endTime - startTime - 1000);
    }
测试结果
ForkJoinPool线程数 ThreadPool线程数 消费数据 耗时 备注
4 16 15000 28.55s
6 16 15000 37.55s 并行越多反而越慢
4 32 15000 15.53s
6 32 15000 16.15s 并行越多反而越慢,但线程多反而快
总结

看来并没有什么用,还是用 ThreadPool 速度最快。

Pipelined

查了很多方式,查到redis Pipelined也可以 使用rpop,代码如下:

    public void test7(int i){
        long startTime = System.currentTimeMillis();
        List<String> list = redisSdk.getTemplate().executePipelined(new RedisCallback() {
            @Override
            public String doInRedis(RedisConnection connection) throws DataAccessException {
                for(int i=0;i<20000;i++) {
                    connection.rPop("a".getBytes());
                }
                return null;
            }
        });
        long endTime = System.currentTimeMillis();
        log.info("共取到 {} 耗时 {}",list.size(),endTime-startTime);
    }
测试结果

这种方式不像上面单条消费,这里可以自己存到list,然后在写消费程序。注意这里如果redis list数据没有那么多,可能取到的是 null 值,该null值为对象,不为字符串。这里的测试每次都会打开一个新的连接

数据量 时间
1W 980ms
2W 1790ms
3W 2880ms
多线程测试数据并发
    private List<String> list1 = new ArrayList<>();

    private List<String> list2 = new ArrayList<>();

    private List<String> list3 = new ArrayList<>();


    public static void main(String[] args) {
        SpringApplication.run(RealTimeLibraryApplication.class, args);
    }


    @Override
    public void run(String... args) throws Exception {
        long[] a = LongStream.range(0, 10000).toArray();
        String[] aArr = Stream.of(Arrays.toString(a)).collect(Collectors.joining("", "[", "]")).split(",");
        redisSdk.LLeftSet("a", aArr);
        System.out.println("初始化完毕");
        
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3, 3, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
        for(int i=0;i<3;i++) {
            int finalI = i;
            threadPoolExecutor.execute(() -> test7(finalI));
        }
        while(true) {
            if(list1.size() > 0 && list2.size() > 0 && list3.size() >0){
                Thread.sleep(3000);
                break;
            }
            Thread.sleep(1000);
        }
        // disjoint true 就是没有交集 false 就是有交集
        log.info("list1 与 list2 交集存在为 {}", Collections.disjoint(list1,list2)?"否":"是");
        log.info("list1 与 list2 交集存在为 {}",Collections.disjoint(list1,list3)?"否":"是");
        log.info("list1 与 list2 交集存在为 {}",Collections.disjoint(list2,list3)?"否":"是");
    }

    public void test7(int i){
        long startTime = System.currentTimeMillis();
        List<String> list = redisSdk.getTemplate().executePipelined(new RedisCallback() {
            @Override
            public String doInRedis(RedisConnection connection) throws DataAccessException {
                for(int i=0;i<20000;i++) {
                    connection.rPop("a".getBytes());
                }
                return null;
            }
        });
        long endTime = System.currentTimeMillis();
        log.info("共取到 {} 耗时 {}",list.size(),endTime-startTime);
        while(list.contains(null)){
            list.remove(null);
        }
        list.forEach(System.out::println);
        if(i ==0){
            list1.addAll(list);
        }
        if(i ==1){
            list2.addAll(list);
        }
        if(i ==2){
            list3.addAll(list);
        }
    }

测试结果发现数据差不多是均匀的分布在每个list中(每个list在3000+多),且数据没有重复,所以上集群是没有问题的。

业务数据模拟测试代码
    @Override
    public void run(String... args) throws Exception {
        List<String> list1 = new ArrayList<>();
        for(int i=0;i<10000;i++){
            list1.add("{\"sysId\":41040020001,\"mpntId\":1000070001,\"attrId\":1,\"dataItemId\":1000001,\"timestamp\":1603789510000,\"value\":1,\"key\":\"41040020001:1000070001:1:1000001\"}");
        }
        List<String> list2 = new ArrayList<>();
        for(int i=0;i<10000;i++){
            list2.add("{\"sysId\":41040020001,\"mpntId\":1000070001,\"attrId\":1,\"dataItemId\":1000002,\"timestamp\":1603789510000,\"value\":1,\"key\":\"41040020001:1000070001:1:1000002\"}");
        }
        List<String> list3 = new ArrayList<>();
        for(int i=0;i<10000;i++){
            list3.add("{\"sysId\":41040020001,\"mpntId\":1000070001,\"attrId\":1,\"dataItemId\":1000012,\"timestamp\":1603789510000,\"value\":1,\"key\":\"41040020001:1000070001:1:1000012\"}");
        }
        List<String> list4 = new ArrayList<>();
        for(int i=0;i<10000;i++){
            list4.add("{\"sysId\":41040020001,\"mpntId\":1000070001,\"attrId\":1,\"dataItemId\":1000013,\"timestamp\":1603789510000,\"value\":1,\"key\":\"41040020001:1000070001:1:1000013\"}");
        }
        List<String> list5 = new ArrayList<>();
        for(int i=0;i<10000;i++){
            list5.add("{\"sysId\":41040020001,\"mpntId\":1000070001,\"attrId\":1,\"dataItemId\":1000014,\"timestamp\":1603789510000,\"value\":1,\"key\":\"41040020001:1000070001:1:1000014\"}");
        }

        redisSdk.sAddAll("power:station:keys", Arrays.asList("41040020001"));


        redisSdk.lLeftAddAll("41040020001",list1);
        redisSdk.lLeftAddAll("41040020001",list2);
        redisSdk.lLeftAddAll("41040020001",list3);
        redisSdk.lLeftAddAll("41040020001",list4);
        redisSdk.lLeftAddAll("41040020001",list5);



        System.out.println("初始化完毕");

        mainService.start();

    }
测试结果打印

在测试代码里面针对不同的 dataItemId 各创建1W条数据。

获取数量 获取时间 整个流程所耗时间
3W 996 ms 1030 ms
3W 1084 1303 ms
3W 1006 ms 1032 ms
3W 982 ms 1027 ms

最终结论

以上的 Pipelined 测试为表现最好的一个,能批量处理数据,减少网络IO,在实际业务应用中测试的表现也优于其他方式,故针对实时库的消费方式定为此方式。

lettuce jedis 性能比较

使用的都是spring redis提供的开发API,在数据包大小差不多一样的情况下,没用到链接池。

lettuce(ms) jedis(ms)
606 231
490 202
436 183
382 173
381 167
373 158
... 150
... 149
... 148
... ...

jedis 稳定在 148-149ms
lettuce 稳定在 373ms

补:这里引发的一个问题就是,我到底使用多少个线程会比较好呢?如果线程执行的是计算型任务可以核数 * 2,因为数值计算快,给太多线程会频繁切换线程。如果是io型(任务型,业务型),线程可以给几百上千都没问题,但是太多的线程会占用内存,所以根据内存分配。把处理器想象成一个队列(不恰当想象),那么多线程只是等待被执行而已。切记小心,防止oom。

相关文章

  • Redis 使用list队列高速消费数据

    测试目的 本次测试目的是 消费 Redis List类型 里的数据 以各种方式来快速消费,得到最佳消费方式。消费框...

  • redis之list类型模拟队列

    使用redis 的list类型实现异步队列。 队列消费者代码: 队列生产者代码

  • redis 异步和延时消息队列

    redis 异步消息队列Redis 的 list(列表) 数据结构常用来作为异步消息队列使用,使用rpush/l...

  • 基于redis的延时队列的实现

    1、异步消息队列 Redis的list(列表)数据结构常用来作异步消息队列使用,用rpush和lpush操作入队列...

  • 2020-06-12

    Redis 没有 ack 机制,当消费失败的情况下队列如何处理? Redis List的特点就是只有消费,没有查看...

  • Redis中的Stream数据类型作为消息队列的尝试

    Redis的List数据类型作为消息队列,已经比较合适了,但存在一些不足,比如只能独立消费,订阅发布又无法支持数据...

  • redis队列

    问: 很多程序员喜欢用 Redis 的 LIST 数据结构实现任务队列,那么问题来了:比如一个消费者从队列拿到一个...

  • laravel队列背景知识

    redis队列 redis做消息队列的特性例如 FIFO(先进先出)很容易实现,只需要一个list对象从头取数据,...

  • redis异步队列

    redis来做消息队列是用list数据类型。 rpush->在list的right面插入数据,lpop->在lis...

  • redis学习笔记(十一) 异步队列

    1. 引子 redis中有list数据类型,通过list可以实现生产者和消费者。 2. 生产者 使用rpush命令...

网友评论

      本文标题:Redis 使用list队列高速消费数据

      本文链接:https://www.haomeiwen.com/subject/dlompktx.html