Spring Batch Batch - exception handling and fault tolerance mechanisms

Exception handling and restart mechanism

<br/> 1. For Step of type chunk, spring batch provides us with a state to manage it

2. State management is achieved through the ItemStream interface

3.ItemStream interface:

(1) open(): invoked every step execution

(2) Update(): every chunk is called to execute

(3) Close(): All chunk s are called after execution

<br/> ![file](https://graph.baidu.com/resource/222879cfd846b11d8e3ee01583228330.png)          

<br/><br/> Construction examples

Prepare this cvs file, add an incorrect name to the 33rd data, and throw an exception terminating the program when reading this data.

  <br/><br/><br/> ItemReader test code

@Component("restartDemoReader")
public class RestartDemoReader implements ItemStreamReader<Customer> {
 
    // Record the number of rows currently read
    private Long curLine = 0L;
    // Restart State Initial Value
    private boolean restart = false;
 
    private FlatFileItemReader<Customer> reader = new FlatFileItemReader<>();
 
    // Persist information to database
    private ExecutionContext executionContext;
    RestartDemoReader
    public () {
        
        reader.setResource(new ClassPathResource("restartDemo.csv"));
 
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        tokenizer.setNames(new String[]{"id", "firstName", "lastName", "birthdate"});
 
        DefaultLineMapper<Customer> lineMapper = new DefaultLineMapper<>();
        lineMapper.setLineTokenizer(tokenizer);
        lineMapper.setFieldSetMapper((fieldSet -> {
            return Customer.builder().id(fieldSet.readLong("id"))
                    .firstName(fieldSet.readString("firstName"))
                    .lastName(fieldSet.readString("lastName"))
                    .birthdate(fieldSet.readString("birthdate"))
                    .build();
        }));
        lineMapper.afterPropertiesSet();
 
        reader.setLineMapper(lineMapper);
    }
 
    @Override
    public Customer read() throws Exception, UnexpectedInputException, ParseException,
            NonTransientResourceException {
 
        Customer customer = null;
 
        this.curLine++;
        //If restarted, continue with the number of rows read from the previous step
        if (restart) {
            reader.setLinesToSkip(this.curLine.intValue()-1);
            restart = false;
            System.out.println("Start reading from line: " + this.curLine);
        }
 
        reader.open(this.executionContext);
 
        customer = reader.read();
        //When a wrongName is matched, an exception is thrown, terminating the program
        if (customer != null) {
            if (customer.getFirstName().equals("wrongName"))
                throw new RuntimeException("Something wrong. Customer id: " + customer.getId());
        } else {
            curLine--;
        }
        return customer;
    }
 
    /**
     * Determine whether to restart job
     * @param executionContext
     * @throws ItemStreamException
     */
    @Override
    public void open(ExecutionContext executionContext) throws ItemStreamException {
        this.executionContext = executionContext;
				// If the job is restarted, read the number of restarted rows from the database and re-execute from the number of restarted rows
        if (executionContext.containsKey("curLine")) {
            this.curLine = executionContext.getLong("curLine");
            this.restart = true;
        }
				// If you do not restart the job, initialize the number of rows, starting from the first line
				else {
            this.curLine = 0L;
            executionContext.put("curLine", this.curLine.intValue());
        }
 
    }
 
    @Override
    public void update(ExecutionContext executionContext) throws ItemStreamException {
		    // Print the current number of lines for each batch chunk executed
        System.out.println("update curLine: " + this.curLine);
        executionContext.put("curLine", this.curLine);
 
    }
 
    @Override
    public void close() throws ItemStreamException {
 
    }
}

<br/><br/> Job Configuration

Read 10 records as a batch

@Configuration
public class RestartDemoJobConfiguration {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
 
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
 
    @Autowired
    @Qualifier("flatFileDemoWriter")
    private ItemWriter<? super Customer> flatFileDemoWriter;
 
    @Autowired
    @Qualifier("restartDemoReader")
    private ItemReader<Customer> restartDemoReader;
 
    @Bean
    public Job restartDemoJob(){
        return jobBuilderFactory.get("restartDemoJob")
                .start(restartDemoStep())
                .build();
 
    }
 
    @Bean
    public Step restartDemoStep() {
        return stepBuilderFactory.get("restartDemoStep")
                .<Customer,Customer>chunk(10)
                .reader(restartDemoReader)
                .writer(flatFileDemoWriter)
                .build();
    }
}

<br/><br/> When executed for the first time, the program throws an exception on line 33 with a curline value of 30.

<br/><br/> At this point, you can query the database batch_step_excution table and find that the curline values have been persisted into the database as key-value pairs (10 data batches above; therefore, when 33 data exceptions occur, the curline value is 30)

<br/><br/>

Next, update wrongName and execute the program again;

The program executes the open method to determine if there is a curline in the map in the database step, and if there is, it runs again, reading the curline and continuing from that batch.

<br/>

<br/>

<br/>

fault tolerance

<br/> Spring batch's fault tolerance mechanism is a mechanism combined with transaction mechanism, which mainly includes three operations:

  • restart

  • Restart is used for a job and is an operation to restart a job.By default, SpringBatch terminates a task when it has an exception, and when it restarts a task with the same parameters, SpringBatch executes the remaining tasks that are not executed

  • retry

  • Retry is a step of job. If an exception is found while processing a data item, retry the operation of the step of the data item.

  • skip

  • skip is a step of job. If an exception is found when processing a data item, the operation of the step of the data item is skipped.

<br/> <br/>

The restart sample code is as follows, when executed for the first time, there is no field in the context, an exception is thrown, the second execution, the field already exists, and the execution succeeds

<br/>

<br/> Examples of retry and skip are as follows. Change the configuration of the previous step with the following reference code: ``` @Bean public Step stepForTranscation(StepBuilderFactory stepBuilderFactory, @Qualifier("stepForTranscationReader")ListItemReader<String> reader, @Qualifier("stepForTranscationProcessor")ItemProcessor<String, String> processor, @Qualifier("stepForTranscationWriter")ItemWriter<String> writer) { return stepBuilderFactory.get("stepForTranscation") .<String, String> chunk(3) .reader(reader) .processor(processor) .writer(writer)

		// Fault Tolerant Methods
		.faultTolerant()
		// retry count
		.retryLimit(3)
		// Try again when this exception occurs			
		.retry(DataIntegrityViolationException.class)
		// Number of skips
		.skipLimit(1)
		// Skip when this exception occurs	
		.skip(DataIntegrityViolationException.class)
		// Skip error listeners
		.listener(mySkipListener)
		// Number of restarts
		.startLimit(3)
        .build();

}

<br/>


This sets the number of allowed retries to be 3, the maximum number of data allowed to be skipped is 1, and if job fails, the maximum number of run retries is 3.

<br/>
Configure SkipListener after skip to skip errors

public class MySkipListener implements SkipListener<String, String>{ //Listening to be performed when a read skip error occurs public void onSkipInRead(Throwable t){ }

// Listening to be performed when a write skip error occurs
public void onSkipInWrite(String item, Throwable t){
}

// Listening to be performed when skipping errors while processing data
public void onSkipInProcess (String item, Throwable t){
	System.out.println(item + "occur exception" + t);
}

}

<br/>


Rerun the program to get new results:

![file](https://graph.baidu.com/resource/22272cc98e99e987855e701583425345.png)

<br/>

This time, you can see that 11 out of 12 data entered the database, while the long 008008008008 data was not interrupted by the rollback because skip was set, so the fault tolerance mechanism allowed it not to enter the database.

<br/>
Look at the persistent data table for Spring batch:

![file](https://graph.baidu.com/resource/222619a873ec4cd414ce501583425355.png)

You can see that one data was skipped, but because we allowed it to skip, the entire job completed successfully, COMPLETED.



Reference resources:

https://blog.csdn.net/chihe9907/article/details/100601523

Tags: Programming Database Spring

Posted on Mon, 09 Mar 2020 10:14:58 -0700 by bigsid