美文网首页Node.js专题我爱编程程序员
实现一个轻量级JS任务队列(自调度)

实现一个轻量级JS任务队列(自调度)

作者: blurooo | 来源:发表于2018-06-27 18:23 被阅读14次

项目介绍

轻量级JS任务队列,支持并行数控制,任务超时,任务失败重试,错误抓取,任务运行情况统计等。通过简单的一个任务队列,二三百行代码,不仅能实现可控的并行数,还能收集到每个任务的排队时间、执行时间、执行状况、重试次数等,进而推断出主机压力等情况,不亦乐乎?

注:目前仅为单机任务队列,适合单机性质的压力(例如socket通信,使用队列可以控制主机的网络IO,避免雪崩效应)或前端。但也有想法借助redis等数据库实现集群任务队列,看天意和契机了。

项目地址:https://github.com/blurooo/queue

软件架构

队列如何实现任务的及时消化?抓住两个重要时机:put任务和任务执行完成。假设队列的并行量被设置为n,初始化的时候正在执行中任务为m:

  1. put任务的时候,顺带检测一下执行中的任务数距离并发量上限还有多少,一次性加载n-m个任务放入执行中队列。
  2. 每个执行完成的任务,处理完事务之后,顺带,尝试加载一个任务来补充执行。

两个时机都抓住,这个任务队列就是“永动机”。

队列仅依赖于Promise,不管是使用原生ES6的Promise对象,抑或使用bluebird库,都可以正常运行(代码扔在同个文件里,方便拷走)。

安装教程

暂且作为一个工具类使用吧,不放到npm了,前端或nodeJs都可以直接打包带走。

使用说明

  1. 创建默认队列

    按照需要创建一个队列对象(队列对象之间互不影响, 所以可以多种场景使用多个队列对象,甚至可以通过多个队列组合从而应对一些复杂场景)

    let q = new Queue();
    
  2. put单个任务

    由于Promise的执行代码在创建的时刻就已经被执行(then和catch内的代码则通过回调执行),所以简单把Promise扔进队列是不可行的

    // 下面的打印序是 1 2 3
    new Promise(resolve => {
        console.log(1);
        resolve(3)
    }).then(res => {
        console.log(res);
    })
    console.log(2)
    

    通过将Promise扔进一个function可以达到延期执行的效果

    function p() {
        // 返回promise任务
        return new Promise(resolve => {
            console.log(1);
            resolve(3)
        }).then(res => {
            console.log(res);
        })
    }
    console.log(2);
    

    此时p必须等待调用才会执行内部的Promise代码,且p返回的是该Promise,值便可以继续传递。 每个放置到QueueTask的Promise任务都应该以这种方式封装

    function p() {
        return new Promise(resolve => {
            console.log(1);
            resolve(2)
        }).then(res => {
            console.log(res);
            return 3;
        })
    }
    // 由于队列很空闲, 可以立即调度本任务,
    // 所以很快就成功打印出了1, 之后的then则需要等待合适的时机回调,
    // 如果Promise及其上面的所有then都执行完了, 最终会传递到put.then
    q.put(p).then(task => {
        // 打印最终值3
        console.log(task.res)
    });
    

    当然,如果在put的时候,队列执行中的任务数已经到达最大并行量,则需要等待有任务执行完成时腾出空间,并且排在当前任务之前的任务已经都被调度完了才会得到执行。

  3. put一组任务

    QueueTask允许同时put多个任务,且put.then会在该组任务都被执行完毕时才被调用

    function getP(flag) {
        return function p() {
            return new Promise(resolve => {
                resolve(flag)
            });
        }
    }
    q.put([getP(1), getP(2), getP(3)]).then(tasks => {
        // 打印每个任务的返回值, 按放入顺序一一对应
        for (let i = 0; i < tasks.length; i++) {
            console.log(tasks[i].res);
        }
    })
    
  4. 配置

    目前支持的参数如下

    let q = new Queue({
       // 最大并行数,最小为1
       concurrency: 5,
       // 任务超时时间ms,0不超时
       timeout: 15000,
       // 任务失败重试次数,0不重试
       retry: 0,
       // 是否优先处理失败重试的任务,为true则失败的任务会被放置到队列头
       retryPrior: false,
       // 是否优先处理新任务,为true则新任务会被放置到队列头
       newPrior: false,
       // 任务失败的handler函数,如果设置了重试,同个任务失败多次会执行catch多次
       catch: (err) => {
           // 错误
       }
    });
    

    参数可以在运行期更改, 对后续生效

    q.setOptions({
       concurrency: 5,
       timeout: 15000,
       retry: 0,
       retryPrior: false,
       newPrior: false,
       catch: null
    });
    
  5. 任务运行状态

    使用QueueTask执行的Promise任务所有错误会被catch并包装,所以只存在put.then而不存在put.catch(除非put.then自身出错)。任务执行之后获取到的响应有一些有用的值可以用于服务统计

    taskRes = {
        // 执行是否遇到错误, 判断任务是否执行成功的判断依据是err === undefined, err为任何其它值都代表了运行失败。
        // 任务出错时, 如果不重试, 那么catch到的错误会直接放入err, 超时时err为'queue_timeout'
        // 如果重试, 且在最大重试次数之后依然错误的话, 会将最后一次的错误放入err
        // 如果重试, 且在重试期间成功的话, 被认为是成功的, 所以err为空
        err: undefined,
        // 执行Promise返回的结果
        res: Object,
        // 从任务放入队列到该任务最后一次被调度, 所经过的时间(ms)
        putUntilRunTime: 20,
        // 该任务最后一次运行的时间(ms)
        runTime: 1,
        // 该任务出错重试的次数
        retry: 2
    }
    
  6. 插队

    除了队列控制参数newPrior和retryPrior之外,也允许在put的时候指定当前任务是否优先处理

    put(tasks, prior)
    

    默认情况下,任务是放入队尾的,但如果指定了prior为true,则会被放置到队头,put任务组时会维持组任务原本的顺序,并整个放入队头。

  7. 更多

    提供了demo.js,可以评估运行状况是否符合预期。

