Improve multi-thread indexing with lucene

13,474

Solution 1

If you want to parallelize indexing, there are two things you can do:

  • parallelizing calls to addDocument,
  • increasing the maximum thread count of your merge scheduler.

You are on the right path to parallelize calls to addDocuments, but spawning one thread per document will not scale as the number of documents you need to index will grow. You should rather use a fixed-size ThreadPoolExecutor. Since this task is mainly CPU-intensive (depending on your analyzer and the way you retrieve your data), setting the number of CPUs of your computer as the maximum number of threads might be a good start.

Regarding the merge scheduler, you can increase the maximum number of threads which can be used with the setMaxThreadCount method of ConcurrentMergeScheduler. Beware that disks are much better at sequential reads/writes than random read/writes, as a consequence setting a too high maximum number of threads to your merge scheduler is more likely to slow indexing down than to speed it up.

But before trying to parallelizing your indexing process, you should probably try to find where the bottleneck is. If your disk is too slow, the bottleneck is likely to be the flush and the merge steps, as a consequence parallelizing calls to addDocument (which essentially consists in analyzing a document and buffering the result of the analysis in memory) will not improve indexing speed at all.

Some side notes:

  • There is some ongoing work in the development version of Lucene in order to improve indexing parallelism (the flushing part especially, this blog entry explains how it works).

  • Lucene has a nice wiki page on how to improve indexing speed where you will find other ways to improve indexing speed.

Solution 2

I think the more modern way to do this is use a ThreadPoolExecutor and submit a Runnable that's doing your indexing. You can wait for all threads to terminate using .awaitTermination, or a CountdownLatch.

I'm not a big fan of having your main class extend Thread, just create a runnable inner class that takes its depdencies in a constructor. This makes your code more readable, as the work the threads are doing are clearly separated from your application setup code.

A few notes on style, I'm not a big fan of having your main class throw Exception, this usually just means you don't have a clear idea of the different checked exception cases the code you are using can throw. Usually it's not the right thing to be doing unless you have a very specific reason.

Share:
13,474
orezvani
Author by

orezvani

Updated on June 05, 2022

Comments

  • orezvani
    orezvani almost 2 years

    I am trying to build my indexes in Lucene with multiple threads. So, I started my coding and wrote the following code. First I find the files and for each file, I create a thread to index it. After that I join the threads and optimize the indexes. It works but I'm not sure... can I trust it in large scale? Is there any way to improve it?

    import java.io.File;
    import java.io.FileFilter;
    import java.io.FileReader;
    import java.io.IOException;
    import java.io.File;
    import java.io.FileReader;
    import java.io.BufferedReader;
    import org.apache.lucene.index.IndexWriter;
    import org.apache.lucene.document.Field;
    import org.apache.lucene.document.Document;
    import org.apache.lucene.store.RAMDirectory;
    import org.apache.lucene.analysis.standard.StandardAnalyzer;
    import org.apache.lucene.analysis.StopAnalyzer;
    import org.apache.lucene.index.IndexReader;
    import org.apache.lucene.store.Directory;
    import org.apache.lucene.store.FSDirectory;
    import org.apache.lucene.util.Version;
    import org.apache.lucene.index.TermFreqVector;
    
    public class mIndexer extends Thread {
    
        private File ifile;
        private static IndexWriter writer;
    
        public mIndexer(File f) {
        ifile = f.getAbsoluteFile();
        }
    
        public static void main(String args[]) throws Exception {
        System.out.println("here...");
    
        String indexDir;
            String dataDir;
        if (args.length != 2) {
            dataDir = new String("/home/omid/Ranking/docs/");
            indexDir = new String("/home/omid/Ranking/indexes/");
        }
        else {
            dataDir = args[0];
            indexDir = args[1];
        }
    
        long start = System.currentTimeMillis();
    
        Directory dir = FSDirectory.open(new File(indexDir));
        writer = new IndexWriter(dir,
        new StopAnalyzer(Version.LUCENE_34, new File("/home/omid/Desktop/stopwords.txt")),
        true,
        IndexWriter.MaxFieldLength.UNLIMITED);
        int numIndexed = 0;
        try {
            numIndexed = index(dataDir, new TextFilesFilter());
        } finally {
            long end = System.currentTimeMillis();
            System.out.println("Indexing " + numIndexed + " files took " + (end - start) + " milliseconds");
            writer.optimize();
            System.out.println("Optimization took place in " + (System.currentTimeMillis() - end) + " milliseconds");
            writer.close();
        }
        System.out.println("Enjoy your day/night");
        }
    
        public static int index(String dataDir, FileFilter filter) throws Exception {
        File[] dires = new File(dataDir).listFiles();
        for (File d: dires) {
            if (d.isDirectory()) {
            File[] files = new File(d.getAbsolutePath()).listFiles();
            for (File f: files) {
                if (!f.isDirectory() &&
                !f.isHidden() &&
                f.exists() &&
                f.canRead() &&
                (filter == null || filter.accept(f))) {
                    Thread t = new mIndexer(f);
                    t.start();
                    t.join();
                }
            }
            }
        }
        return writer.numDocs();
        }
    
        private static class TextFilesFilter implements FileFilter {
        public boolean accept(File path) {
            return path.getName().toLowerCase().endsWith(".txt");
        }
        }
    
        protected Document getDocument() throws Exception {
        Document doc = new Document();
        if (ifile.exists()) {
            doc.add(new Field("contents", new FileReader(ifile), Field.TermVector.YES));
            doc.add(new Field("path", ifile.getAbsolutePath(), Field.Store.YES, Field.Index.NOT_ANALYZED));
            String cat = "WIR";
            cat = ifile.getAbsolutePath().substring(0, ifile.getAbsolutePath().length()-ifile.getName().length()-1);
            cat = cat.substring(cat.lastIndexOf('/')+1, cat.length());
            //doc.add(new Field("category", cat.subSequence(0, cat.length()), Field.Store.YES));
            //System.out.println(cat.subSequence(0, cat.length()));
        }
        return doc;
        }
    
        public void run() {
        try {
            System.out.println("Indexing " + ifile.getAbsolutePath());
            Document doc = getDocument();
            writer.addDocument(doc);
        } catch (Exception e) {
            System.out.println(e.toString());
        }
    
        }
    }
    

    Any hep is regarded.

  • orezvani
    orezvani about 12 years
    Thank you in advance. Actually I implemented Runnable which was a nice idea and used ThreadPoolExecutor which solved a real bug in the program mentioned by jpountz.
  • orezvani
    orezvani about 12 years
    I really appreciate your help. Your comment on the number of threads was really usefull. I didn't mention that before...
  • Michael-O
    Michael-O almost 12 years
    The downside of awaitTermination is that it does not wait for all threads to finish but will exit after n time units. :-( A loop is necessary.
  • Adams.H
    Adams.H almost 10 years
    agree with that , this will turn out that the IndexWriter coundn't close properly. and the writer_lock will still exist even the index Directory isn't being manipulate by the indexwriter .