美文网首页
使用javaConfig方式来控制spring batch执行流

使用javaConfig方式来控制spring batch执行流

作者: 七玄之主 | 来源:发表于2018-10-31 16:25 被阅读0次

1、任务顺序执行
按照先step1再step2的顺序执行。以下是按照step为单位直接按序注册到jobBuilder中。

@Configuration
public class TestConfiguration {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    public Tasklet1 tasklet1;

    @Autowired
    public Tasklet1 tasklet2;

    private static final String JOB1_NAME = "job1";
    private static final String STEP1_NAME = "step1";
    private static final String STEP2_NAME = "step2";

    @Bean
    public Job job1() {
        return jobBuilderFactory.get(JOB1_NAME).incrementer(new RunIdIncrementer()).start(step1())
                        .next(step2()).end().build();
    }

    @Bean
    public Step step1() {
        return stepBuilderFactory.get(STEP1_NAME).tasklet(tasklet1).build();
    }

    @Bean
    public Step step2() {
        return stepBuilderFactory.get(STEP2_NAME).tasklet(tasklet2).build();
    }
}

也可以先把step整合到一个flow中,统一注册到jobBuilder中。


@Bean
public Job job1() {
    Flow flow = new FlowBuilder<Flow>(FLOW1_NAME)
                    .from(step1())
                    .next(step2())
                    .build();
    return jobBuilderFactory.get(JOB1_NAME).incrementer(new RunIdIncrementer()).start(flow)
                    .end().build();
}

2、任务条件执行
通过在条件执行的tasklet中设置该step执行状态来决定下一步需要执行的step是什么。

@Component
@StepScope
public class Tasklet1 implements Tasklet {

    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext)
                    throws Exception {
        // 不同的业务分支设置step执行状态
        if (check()) {
            contribution.setExitStatus(ExitStatus.COMPLETED);
        } else {
            contribution.setExitStatus(ExitStatus.FAILED);
        }

        return RepeatStatus.FINISHED;
    }
}

job中通过on方法来设定条件执行逻辑。

@Bean
public Job job1() {
    return jobBuilderFactory.get(JOB1_NAME).incrementer(new RunIdIncrementer()).start(step1())
                    .on(ExitStatus.COMPLETED.getExitCode()).to(step2()).from(step1())
                    .on(ExitStatus.FAILED.getExitCode()).to(step3()).end().build();
}

3、任务并行执行
并行执行是通过多线程方式实现的。并行执行可分为不同step的并行及针对同一step处理数据量过大的分区执行。

不同step的实现

@Bean
public Job job1() {
    Flow flow1 = new FlowBuilder<Flow>(FLOW1_NAME)
                    .start(new FlowBuilder<Flow>(STEP1_NAME).from(step1()).end())
                    .split(new SimpleAsyncTaskExecutor())
                    .add(new FlowBuilder<Flow>(STEP2_NAME).from(step2()).end()).build();

    return jobBuilderFactory.get(JOB1_NAME).incrementer(new RunIdIncrementer()).start(flow1)
                    .end().build();
}

对于上面使用到的SimpleAsyncTaskExecutor执行器来讲,首先该执行器不重用任何线程,或者说它每次调用都启动一个新线程。但是,它还是支持对并发总数设限,当超过线程并发总数限制时,阻塞新的调用,直到有位置被释放。

step分区的实现
首先需要为开启的各线程定义分区逻辑,通过此类可以向多线程执行的同一step传递不用的参数,已明确分区范围。

@Component
public class TestPartitioner implements Partitioner {

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {

        Map<String, ExecutionContext> map = new HashMap<String, ExecutionContext>();

        for (int i = 1; i <= gridSize; i++) {
            ExecutionContext context = new ExecutionContext();
            context.putInt("param1", i);

            map.put("partition" + i, context);
        }
        return map;
    }
}

tasklet中接收到分区逻辑中设定的参数,并将参数传入对应的业务逻辑方法中实现分区处理。

@Component
public class Tasklet1 implements Tasklet {

    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
        int param1 = (int)chunkContext.getStepContext().getStepExecution().getExecutionContext().get("param1");

        // testService.handle(param1);
        return RepeatStatus.FINISHED;
    }
}

jobConfig中如下

@Configuration
public class TestConfiguration {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    public Tasklet1 tasklet1;

    @Autowired
    private TestPartitioner testPartitioner;

    private static final String JOB1_NAME = "job1";
    private static final String MASTER_STEP_NAME = "mainStep";
    private static final String SLAVE_STEP_NAME = "slaveStep";

    @Bean
    public Job job1() {
        return jobBuilderFactory.get(JOB1_NAME).incrementer(new RunIdIncrementer())
                        .start(masterStep()).build();
    }

    @Bean
    public Step masterStep() {
        return stepBuilderFactory.get(MASTER_STEP_NAME)
                        .partitioner(slaveStep().getName(), testPartitioner)
                        .partitionHandler(handler()).build();
        ;
    }

    @Bean
    public Step slaveStep() {
        return stepBuilderFactory.get(SLAVE_STEP_NAME).tasklet(tasklet1).build();
    }

    @Bean
    public PartitionHandler handler() {
        TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
        // 根据实际需要设置开启step句柄数量(分区数)
        handler.setGridSize(10);
        handler.setTaskExecutor(new SimpleAsyncTaskExecutor());
        handler.setStep(slaveStep());
        try {
            handler.afterPropertiesSet();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return handler;
    }
}

相关文章

网友评论

      本文标题:使用javaConfig方式来控制spring batch执行流

      本文链接:https://www.haomeiwen.com/subject/oweatqtx.html