How to process a list of objects in parallel processing in Java

16,639

Solution 1

You can use a ThreadPoolExecutor, it will take care of load balance. Tasks will be distributed on different threads.

Here is an example:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Test {

    public static void main(String[] args) {

        // Fixed thread number
        ExecutorService service = Executors.newFixedThreadPool(10);

        // Or un fixed thread number
        // The number of threads will increase with tasks
        // ExecutorService service = Executors.newCachedThreadPool(10);

        List<Object> objects = new ArrayList<>();
        for (Object o : objects) {
            service.execute(new MyTask(o));
        }

        // shutdown
        // this will get blocked until all task finish
        service.shutdown();
        try {
            service.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static class MyTask implements Runnable {
        Object target;

        public MyTask(Object target) {
            this.target = target;
        }

        @Override
        public void run() {
            // business logic at here
        }
    }
}

Solution 2

There are many options for processing a list in parallel:

Use a parallel stream:

objects.stream().parallel().forEach(object -> {
    //Your work on each object goes here, using object
})

Use an executor service to submit tasks if you want to use a pool with more threads than the fork-join pool:

ExecutorService es = Executors.newFixedThreadPool(10);
for(Object o: objects) {
    es.submit(() -> {
        //code here using Object o...
    }
}

This preceding example is essentially the same as the traditional executor service, running tasks on separate threads.

As an alternative to these, you can also submit using the completable future:

//You can also just run a for-each and manually add each
//feature to a list
List<CompletableFuture<Void>> futures = 
    objects.stream().map(object -> CompletableFuture.runAsync(() -> {
    //Your work on each object goes here, using object
})

You can then use the futures object to check the status of each execution if that's required.

Solution 3

Split list into multiple sub-lists and use multi threading to process each sub-lists parallel.

public class ParallelProcessListElements {
    public void processList (int numberofthreads,List<Object>tempList, 
            Object obj, Method method){

                final int sizeofList=tempList.size();
                final int sizeofsublist = sizeofList/numberofthreads;
                List<Thread> threadlist = new ArrayList<Thread>();

                for(int i=0;i<numberofthreads;i++) {
                    int firstindex = i*sizeofsublist;
                    int lastindex = i*sizeofsublist+sizeofsublist;
                    if(i==numberofthreads-1)
                        lastindex=sizeofList;

                    List<Object> subList=tempList.subList(firstindex,lastindex );

                    Thread th = new Thread(()->{
                                try{method.invoke(obj, subList);}catch(Exception e) {e.printStackTrace();}
                            });

                    threadlist.add(th);
                }

                threadlist.forEach(th->{th.start();try{Thread.sleep(10);}catch(Exception e) {}});
    }

}

public class Demo {
    public static void main(String[] args) {

        List<Object> tempList= new ArrayList<Object>();
        /**
         * Adding values to list... For Demo purpose..
         */
        for(int i=0;i<500;i++)
            tempList.add(i);

        ParallelProcessListElements process = new ParallelProcessListElements();
        final int numberofthreads = 5;
        Object obj = new Demo();
        Method method=null;

        try{ method=Demo.class.getMethod("printList", List.class);}catch(Exception e) {}
        /**
         * Method Call...
         */
        process.processList(numberofthreads,tempList,obj,method);
    }

    public void printList(List<Integer>list) {
        /**
         * Business logic to process the list...
         */
        list.forEach(item->{
            try{Thread.sleep(1000);}catch(Exception e) {}
            System.out.println(item);
            });
    }
}

Share:
16,639
SSV
Author by

SSV

Software Engineer with technical expertise on C++, Python,Java, MYSQL, Shell, Core PhP, Pro C

Updated on June 04, 2022

Comments

  • SSV
    SSV almost 2 years

    I have a list of objects in Java like thousand objects in a List and I am iterating the List for every object and further processing . The same processing is hapening for every objects. This sequentail approach is taking much time for processing so, I want to achieve with parallel processing in Java. I checked executor framework in Java but I got stuck in it.

    I thought one approach to implement my requirement.

    I want to implement some fixed number of minimum objects will be processed by each thread so that each thread do its work and process objects in a quick manner. How can I acheive this ? Or If any other approach is ther for implementing my requirement pls share.

    Eg:

    List objects = new List();

    For(Object object : objects) { //Doing some common operation for all Objects

    }

  • SSV
    SSV about 6 years
    The Executor service you have mentioned contains a pool of 10 threads. In my case suppose I am having a 1 thousand objects in a list or can vary . The each thread will process 1 object and for 10 threads will process 10 orders simultaneously. What would be the good approach to decide to create a pool of threads? As my list elements will vary
  • SSV
    SSV about 6 years
    If I take a pool of more threads then will it create memory issue? I am thinking to convert the list of obje ts to sub lists
  • ernest_k
    ernest_k about 6 years
    @SSV I picked 10 randomly. Obviously, you'll consider the correct number of threads based on your actual input and also based on the available number of cores on your system....
  • SSV
    SSV about 6 years
    Like I am having list of 1000 objects and will create 10 sublists of 100 objects and then pass each sublist to each thread. Would it be a good approach?
  • ernest_k
    ernest_k about 6 years
    Why would you want to batch them in sublists of elements? I'm of the opinion that processing your list at the finest level allows you to optimally make use of your cpu power. If you're worried about memory-related consequences, then the first option (parallel stream) is going to be the safest for you. It will run cpuCount -1 threads in parallel and process in that fashion until your list is exhausted. You probably need to test and find out for fact which works best for you
  • SSV
    SSV about 6 years
    Thread pooling approach looks good but in my case Like I am having list of 1000 objects and creating a pool of large number of thread would be a gud approach to process each object?
  • xingbin
    xingbin about 6 years
    What will you do to every single object?
  • SSV
    SSV about 6 years
    I am thinking to create a sublist of elements of parent list like I am having a list of 1 thousand objects and like each sublist will have some fixed number of objects and then I will pass each sublist to execute method of executer service so that each thread process more number of objects rather than processing only one. Would this approach be good,
  • SSV
    SSV about 6 years
    I need to fetch one attribute of an each object and pass that attribute of some third party rest api and will get the response and then on th basis of response , I will update the same attribute in each object
  • SSV
    SSV about 6 years
    I am thinking to batch them into sublist bcoz for each object I need to do further processing which may consume some time. Like I need to fetch one attribute of an each object and pass that attribute of some third party rest api and will get the response and then on the basis of response , I will update the same attribute in each object . That is why I am worried about memory consequences if I use executor service of creating large nunber of thread pool. That is why I wanted to convert parent list into sublists so that Thread pool size will also not go much longer
  • ernest_k
    ernest_k about 6 years
    Well, that depends on your logic. But if the downstream processing doesn't offer any benefit for processing in batches, then you shouldn't. You would be keeping threads busy for their allocated batches, but if some batches are processed more quickly than others, you would waste resources and probably wait unnecessarily.
  • xingbin
    xingbin about 6 years
    @SSV Seems the majority of the procedure is waiting for response from the third party api, then executor is good to handle this kind of asynchronous job. You do not need create sublists. It does not help. The threads in the executor will get resued. What you need to do is find a suitable thread number. Executors.newFixedThreadPool(thread number);
  • SSV
    SSV about 6 years
    Okay.. As I am getting objects into a list is also from third party api which is one time process and for each object need to further processing as mentioned in above comment so it is difficult to decide the exact thread pool number. Sometimes may be third part api returns like 1 thousand objects or may be 2 thousand objects or may be in hundreds.
  • SSV
    SSV about 6 years
    One another approach I am thinking I will only direct my program to parallel processingonly if I get objects more than some number like 500 but even then deciding thread pool number is difficult and need to check the cores of the system on which it will be hosted whether does it supports large nunber of thread pool or not
  • SSV
    SSV about 6 years
    Yes, the downstream processing api is taking approx 2 seconds of time to get the response for parameter I am passing for each object.
  • xingbin
    xingbin about 6 years
  • ernest_k
    ernest_k about 6 years
    @SSV Then it's better NOT to batch in sub-lists if you know it takes some time to process each entry. If processing per-element takes exactly the same amount of time, it could make sense (then again you would have to use as many threads as there are cpus max), but the better choice would be not to batch as each element's processing time would probably differ.
  • SSV
    SSV about 6 years
    Thanks for the mentioned link.. it is helpful. The whole idea is to improve parallelism in the program is to find number of processors in your system. Creating more number of threads can not improve performance until you don't have a system that can provide multiple cores. I will check and come to know which system we are gonna use for deployment. Thanks
  • user2862544
    user2862544 almost 6 years
    @SSV , which solution you finalized finally ?
  • user2862544
    user2862544 almost 6 years
    @Ernst which solution among above three gives better performance ? in case of fixedthreadpool if subsequent request comes than again new pool will get created in that case thread counts will keep increasing with every subsequent request , what do you suggest in that case ?
  • ernest_k
    ernest_k almost 6 years
    @user2862544 that will have to be tested, although I believe the bottom two will have better performance given that they offer more control over thread pool size. I'd personally use the last one, but calling runAsync with an executor service (second argument). The first one is OK too, but it uses the default fork-join pool.
  • user2862544
    user2862544 almost 6 years
    Dear Ernest, could please give your thought on scenarios like in case of fixedthreadpool if subsequent request comes than again n again new pools will get created in that case thread counts will keep increasing with every subsequent request
  • ernest_k
    ernest_k almost 6 years
    @user2862544 One shouldn't need to create a thread pool each time. There should be one shared thread pool with sufficient number of threads in it. Then all concurrent tasks will be submitted to the same executor service. It will just be important to carefully choose pool size and test according to anticipated load.
  • user2862544
    user2862544 almost 6 years
    @Ernest, let suppose i need to write a method abc(List<Object> list) which gets a list of objects and i need to do some processing on each object. so i need to pass that object to a task and sumbit to executer. but that method abc() may get called multiple times parellely with different lists, will shared pool work in that scnereio? and is the right approach?
  • ernest_k
    ernest_k almost 6 years
    @user2862544 Absolutely. abc() will maintain the list iteration and will end up returning consistent results for its input. But what matters more is that you make sure that the maximum number of tasks to be submitted to the executor service at any given time doesn't cause an abnormal wait queue. And for this, you need to test in order to pick the right pool sizes.
  • user2862544
    user2862544 almost 6 years
    @Ernest, in that case if abc() get called 5 times with list of >1000 elements and if processing of one element take 2-3 seconds then with thread pool size = no of core , will keep tasks in waiting state for long time , will tasks get rejected ? what do you suggest? what should thread pool size is this scenerio?
  • ernest_k
    ernest_k almost 6 years