Spring Integration Inbound-Channel-Adapter to read large files line-by-line
Spring Integration 4.x has a nice new feature of using Iterator's as messages:
Starting with version 4.1, the AbstractMessageSplitter supports the Iterator type for the value to split.
This allows to send Iterator as messages not reading the whole file into the memory.
Here is a simple example of a Spring Context splitting CSV files into one message per line:
<int-file:inbound-channel-adapter
directory="${inputFileDirectory:/tmp}"
channel="inputFiles"/>
<int:channel id="inputFiles">
<int:dispatcher task-executor="executor"/>
</int:channel>
<int:splitter
input-channel="inputFiles"
output-channel="output">
<bean
class="FileSplitter"
p:commentPrefix="${commentPrefix:#}" />
</int:splitter>
<task:executor
id="executor"
pool-size="${poolSize:8}"
queue-capacity="${aueueCapacity:0}"
rejection-policy="CALLER_RUNS" />
<int:channel id="output"/>
And this is the splitter implementation:
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.Iterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.splitter.AbstractMessageSplitter;
import org.springframework.integration.transformer.MessageTransformationException;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
public class FileSplitter extends AbstractMessageSplitter {
private static final Logger log = LoggerFactory.getLogger(FileSplitter.class);
private String commentPrefix = "#";
public Object splitMessage(Message<?> message) {
if(log.isDebugEnabled()) {
log.debug(message.toString());
}
try {
Object payload = message.getPayload();
Assert.isInstanceOf(File.class, payload, "Expected java.io.File in the message payload");
return new BufferedReaderFileIterator((File) payload);
}
catch (IOException e) {
String msg = "Unable to transform file: " + e.getMessage();
log.error(msg);
throw new MessageTransformationException(msg, e);
}
}
public void setCommentPrefix(String commentPrefix) {
this.commentPrefix = commentPrefix;
}
public class BufferedReaderFileIterator implements Iterator<String> {
private File file;
private BufferedReader bufferedReader;
private String line;
public BufferedReaderFileIterator(File file) throws IOException {
this.file = file;
this.bufferedReader = new BufferedReader(new FileReader(file));
readNextLine();
}
@Override
public boolean hasNext() {
return line != null;
}
@Override
public String next() {
try {
String res = this.line;
readNextLine();
return res;
}
catch (IOException e) {
log.error("Error reading file", e);
throw new RuntimeException(e);
}
}
void readNextLine() throws IOException {
do {
line = bufferedReader.readLine();
}
while(line != null && line.trim().startsWith(commentPrefix));
if(log.isTraceEnabled()) {
log.trace("Read next line: {}", line);
}
if(line == null) {
close();
}
}
void close() throws IOException {
bufferedReader.close();
file.delete();
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
}
}
Please note the Iterator object being returned from the splitMessage() handler method.
Tony Falabella
Updated on June 23, 2022Comments
-
Tony Falabella almost 2 years
I'm currently using Spring Integration 4.1.0 with Spring 4.1.2. I have a requirement to be able to read a file line-by-line and use each line read as a message. Basically I want to allow "replay" for one of our message sources but messages are not saved in individual files but rather in a single file. I have no transaction requirements for this use-case. My requirements are similar to this posting except on a file residing on the same server as the one that the JVM is running on: spring integration - read a remote file line by line
As I see it I have the following options:
1. Use
int-file:inbound-channel-adapter
to read the file then "split" that file so that 1 message now becomes multiple messages. Sample config file:<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-jms="http://www.springframework.org/schema/integration/jms" xmlns:int-file="http://www.springframework.org/schema/integration/file" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd"> <int-file:inbound-channel-adapter id="filereader" directory="/tmp" filename-pattern="myfile.txt" channel="channel1"/> <int-file:file-to-string-transformer input-channel="channel1" output-channel="channel2"/> <int:channel id="channel1"/> <int:splitter input-channel="channel2" output-channel="nullChannel"/> <int:channel id="channel2"/> </beans>
The problem is that the file is very large and when using the above technique the entire file is first read into memory and is then split and the JVM runs out of heap space. Really the steps required are: read a line and convert line to message, send message, remove message from memory, repeat.
-
Use
int-file:tail-inbound-channel-adapter
withend="false"
(which basically indicates to read from the start of the file). Start and stop this adapter as needed for each file (changing the filename before each start). Sample config file:<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-jms="http://www.springframework.org/schema/integration/jms" xmlns:int-file="http://www.springframework.org/schema/integration/file" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd"> <int-file:tail-inbound-channel-adapter id="apache" channel="exchangeSpringQueueChannel" task-executor="exchangeFileReplayTaskExecutor" file="C:\p2-test.txt" delay="1" end="false" reopen="true" file-delay="10000" /> <int:channel id="exchangeSpringQueueChannel" /> <task:executor id="exchangeFileReplayTaskExecutor" pool-size="1" /> </beans>
Have Spring Integration call into Spring Batch and use an
ItemReader
to process the file. Certainly allows more fine-grained controls over the whole process but a fair amount of work to setup what with the job repository and such (and I don't care about the job history so I'd either tell the job to not log status and/or or use the in-memoryMapJobRepository
).
4. Create my own
FileLineByLineInboundChannelAdapter
by extendingMessageProducerSupport
. Much of the code can be borrowed fromApacheCommonsFileTailingMessageProducer
(also see http://forum.spring.io/forum/spring-projects/integration/119897-custom-upd-inbound-channel-adapter). Below is a sample but needs some work to put the reading into it's ownThread
so that I honor thestop()
command while I read line-by-line.package com.xxx.exchgateway.common.util.springintegration; import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStreamReader; import org.apache.commons.io.IOUtils; import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.core.task.TaskExecutor; import org.springframework.integration.core.MessageSource; import org.springframework.integration.endpoint.MessageProducerSupport; import org.springframework.integration.file.FileHeaders; import org.springframework.messaging.Message; import org.springframework.util.Assert; /** * A lot of the logic for this class came from {@link #ApacheCommonsFileTailingMessageProducer}. * See {@link http://forum.spring.io/forum/spring-projects/integration/119897-custom-upd-inbound-channel-adapter} */ public class FileLineByLineInboundChannelAdapter extends MessageProducerSupport implements MessageSource<String> { private volatile File file; /** * The name of the file you wish to tail. * @param file The absolute path of the file. */ public void setFile(File file) { Assert.notNull("'file' cannot be null"); this.file = file; } protected File getFile() { if (this.file == null) { throw new IllegalStateException("No 'file' has been provided"); } return this.file; } @Override public String getComponentType() { return "file:line-by-line-inbound-channel-adapter"; } private void readFile() { FileInputStream fstream; try { fstream = new FileInputStream(getFile()); BufferedReader br = new BufferedReader(new InputStreamReader(fstream)); String strLine; // Read File Line By Line, make sure we honor if someone manually sets the isRunning=false (via clicking the stop() method in JMX) while ((strLine = br.readLine()) != null && isRunning()) { send(strLine); } //Close the input stream IOUtils.closeQuietly(br); IOUtils.closeQuietly(fstream); } catch (FileNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } @Override protected void doStart() { super.doStart(); // TODO this needs to be moved into it's own thread since isRunning() will return "false" until this method has completed // and we want to honor the stop() command while we read line-by-line readFile(); } protected void send(String line) { Message<?> message = this.getMessageBuilderFactory().withPayload(line).setHeader(FileHeaders.FILENAME, this.file.getAbsolutePath()).build(); super.sendMessage(message); } @Override public Message<String> receive() { // TODO Auto-generated method stub return null; } }
It doesn't seem to me that my use-case is outside the realm of typical things people might like to do so I'm surprised that I can't find a solution to it out-of-the-box. I've searched quite a bit however and looked at a lot of the examples and unfortunately have yet to find something that suites my needs.
I'm assuming that perhaps I've missed something obvious that the framework already offers (though perhaps this falls into the blurry-line between Spring Integraton and Spring Batch). Can someone let me know if I'm totally off-base with my ideas or if there's a simple solution that I've missed, or offer alternative suggestions?
-
-
Tony Falabella over 9 yearsThanks for the reply Smollet. This is really helpful. I tried your solution (and even the 2nd updated version) and it has a glitch in that it reads the 1st line of the file and then keeps reading the same line. As such I edited your original post to have slightly modified methods for next() and readNextLine(). I assume (perhaps incorrectly) that most people will want to stop reading the file once all rows of the file have been read. If that is indeed a requirement for people then the update I made also tweaked the hasNext() method to close the file after the last line has been read.
-
Sergey Shcherbakov over 9 yearsInteresting point! I've been debugging similar problem (same file being read in a loop) last couple of hours. The reader needs to be closed indeed, but your change hasn't fixed my current problem. I suspect that the file:inbound-channel-adapter keeps sending a message with the same file over and over (if not to set prevent-duplicates="true" on the inbound-channel-adapter). Also it would be nice to delete the file after it has been processed.
-
Sergey Shcherbakov over 9 yearsI took liberty to update the code snippet with the changes that helped me to finish file processing in Spring XD environment. @TonyFalabella it should be compatible with your needs too (just comment file.delete() out if not needed).
-
Sergey Shcherbakov over 9 yearsHere is even a better Splitter implementation: stackoverflow.com/questions/27171978/…
-
Tony Falabella over 9 yearsI've looked at Artem's reply and like to do as much as possible within SpEL without writing code so I'll go with his suggestion. Thanks again Smollet.
-
Sergey Shcherbakov over 9 yearsJust be aware, that that code doesn't close the stream automatically. It also doesn't contain comments filtering (which can probably be done with a filter down-stream). It also doesn't have a place to delete file after it has been read. Plus I prefer to use SpEL as less as possible because of performance overhead.
-
Tony Falabella over 9 yearsYes I noticed that too. Definately a few benefits to what you wrote. Thanks for pointing that out.