Spring Batch - Looping a reader/processor/writer step

10,330

Don't instantiate your Steps, Readers, Processers and Writers as Spring-Beans. There is no need to do it. Only your job instance has to be a Spring Bean.

So just remove the @Bean and @StepScope configuration from your step, reader, writer and processor creater methods and instantiate them where needed.

There is only one catch, you have to call afterPropertiesSet() manually. E.g.:

// @Bean -> delete
// @StepScope -> delete
public FlatFileItemWriter writer(@Value("#{jobExecutionContext[fileName]}") String fileName){
    FlatFileItemWriter writer = new FlatFileItemWriter();
    writer.setResource(new FileSystemResource(new File(TARGET_LOCATION + fileName + TARGET_FILE_EXTENSION)));       
    writer.setLineAggregator(new DelimitedLineAggregator(){{
        setDelimiter(TARGET_DELIMITER);
        setFieldExtractor(new PassThroughFieldExtractor());
        }}
    );

    // ------- ADD!!
    writer.afterPropertiesSet();

    return writer;
}

This way, your step, reader, writer instances will be "step scoped" automatically, since you instantiate them for every step explicitely.

Let me know, if my answer is not clear enough. I will then add a more detailed example.

EDIT

A simple example:

@Configuration
public class MyJobConfiguration {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;


    List<String> filenames = Arrays.asList("file1.txt", "file2.txt");

    @Bean
    public Job myJob() {

       List<Step> steps = filenames.stream().map(name -> createStep(filename));

       return jobBuilderFactory.get("subsetJob")               
            .start(createParallelFlow(steps));                
            .end()
            .build();
    }


    // helper method to create a step
    private Step createStep(String filename) {
    {
        return stepBuilderFactory.get("convertStepFor" + filename); // !!! Stepname has to be unique
            .chunk(100_000)
            .reader(createFileReader(new FileSystemResource(new File(filename)), new YourInputLineMapper()));
            .processor(new YourConversionProcessor());
            .writer(createFileWriter(new FileSystemResource(new File("converted_"+filename)), new YourOutputLineAggregator()));
            .build();
    }


    // helper method to create a split flow out of a List of steps
    private static Flow createParallelFlow(List<Step> steps) {
        SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
        taskExecutor.setConcurrencyLimit(steps.size());

        List<Flow> flows = steps.stream() // we have to convert the steps to a flows
            .map(step -> //
                    new FlowBuilder<Flow>("flow_" + step.getName()) //
                    .start(step) //
                    .build()) //
            .collect(Collectors.toList());

        return new FlowBuilder<SimpleFlow>("parallelStepsFlow").split(taskExecutor) //
            .add(flows.toArray(new Flow[flows.size()])) //
            .build();
    }


    // helper methods to create filereader and filewriters
    public static <T> ItemReader<T> createFileReader(Resource source, LineMapper<T> lineMapper) throws Exception {
        FlatFileItemReader<T> reader = new FlatFileItemReader<>();

        reader.setEncoding("UTF-8");
        reader.setResource(source);
        reader.setLineMapper(lineMapper);
        reader.afterPropertiesSet();

        return reader;
    }

    public static <T> ItemWriter<T> createFileWriter(Resource target, LineAggregator<T> aggregator) throws Exception {
        FlatFileItemWriter<T> writer = new FlatFileItemWriter<>();

        writer.setEncoding("UTF-8");
        writer.setResource(target);
        writer.setLineAggregator(aggregator);

        writer.afterPropertiesSet();
        return writer;
    }
}
Share:
10,330
Sander_M
Author by

Sander_M

Updated on July 18, 2022

Comments

  • Sander_M
    Sander_M almost 2 years

    ANSWER

    Based on the accepted answer code the following adjustment to that code worked for me:

    // helper method to create a split flow out of a List of steps
    private static Flow createParallelFlow(List<Step> steps) {
        SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
        taskExecutor.setConcurrencyLimit(steps.size());         
    
        Flow[] flows = new Flow[steps.size()];
        for (int i = 0; i < steps.size(); i++) {
            flows[i] = new FlowBuilder<SimpleFlow>(steps.get(i).getName()).start(steps.get(i)).build();
        }           
    
        return new FlowBuilder<SimpleFlow>("parallelStepsFlow")
            .split(taskExecutor)                
            .add(flows)
            .build();
    }
    

    EDIT

    I have updated the question to a version that correctly loops, but as the application will scale, being able to proces parallel is important, and I still don't know how to do that with a javaconfig dynamically at runtime...

    Refined question: How do I create a reader-processor-writer dynamically at runtime for say 5 different situations (5 queries means a loop of 5 as it is configured now)?

    My LoopDecider looks like this:

    public class LoopDecider implements JobExecutionDecider {
    
        private static final Logger LOG = LoggerFactory.getLogger(LoopDecider.class);
        private static final String COMPLETED = "COMPLETED";
        private static final String CONTINUE = "CONTINUE";
        private static final String ALL = "queries";
        private static final String COUNT = "count";
    
        private int currentQuery;
        private int limit;
    
