Spring Integration Inbound-Channel-Adapter to read large files line-by-line

17,096

Spring Integration 4.x has a nice new feature of using Iterator's as messages:

Spring Integration Reference

Starting with version 4.1, the AbstractMessageSplitter supports the Iterator type for the value to split.

This allows to send Iterator as messages not reading the whole file into the memory.

Here is a simple example of a Spring Context splitting CSV files into one message per line:

<int-file:inbound-channel-adapter 
        directory="${inputFileDirectory:/tmp}"
        channel="inputFiles"/>

<int:channel id="inputFiles">
    <int:dispatcher task-executor="executor"/>
</int:channel>

<int:splitter 
    input-channel="inputFiles" 
    output-channel="output">
    <bean 
        class="FileSplitter" 
        p:commentPrefix="${commentPrefix:#}" />
</int:splitter>

<task:executor 
    id="executor" 
    pool-size="${poolSize:8}" 
    queue-capacity="${aueueCapacity:0}" 
    rejection-policy="CALLER_RUNS" />

<int:channel id="output"/>

And this is the splitter implementation:

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.Iterator;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.splitter.AbstractMessageSplitter;
import org.springframework.integration.transformer.MessageTransformationException;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;

public class FileSplitter extends AbstractMessageSplitter {
    private static final Logger log = LoggerFactory.getLogger(FileSplitter.class);

    private String commentPrefix = "#";

    public Object splitMessage(Message<?> message) {
        if(log.isDebugEnabled()) {
            log.debug(message.toString());
        }
        try {

            Object payload = message.getPayload();
            Assert.isInstanceOf(File.class, payload, "Expected java.io.File in the message payload"); 

            return new BufferedReaderFileIterator((File) payload);
        } 
        catch (IOException e) {
            String msg = "Unable to transform file: " + e.getMessage();
            log.error(msg);
            throw new MessageTransformationException(msg, e);
        }
    }

    public void setCommentPrefix(String commentPrefix) {
        this.commentPrefix = commentPrefix;
    }

    public class BufferedReaderFileIterator implements Iterator<String> {

        private File file;
        private BufferedReader bufferedReader;
        private String line;

        public BufferedReaderFileIterator(File file) throws IOException {
            this.file = file;
            this.bufferedReader = new BufferedReader(new FileReader(file));
            readNextLine();
        }

        @Override
        public boolean hasNext() {
            return line != null;
        }

        @Override
        public String next() {
            try {
                String res = this.line;
                readNextLine();
                return res;
            } 
            catch (IOException e) {
                log.error("Error reading file", e);
                throw new RuntimeException(e);
            }   
        }

        void readNextLine() throws IOException {
            do {
                line = bufferedReader.readLine();
            }
            while(line != null && line.trim().startsWith(commentPrefix));

            if(log.isTraceEnabled()) {
                log.trace("Read next line: {}", line);
            }

            if(line == null) {
                close();
            }
        }

        void close() throws IOException {
            bufferedReader.close();
            file.delete();
        }

        @Override
        public void remove() {
            throw new UnsupportedOperationException();
        }

    }

}

Please note the Iterator object being returned from the splitMessage() handler method.

Share:
17,096
Tony Falabella
Author by

Tony Falabella

Updated on June 23, 2022

