How to read huge CSV file in Mule

10,240

Solution 1

As SteveS said, the csv-to-maps-transformer might try to load the entire file to memory before process it. What you can try to do is split the csv file in smaller parts and send those parts to VM to be processed individually. First, create a component to achieve this first step:

public class CSVReader implements Callable{
    @Override
    public Object onCall(MuleEventContext eventContext) throws Exception {

        InputStream fileStream = (InputStream) eventContext.getMessage().getPayload();
        DataInputStream ds = new DataInputStream(fileStream);
        BufferedReader br = new BufferedReader(new InputStreamReader(ds));

        MuleClient muleClient = eventContext.getMuleContext().getClient();

        String line;
        while ((line = br.readLine()) != null) {
            muleClient.dispatch("vm://in", line, null);
        }

        fileStream.close();
        return null;
    }
}

Then, split your main flow in two

<file:connector name="File" 
    workDirectory="yourWorkDirPath" autoDelete="false" streaming="true"/>

<flow name="CsvToFile" doc:name="Split and dispatch">
    <file:inbound-endpoint path="inboxPath"
        moveToDirectory="processedPath" pollingFrequency="60000"
        doc:name="CSV" connector-ref="File">
        <file:filename-wildcard-filter pattern="*.csv"
            caseSensitive="true" />
    </file:inbound-endpoint>
    <component class="it.aizoon.grpBuyer.AddMessageProperty" doc:name="Add Message Property" />
    <component class="com.dgonza.CSVReader" doc:name="Split the file and dispatch every line to VM" />
</flow>

<flow name="storeInDatabase" doc:name="receive lines and store in database">
    <vm:inbound-endpoint exchange-pattern="one-way"
        path="in" doc:name="VM" />
    <Choice>
        .
        .
        Your JDBC Stuff
        .
        .
    <Choice />
</flow>

Maintain your current file-connector configuration to enable streaming. With this solution the csv data can be processed without the need to load the entire file to memory first. HTH

Solution 2

I believe that the csv-to-maps-transformer is going to force the whole file into memory. Since you are dealing with one large file, personally, I would tend to just write a Java class to handle it. The File endpoint will pass a filestream to your custom transformer. You can then make a JDBC connection and pick off the information a row at a time without having to load the whole file. I have used OpenCSV to parse the CSV for me. So your java class would contain something like the following:

protected Object doTransform(Object src, String enc) throws TransformerException {  

    try {
        //Make a JDBC connection here

        //Now read and parse the CSV

        FileReader csvFileData = (FileReader) src;


        BufferedReader br = new BufferedReader(csvFileData);
        CSVReader reader = new CSVReader(br);

        //Read the CSV file and add the row to the appropriate List(s)
        String[] nextLine;
        while ((nextLine = reader.readNext()) != null) {
            //Push your data into the database through your JDBC connection
        }
        //Close connection.

               }catch (Exception e){
    }
Share:
10,240
Paride Letizia
Author by

Paride Letizia

Updated on June 04, 2022

Comments

  • Paride Letizia
    Paride Letizia almost 2 years

    I'am using Mule Studio 3.4.0 Community Edition. I have a big problem about how to parse a large CSV file incoming with File Endpoint. The scenario is that I have 3 CSV files and I would putting the files'content into a database. But when I try to load a huge file (about 144MB) I get the "OutOfMemory" Exception. I thought as solution to divide/split my the large CSV into smaller size CSVs (I don't know if this solution is the best) o try to find a way to process CSV without throwing an exception.

    <file:connector name="File" autoDelete="true" streaming="true" validateConnections="true" doc:name="File"/>
    
    <flow name="CsvToFile" doc:name="CsvToFile">
            <file:inbound-endpoint path="src/main/resources/inbox" moveToDirectory="src/main/resources/processed"  responseTimeout="10000" doc:name="CSV" connector-ref="File">
                <file:filename-wildcard-filter pattern="*.csv" caseSensitive="true"/>
            </file:inbound-endpoint>
            <component class="it.aizoon.grpBuyer.AddMessageProperty" doc:name="Add Message Property"/>
            <choice doc:name="Choice">
                <when expression="INVOCATION:nome_file=azienda" evaluator="header">
                    <jdbc-ee:csv-to-maps-transformer delimiter="," mappingFile="src/main/resources/companies-csv-format.xml" ignoreFirstRecord="true" doc:name="CSV2Azienda"/>
                    <jdbc-ee:outbound-endpoint exchange-pattern="one-way" queryKey="InsertAziende" queryTimeout="-1" connector-ref="jdbcConnector" doc:name="Database Azienda">
                        <jdbc-ee:query key="InsertAziende" value="INSERT INTO aw006_azienda VALUES (#[map-payload:AW006_ID], #[map-payload:AW006_ID_CLIENTE], #[map-payload:AW006_RAGIONE_SOCIALE])"/>
                    </jdbc-ee:outbound-endpoint>
                </when>
                <when expression="INVOCATION:nome_file=servizi" evaluator="header">
                    <jdbc-ee:csv-to-maps-transformer delimiter="," mappingFile="src/main/resources/services-csv-format.xml" ignoreFirstRecord="true" doc:name="CSV2Servizi"/>
                    <jdbc-ee:outbound-endpoint exchange-pattern="one-way" queryKey="InsertServizi" queryTimeout="-1" connector-ref="jdbcConnector" doc:name="Database Servizi">
                        <jdbc-ee:query key="InsertServizi" value="INSERT INTO ctrl_aemd_unb_servizi VALUES (#[map-payload:CTRL_ID_TIPO_OPERAZIONE], #[map-payload:CTRL_DESCRIZIONE], #[map-payload:CTRL_COD_SERVIZIO])"/>
                    </jdbc-ee:outbound-endpoint>
                </when>
                <when expression="INVOCATION:nome_file=richiesta" evaluator="header">
                    <jdbc-ee:csv-to-maps-transformer delimiter="," mappingFile="src/main/resources/requests-csv-format.xml" ignoreFirstRecord="true" doc:name="CSV2Richiesta"/>
                    <jdbc-ee:outbound-endpoint exchange-pattern="one-way" queryKey="InsertRichieste" queryTimeout="-1" connector-ref="jdbcConnector" doc:name="Database Richiesta">
                        <jdbc-ee:query key="InsertRichieste" value="INSERT INTO ctrl_aemd_unb_richiesta VALUES (#[map-payload:CTRL_ID_CONTROLLER], #[map-payload:CTRL_NUM_RICH_VENDITORE], #[map-payload:CTRL_VENDITORE], #[map-payload:CTRL_CANALE_VENDITORE], #[map-payload:CTRL_CODICE_SERVIZIO], #[map-payload:CTRL_STATO_AVANZ_SERVIZIO], #[map-payload:CTRL_DATA_INSERIMENTO])"/>
                    </jdbc-ee:outbound-endpoint>
                </when>
            </choice>   
        </flow>
    

    Please, I do not know how to fix this problem. Thanks in advance for any kind of help