美文网首页
ElasticJob分片机制

ElasticJob分片机制

作者: 圣村的希望 | 来源:发表于2020-10-06 18:20 被阅读0次

    ElasticJob是一个弹性的分布式任务调度框架,这里的分布式就是采用分片的来进行任务调度和业务执行的解耦,分片信息就是中间进行解耦的。ElasticJob任务调度框架调度触发执行的是分片,然后业务可以在框架触发对应分片信息的时候,增加自己业务的处理。分片这个思想挺不错的,把任务调度框架和实际业务解耦的相当好。

ShardingListenerManager:这个是分片监听的开始
@Override
    public void start() {
        addDataListener(new ShardingTotalCountChangedJobListener());
        addDataListener(new ListenServersChangedJobListener());
    }

     ShardingListenerManager分片管理监听器在ElasticJob启动的就是开启了监听,这里是开启了2个监听器ShardingTotalCountChangedJobListener(分片节点总数变化监听器)和ListenServersChangedJobListener(服务器改变监听器)。

ShardingTotalCountChangedJobListener:分片总数变化监听器
class ShardingTotalCountChangedJobListener extends AbstractJobListener {
        
        @Override
        protected void dataChanged(final String path, final Type eventType, final String data) {
            if (configNode.isConfigPath(path) && 0 != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {
                int newShardingTotalCount = YamlEngine.unmarshal(data, JobConfigurationPOJO.class).toJobConfiguration().getShardingTotalCount();
                if (newShardingTotalCount != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {
                    shardingService.setReshardingFlag();
                    JobRegistry.getInstance().setCurrentShardingTotalCount(jobName, newShardingTotalCount);
                }
            }
        }
    }

    获取zookeeper下发的分片个数变化事件的通知,判断新分片数和原分片数是否相等,不相等的话设置需要重新分片的标记,创建/leader/sharding/neccessary持久节点。

ListenServersChangedJobListener:服务器状态变更监听器
class ListenServersChangedJobListener extends AbstractJobListener {
        
        @Override
        protected void dataChanged(final String path, final Type eventType, final String data) {
            if (!JobRegistry.getInstance().isShutdown(jobName) && (isInstanceChange(eventType, path) || isServerChange(path))) {
                shardingService.setReshardingFlag();
            }
        }
        
        private boolean isInstanceChange(final Type eventType, final String path) {
            return instanceNode.isInstancePath(path) && Type.NODE_CHANGED != eventType;
        }
        
        private boolean isServerChange(final String path) {
            return serverNode.isServerPath(path);
        }
    }

    instance和servers节点下有子节点变化会被监听到,这个时候也会去设置下需要重新分片的标记/leader/sharding/neccessary节点。

    上面是触发生成了需要重新分片的标记,具体分片的执行时在任务执行的过程中。在作业任务执行的时候需要获取分片信息,这个时候会完成重新分片的执行。

    public final void execute() {
        //...
        //获取当前作业服务器的分片上下文信息
        ShardingContexts shardingContexts = jobFacade.getShardingContexts();

        //...
    }

    @Override
    public ShardingContexts getShardingContexts() {
        boolean isFailover = configService.load(true).isFailover();
        if (isFailover) {
            //TODO 获取故障转移到当前节点的分片信息
            List<Integer> failoverShardingItems = failoverService.getLocalFailoverItems();
            if (!failoverShardingItems.isEmpty()) {
                //TODO 获取故障转移分片信息
                return executionContextService.getJobShardingContext(failoverShardingItems);
            }
        }
        shardingService.shardingIfNecessary();
        List<Integer> shardingItems = shardingService.getLocalShardingItems();
        if (isFailover) {
            //TODO 删除本节点被故障转移的分片信息
            shardingItems.removeAll(failoverService.getLocalTakeOffItems());
        }
        shardingItems.removeAll(executionService.getDisabledItems(shardingItems));
        return executionContextService.getJobShardingContext(shardingItems);
    }
  • 获取当前节点服务器的分片上下文信息,如果开启了故障转移机制,会优先获取故障转移到当前作业服务器的分片任务。
   shardingService.shardingIfNecessary();

    public void shardingIfNecessary() {
        List<JobInstance> availableJobInstances = instanceService.getAvailableJobInstances();//TODO 获取job存活运行实例
        //不必要分片或作业运行实例为空,则不进行分片操作
        if (!isNeedSharding() || availableJobInstances.isEmpty()) {
            return;
        }
        //等待选举完成,非leader的话,等待完成分片
        if (!leaderService.isLeaderUntilBlock()) {
            blockUntilShardingCompleted();
            return;
        }
        waitingOtherShardingItemCompleted();
        JobConfiguration jobConfig = configService.load(false);
        int shardingTotalCount = jobConfig.getShardingTotalCount();
        log.debug("Job '{}' sharding begin.", jobName);
        jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "");//临时processing节点,表示当前正在进行分片
        resetShardingInfo(shardingTotalCount);
        JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(jobConfig.getJobShardingStrategyType());
        jobNodeStorage.executeInTransaction(new PersistShardingInfoTransactionExecutionCallback(jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount)));
        log.debug("Job '{}' sharding complete.", jobName);
    }
  • 如果需要就执行重新分片,这里是去判断/leader/sharding/neccessary节点是否存在,如果存在就需要执行重新分片操作。重新分片操作是只能在主服务器才能执行的,因为这个分片信息是需要所有作业集群统一。所以这里需要等待集群节点leader选举完成,然后从节点不会进行分片操作,从节点是阻塞等待主节点分片完成才退出。
