知識庫 / Spring / Spring Boot RSS 訂閱

在 Spring Batch 失敗時重試任務

Spring Boot
HongKong
6
10:43 AM · Dec 06 ,2025

1. 簡介

Spring Batch 提供強大的機制來重啓失敗的任務。這些機制允許任務從失敗點繼續處理。這種功能對於高效地處理大規模數據處理任務至關重要。

Spring Batch 內置的 <em >JobRepository</em> 存儲了任務執行狀態。這使得任務具有默認的重啓能力。因此,一個失敗的任務可以精確地從它中斷的地方繼續執行。這確保了沒有重複處理或數據丟失的發生。

在本教程中,我們將探討如何配置和有效地重啓一個失敗的 Spring Batch 任務。

2. Maven 依賴

讓我們首先導入 spring-boot-starter-batchspring-boot-starter-data-jpah2 依賴到我們的 <em>pom.xml</em> 中:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
    <version>3.3.2</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-jpa</artifactId>
    <version>3.3.2</version>
</dependency>
	  
<dependency>
    <groupId>com.h2database</groupId>
    <artifactId>h2</artifactId>
    <version>2.2.224</version>
</dependency>

我們需要基於文件的 H2 數據庫,以便通過在應用程序運行之間持久化作業執行狀態,從而實現作業的可恢復運行.

3. 定義一個簡單的 Spring Batch 任務

本節將探討一個 Spring Batch 任務配置,它演示了一個簡單的批量處理工作流程。我們將定義一個任務,其中包含一個步驟:處理 CSV 文件。

在 Spring Boot 3 中,應避免使用 <em >@EnableBatchProcessing</em >>,因為它會禁用 Spring Boot 有用的自動配置(例如創建 Spring Batch 表的功能)。

3.1. 配置

讓我們創建 BatchConfig 類,該類設置一個名為 simpleJob 的作業:

@Configuration
public class BatchConfig {

    @Bean
    public Job simpleJob(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        return new JobBuilder("simpleJob", jobRepository)
          .start(step1(jobRepository, transactionManager))
          .build();
    }
}

simpleJob Bean 使用 JobBuilder 定義批處理作業。該作業包含一個步驟:step1,它讀取、處理和寫入 CSV 文件:

@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
    return new StepBuilder("step1", jobRepository)
      .<String, String>chunk(2, transactionManager)
      .reader(flatFileItemReader())
      .processor(itemProcessor())
      .writer(itemWriter())
      .build();
}

步驟 1 使用 StepBuilder 定義了基於分塊的步驟。該步驟以每次處理兩項數據為批次,從 FlatFileItemReader 讀取字符串,使用 ItemProcessor 進行轉換,並使用 ItemWriter 寫入結果 – 這一切都由 PlatformTransactionManager 管理的事務控制,以確保數據完整性。

JobRepository 存儲了步驟的執行狀態,包括 FlatFileItemReader 的位置,從而實現從上次未提交的塊中恢復執行,如果作業失敗。

3.2. 項目讀取器

讓我們定義 flatFileItemReader Bean 以提供輸入數據:

@Bean
@StepScope
public FlatFileItemReader<String> flatFileItemReader() {
    return new FlatFileItemReaderBuilder<String>()
      .name("itemReader")
      .resource(new ClassPathResource("data.csv"))
      .lineMapper(new PassThroughLineMapper())
      .saveState(true)
      .build();
}

代碼定義了一個名為 FlatFileItemReader<String> 的 Bean,@StepScope 確保每個步驟執行時創建一個新的實例,從而實現正確的狀態管理和可重啓功能。它從 classpath 路徑下的 data.csv 文件讀取字符串,並使用 PassThroughLineMapper 將每一行映射為字符串。

此外,它具有 saveState(true) 功能,以持久化其在 ExecutionContext 中的讀取位置。這允許讀取器在作業失敗後從上次未處理的行恢復,利用 JobRepository 進行狀態持久化。

