您好,登錄后才能下訂單哦!
一.Job的創建及其應用
1.Job flow的介紹:
(1)狀態機:例完成step1,是否繼續完成step2,step3,我們就需要通過Job flow來控制
(2)進行演示:使用next()方法來達到順序執行step1,step2...的目的,再使用on(),to(),from()方法達到與next()方法同樣的目的,再展示fail()方法和stopAndRestart()方法;
例1:創建JobFlowDemoOne,以及三個Step使用next()讓其順序執行
JobFlowDemOne:
package com.dhcc.batch.batchDemo.config;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class JobFlowDemOne {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job JobFlowDemo1() {
return jobBuilderFactory.get("JobFlowDemo1").start(step1()).next(step2()).next(step3()).build();
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1").tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("step1-->Hello Spring Batch....");
return RepeatStatus.FINISHED;
}
}).build();
}
@Bean
public Step step2() {
return stepBuilderFactory.get("step2").tasklet((contribution, context) -> {
System.out.println("step 2-->Hello Spring Batch..");
return RepeatStatus.FINISHED;
}).build();
}
@Bean
public Step step3() {
return stepBuilderFactory.get("step3").tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("step3-->Hello Spring Batch....");
return RepeatStatus.FINISHED;
}
}).build();
}
}
在step2()中,我們使用了java8的新特性,使用了Lambda表達式
package com.dhcc.batch.batchDemo;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@EnableBatchProcessing
public class BatchDemoApplication {
public static void main(String[] args) {
SpringApplication.run(BatchDemoApplication.class, args);
}
}
開始執行,執行結果為:
觀察控制臺執行結果,我們可以發現Step1,step2,step3,按照next()的順序順序執行完畢,此時我們在進入數據庫中進行觀察,我們主要觀察JobExecution表以及StepExecution表,查看其表數據:
JobExecution:
StepExecution:
在查看一個JobInstance,觀察Job實例:
例2:我們使用on(),to(),from()方法順序執行我們的Step,達到與例1相同的效果
原代碼不變,我們將JobFlowDemo1略作修改(注:我在這里為例方便演示新建了一個數據庫):
@Bean
public Job JobFlowDemo2() {
return jobBuilderFactory.get("JobFlowDemo2")
// .start(step1())
// .next(step2())
// .next(step3())
.start(step1()).on("COMPLETED").to(step2())
.from(step2()).on("COMPLETED").to(step3())
.from(step3()).end()
.build();
觀察控制臺:
代碼執行成功;
fail()方法和stopAndRestart()方法后面用到的時候我們在做詳細了解
二.Flow的創建及使用
1.flow的介紹:
(1)flow是一個Step的集合,他規定了Step與Step之間的轉換關系;
(2)創建Flow可以達到復用的效果,讓其在不同的Job之間進行復用;
(3)使用FlowBuilder去創建一個Flow,他和Job類似,使用start(),next()以及end()來運行flow;
例:
創建JobFlowDemoTwoApplication
package com.dhcc.batch.batchDemo.jobFlowDemoTwo;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@EnableBatchProcessing
public class JobFlowDemoTwoApplication {
public static void main(String[] args) {
SpringApplication.run(JobFlowDemoTwoApplication.class, args);
}
}
創建step,flow以及Job——》JobFlowDemoTwoConfiguration:
package com.dhcc.batch.batchDemo.jobFlowDemoTwo;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class JobFlowDemoTwoConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Step jobFlowDemoTwo1() {
return stepBuilderFactory.get("jobFlowDemoTwo1").tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("Hello-->jobFlowDemoTwo1");
return RepeatStatus.FINISHED;
}
}).build();
}
@Bean
public Step jobFlowDemoTwo2() {
return stepBuilderFactory.get("jobFlowDemoTwo2").tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("Hello-->jobFlowDemoTwo2");
return RepeatStatus.FINISHED;
}
}).build();
}
@Bean
public Step jobFlowDemoTwo3() {
return stepBuilderFactory.get("jobFlowDemoTwo3").tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("Hello-->jobFlowDemoTwo3");
return RepeatStatus.FINISHED;
}
}).build();
}
// 創建Flow,它是一個Step集合
@Bean
public Flow jobFlowDemoFlow() {
return new FlowBuilder<Flow>("jobFlowDemoFlow")
.start(jobFlowDemoTwo1())
.next(jobFlowDemoTwo2())
.build();
}
//創建一個Job執行Flow以及Step
@Bean
public Job jobFlowDemoTwoJob() {
return jobBuilderFactory.get("jobFlowDemoTwoJob")
.start(jobFlowDemoFlow())
.next(jobFlowDemoTwo3())
.end()
.build();
}
}
開始運行,我們觀察控制臺:
運行結果我們可以看見Job先運行了Flow中的兩個Step,然后再運行jobFlowDemoStep3(),與我們設定的順序一樣
在數據庫中,我們以stepExection表為例觀察:
看見了3個Step順序執行完成;
三.Spilt多線程并發任務定義及使用
1.Spilt異步執行Flow
2.舉例說明:
(1)首先定義兩個Flow,在每個Flow中定義一些Step,每一個Step將自身的名字以及當前運行的線程打印出來;
(2)創建一個Job使用Spilt異步的啟動兩個Flow;
(3)運行Job,查看結果;
例:
JobSpiltDemoApplication:
package com.dhcc.batch.batchDemo.jobSpiltDemo;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@EnableBatchProcessing
public class JobSpiltDemoApplication {
public static void main(String[] args) {
SpringApplication.run(JobSpiltDemoApplication.class, args);
}
}
JobSpiltDemoConfiguration:
package com.dhcc.batch.batchDemo.jobSpiltDemo;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
@Configuration
public class JobSpiltDemoConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
/**
* 創建job運行Flow,我們利用split(new
* SimpleAsyncTaskExecutor()).add()讓flow異步執行,add()中可以添加多個Flow
*
* @return
*/
@Bean
public Job SpiltJob() {
return jobBuilderFactory.get("SpiltJob").start(jobSpiltFlow1()).split(new SimpleAsyncTaskExecutor())
.add(jobSpiltFlow2()).end().build();
}
// 創建Flow1
@Bean
public Flow jobSpiltFlow1() {
return new FlowBuilder<Flow>("jobSpiltFlow1")
.start(stepBuilderFactory.get("jobSpiltStep1").tasklet(tasklet()).build())
.next(stepBuilderFactory.get("jobSpiltStep2").tasklet(tasklet()).build()).build();
}
// 創建Flow1
@Bean
public Flow jobSpiltFlow2() {
return new FlowBuilder<Flow>("jobSpiltFlow2")
.start(stepBuilderFactory.get("jobSpiltStep3").tasklet(tasklet()).build())
.next(stepBuilderFactory.get("jobSpiltStep4").tasklet(tasklet()).build()).build();
}
private Tasklet tasklet() {
return new PrintTasklet();
}
// step執行的任務類(可以寫為外部類,此處為了方便,我們寫為內部類)
private class PrintTasklet implements Tasklet {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("has been execute on stepName:" + chunkContext.getStepContext().getStepName()
+ ",has been execute on thread:" + Thread.currentThread().getName());
return RepeatStatus.FINISHED;
}
}
}
運行結果:
觀察控制臺,我們可以看出我們達到了讓Step異步執行的目的
觀察數據庫表:
四.決策器的定義及應用
1.Decision:為我們提供下一步執行哪一個Step提供條件決策
2.JobExecutionDecider:接口,提供決策條件
例:
package com.dhcc.batch.batchDemo.flowDecisionDemo;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@EnableBatchProcessing
public class FlowDecisionDemoApplication {
public static void main(String[] args) {
SpringApplication.run(FlowDecisionDemoApplication.class, args);
}
}
FlowDecisionDemoConfiguration:
package com.dhcc.batch.batchDemo.flowDecisionDemo;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.flow.JobExecutionDecider;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FlowDecisionDemoConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
//編寫Job
@Bean
public Job FlowDecisionJob() {
return jobBuilderFactory.get("FlowDecisionJob")
.start(firstStep()).next(myDecider())
.from(myDecider()).on("EVEN").to(evenStep())
.from(myDecider()).on("ODD").to(oddStep())
.from(oddStep()).on("*").to(myDecider())
.end()
.build();
}
@Bean
public Step firstStep() {
return stepBuilderFactory.get("firstStep").tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("Hello firstStep..");
return RepeatStatus.FINISHED;
}
}).build();
}
@Bean
public Step evenStep() {
return stepBuilderFactory.get("evenStep").tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("Hello evenStep..");
return RepeatStatus.FINISHED;
}
}).build();
}
@Bean
public Step oddStep() {
return stepBuilderFactory.get("oddStep").tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("Hello oddStep..");
return RepeatStatus.FINISHED;
}
}).build();
}
//編寫決策器
@Bean
public JobExecutionDecider myDecider() {
return new myDecider();
}
}
myDecider:
package com.dhcc.batch.batchDemo.flowDecisionDemo;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.job.flow.FlowExecutionStatus;
import org.springframework.batch.core.job.flow.JobExecutionDecider;
public class myDecider implements JobExecutionDecider {
private int count=0;
@Override
public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
count++;
if(count%2==0) {
return new FlowExecutionStatus("EVEN");
}else {
return new FlowExecutionStatus("ODD");
}
}
}
運行結果:
我們觀察控制臺,分析結果:首先job中先運行firstStep(),然后進入到myDecider中Count++,此時count=1,返回”ODD”,job中執行oddStep(),然后無論什么狀態再次進入myDecider中,此時count=2,故返回”EVEN”,下一步執行evenStep();
五.Job的嵌套定義及應用
1.job可以嵌套使用,嵌套的Job我們將其稱為子job,被嵌套的Job我們將其稱為父job;
2.一個父Job可以有多個子Job;
3.子job不能單獨運行,需要其父Job去啟動;
例:
NestedJobApplication:
package com.dhcc.batch.batchDemo.nestedJob;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@EnableBatchProcessing
public class NestedJobApplication {
public static void main(String[] args) {
SpringApplication.run(NestedJobApplication.class, args);
}
}
接下來建立兩個子Job,每個子job中有兩個Step,一個父job
讓父job控制運行子job
ChildJobOneConfiguration:
package com.dhcc.batch.batchDemo.nestedJob;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ChildJobOneConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job childJob1() {
return jobBuilderFactory.get("childJob1")
.start(childJob1Step1())
.next(childJob1Step2())
.build();
}
@Bean
public Step childJob1Step1() {
return stepBuilderFactory.get("childJob1Step1").tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("Hello---->childJob1Step1");
return RepeatStatus.FINISHED;
}
}).build();
}
@Bean
public Step childJob1Step2() {
return stepBuilderFactory.get("childJob1Step2").tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("Hello---->childJob1Step2");
return RepeatStatus.FINISHED;
}
}).build();
}
}
ChildJobTwoConfiguration:
package com.dhcc.batch.batchDemo.nestedJob;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ChildJobTwoConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job childJob2() {
return jobBuilderFactory.get("childJob2")
.start(childJob2Step1())
.next(childJob2Step2())
.build();
}
@Bean
public Step childJob2Step1() {
return stepBuilderFactory.get("childJob2Step1").tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("Hello---->childJob2Step1");
return RepeatStatus.FINISHED;
}
}).build();
}
@Bean
public Step childJob2Step2() {
return stepBuilderFactory.get("childJob2Step2").tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("Hello---->childJob2Step2");
return RepeatStatus.FINISHED;
}
}).build();
}
}
ParentJobConfiguration:
package com.dhcc.batch.batchDemo.nestedJob;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.builder.JobStepBuilder;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
@Configuration
public class ParentJobConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private Job childJob1;
@Autowired
private Job childJob2;
@Autowired
private JobLauncher jobLauncher;
@Bean
public Job parentJob(JobRepository repository,PlatformTransactionManager transactionManager) {
return jobBuilderFactory.get("parentJob")
.start(parentJobStep())
.next(childJob1(repository, transactionManager))
.next(childJob2(repository, transactionManager))
.build();
}
private Step childJob1(JobRepository repository,PlatformTransactionManager transactionManager) {
return new JobStepBuilder(new StepBuilder("childJob1"))
.job(childJob1)
.launcher(jobLauncher)
.repository(repository)
.transactionManager(transactionManager)
.build();
}
private Step childJob2(JobRepository repository,PlatformTransactionManager transactionManager) {
return new JobStepBuilder(new StepBuilder("childJob2"))
.job(childJob2)
.launcher(jobLauncher)
.repository(repository)
.transactionManager(transactionManager)
.build();
}
@Bean
public Step parentJobStep() {
return stepBuilderFactory.get("parentJobStep")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("Helllo------>parentJobStep..");
return RepeatStatus.FINISHED;
}
}).build();
}
}
在配置文件中添加
spring.datasource.url=jdbc:mysql://localhost:3306/springbatch?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8&useSSL=false
spring.datasource.username=root
spring.datasource.password=qitao1996
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.schema=classpath:/org/springframework/batch/core/schema-mysql.sql
spring.batch.initialize-schema=always
spring.batch.job.names=parentJob
運行結果:
觀察控制臺,成功運行,我們達到了job嵌套的效果
六.監聽器的定義及應用
1.Listener:控制Job執行的一種方式
2.可以通過接口或者注解實現監聽器
3.在spring-batch中提供各個級別的監聽器接口,從job級別到item級別都有
(1)JobExecutionListener(before..,after..);
(2)StepExecutionListener(before..,after..);
(3)ChunkListener(before..,after..);
(4)ItemReaderListener;ItemWriterListener;ItemProcessListener(before..,after..,error..);
例:
ListenerJobApplication :
package com.dhcc.batch.batchDemo.listenerJob;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@EnableBatchProcessing
public class ListenerJobApplication {
public static void main(String[] args) {
SpringApplication.run(ListenerJobApplication.class, args);
}
}
創建監聽器(通過實現接口實現)
MyJobListener:
package com.dhcc.batch.batchDemo.listenerJob;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
/**
* 實現接口構建監聽器
* @author Administrator
*
*/
public class MyJobListener implements JobExecutionListener{
@Override
public void beforeJob(JobExecution jobExecution) {
System.out.println(jobExecution.getJobInstance().getJobName()+"before running......");
}
@Override
public void afterJob(JobExecution jobExecution) {
System.out.println(jobExecution.getJobInstance().getJobName()+"before running......");
}
}
創建監聽器(通過注解實現)
MyChunkListener:
package com.dhcc.batch.batchDemo.listenerJob;
import org.springframework.batch.core.annotation.AfterChunk;
import org.springframework.batch.core.annotation.BeforeChunk;
import org.springframework.batch.core.scope.context.ChunkContext;
/**
* 使用注解構建監聽器
* @author Administrator
*
*/
public class MyChunkListener {
@BeforeChunk
public void beforeChunk(ChunkContext context) {
System.out.println(context.getStepContext().getStepName()+"chunk before running.....");
}
@AfterChunk
public void afterChunk(ChunkContext context) {
System.out.println(context.getStepContext().getStepName()+"chunk after running.....");
}
}
MyListenerJobConfiguration:
package com.dhcc.batch.batchDemo.listenerJob;
import java.util.Arrays;
import java.util.List;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MyListenerJobConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
//監聽Job執行
@Bean
public Job myListenerJob() {
return jobBuilderFactory.get("myListenerJob")
.start(myListenerStep())
.listener(new MyJobListener())
.build();
}
private Step myListenerStep() {
return stepBuilderFactory.get("myListenerStep")
.<String,String>chunk(2)
.faultTolerant()
.listener(new MyChunkListener())
.reader(reader())
.writer(writer())
.build();
}
private ItemReader<? extends String> reader() {
return new ListItemReader<>(Arrays.asList("maozedong","zhude","pendehuai","zhouenlai","liushaoqi"));
}
private ItemWriter<? super String> writer() {
return new ItemWriter<String>() {
@Override
public void write(List<? extends String> items) throws Exception {
for(String item:items) {
System.out.println("Writing item: "+item);
}
}
};
}
}
運行結果:
觀察控制臺,我們成功的完成了監聽任務..
七.Job參數初探
1.JobParameters作用:在Job運行過程中,可以用來傳遞信息
2.通過”key---->value”鍵值對的形式傳入,在代碼中我們通過get(“key”)來獲取value值
例:
JobParametersDemoApplication:
package com.dhcc.batch.batchDemo.jobParametersDemo;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@EnableBatchProcessing
public class JobParametersDemoApplication {
public static void main(String[] args) {
SpringApplication.run(JobParametersDemoApplication.class, args);
}
}
創建Job,Step:
JobParametersConfiguretion:
package com.dhcc.batch.batchDemo.jobParametersDemo;
import java.util.Map;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameter;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class JobParametersConfiguretion implements StepExecutionListener{
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
private Map<String, JobParameter> parames;
@Bean
public Job MyParametersJobThree() {
return jobBuilderFactory.get("MyParametersJobThree")
.start(MyParametersJobStep3())
.build();
}
private Step MyParametersJobStep3() {
return stepBuilderFactory.get("MyParametersJobStep3")
.listener(this)
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("parame is: "+parames.get("info"));
return RepeatStatus.FINISHED;
}
}).build();
}
@Override
public void beforeStep(StepExecution stepExecution) {
System.out.println(stepExecution.getStepName()+"運行之前...........");
parames = stepExecution.getJobParameters().getParameters();
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
System.out.println(stepExecution.getStepName()+"運行之完畢...........");
return null;
}
}
運行結果:
觀察控制臺,此處我們沒有進行傳參;
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。