您好,登錄后才能下訂單哦!
Spring Batch簡單來說就是一個輕量級的批處理框架,從名字就可以知道它是Spring 的子項目。我們在企業開發中可能會面臨到一些需要處理較大數據量的場景,例如將一個表的全部數據導入到另一張表結構類似的表中、批量讀取一個或多個文件內容并寫入到數據庫中,又或者將一張表的數據批量更新到另一張表中。而Spring Batch可以幫助我們快速的開發這種場景下的批處理應用程序。
Spring Batch提供了在處理大量數據時必不可少的可重用功能,包括日志記錄/跟蹤、事務管理、作業處理統計信息、作業重新啟動、跳過和資源管理。對于大數據量和高性能的批處理任務,Spring Batch 同樣提供了高級功能和特性來支持,例如分區功能、遠程功能等,大大簡化了批處理應用的開發,將開發人員從復雜的任務配置管理過程中解放出來,讓我們可以更多地去關注核心的業務的處理過程。總之,通過 Spring Batch 我們就能夠實現簡單的或者復雜的和大數據量的批處理作業。
Spring Batch的結構圖如下:
本文目的主要是教大家如何快速地使用Spring Boot集成Spring Batch實現一個定時的批處理作業Demo,所以不會對Spring Batch理論部分進行過多的介紹,因為其官網及網絡上都有詳細的參考文檔。
官網地址如下:
本文以操作數據庫的批處理示例,當我們的批處理作業需要操作數據庫時,Spring Batch要求在數據庫中創建好批處理作業的元數據的存儲表格。如下,其中以batch開頭的表,是Spring Batch用來存儲每次執行作業所產生的元數據。而student表則是作為我們這個Demo中數據的來源:
下圖顯示了所有6張表的ERD模型及其相互關系(摘自官網):
綜上,所以我們需要在數據庫中執行如下來自官方的元數據模式SQL腳本:
-- do not edit this file
-- BATCH JOB 實例表 包含與aJobInstance相關的所有信息
-- JOB ID由batch_job_seq分配
-- JOB 名稱,與spring配置一致
-- JOB KEY 對job參數的MD5編碼,正因為有這個字段的存在,同一個job如果第一次運行成功,第二次再運行會拋出JobInstanceAlreadyCompleteException異常。
CREATE TABLE BATCH_JOB_INSTANCE (
JOB_INSTANCE_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT ,
JOB_NAME VARCHAR(100) NOT NULL,
JOB_KEY VARCHAR(32) NOT NULL,
constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY)
) ENGINE=InnoDB;
-- 該BATCH_JOB_EXECUTION表包含與該JobExecution對象相關的所有信息
CREATE TABLE BATCH_JOB_EXECUTION (
JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT ,
JOB_INSTANCE_ID BIGINT NOT NULL,
CREATE_TIME DATETIME NOT NULL,
START_TIME DATETIME DEFAULT NULL ,
END_TIME DATETIME DEFAULT NULL ,
STATUS VARCHAR(10) ,
EXIT_CODE VARCHAR(2500) ,
EXIT_MESSAGE VARCHAR(2500) ,
LAST_UPDATED DATETIME,
JOB_CONFIGURATION_LOCATION VARCHAR(2500) NULL,
constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID)
references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID)
) ENGINE=InnoDB;
-- 該表包含與該JobParameters對象相關的所有信息
CREATE TABLE BATCH_JOB_EXECUTION_PARAMS (
JOB_EXECUTION_ID BIGINT NOT NULL ,
TYPE_CD VARCHAR(6) NOT NULL ,
KEY_NAME VARCHAR(100) NOT NULL ,
STRING_VAL VARCHAR(250) ,
DATE_VAL DATETIME DEFAULT NULL ,
LONG_VAL BIGINT ,
DOUBLE_VAL DOUBLE PRECISION ,
IDENTIFYING CHAR(1) NOT NULL ,
constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;
-- 該表包含與該StepExecution 對象相關的所有信息
CREATE TABLE BATCH_STEP_EXECUTION (
STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT NOT NULL,
STEP_NAME VARCHAR(100) NOT NULL,
JOB_EXECUTION_ID BIGINT NOT NULL,
START_TIME DATETIME NOT NULL ,
END_TIME DATETIME DEFAULT NULL ,
STATUS VARCHAR(10) ,
COMMIT_COUNT BIGINT ,
READ_COUNT BIGINT ,
FILTER_COUNT BIGINT ,
WRITE_COUNT BIGINT ,
READ_SKIP_COUNT BIGINT ,
WRITE_SKIP_COUNT BIGINT ,
PROCESS_SKIP_COUNT BIGINT ,
ROLLBACK_COUNT BIGINT ,
EXIT_CODE VARCHAR(2500) ,
EXIT_MESSAGE VARCHAR(2500) ,
LAST_UPDATED DATETIME,
constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;
-- 該BATCH_STEP_EXECUTION_CONTEXT表包含ExecutionContext與Step相關的所有信息
CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT (
STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
SHORT_CONTEXT VARCHAR(2500) NOT NULL,
SERIALIZED_CONTEXT TEXT ,
constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID)
references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID)
) ENGINE=InnoDB;
-- 該表包含ExecutionContext與Job相關的所有信息
CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT (
JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
SHORT_CONTEXT VARCHAR(2500) NOT NULL,
SERIALIZED_CONTEXT TEXT ,
constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;
CREATE TABLE BATCH_STEP_EXECUTION_SEQ (
ID BIGINT NOT NULL,
UNIQUE_KEY CHAR(1) NOT NULL,
constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;
INSERT INTO BATCH_STEP_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_STEP_EXECUTION_SEQ);
CREATE TABLE BATCH_JOB_EXECUTION_SEQ (
ID BIGINT NOT NULL,
UNIQUE_KEY CHAR(1) NOT NULL,
constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;
INSERT INTO BATCH_JOB_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_EXECUTION_SEQ);
CREATE TABLE BATCH_JOB_SEQ (
ID BIGINT NOT NULL,
UNIQUE_KEY CHAR(1) NOT NULL,
constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;
INSERT INTO BATCH_JOB_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_SEQ);
而student表的建表SQL如下:
CREATE TABLE `student` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(20) NOT NULL,
`age` int(11) NOT NULL,
`sex` varchar(20) NOT NULL,
`address` varchar(100) NOT NULL,
`cid` int(11) NOT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=19 DEFAULT CHARSET=utf8;
并且student表中有如下簡單的數據:
數據庫準備完畢后,接下來我們就是創建Spring Boot項目:
填寫項目名、包名等信息:
勾選如下紅框標注的依賴項:
點擊Finish完成項目的創建:
項目最終的依賴項如下:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
SpringBoot的配置文件內容如下(本人習慣于使用.yml文件格式):
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/springbatch?serverTimezone=Asia/Shanghai&characterEncoding=UTF-8&autoReconnect=true
hikari:
password: password
username: root
jpa:
open-in-view: true
show-sql: true
hibernate:
ddl-auto: update
database: mysql
# 禁止項目啟動時運行job
batch:
job:
enabled: false
本小節我們來開始編寫實際的代碼,項目最終結構如下:
首先是 student 表格的實體類,我們需要通過這個類去操作student表格中的數據,代碼如下:
package org.zero.example.springbatchdemo.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.persistence.*;
/**
* student 表格的實體類
*
* @author 01
* @date 2019-02-24
**/
@Data
@Entity
@Table(name = "student")
@NoArgsConstructor
@AllArgsConstructor
public class Student {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Integer id;
private String name;
private Integer age;
private String sex;
private String address;
private Integer cid;
}
由于批處理作業和定時任務都需要使用到多線程,所以我們需要配置一下Spring的線程池,代碼如下:
package org.zero.example.springbatchdemo.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
/**
* 配置任務線程池執行器
*
* @author 01
* @date 2019-02-24
**/
@Configuration
public class ExecutorConfiguration {
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(50);
threadPoolTaskExecutor.setMaxPoolSize(200);
threadPoolTaskExecutor.setQueueCapacity(1000);
threadPoolTaskExecutor.setThreadNamePrefix("Data-Job");
return threadPoolTaskExecutor;
}
}
實現一個作業的監聽器,批處理作業在執行前后會調用監聽器的方法,這樣我們就可以根據實際的業務需求在作業執行的前后進行一些日志的打印或者邏輯處理等,代碼如下:
package org.zero.example.springbatchdemo.task.listener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
/**
* 一個簡單的job監聽器
*
* @author 01
* @date 2019-02-24
**/
@Slf4j
@Component
public class JobListener implements JobExecutionListener {
private final ThreadPoolTaskExecutor threadPoolTaskExecutor;
private long startTime;
@Autowired
public JobListener(ThreadPoolTaskExecutor threadPoolTaskExecutor) {
this.threadPoolTaskExecutor = threadPoolTaskExecutor;
}
/**
* 該方法會在job開始前執行
*/
@Override
public void beforeJob(JobExecution jobExecution) {
startTime = System.currentTimeMillis();
log.info("job before " + jobExecution.getJobParameters());
}
/**
* 該方法會在job結束后執行
*/
@Override
public void afterJob(JobExecution jobExecution) {
log.info("JOB STATUS : {}", jobExecution.getStatus());
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
log.info("JOB FINISHED");
threadPoolTaskExecutor.destroy();
} else if (jobExecution.getStatus() == BatchStatus.FAILED) {
log.info("JOB FAILED");
}
log.info("Job Cost Time : {}/ms", (System.currentTimeMillis() - startTime));
}
}
核心的來了,我們需要配置一個最基本的Job,Job是真正進行批處理業務的地方。一個Job 通常由一個或多個Step組成(基本就像是一個工作流);而一個Step通常由三部分組成(讀入數據:ItemReader,處理數據:ItemProcessor,寫入數據:ItemWriter)。代碼如下:
package org.zero.example.springbatchdemo.task.job;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JpaPagingItemReader;
import org.springframework.batch.item.database.orm.JpaNativeQueryProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.zero.example.springbatchdemo.model.Student;
import org.zero.example.springbatchdemo.task.listener.JobListener;
import javax.persistence.EntityManagerFactory;
/**
* 配置一個最基本的Job
*
* @author 01
* @date 2019-02-24
**/
@Slf4j
@Component
public class DataBatchJob {
/**
* Job構建工廠,用于構建Job
*/
private final JobBuilderFactory jobBuilderFactory;
/**
* Step構建工廠,用于構建Step
*/
private final StepBuilderFactory stepBuilderFactory;
/**
* 實體類管理工工廠,用于訪問表格數據
*/
private final EntityManagerFactory emf;
/**
* 自定義的簡單Job監聽器
*/
private final JobListener jobListener;
@Autowired
public DataBatchJob(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory,
EntityManagerFactory emf, JobListener jobListener) {
this.jobBuilderFactory = jobBuilderFactory;
this.stepBuilderFactory = stepBuilderFactory;
this.emf = emf;
this.jobListener = jobListener;
}
/**
* 一個最基礎的Job通常由一個或者多個Step組成
*/
public Job dataHandleJob() {
return jobBuilderFactory.get("dataHandleJob").
incrementer(new RunIdIncrementer()).
// start是JOB執行的第一個step
start(handleDataStep()).
// 可以調用next方法設置其他的step,例如:
// next(xxxStep()).
// next(xxxStep()).
// ...
// 設置我們自定義的JobListener
listener(jobListener).
build();
}
/**
* 一個簡單基礎的Step主要分為三個部分
* ItemReader : 用于讀取數據
* ItemProcessor : 用于處理數據
* ItemWriter : 用于寫數據
*/
private Step handleDataStep() {
return stepBuilderFactory.get("getData").
// <輸入對象, 輸出對象> chunk通俗的講類似于SQL的commit; 這里表示處理(processor)100條后寫入(writer)一次
<Student, Student>chunk(100).
// 捕捉到異常就重試,重試100次還是異常,JOB就停止并標志失敗
faultTolerant().retryLimit(3).retry(Exception.class).skipLimit(100).skip(Exception.class).
// 指定ItemReader對象
reader(getDataReader()).
// 指定ItemProcessor對象
processor(getDataProcessor()).
// 指定ItemWriter對象
writer(getDataWriter()).
build();
}
/**
* 讀取數據
*
* @return ItemReader Object
*/
private ItemReader<? extends Student> getDataReader() {
// 讀取數據,這里可以用JPA,JDBC,JMS 等方式讀取數據
JpaPagingItemReader<Student> reader = new JpaPagingItemReader<>();
try {
// 這里選擇JPA方式讀取數據
JpaNativeQueryProvider<Student> queryProvider = new JpaNativeQueryProvider<>();
// 一個簡單的 native SQL
queryProvider.setSqlQuery("SELECT * FROM student");
// 設置實體類
queryProvider.setEntityClass(Student.class);
queryProvider.afterPropertiesSet();
reader.setEntityManagerFactory(emf);
// 設置每頁讀取的記錄數
reader.setPageSize(3);
// 設置數據提供者
reader.setQueryProvider(queryProvider);
reader.afterPropertiesSet();
// 所有ItemReader和ItemWriter實現都會在ExecutionContext提交之前將其當前狀態存儲在其中,
// 如果不希望這樣做,可以設置setSaveState(false)
reader.setSaveState(true);
} catch (Exception e) {
e.printStackTrace();
}
return reader;
}
/**
* 處理數據
*
* @return ItemProcessor Object
*/
private ItemProcessor<Student, Student> getDataProcessor() {
return student -> {
// 模擬處理數據,這里處理就是打印一下
log.info("processor data : " + student.toString());
return student;
};
}
/**
* 寫入數據
*
* @return ItemWriter Object
*/
private ItemWriter<Student> getDataWriter() {
return list -> {
for (Student student : list) {
// 模擬寫數據,為了演示的簡單就不寫入數據庫了
log.info("write data : " + student);
}
};
}
}
完成以上Job配置后,就可以執行了。通常運行Job的方式有兩種,一種是我們把Job對象注入到Spring容器里,Spring Batch默認在項目啟動完成后就會運行容器里配置好的Job,如果配置了多個Job也可以通過配置文件去指定。但是以我個人經驗來說大多數業務場景都是要求定時去執行Job的,所以這里采用定時任務去運行Job。通過調用的方式主動去運行Job的話,需要使用到JobLauncher中的run方法。具體代碼如下:
package org.zero.example.springbatchdemo.task;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.*;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.zero.example.springbatchdemo.task.job.DataBatchJob;
/**
* 簡單的定時任務
*
* @author 01
* @date 2019-02-24
**/
@Slf4j
@Component
public class TimeTask {
private final JobLauncher jobLauncher;
private final DataBatchJob dataBatchJob;
@Autowired
public TimeTask(JobLauncher jobLauncher, DataBatchJob dataBatchJob) {
this.jobLauncher = jobLauncher;
this.dataBatchJob = dataBatchJob;
}
// 定時任務,每十秒執行一次
@Scheduled(cron = "0/10 * * * * ?")
public void runBatch() throws JobParametersInvalidException, JobExecutionAlreadyRunningException,
JobRestartException, JobInstanceAlreadyCompleteException {
log.info("定時任務執行了...");
// 在運行一個job的時候需要添加至少一個參數,這個參數最后會被寫到batch_job_execution_params表中,
// 不添加這個參數的話,job不會運行,并且這個參數在表中中不能重復,若設置的參數已存在表中,則會拋出異常,
// 所以這里才使用時間戳作為參數
JobParameters jobParameters = new JobParametersBuilder()
.addLong("timestamp", System.currentTimeMillis())
.toJobParameters();
// 獲取job并運行
Job job = dataBatchJob.dataHandleJob();
JobExecution execution = jobLauncher.run(job, jobParameters);
log.info("定時任務結束. Exit Status : {}", execution.getStatus());
}
}
最后,我們需要在Spring Boot的啟動類上加上兩個注解,以開啟批處理及定時任務,否則批處理和定時任務都不會執行,代碼如下:
package org.zero.example.springbatchdemo;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
/**
* :@EnableBatchProcessing 用于開啟批處理作業的配置
* :@EnableScheduling 用于開啟定時任務的配置
*
* @author 01
* @date 2019-02-24
*/
@EnableScheduling
@EnableBatchProcessing
@SpringBootApplication
public class SpringBatchDemoApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBatchDemoApplication.class, args);
}
}
啟動項目,等待十秒,控制臺輸出日志如下,證明我們的批處理程序正常執行了:
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。