為了使作業真正可重啓,ItemReader 必須在 Spring Batch 的執行上下文中持久化其狀態。 例如,像 FlatFileItemReader 這樣的讀取器會自動保存關鍵進度信息(例如行號或記錄計數)在塊之間。

3.3. 物品處理器

讓我們聲明 itemProcessor Bean,用於轉換輸入數據:

@Bean
public RestartItemProcessor itemProcessor() {
    return new RestartItemProcessor();
}

static class RestartItemProcessor implements ItemProcessor<String, String> {
    private boolean failOnItem3 = true;

    public void setFailOnItem3(boolean failOnItem3) {
        this.failOnItem3 = failOnItem3;
    }

    @Override
    public String process(String item) throws Exception {
        System.out.println("Processing: " + item + " (failOnItem3=" + failOnItem3 + ")");
        if (failOnItem3 && item.equals("Item3")) {
            throw new RuntimeException("Simulated failure on Item3");
        }
        return "PROCESSED " + item;
    }
}

它通過在每個項目前添加 PROCESSED 標記並模擬在 Item3 上的失敗來處理每個項目。

3.4. Item Writer

現在,讓我們創建 itemWriter Bean,用於輸出處理後的數據:

@Bean
public ItemWriter<String> itemWriter() {
    return items -> {
      System.out.println("Writing items:");
      for (String item : items) {
          System.out.println("- " + item);
      }
    };
}

此命令會將處理後的項目打印到控制枱。現在,我們的應用程序已準備就緒。

4. 重新啓動失敗的 Spring Batch 任務

Spring Batch 任務默認設計為可重啓,從而能夠無縫地從故障點恢復執行。因此,無需進行任何額外的配置即可啓用此功能。

為了確保其有效運行,任務狀態必須保存在 JobRepository 中。 此外,JobRepository 必須由數據庫支持,以確保可靠地存儲和檢索任務的執行狀態。

以下子章節描述瞭如何模擬任務失敗並重新啓動它。

4.1. 模擬作業失敗

為了模擬作業失敗,ItemProcessor 被配置為在處理 Item3 時拋出 RuntimeException。當失敗發生在 Item3 時,JobRepository 會存儲該狀態,並將作業標記為 FAILED

運行應用程序時使用 mvn spring-boot:run 產生如下結果:

Starting new job execution...
Processing: Item1
Processing: Item2
Writing items:
- PROCESSED Item1
- PROCESSED Item2
Processing: Item3
[Exception: Simulated failure on Item3]
Job started with status: FAILED

這段輸出確認Item1Item2 已被處理並寫入,但 Item3 的失敗會停止任務,狀態會被保留以便後續重啓。

4.2. 重新啓動作業

要重新啓動失敗的作業,我們使用 CommandLineRunner 通過使用 JobExplorer 和固定 JobParameters 檢測失敗的作業實例。

@Bean
CommandLineRunner run(JobLauncher jobLauncher, Job job, JobExplorer jobExplorer,
    JobOperator jobOperator, BatchConfig.RestartItemProcessor itemProcessor) {
    return args -> {
      JobParameters jobParameters = new JobParametersBuilder()
        .addString("jobId", "test-job-" + System.currentTimeMillis())
        .toJobParameters();

      List<JobInstance> instances = jobExplorer.getJobInstances("simpleJob", 0, 1);
      if (!instances.isEmpty()) {
          JobInstance lastInstance = instances.get(0);
          List<JobExecution> executions = jobExplorer.getJobExecutions(lastInstance);
          if (!executions.isEmpty()) {
              JobExecution lastExecution = executions.get(0);
              if (lastExecution.getStatus() == BatchStatus.FAILED) {
                  itemProcessor.setFailOnItem3(false);

                  JobExecution restartedExecution = jobLauncher.run(job, jobParameters);

                  // final Long restartId = jobOperator.restart(lastExecution.getId());
                  // final JobExecution restartedExecution = jobExplorer.getJobExecution(restartedExecution);

                  // ...
              }
          }
      }

    };
}