Comments

  • Tony Falabella
    Tony Falabella almost 2 years

    I'm currently using Spring Integration 4.1.0 with Spring 4.1.2. I have a requirement to be able to read a file line-by-line and use each line read as a message. Basically I want to allow "replay" for one of our message sources but messages are not saved in individual files but rather in a single file. I have no transaction requirements for this use-case. My requirements are similar to this posting except on a file residing on the same server as the one that the JVM is running on: spring integration - read a remote file line by line

    As I see it I have the following options:

    1. Use int-file:inbound-channel-adapter to read the file then "split" that file so that 1 message now becomes multiple messages. Sample config file:

        <?xml version="1.0" encoding="UTF-8"?>
        <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-jms="http://www.springframework.org/schema/integration/jms" xmlns:int-file="http://www.springframework.org/schema/integration/file" xmlns:task="http://www.springframework.org/schema/task"
            xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd
                http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
                http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
                http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
                http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
                http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
    
            <int-file:inbound-channel-adapter id="filereader" directory="/tmp" filename-pattern="myfile.txt" channel="channel1"/>
            <int-file:file-to-string-transformer input-channel="channel1" output-channel="channel2"/>
            <int:channel id="channel1"/>
            <int:splitter input-channel="channel2" output-channel="nullChannel"/>
            <int:channel id="channel2"/>
        </beans>
    

    The problem is that the file is very large and when using the above technique the entire file is first read into memory and is then split and the JVM runs out of heap space. Really the steps required are: read a line and convert line to message, send message, remove message from memory, repeat.

    1. Use int-file:tail-inbound-channel-adapter with end="false" (which basically indicates to read from the start of the file). Start and stop this adapter as needed for each file (changing the filename before each start). Sample config file:

      <?xml version="1.0" encoding="UTF-8"?>
      <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-jms="http://www.springframework.org/schema/integration/jms" xmlns:int-file="http://www.springframework.org/schema/integration/file" xmlns:task="http://www.springframework.org/schema/task"
          xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd
              http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
              http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
              http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
              http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
              http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
              http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
      
          <int-file:tail-inbound-channel-adapter id="apache"
              channel="exchangeSpringQueueChannel"
              task-executor="exchangeFileReplayTaskExecutor"
              file="C:\p2-test.txt"
              delay="1"
              end="false"
              reopen="true"
              file-delay="10000" />
      
          <int:channel id="exchangeSpringQueueChannel" />
          <task:executor id="exchangeFileReplayTaskExecutor" pool-size="1" />
      </beans>
      
    2. Have Spring Integration call into Spring Batch and use an ItemReader to process the file. Certainly allows more fine-grained controls over the whole process but a fair amount of work to setup what with the job repository and such (and I don't care about the job history so I'd either tell the job to not log status and/or or use the in-memory MapJobRepository).

    4. Create my own FileLineByLineInboundChannelAdapter by extending MessageProducerSupport. Much of the code can be borrowed from ApacheCommonsFileTailingMessageProducer (also see http://forum.spring.io/forum/spring-projects/integration/119897-custom-upd-inbound-channel-adapter). Below is a sample but needs some work to put the reading into it's own Thread so that I honor the stop() command while I read line-by-line.

        package com.xxx.exchgateway.common.util.springintegration;
    
        import java.io.BufferedReader;
        import java.io.File;
        import java.io.FileInputStream;
        import java.io.FileNotFoundException;
        import java.io.IOException;
        import java.io.InputStreamReader;
        import org.apache.commons.io.IOUtils;
        import org.springframework.core.task.SimpleAsyncTaskExecutor;
        import org.springframework.core.task.TaskExecutor;
        import org.springframework.integration.core.MessageSource;
        import org.springframework.integration.endpoint.MessageProducerSupport;
        import org.springframework.integration.file.FileHeaders;
        import org.springframework.messaging.Message;
        import org.springframework.util.Assert;
    
        /**
         * A lot of the logic for this class came from {@link #ApacheCommonsFileTailingMessageProducer}.
         * See {@link http://forum.spring.io/forum/spring-projects/integration/119897-custom-upd-inbound-channel-adapter}
         */
        public class FileLineByLineInboundChannelAdapter extends MessageProducerSupport implements MessageSource<String> {
            private volatile File file;
    
            /**
             * The name of the file you wish to tail.
             * @param file The absolute path of the file.
             */
            public void setFile(File file) {
                Assert.notNull("'file' cannot be null");
                this.file = file;
            }
    
            protected File getFile() {
                if (this.file == null) {
                    throw new IllegalStateException("No 'file' has been provided");
                }
                return this.file;
            }
    
            @Override
            public String getComponentType() {
                return "file:line-by-line-inbound-channel-adapter";
            }
    
            private void readFile() {
                FileInputStream fstream;
                try {
                    fstream = new FileInputStream(getFile());
    
                    BufferedReader br = new BufferedReader(new InputStreamReader(fstream));
    
                    String strLine;
    
                    // Read File Line By Line, make sure we honor if someone manually sets the isRunning=false (via clicking the stop() method in JMX)
                    while ((strLine = br.readLine()) != null && isRunning()) {
                        send(strLine);
                    }
    
                    //Close the input stream
                    IOUtils.closeQuietly(br);
                    IOUtils.closeQuietly(fstream);
                } catch (FileNotFoundException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
    
            @Override
            protected void doStart() {
                super.doStart();
    
                // TODO this needs to be moved into it's own thread since isRunning() will return "false" until this method has completed
                // and we want to honor the stop() command while we read line-by-line
                readFile();
            }
    
            protected void send(String line) {
                Message<?> message = this.getMessageBuilderFactory().withPayload(line).setHeader(FileHeaders.FILENAME, this.file.getAbsolutePath()).build();
                super.sendMessage(message);
            }
    
            @Override
            public Message<String> receive() {
                // TODO Auto-generated method stub
                return null;
            }
        }
    

    It doesn't seem to me that my use-case is outside the realm of typical things people might like to do so I'm surprised that I can't find a solution to it out-of-the-box. I've searched quite a bit however and looked at a lot of the examples and unfortunately have yet to find something that suites my needs.

    I'm assuming that perhaps I've missed something obvious that the framework already offers (though perhaps this falls into the blurry-line between Spring Integraton and Spring Batch). Can someone let me know if I'm totally off-base with my ideas or if there's a simple solution that I've missed, or offer alternative suggestions?

  • Tony Falabella
    Tony Falabella over 9 years
    Thanks for the reply Smollet. This is really helpful. I tried your solution (and even the 2nd updated version) and it has a glitch in that it reads the 1st line of the file and then keeps reading the same line. As such I edited your original post to have slightly modified methods for next() and readNextLine(). I assume (perhaps incorrectly) that most people will want to stop reading the file once all rows of the file have been read. If that is indeed a requirement for people then the update I made also tweaked the hasNext() method to close the file after the last line has been read.
  • Sergey Shcherbakov
    Sergey Shcherbakov over 9 years
    Interesting point! I've been debugging similar problem (same file being read in a loop) last couple of hours. The reader needs to be closed indeed, but your change hasn't fixed my current problem. I suspect that the file:inbound-channel-adapter keeps sending a message with the same file over and over (if not to set prevent-duplicates="true" on the inbound-channel-adapter). Also it would be nice to delete the file after it has been processed.
  • Sergey Shcherbakov
    Sergey Shcherbakov over 9 years
    I took liberty to update the code snippet with the changes that helped me to finish file processing in Spring XD environment. @TonyFalabella it should be compatible with your needs too (just comment file.delete() out if not needed).
  • Sergey Shcherbakov
    Sergey Shcherbakov over 9 years
    Here is even a better Splitter implementation: stackoverflow.com/questions/27171978/…
  • Tony Falabella
    Tony Falabella over 9 years
    I've looked at Artem's reply and like to do as much as possible within SpEL without writing code so I'll go with his suggestion. Thanks again Smollet.
  • Sergey Shcherbakov
    Sergey Shcherbakov over 9 years
    Just be aware, that that code doesn't close the stream automatically. It also doesn't contain comments filtering (which can probably be done with a filter down-stream). It also doesn't have a place to delete file after it has been read. Plus I prefer to use SpEL as less as possible because of performance overhead.
  • Tony Falabella
    Tony Falabella over 9 years
    Yes I noticed that too. Definately a few benefits to what you wrote. Thanks for pointing that out.