美文网首页
最简单实现高并发插入数万条数据(可同步可异步)

最简单实现高并发插入数万条数据(可同步可异步)

作者: jeyyband | 来源:发表于2019-03-28 09:53 被阅读0次

获取数据| 整理数据

        
        public static int count = 0;
        
        String uniqueTimeId = IDUtils.getInstance().getUniqueTimeId();

        String sqlof1 = "select vc_scode from ( "+sqlExpr+" )";

        String sqlof2 = SqlExprConstant.pool_sql.replace(SqlExprConstant.POOL_TEMPLATE, poolId);
        
        String joiner = " minus ";
        
        //查询筛选结果
        List<Map<String, Object>> reduce1 = riskruleService.executeSql(sqlof1+joiner+sqlof2);
        

        //插入数据库  121_证券筛选执行结果
        List<String> keys1 = reduce1.stream().filter(map -> !Objects.isNull(map.get("VC_SCODE")))
                                      .map(i ->(String)i.get("VC_SCODE"))
                                      .distinct().collect(Collectors.toList());
                                      
        
        //并发插入数据
        CurrencyAdd(keys1.size(), 100, keys1, uniqueTimeId, "1");       

高并发插入上万条数据

    /**
     * 高并发插入上万条数据
     * @param totalCount
     * @param threadTotal
     * @throws InterruptedException
     * @author Bruce
     */
    public void  CurrencyAdd(int totalCount ,int threadTotal,List<String> keys,String uniqueTimeId,String cCompType) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        // 限制同时执行的线程数
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(totalCount);

        for (int i = 0; i < totalCount; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    add(cCompType);
                    //根据下标获取取值插入
                    log.info("current count no is --->{}",count);
                    CompSecuFilterRes res = new CompSecuFilterRes();
                    String code = "";
                    
                    code = keys.get(count-1);
                    
                    res.setVcScode(code );
                    res.setVcFilterSeqno(uniqueTimeId);
                    res.setCCompType(cCompType);
                    res.setDMdftime(new Date());
                    riskruleService.addCompSecuFilterRes(res);
                    semaphore.release();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                countDownLatch.countDown();

            });
        }

        //异步不等待执行过程就把这行注掉
        //countDownLatch.await();
        executorService.shutdown();
        log.info("count{}", count);
    }
    
    private synchronized static void add(String type ) {
        
        count++;
       
    }

日志信息

...

2019-03-27 at 18:38:37 CST INFO  com.xx.xxx.controller.xxxxController 442 lambda$4 - current count no is --->97
2019-03-27 at 18:38:37 CST DEBUG org.apache.ibatis.logging.jdbc.BaseJdbcLogger 159 debug - ==>  Preparing: insert into T_COMP_SECU_FILTER_RES (VC_FILTER_SEQNO, C_REC_TYPE, VC_SCODE, C_COMP_TYPE, D_MDFTIME) values (?, ?, ?, ?, ?) 
2019-03-27 at 18:38:37 CST DEBUG org.apache.ibatis.logging.jdbc.BaseJdbcLogger 159 debug - ==> Parameters: 1553683109754193(String), null, 011801593YH(String), 2(String), 2019-03-27(Date)
2019-03-27 at 18:38:37 CST DEBUG org.apache.ibatis.logging.jdbc.BaseJdbcLogger 159 debug - <==    Updates: 1
2019-03-27 at 18:38:37 CST INFO  com.xx.xxx.controller.xxxxController 442 lambda$4 - current count no is --->98
2019-03-27 at 18:38:37 CST DEBUG org.apache.ibatis.logging.jdbc.BaseJdbcLogger 159 debug - ==>  Preparing: insert into T_COMP_SECU_FILTER_RES (VC_FILTER_SEQNO, C_REC_TYPE, VC_SCODE, C_COMP_TYPE, D_MDFTIME) values (?, ?, ?, ?, ?) 
2019-03-27 at 18:38:37 CST DEBUG org.apache.ibatis.logging.jdbc.BaseJdbcLogger 159 debug - ==> Parameters: 1553683109754193(String), null, 041758030YH(String), 1(String), 2019-03-27(Date)
2019-03-27 at 18:38:37 CST DEBUG org.apache.ibatis.logging.jdbc.BaseJdbcLogger 159 debug - <==    Updates: 1
2019-03-27 at 18:38:37 CST INFO  com.xx.xxx.controller.xxxxController 442 lambda$4 - current count no is --->99

...

转载请注明来源。
我的博客:http://blog.northpark.cn
本文同步地址:http://blog.northpark.cn/2019/03/28/%E6%9C%80%E7%AE%80%E5%8D%95%E5%AE%9E%E7%8E%B0%E9%AB%98%E5%B9%B6%E5%8F%91%E6%8F%92%E5%85%A5%E6%95%B0%E4%B8%87%E6%9D%A1%E6%95%B0%E6%8D%AE(%E5%8F%AF%E5%90%8C%E6%AD%A5%E5%8F%AF%E5%BC%82%E6%AD%A5)/

相关文章

  • 最简单实现高并发插入数万条数据(可同步可异步)

    获取数据| 整理数据 高并发插入上万条数据 日志信息 转载请注明来源。我的博客:http://blog.north...

  • MySQL 高可用集群解决方案--pxc

    一、pxc简介 优点总结服务高可用数据同步复制(并发复制),几乎无延迟多个可同时读写节点,可实现写扩展新节点可以自...

  • MyBatis批量插入数据实现(MySQL)

    一、SQL层面实现数据插入 先复习一下单条/批量插入数据的sql语句怎么写: 单条插入数据的写法: 批量插入一种可...

  • Java中的同步与异步

    进程同步用来实现程序并发执行时候的可再现性。 一.进程同步及异步的概念 1.进程同步:就是在发出一个功能调用时,在...

  • 同步与异步

    进程同步用来实现程序并发执行时候的可再现性。 进程同步及异步的概念 1.进程同步:就是在发出一个功能调用时,在没有...

  • 2017-10-19-同步和异步

    同步和异步 1,同步 2,异步 并发和并行都是异步任务实现的俩种方式 3,并发 4,并行 总结:并行是相对于多核C...

  • 2018-05-24-多线程学习

    java多线程并发的编程学习 1,概念的学习 同步异步:同步需要等待异步不需要,同步如对共享数据操作避免脏数据...

  • 发布-订阅方式实现异步并发

    发布-订阅方式实现异步并发 通过发布/订阅者模式实现异步并发 异步意味着我们不会像同步那样顺序或串行地获取到程序每...

  • GCD

    GCD的底层实现 基本理解 重点 并发:同步的话,任务按顺序执行。异步:真正的并发。 同步和异步:能不能开辟一个线...

  • Message Queue 消息队列

    消息队列概述 消息队列是系统中重要的中间件,主要解决应用耦合,异步消息,高并发等问题。实现高性能,高可用,可伸缩和...

网友评论

      本文标题:最简单实现高并发插入数万条数据(可同步可异步)

      本文链接:https://www.haomeiwen.com/subject/nepfbqtx.html