代碼通過 JobExplorer 檢查作業倉庫中是否存在任何 simpleJob 實例。

它檢測到執行失敗,狀態為 FAILED。它使用 JobLauncher.run()JobOperator.restart() 來恢復該作業。該作業從其上一次持久化狀態開始恢復,從而確保先前處理過的項目不會被重新處理。

為了在重啓期間忽略 Item3 的失敗,我們在啓動重啓後的作業之前設置 itemProcessor.setFailOnItem3(false),允許 RestartItemProcessor 在不拋出異常的情況下處理 Item3

現在,我們重新運行該應用程序。

讓我們檢查輸出:

Restarting failed job execution with ID: [execution_id]
Processing: Item3
Processing: Item4
Writing items:
- PROCESSED Item3
- PROCESSED Item4
Processing: Item5
Writing items:
- PROCESSED Item5
Restarted job status: COMPLETED

Spring Batch 任務在 Item3 處失敗。它成功從失敗點重新啓動。然後,該任務分塊處理 Item3Item5,寫入結果,並以 COMPLETED 狀態完成。

4.3. 驗證作業重啓功能

為了驗證 Spring Batch 的作業重啓功能,而不是使用 CommandLineRunner,我們添加一個單元測試。

@Test
public void givenItems_whenFailed_thenRestartFromFailure() throws Exception {
    // Given
    createTestFile("Item1\nItem2\nItem3\nItem4");

    JobParameters jobParameters = new JobParametersBuilder()
      .addLong("time", System.currentTimeMillis())
      .toJobParameters();

    // When
    JobExecution firstExecution = jobLauncherTestUtils.launchJob(jobParameters);
    assertEquals(BatchStatus.FAILED, firstExecution.getStatus());

    Long executionId = firstExecution.getId();

    itemProcessor.setFailOnItem3(false);

    // Then
    JobExecution restartedExecution = jobLauncherTestUtils.launchJob(jobParameters);

    assertEquals(BatchStatus.COMPLETED, restartedExecution.getStatus());

    assertEquals(
      firstExecution.getJobInstance().getInstanceId(),
      restartedExecution.getJobInstance().getInstanceId()
    );
}

測試方法首先執行一個故意在處理 Item3 時失敗的任務(斷言預期 FAILED 狀態)。然後,它修改了處理器行為,不再在該項上失敗。最後,它使用相同的參數重新啓動該任務,以確認它能夠從失敗點成功完成。

該測試驗證了三個關鍵方面。首先,它確認初始失敗按預期進行,測試了故障模擬。然後,它確保重新啓動的任務從失敗點繼續處理,驗證了重啓邏輯。最後,它檢查兩個執行實例屬於同一個任務實例,驗證了實例跟蹤。

4.4. 阻止作業重啓

我們可以使用 preventRestart() 方法來阻止作業重啓。

@Bean
public Job simpleJob(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
    return new JobBuilder("simpleJob", jobRepository)
      .start(step1(jobRepository, transactionManager))
      .preventRestart()
      .build();
}

JobBuilder 的配置中添加 .preventRestart(),會將任務配置為始終從頭開始執行(例如:Item1),而不是在重新啓動時從失敗點繼續執行(例如:Item3)。 這會覆蓋 Spring Batch 默認行為,即為重啓目的而將任務狀態保存在 JobRepository 中。

5. 結論

Spring Batch 的默認重啓功能能夠實現對作業失敗的健壯恢復,確保失敗的作業能夠從故障點恢復,而無需重新處理已完成的項或丟失數據。

在本文中,我們創建了一個簡單的作業,演示了該重啓功能。我們配置了一個作業以分塊處理項,模擬在第 Item3 > 處發生故障。重啓後,作業從 Item3 > 處恢復,通過了 Item5 >,並以 COMPLETED 狀態成功完成。

我們還探討了如何使用 preventRestart()JobBuilder 上覆蓋此行為。這會強制作業從頭開始運行,就像 Item1 處一樣,而不是從故障點恢復。

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.