1. 簡介
Spring Batch 提供強大的機制來重啓失敗的任務。這些機制允許任務從失敗點繼續處理。這種功能對於高效地處理大規模數據處理任務至關重要。
Spring Batch 內置的 <em >JobRepository</em> 存儲了任務執行狀態。這使得任務具有默認的重啓能力。因此,一個失敗的任務可以精確地從它中斷的地方繼續執行。這確保了沒有重複處理或數據丟失的發生。
在本教程中,我們將探討如何配置和有效地重啓一個失敗的 Spring Batch 任務。
2. Maven 依賴
讓我們首先導入 spring-boot-starter-batch、spring-boot-starter-data-jpa 和 h2 依賴到我們的 <em>pom.xml</em> 中:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>2.2.224</version>
</dependency>我們需要基於文件的 H2 數據庫,以便通過在應用程序運行之間持久化作業執行狀態,從而實現作業的可恢復運行.
3. 定義一個簡單的 Spring Batch 任務
本節將探討一個 Spring Batch 任務配置,它演示了一個簡單的批量處理工作流程。我們將定義一個任務,其中包含一個步驟:處理 CSV 文件。
在 Spring Boot 3 中,應避免使用 <em >@EnableBatchProcessing</em >>,因為它會禁用 Spring Boot 有用的自動配置(例如創建 Spring Batch 表的功能)。
3.1. 配置
讓我們創建 BatchConfig 類,該類設置一個名為 simpleJob 的作業:
@Configuration
public class BatchConfig {
@Bean
public Job simpleJob(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new JobBuilder("simpleJob", jobRepository)
.start(step1(jobRepository, transactionManager))
.build();
}
}simpleJob Bean 使用 JobBuilder 定義批處理作業。該作業包含一個步驟:step1,它讀取、處理和寫入 CSV 文件:
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(2, transactionManager)
.reader(flatFileItemReader())
.processor(itemProcessor())
.writer(itemWriter())
.build();
}步驟 1 使用 StepBuilder 定義了基於分塊的步驟。該步驟以每次處理兩項數據為批次,從 FlatFileItemReader 讀取字符串,使用 ItemProcessor 進行轉換,並使用 ItemWriter 寫入結果 – 這一切都由 PlatformTransactionManager 管理的事務控制,以確保數據完整性。
JobRepository 存儲了步驟的執行狀態,包括 FlatFileItemReader 的位置,從而實現從上次未提交的塊中恢復執行,如果作業失敗。
3.2. 項目讀取器
讓我們定義 flatFileItemReader Bean 以提供輸入數據:
@Bean
@StepScope
public FlatFileItemReader<String> flatFileItemReader() {
return new FlatFileItemReaderBuilder<String>()
.name("itemReader")
.resource(new ClassPathResource("data.csv"))
.lineMapper(new PassThroughLineMapper())
.saveState(true)
.build();
}代碼定義了一個名為 FlatFileItemReader<String> 的 Bean,@StepScope 確保每個步驟執行時創建一個新的實例,從而實現正確的狀態管理和可重啓功能。它從 classpath 路徑下的 data.csv 文件讀取字符串,並使用 PassThroughLineMapper 將每一行映射為字符串。
此外,它具有 saveState(true) 功能,以持久化其在 ExecutionContext 中的讀取位置。這允許讀取器在作業失敗後從上次未處理的行恢復,利用 JobRepository 進行狀態持久化。
為了使作業真正可重啓,ItemReader 必須在 Spring Batch 的執行上下文中持久化其狀態。 例如,像 FlatFileItemReader 這樣的讀取器會自動保存關鍵進度信息(例如行號或記錄計數)在塊之間。
3.3. 物品處理器
讓我們聲明 itemProcessor Bean,用於轉換輸入數據:
@Bean
public RestartItemProcessor itemProcessor() {
return new RestartItemProcessor();
}
static class RestartItemProcessor implements ItemProcessor<String, String> {
private boolean failOnItem3 = true;
public void setFailOnItem3(boolean failOnItem3) {
this.failOnItem3 = failOnItem3;
}
@Override
public String process(String item) throws Exception {
System.out.println("Processing: " + item + " (failOnItem3=" + failOnItem3 + ")");
if (failOnItem3 && item.equals("Item3")) {
throw new RuntimeException("Simulated failure on Item3");
}
return "PROCESSED " + item;
}
}它通過在每個項目前添加 PROCESSED 標記並模擬在 Item3 上的失敗來處理每個項目。
3.4. Item Writer
現在,讓我們創建 itemWriter Bean,用於輸出處理後的數據:
@Bean
public ItemWriter<String> itemWriter() {
return items -> {
System.out.println("Writing items:");
for (String item : items) {
System.out.println("- " + item);
}
};
}此命令會將處理後的項目打印到控制枱。現在,我們的應用程序已準備就緒。
4. 重新啓動失敗的 Spring Batch 任務
Spring Batch 任務默認設計為可重啓,從而能夠無縫地從故障點恢復執行。因此,無需進行任何額外的配置即可啓用此功能。
為了確保其有效運行,任務狀態必須保存在 JobRepository 中。 此外,JobRepository 必須由數據庫支持,以確保可靠地存儲和檢索任務的執行狀態。
以下子章節描述瞭如何模擬任務失敗並重新啓動它。
4.1. 模擬作業失敗
為了模擬作業失敗,ItemProcessor 被配置為在處理 Item3 時拋出 RuntimeException。當失敗發生在 Item3 時,JobRepository 會存儲該狀態,並將作業標記為 FAILED。
運行應用程序時使用 mvn spring-boot:run 產生如下結果:
Starting new job execution...
Processing: Item1
Processing: Item2
Writing items:
- PROCESSED Item1
- PROCESSED Item2
Processing: Item3
[Exception: Simulated failure on Item3]
Job started with status: FAILED這段輸出確認Item1 和 Item2 已被處理並寫入,但 Item3 的失敗會停止任務,狀態會被保留以便後續重啓。
4.2. 重新啓動作業
要重新啓動失敗的作業,我們使用 CommandLineRunner 通過使用 JobExplorer 和固定 JobParameters 檢測失敗的作業實例。
@Bean
CommandLineRunner run(JobLauncher jobLauncher, Job job, JobExplorer jobExplorer,
JobOperator jobOperator, BatchConfig.RestartItemProcessor itemProcessor) {
return args -> {
JobParameters jobParameters = new JobParametersBuilder()
.addString("jobId", "test-job-" + System.currentTimeMillis())
.toJobParameters();
List<JobInstance> instances = jobExplorer.getJobInstances("simpleJob", 0, 1);
if (!instances.isEmpty()) {
JobInstance lastInstance = instances.get(0);
List<JobExecution> executions = jobExplorer.getJobExecutions(lastInstance);
if (!executions.isEmpty()) {
JobExecution lastExecution = executions.get(0);
if (lastExecution.getStatus() == BatchStatus.FAILED) {
itemProcessor.setFailOnItem3(false);
JobExecution restartedExecution = jobLauncher.run(job, jobParameters);
// final Long restartId = jobOperator.restart(lastExecution.getId());
// final JobExecution restartedExecution = jobExplorer.getJobExecution(restartedExecution);
// ...
}
}
}
};
}代碼通過 JobExplorer 檢查作業倉庫中是否存在任何 simpleJob 實例。
它檢測到執行失敗,狀態為 FAILED。它使用 JobLauncher.run() 或 JobOperator.restart() 來恢復該作業。該作業從其上一次持久化狀態開始恢復,從而確保先前處理過的項目不會被重新處理。
為了在重啓期間忽略 Item3 的失敗,我們在啓動重啓後的作業之前設置 itemProcessor.setFailOnItem3(false),允許 RestartItemProcessor 在不拋出異常的情況下處理 Item3。
現在,我們重新運行該應用程序。
讓我們檢查輸出:
Restarting failed job execution with ID: [execution_id]
Processing: Item3
Processing: Item4
Writing items:
- PROCESSED Item3
- PROCESSED Item4
Processing: Item5
Writing items:
- PROCESSED Item5
Restarted job status: COMPLETEDSpring Batch 任務在 Item3 處失敗。它成功從失敗點重新啓動。然後,該任務分塊處理 Item3 到 Item5,寫入結果,並以 COMPLETED 狀態完成。
4.3. 驗證作業重啓功能
為了驗證 Spring Batch 的作業重啓功能,而不是使用 CommandLineRunner,我們添加一個單元測試。
@Test
public void givenItems_whenFailed_thenRestartFromFailure() throws Exception {
// Given
createTestFile("Item1\nItem2\nItem3\nItem4");
JobParameters jobParameters = new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.toJobParameters();
// When
JobExecution firstExecution = jobLauncherTestUtils.launchJob(jobParameters);
assertEquals(BatchStatus.FAILED, firstExecution.getStatus());
Long executionId = firstExecution.getId();
itemProcessor.setFailOnItem3(false);
// Then
JobExecution restartedExecution = jobLauncherTestUtils.launchJob(jobParameters);
assertEquals(BatchStatus.COMPLETED, restartedExecution.getStatus());
assertEquals(
firstExecution.getJobInstance().getInstanceId(),
restartedExecution.getJobInstance().getInstanceId()
);
}測試方法首先執行一個故意在處理 Item3 時失敗的任務(斷言預期 FAILED 狀態)。然後,它修改了處理器行為,不再在該項上失敗。最後,它使用相同的參數重新啓動該任務,以確認它能夠從失敗點成功完成。
該測試驗證了三個關鍵方面。首先,它確認初始失敗按預期進行,測試了故障模擬。然後,它確保重新啓動的任務從失敗點繼續處理,驗證了重啓邏輯。最後,它檢查兩個執行實例屬於同一個任務實例,驗證了實例跟蹤。
4.4. 阻止作業重啓
我們可以使用 preventRestart() 方法來阻止作業重啓。
@Bean
public Job simpleJob(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new JobBuilder("simpleJob", jobRepository)
.start(step1(jobRepository, transactionManager))
.preventRestart()
.build();
}在 JobBuilder 的配置中添加 .preventRestart(),會將任務配置為始終從頭開始執行(例如:Item1),而不是在重新啓動時從失敗點繼續執行(例如:Item3)。 這會覆蓋 Spring Batch 默認行為,即為重啓目的而將任務狀態保存在 JobRepository 中。
5. 結論
Spring Batch 的默認重啓功能能夠實現對作業失敗的健壯恢復,確保失敗的作業能夠從故障點恢復,而無需重新處理已完成的項或丟失數據。
在本文中,我們創建了一個簡單的作業,演示了該重啓功能。我們配置了一個作業以分塊處理項,模擬在第 Item3 > 處發生故障。重啓後,作業從 Item3 > 處恢復,通過了 Item5 >,並以 COMPLETED 狀態成功完成。
我們還探討了如何使用 preventRestart() 在 JobBuilder 上覆蓋此行為。這會強制作業從頭開始運行,就像 Item1 處一樣,而不是從故障點恢復。