美文网首页
xxl_job的调度线程池

xxl_job的调度线程池

作者: 尘埃里的玄 | 来源:发表于2021-12-16 14:52 被阅读0次

5.4.3 调度中心HA(集群)

基于数据库的集群方案,数据库选用Mysql;集群分布式并发环境中进行定时任务调度时,会在各个节点会上报任务,存到数据库中,执行时会从数据库中取出触发器来执行,如果触发器的名称和执行时间相同,则只有一个节点去执行此任务。

5.4.4 调度线程池

调度采用线程池方式实现,避免单线程因阻塞而引起任务调度延迟。

网址:https://www.xuxueli.com/xxl-job

源码:

package com.xxl.job.admin.core.thread;

import com.xxl.job.admin.core.conf.XxlJobAdminConfig;
import com.xxl.job.admin.core.trigger.TriggerTypeEnum;
import com.xxl.job.admin.core.trigger.XxlJobTrigger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * job trigger thread pool helper
 *
 * @author xuxueli 2018-07-03 21:08:07
 */
public class JobTriggerPoolHelper {
    private static Logger logger = LoggerFactory.getLogger(JobTriggerPoolHelper.class);


    // ---------------------- trigger pool ----------------------

    // fast/slow thread pool
    private ThreadPoolExecutor fastTriggerPool = null;
    private ThreadPoolExecutor slowTriggerPool = null;

    public void start(){
        fastTriggerPool = new ThreadPoolExecutor(
                10,
                XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),
                60L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(1000),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());
                    }
                });

        slowTriggerPool = new ThreadPoolExecutor(
                10,
                XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),
                60L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(2000),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());
                    }
                });
    }


    public void stop() {
        //triggerPool.shutdown();
        fastTriggerPool.shutdownNow();
        slowTriggerPool.shutdownNow();
        logger.info(">>>>>>>>> xxl-job trigger thread pool shutdown success.");
    }


    // job timeout count
    private volatile long minTim = System.currentTimeMillis()/60000;     // ms > min
    private volatile ConcurrentMap<Integer, AtomicInteger> jobTimeoutCountMap = new ConcurrentHashMap<>();


    /**
     * add trigger
     */
    public void addTrigger(final int jobId,
                           final TriggerTypeEnum triggerType,
                           final int failRetryCount,
                           final String executorShardingParam,
                           final String executorParam,
                           final String addressList) {

        // choose thread pool
        ThreadPoolExecutor triggerPool_ = fastTriggerPool;
        AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
        if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) {      // job-timeout 10 times in 1 min
            triggerPool_ = slowTriggerPool;
        }

        // trigger
        triggerPool_.execute(new Runnable() {
            @Override
            public void run() {

                long start = System.currentTimeMillis();

                try {
                    // do trigger
                    XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                } finally {

                    // check timeout-count-map
                    long minTim_now = System.currentTimeMillis()/60000;
                    if (minTim != minTim_now) {
                        minTim = minTim_now;
                        jobTimeoutCountMap.clear();
                    }

                    // incr timeout-count-map
                    long cost = System.currentTimeMillis()-start;
                    if (cost > 500) {       // ob-timeout threshold 500ms
                        AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
                        if (timeoutCount != null) {
                            timeoutCount.incrementAndGet();
                        }
                    }

                }

            }
        });
    }



    // ---------------------- helper ----------------------

    private static JobTriggerPoolHelper helper = new JobTriggerPoolHelper();

    public static void toStart() {
        helper.start();
    }
    public static void toStop() {
        helper.stop();
    }

    /**
     * @param jobId
     * @param triggerType
     * @param failRetryCount
     *          >=0: use this param
     *          <0: use param from job info config
     * @param executorShardingParam
     * @param executorParam
     *          null: use job param
     *          not null: cover job param
     */
    public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) {
        helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
    }

}

image.png

通过源码发现作者定义了2个线程池,一个快触发任务的线程池,一个慢触发任务的线程池。


image.png

当在一分钟内一个任务超时的次数超过10次就从快线程池切换到慢线程池

相关文章

  • xxl_job的调度线程池

    5.4.3 调度中心HA(集群) 基于数据库的集群方案,数据库选用Mysql;集群分布式并发环境中进行定时任务调度...

  • Java线程池的使用

    线程类型: 固定线程 cached线程 定时线程 固定线程池使用 cache线程池使用 定时调度线程池使用

  • 2019-03-25——Java并发 Executor框架 Th

    ThreadPoolExecutor是Java线程池的核心类,以内部线程池的形式对外提供管理任务执行,线程调度,线...

  • 漫画解读: Java 线程池的工作机制

    线程池 结构图 线程池是一套围绕着核心线程、非核心线程、等待队列的任务调度框架。 默认情况,线程池主要结构如下: ...

  • KafkaScheduler 调度分析

    kafkaScheduler调度模块 KafkaScheduler作为broker进程的调度模块,提供对线程池的封...

  • java中4种常用线程池

    java中4种常用线程池 一、线程池 线程池:说白了,就是一种线程使用模式。线程过多会带来调度开销,进而影响整体性...

  • 【转载】线程池-2

    线程池基本调度功能。 线程池自动扩容缩容。 队列缓存线程。 关闭线程池。 这些功能,最后也留下了三个待实现的 fe...

  • SpringBoot使用线程池

    1. 线程池 线程池是一种线程使用模式。线程过多会带来额外的开销,其中包括创建销毁线程的开销、调度线程的开销等等,...

  • 线程池

    线程池管理器:用于创建并管理线程池工作线程:线程池中的线程任务接口:每个任务必须实现的接口,用于工作线程调度其运行...

  • 看ThreadPoolExecutor源码前的骚操作

    Flag ThreadPoolExecutor(线程池),大家使用线程的时候,都用过它对线程进行创建及其调度管理,...

网友评论

      本文标题:xxl_job的调度线程池

      本文链接:https://www.haomeiwen.com/subject/wexvfrtx.html