参与贡献

  1. Fork 本项目
  2. 新建 Feat_xxx 分支
  3. 提交代码
  4. 新建 Pull Request

相关文章

  • 实现一个轻量级JS任务队列(自调度)

    项目介绍 轻量级JS任务队列,支持并行数控制,任务超时,任务失败重试,错误抓取,任务运行情况统计等。通过简单的一个...

  • 关于宏任务微任务的题

    第一道题: 解析:js任务队列有两种,宏任务队列,微任务队列。js的事件循环调度的就是宏任务队列。一个宏任务执行完...

  • Golang实现简单爬虫框架(5)——项目重构与数据存储

    前言 在上一篇文章《Golang实现简单爬虫框架(4)——队列实现并发任务调度》中,我们使用用队列实现了任务调度,...

  • GCD解析

    死锁(Deadlock) 在串行队列中,当前队列的调度块内(包含嵌套)调用当前队列的同步任务会死锁异步调度块要等自...

  • GCD死锁的知识点

    队列:负责调度任务 串行队列(Serial Dispatch Queue):一个接一个的调度任务,要等待上一个执行...

  • 2018-07-09 python中的轻量级定时任务调度库:sc

    转载自: python中的轻量级定时任务调度库:schedule 提到定时任务调度的时候,相信很多人会想到芹菜ce...

  • 操作系统知识点

    进程调度全局队列调度:操作系统维护一个全局的任务等待队列,当系统中有一个空闲的CPU时,操作系统就会从全局任务队列...

  • 8.7 阻塞队列BlockingQueue

    简介 当调度器调度线程池执行任务时,生产者生产任务,消费者消费任务,那么这时就需要一个任务队列,生产者向队列里插入...

  • iOS调度队列

    GCD调度队列是执行任务的强大工具。调度队列允许您相对于调度者异步或者同步的执行任意代码块。您能够使用调度队列来执...

  • GCD

    队列的两种类型:并发() 串行(Serial Dispatch Queue) 串行队列:一次只调度一个任务,队列...

网友评论

    本文标题:实现一个轻量级JS任务队列(自调度)

    本文链接:https://www.haomeiwen.com/subject/jpsiyftx.html