        @SuppressWarnings("unchecked")
        @Override
        public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
            List<String> allQueries = (List<String>) jobExecution.getExecutionContext().get(ALL);
            this.limit = allQueries.size();
            jobExecution.getExecutionContext().put(COUNT, currentQuery);
            if (++currentQuery >= limit) {
                return new FlowExecutionStatus(COMPLETED);
            } else {
                LOG.info("Looping for query: " + allQueries.get(currentQuery - 1));
                return new FlowExecutionStatus(CONTINUE);
            }       
        }
    
    }
    

    Based on a list of queries (HQL queries) I want a reader - processor - writer for each query. My current configuration looks like this:

    Job

    @Bean
    public Job subsetJob() throws Exception {
        LoopDecider loopDecider = new LoopDecider();        
        FlowBuilder<Flow> flowBuilder = new FlowBuilder<>(FLOW_NAME);
        Flow flow = flowBuilder
                .start(createHQL())
                .next(extractData())
                .next(loopDecider)
                .on("CONTINUE")
                .to(extractData())
                .from(loopDecider)
                .on("COMPLETED")                
                .end()
                .build();       
    
        return jobBuilderFactory.get("subsetJob")               
                .start(flow)                
                .end()
                .build();
    }
    

    Step

    public Step extractData(){
        return stepBuilderFactory.get("extractData")
                .chunk(100_000)
                .reader(reader())
                .processor(processor())
                .writer(writer())
                .build();
    }
    

    Reader

    public HibernateCursorItemReader reader(){      
        CustomHibernateCursorItemReader reader = new CustomHibernateCursorItemReader();
        reader.setSessionFactory(HibernateUtil.getSessionFactory());        
        reader.setUseStatelessSession(false);
        return reader;
    }
    

    Processor

    public DynamicRecordProcessor processor(){
        return new DynamicRecordProcessor();
    }
    

    Writer

    public FlatFileItemWriter writer(){
        CustomFlatFileItemWriter writer = new CustomFlatFileItemWriter();               
        writer.setLineAggregator(new DelimitedLineAggregator(){{
            setDelimiter(TARGET_DELIMITER);
            setFieldExtractor(new PassThroughFieldExtractor());
            }}
        );
        return writer;
    }
    

    Currently the process works fine for a single query. However, I actually have a list of queries.

    My initial idea is to loop the step and pass the step the list of queries and for each query read - process - write. This would also be ideal for parallel chunking.

    However, when I add the list of queries as parameter to the extractData step and for each query I create a step, a list of steps is returned, instead of the expected single step. The job starts complaining it expects a single step instead of a list of steps.

    Another idea was to create a custom MultiHibernateCursorItemReader with the same idea as the MultiItemResourceReader, however I am really looking for a more out-of-the-box solution.

    @Bean
    public List<Step> extractData(@Value("#{jobExecutionContext[HQL]}") List<String> queries){
        List<Step> steps = new ArrayList<Step>();
        for (String query : queries) {
            steps.add(stepBuilderFactory.get("extractData")
                .chunk(100_000)
                .reader(reader(query))
                .processor(processor())
                .writer(writer(query))
                .build());
        }
        return steps;
    }
    

    Question
    How do I loop the step and integrate that in the job?

  • Sander_M
    Sander_M about 8 years
    Thank you for detailed answer @Hansjoerg. A definite improvement for sure, so thats great. As I am moving away from XML configuration to Java config these answers are really helpful. I will implement these improvements ASAP and I will accept the answer if I can get it to work intended. However, I do not yet see how this will enable the reader-processor-writer to go into a loop. I have also checked out the possibilty of a deciderLoopJobConfiguration like solution. (stackoverflow.com/questions/24307020/…). Perhaps I should try that.
  • Hansjoerg Wingeier
    Hansjoerg Wingeier about 8 years
    We have completely stopped using XML-configuration. In my opinion, there is no need to do use it anymore. The java configuration provides so much flexibility. You can build your jobs completely dynamically at runtime; you can use lambdas to implement simple reader, writer, processor; and you can use all other available java features and features of the IDE.
  • Hansjoerg Wingeier
    Hansjoerg Wingeier about 8 years
    concerning using the loop example: one drawback is that you cannot parallelize your steps.
  • Sander_M
    Sander_M about 8 years
    Allright, I finally got my hands dirty. I implemented all your tips and I also redid the jobExecutionContext part with @BeforeSteps methods that load in jobExecutionContext in the reader-processor-writer. The code already looks alot cleaner, thanks again! Somehow, no errors occured when running the job without setting the writer/reader afterPropertiesSet method (but hey, it works!) I am really curious to see a small example of the dynamically created job at runtime, in this case the creation of the reader-processor-writer. Perhaps you would be so kind to share such an example?
  • Sander_M
    Sander_M about 8 years
    I got it to work, thank you very much. The only line that caused me some issues was: Flow[] flows = steps.toArray(new Flow[steps.size()]); It throwed an ArrayStoreException. I have added the code that I used to make that part working at the top of my question.
  • Hansjoerg Wingeier
    Hansjoerg Wingeier about 8 years
    Yes, I made a mistake. I corrected my answer a couple of hours ago. I did forget that I had to wrap the steps into flows.
  • Broshet
    Broshet over 3 years
    Hello, is there any possibility to use this solution with dynamic list of files ? I my case, It's the previous step of my job wich generate all the file, so in my flow, I have to check the new files when it' start because they don't exist à the initial start of my job
  • Hansjoerg Wingeier
    Hansjoerg Wingeier over 3 years
    I would suggest to split your job i two separated jobs. The first creates the list of files and stores the names in a file. Then you can create a second job and using the list as input. You will need to have two jobs, since you cannot. You can't to that in the same job, since the whole job structure is defined when you instantiate your job. After that it is not possible to change the structure of the job.
  • Broshet
    Broshet over 3 years
    Thanks, If I am not mistaken, the instantiation of the two jobs will take place initially before the execution of the first job, right? So the list of files produced by job 1 will not be known? (In case the two jobs are executed sequentially in the same Spring Boot application)
  • Hansjoerg Wingeier
    Hansjoerg Wingeier over 3 years
    no, you cannot do that. you have to finish the first job. and after that you can instantiate the second job. in order to instantiate the second job, you need to have the results of the first one. so you need to orchestrate them, resp. have to two SpringBoot Apps that are launched in sequence.