决策器
一些业务比较复杂的批处理操作中,可能会存在如下的需求:如在 微博抽奖 中,进行批量处理挑选中间人的时候,需要根据 活跃度,发帖量,粉丝数 的不同,进行不同筛选操作。这时就需要一个 决策器 ,决策器的作用:根据不同的条件,返回不同的状态码(自定义状态码),根据状态码的不同,选择不同的步骤进行批量处理操作。
代码示例
自定义一个决策器
自定义一个决策器,当输入值为 奇数 的时候,返回 “odd”,当输入值为 偶数 的时候,返回 “even”
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
|
public class MyDecider implements JobExecutionDecider {
private int count;
@Override public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) { count++; if (count % 2 == 0) { return new FlowExecutionStatus("even"); }else { return new FlowExecutionStatus("odd"); } } }
|
使用自定义的决策器,进行步骤选择
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
|
@Configuration @EnableBatchProcessing public class DeciderDemo { private final JobBuilderFactory jobBuilderFactory; private final StepBuilderFactory stepBuilderFactory; @Autowired public DeciderDemo(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) { this.jobBuilderFactory = jobBuilderFactory; this.stepBuilderFactory = stepBuilderFactory; }
@Bean public Job deciderDemoJob(){ return jobBuilderFactory.get("deciderDemoJob") .start(deciderDemoStep1()) .next(myDecider()) .from(myDecider()).on("even").to(deciderDemoStep2()) .from(myDecider()).on("odd").to(deciderDemoStep3()) .from(deciderDemoStep3()).on("*").to(myDecider()) .end().build(); }
@Bean public JobExecutionDecider myDecider(){ return new MyDecider(); } @Bean public Step deciderDemoStep3() { return stepBuilderFactory.get("deciderDemoStep3") .tasklet(new Tasklet() { @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { System.out.println("odd"); return RepeatStatus.FINISHED; } }).build(); } @Bean public Step deciderDemoStep2() { return stepBuilderFactory.get("deciderDemoStep2") .tasklet(new Tasklet() { @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { System.out.println("even"); return RepeatStatus.FINISHED; } }).build(); } @Bean public Step deciderDemoStep1() { return stepBuilderFactory.get("deciderDemoStep1") .tasklet(new Tasklet() { @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { System.out.println("deciderDemoStep1"); return RepeatStatus.FINISHED; } }).build(); } }
|
JobParamters
JobParameters
,就是Job运行时的参数。它在bath中有两个作用:一是标示不同的 JobInstance
实例,二是作为 job 中用到的信息,以参数的形式传给job。
通常每个 Job 都会有不通的启动方式,或者启动参数等信息,所以一般来说,在一个 Job 中,会公用一个 JobParamters
。
一般来说,不会在 Job 中使用 JobParamters
,大部分情况下,是在 Job 的执行步骤中使用,即在 Step 中使用 JobParamters
。在不同的 Step 中获取需要的 JobParamters
,更加便于 Step 的执行(比如需要判断 JobParamerts
的内容是否是正在执行的 Step 所需要的,如果需要就获取执行,如果不需要就略过)。
StepExecutionListener
在 Step 中获取 JobParameters
,通常使用 StepExecutionListener
监听器。这个监听器的作用是:在某个 Step 中传入监听器,这个监听器就可以获取到这个 Step 的所有上下文信息。在监听器的是实现方法中,进行上下文信息的获取、JobParameters
的获取、Step 执行前后的上下文处理等操作。
StepExecutionListener 需要实现 2 个方法:
- beforeStep(StepExecution stepExecution):在 Step 执行前获取 Step 的上下文信息
- afterStep(StepExecution stepExecution):在 Step 执行后获取 Step 的上下文信息
代码示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
|
public class MyStepListener implements StepExecutionListener {
private Map<String, JobParameter> parameterMap;
@Override public void beforeStep(SteoExecution stepExecution) { System.out.println(" 执行之前,Step 名称:" + stepExecution.getStepName()); parameterMap = stepExecution.getJobParameters().getParameters(); }
@Override public void afterStep(StepExecution stepExecution) { System.out.println(" 执行之后,Step 名称:" + stepExecution.getStepName()); } }
|
在 Job 运行期间获取 JobParameters
在 Job 运行期间获取 JobParameters,有两种方法:
- Map<String, JobParameter> 声明为公开静态变量,在 Listener 中使用
- 声明 Job 的类实现 StepExecutionListener 接口,直接在本类中使用私有变量调用
代码示例(以第二种为例)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
|
@Configuration public class ParametersDemo implements StepExecutionListener {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private Map<String, JobParameter> parameterMap;
@Autowired public ParametersDemo(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) { this.jobBuilderFactory = jobBuilderFactory; this.stepBuilderFactory = stepBuilderFactory; }
@Bean public Job parameterJob(){ return jobBuilderFactory.get("parameterJob") .start(parameterStep()) .build(); }
@Bean public Step parameterStep(){ return stepBuilderFactory.get("parameterStep") .listener(this) .tasklet(new Tasklet() { @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { System.out.println("【parameterStep】接收到的参数:" + parameterMap.get("info")); return RepeatStatus.FINISHED; } }).build(); }
@Override public void beforeStep(StepExecution stepExecution) { System.out.println("在 Step 执行之前传入参数"); parameterMap = stepExecution.getJobParameters().getParameters(); }
@Override public ExitStatus afterStep(StepExecution stepExecution) { System.out.println("在 Step 执行之后处理结果"); return null; } }
|