//等待当前任务所有分片执行完成才去执行分片操作,这里是防止分片任务被重复执行,一个幂等操作
waitingOtherShardingItemCompleted();
  • 等待当前任务所有分片任务执行完成才去执行分片操作,这里是防止分片任务被其它作业服务器重复执行,一个幂等操作的处理。
jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "");
  • 创建一个临时节点/leader/sharding/processing,表示正在执行重新分片操作。
JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(jobConfig.getJobShardingStrategyType());
  • 获取重新分片策略,这里是可以在配置作业任务的时候进行配置jobShardingStrategyType参数。
jobNodeStorage.executeInTransaction(new PersistShardingInfoTransactionExecutionCallback(jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount)));

jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount))
  • 首先是分片策略(默认是平均分配策略)根据当前的作业服务器实例和分片个数,来计算出每个作业实例获取到的对应的分片信息。然后是在一个zookeeper事务中去创建对应的节点信息。
@RequiredArgsConstructor
    class PersistShardingInfoTransactionExecutionCallback implements TransactionExecutionCallback {
        
        private final Map<JobInstance, List<Integer>> shardingResults;
        
        @Override
        public List<CuratorOp> createCuratorOperators(final TransactionOp transactionOp) throws Exception {
            List<CuratorOp> result = new LinkedList<>();
            for (Entry<JobInstance, List<Integer>> entry : shardingResults.entrySet()) {
                for (int shardingItem : entry.getValue()) {
                    result.add(transactionOp.create().forPath(jobNodePath.getFullPath(ShardingNode.getInstanceNode(shardingItem)), entry.getKey().getJobInstanceId().getBytes()));
                }
            }
            result.add(transactionOp.delete().forPath(jobNodePath.getFullPath(ShardingNode.NECESSARY)));
            result.add(transactionOp.delete().forPath(jobNodePath.getFullPath(ShardingNode.PROCESSING)));
            return result;
        }
    }
  • 在这个zookeeper事务中,根据每个作业服务器实例获取到的分片信息,创建一个路径为/sharding/{item}/instance,value为instanceId的节点信息,待所有分片信息对应点的zookeeper节点创建完成之后,会删除掉/leader/sharding/neccessary和/leader/sharding/processing子节点信息。

相关文章

  • ElasticJob分片机制

    ElasticJob是一个弹性的分布式任务调度框架,这里的分布式就是采用分片的来进行任务调度和业务执行的解耦,分片...

  • ElasticJob幂等机制

    ElasticJob的幂等机制,是指作业分片执行的幂等,他需要做到以下两点: 同一个分片在当前作业实例上不会被重复...

  • ElasticJob 源码解析之主节点选举分片实现

    在elasticJob中,最重要的一个功能就是作业分片,作业分片是怎样实现的,由谁来负责分片?哈哈,肯定不是我来负...

  • elastic-job 源码解读之从源码看zookeeper节点

    ​ elasticjob是使用zk做分布式协调,包括分片参数配置,再分片,选主节点,作业状态等一系列...

  • elastic job源码分析 - 分片监听管理器

    elastic job启动时,分片监听管理器io.elasticjob.lite.internal.shardin...

  • elasticjob-业务分片

    方式1:机器分片和业务自定义分片对应 方式2:hashcode取余

  • ElasticJob故障转移机制

    在ElasticJob中,会把一个任务分成多个分片,然后再把分片分配给集群中不同的节点实例进行作业任务的执行。但是...

  • MongoDB/分片

    分片机制:分片的机制:一开始插入数据时,数据是只插入到其中一块分片上的,插入完毕后,mongodb内部开始在各片之...

  • 进一步提高Elasticsearch的检索效率

    Elasticsearch的路由机制与其分片机制有着直接的关系。Elasticsearch的路由机制即是通过...

  • Kafak原理

    Apache kafka原理 1 分片与副本机制 : 此处的分片指的是对topic中数据进行分片和建立副本, 一个...

网友评论

      本文标题:ElasticJob分片机制

      本文链接:https://www.haomeiwen.com/subject/wkxzuktx.html