当前位置:主页 > 查看内容

配置 Spring Batch 批处理失败重试

发布时间:2021-09-08 00:00| 位朋友查看

简介:1. 引言 默认情况下,Spring批处理作业在执行过程中出现任何错误都会失败。然而有些时候,为了提高应用程序的弹性,我们就需要处理这类间歇性的故障。在这篇短文中,我们就来一起探讨 如何在Spring批处理框架中配置重试逻辑。 如果对spring batch不了解,可……

 1. 引言

默认情况下,Spring批处理作业在执行过程中出现任何错误都会失败。然而有些时候,为了提高应用程序的弹性,我们就需要处理这类间歇性的故障。在这篇短文中,我们就来一起探讨 如何在Spring批处理框架中配置重试逻辑。

如果对spring batch不了解,可以参考以前的一篇文章:

开车!Spring Batch 入门级示例教程!

2. 简单举例

假设有一个批处理作业,它读取一个CSV文件作为输入:

  1. username, userid, transaction_date, transaction_amount 
  2. sammy, 1234, 31/10/2015, 10000 
  3. john, 9999, 3/12/2015, 12321 

然后,它通过访问REST端点来处理每条记录,获取用户的 age 和 postCode 属性:

  1. public class RetryItemProcessor implements ItemProcessor<TransactionTransaction> { 
  2.      
  3.     @Override 
  4.     public Transaction process(Transaction transaction) throws IOException { 
  5.         log.info("RetryItemProcessor, attempting to process: {}"transaction); 
  6.         HttpResponse response = fetchMoreUserDetails(transaction.getUserId()); 
  7.         //parse user's age and postCode from response and update transaction 
  8.         ... 
  9.         return transaction
  10.     } 
  11.     ... 

最后,它生成并输出一个合并的XML:

  1. <transactionRecord> 
  2.     <transactionRecord> 
  3.         <amount>10000.0</amount> 
  4.         <transactionDate>2015-10-31 00:00:00</transactionDate> 
  5.         <userId>1234</userId> 
  6.         <username>sammy</username> 
  7.         <age>10</age> 
  8.         <postCode>430222</postCode> 
  9.     </transactionRecord> 
  10.     ... 
  11. </transactionRecord> 

3. ItemProcessor 中添加重试

现在假设,如果到REST端点的连接由于某些网络速度慢而超时,该怎么办?如果发生这种情况,则我们的批处理工作将失败。

在这种情况下,我们希望失败的 item 处理重试几次。因此,接下来我将批处理作业配置为:在出现故障时执行最多三次重试:

  1. @Bean 
  2. public Step retryStep( 
  3.   ItemProcessor<TransactionTransaction> processor, 
  4.   ItemWriter<Transaction> writer) throws ParseException { 
  5.     return stepBuilderFactory 
  6.       .get("retryStep"
  7.       .<TransactionTransaction>chunk(10) 
  8.       .reader(itemReader(inputCsv)) 
  9.       .processor(processor) 
  10.       .writer(writer) 
  11.       .faultTolerant() 
  12.       .retryLimit(3) 
  13.       .retry(ConnectTimeoutException.class) 
  14.       .retry(DeadlockLoserDataAccessException.class) 
  15.       .build(); 

这里调用了 faultTolerant() 来启用重试功能。另外,我们使用 retry 和 retryLimit 分别定义符合重试条件的异常和 item 的最大重试次数。

4. 测试重试次数

假设我们有一个测试场景,其中返回 age 和 postCode 的REST端点关闭了一段时间。在这个测试场景中,我们只对前两个 API 调用获取一个 ConnectTimeoutException ,而第三个调用将成功:

  1. @Test 
  2. public void whenEndpointFailsTwicePasses3rdTime_thenSuccess() throws Exception { 
  3.     FileSystemResource expectedResult = new FileSystemResource(EXPECTED_OUTPUT); 
  4.     FileSystemResource actualResult = new FileSystemResource(TEST_OUTPUT); 
  5.  
  6.     when(httpResponse.getEntity()) 
  7.       .thenReturn(new StringEntity("{ \"age\":10, \"postCode\":\"430222\" }")); 
  8.   
  9.     //fails for first two calls and passes third time onwards 
  10.     when(httpClient.execute(any())) 
  11.       .thenThrow(new ConnectTimeoutException("Timeout count 1")) 
  12.       .thenThrow(new ConnectTimeoutException("Timeout count 2")) 
  13.       .thenReturn(httpResponse); 
  14.  
  15.     JobExecution jobExecution = jobLauncherTestUtils 
  16.       .launchJob(defaultJobParameters()); 
  17.     JobInstance actualJobInstance = jobExecution.getJobInstance(); 
  18.     ExitStatus actualJobExitStatus = jobExecution.getExitStatus(); 
  19.  
  20.     assertThat(actualJobInstance.getJobName(), is("retryBatchJob")); 
  21.     assertThat(actualJobExitStatus.getExitCode(), is("COMPLETED")); 
  22.     AssertFile.assertFileEquals(expectedResult, actualResult); 

在这里,我们的工作成功地完成了。另外,从日志中可以明显看出 第一条记录 id=1234 失败了两次,最后在第三次重试时成功了:

  1. 19:06:57.742 [main] INFO  o.s.batch.core.job.SimpleStepHandler - Executing step: [retryStep] 
  2. 19:06:57.758 [main] INFO  o.b.batch.service.RetryItemProcessor - Attempting to process user with id=1234 
  3. 19:06:57.758 [main] INFO  o.b.batch.service.RetryItemProcessor - Attempting to process user with id=1234 
  4. 19:06:57.758 [main] INFO  o.b.batch.service.RetryItemProcessor - Attempting to process user with id=1234 
  5. 19:06:57.758 [main] INFO  o.b.batch.service.RetryItemProcessor - Attempting to process user with id=9999 
  6. 19:06:57.773 [main] INFO  o.s.batch.core.step.AbstractStep - Step: [retryStep] executed in 31ms 

同样,看下另一个测试用例,当所有重试次数都用完时会发生什么:

  1. @Test 
  2. public void whenEndpointAlwaysFail_thenJobFails() throws Exception { 
  3.     when(httpClient.execute(any())) 
  4.       .thenThrow(new ConnectTimeoutException("Endpoint is down")); 
  5.  
  6.     JobExecution jobExecution = jobLauncherTestUtils 
  7.       .launchJob(defaultJobParameters()); 
  8.     JobInstance actualJobInstance = jobExecution.getJobInstance(); 
  9.     ExitStatus actualJobExitStatus = jobExecution.getExitStatus(); 
  10.  
  11.     assertThat(actualJobInstance.getJobName(), is("retryBatchJob")); 
  12.     assertThat(actualJobExitStatus.getExitCode(), is("FAILED")); 
  13.     assertThat(actualJobExitStatus.getExitDescription(), 
  14.       containsString("org.apache.http.conn.ConnectTimeoutException")); 

在这个测试用例中,在作业因 ConnectTimeoutException 而失败之前,会尝试对第一条记录重试三次。

5. 使用XML配置重试

最后,让我们看一下与上述配置等价的XML:

  1. <batch:job id="retryBatchJob"
  2.     <batch:step id="retryStep"
  3.         <batch:tasklet> 
  4.             <batch:chunk reader="itemReader" writer="itemWriter" 
  5.               processor="retryItemProcessor" commit-interval="10" 
  6.               retry-limit="3"
  7.                 <batch:retryable-exception-classes> 
  8.                     <batch:include class="org.apache.http.conn.ConnectTimeoutException"/> 
  9.                     <batch:include class="org.springframework.dao.DeadlockLoserDataAccessException"/> 
  10.                 </batch:retryable-exception-classes> 
  11.             </batch:chunk> 
  12.         </batch:tasklet> 
  13.     </batch:step> 
  14. </batch:job> 

6. 简单总结

在本文中,我们学习了如何在Spring批处理中配置重试逻辑,其中包括使用Java和XML配置。以及使用单元测试来观察重试在实践中是如何工作的。

本文转载自微信公众号「锅外的大佬」,可以通过以下二维码关注。转载本文请联系锅外的大佬公众号。


本文转载自网络,原文链接:https://mp.weixin.qq.com/s/Mk0v-U33XZJPfrOZszMFww
本站部分内容转载于网络,版权归原作者所有,转载之目的在于传播更多优秀技术内容,如有侵权请联系QQ/微信:153890879删除,谢谢!

推荐图文


随机推荐