Default ForkJoinPool executor taking long time

14,600

Solution 1

Check ForkJoinPool.commonPool() size. By default it creates a pool with a size of

Runtime.getRuntime().availableProcessors() - 1

I run your example on my Intel i7-4800MQ (4 cores + 4 virtual cores) and the size of common pool in my case is 7, so the whole computation took ~2000 ms:

ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-4
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-6
ForkJoinPool.commonPool-worker-5
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-7
ForkJoinPool.commonPool-worker-4
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-1
Processed 10 tasks in 2005 millis
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]

In second case you used

Executors.newFixedThreadPool(Math.min(tasks.size(), 10));

so the pool has 10 threads ready to perform calculation, so all tasks are run in ~1000 ms:

pool-1-thread-1
pool-1-thread-2
pool-1-thread-3
pool-1-thread-4
pool-1-thread-5
pool-1-thread-6
pool-1-thread-7
pool-1-thread-8
pool-1-thread-9
pool-1-thread-10
Processed 10 tasks in 1002 millis
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]

Difference between ForkJoinPool and ExecutorService

Eugene in his comment mentioned also one more important thing. ForkJoinPool uses work-stealing approach:

A ForkJoinPool differs from other kinds of ExecutorService mainly by virtue of employing work-stealing: all threads in the pool attempt to find and execute tasks submitted to the pool and/or created by other active tasks (eventually blocking waiting for work if none exist). This enables efficient processing when most tasks spawn other subtasks (as do most ForkJoinTasks), as well as when many small tasks are submitted to the pool from external clients. Especially when setting asyncMode to true in constructors, ForkJoinPools may also be appropriate for use with event-style tasks that are never joined.

while ExecutorService created with .newFixedThreadPool() uses divide and conquer approach.

How to determine pool size?

There was a question about what is the best thread pool size, you may find useful information there:

Setting Ideal size of Thread Pool

Also this thread is a good place to investigate:

Custom thread pool in Java 8 parallel stream

Solution 2

Further checking the solutions on internet, i found that we can change the default pool size which the ForkJoinPool takes using the following properties:

-Djava.util.concurrent.ForkJoinPool.common.parallelism=16

So, this property can further help out in making the ForkJoinPool to be utilised with more efficient manner and with more parallelism.

Share:
14,600

Related videos on Youtube

KayV
Author by

KayV

Software Architect with experience in developing and designing highly scalable applications using JAVA, Spring, Hibernate, Multi-threading, Tomcat, JBOSS, Oracle, Cassandra, MongoDB, Elastic Search, Apache Camel, Web Services, Pentaho BI & Reporting Tool, Big Data and Hadoop, Map Reduce, HDFS, Sqoop, HIVE, PIG, SPARK, FLUME, HBASE, ZooKeeper, ActiveMQ, Aerospike, Redis, Caching, Distributed Architecture, AWS Cloud, Kafka, Spark SQL, Spark Streaming, Scala.

Updated on June 28, 2022

Comments

  • KayV
    KayV almost 2 years

    I am working with the CompletableFuture for async execution of a stream generated from a list source.

    so i am testing the overloaded method i.e. "supplyAsync" of CompletableFuture in which one method takes only single supplier parameter and other takes a supplier parameter and an executor parameter. Here is the documentation for both:

    one

    supplyAsync(Supplier supplier)

    Returns a new CompletableFuture that is asynchronously completed by a task running in the ForkJoinPool.commonPool() with the value obtained by calling the given Supplier.

    second

    supplyAsync(Supplier supplier, Executor executor)

    Returns a new CompletableFuture that is asynchronously completed by a task running in the given executor with the value obtained by calling the given Supplier.

    And here is my test class:

    public class TestCompleteableAndParallelStream {
    
        public static void main(String[] args) {
            List<MyTask> tasks = IntStream.range(0, 10)
                    .mapToObj(i -> new MyTask(1))
                    .collect(Collectors.toList());
            
            useCompletableFuture(tasks);
            
            useCompletableFutureWithExecutor(tasks);
    
        }
        
        public static void useCompletableFutureWithExecutor(List<MyTask> tasks) {
              long start = System.nanoTime();
              ExecutorService executor = Executors.newFixedThreadPool(Math.min(tasks.size(), 10));
              List<CompletableFuture<Integer>> futures =
                  tasks.stream()
                       .map(t -> CompletableFuture.supplyAsync(() -> t.calculate(), executor))
                       .collect(Collectors.toList());
             
              List<Integer> result =
                  futures.stream()
                         .map(CompletableFuture::join)
                         .collect(Collectors.toList());
              long duration = (System.nanoTime() - start) / 1_000_000;
              System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
              System.out.println(result);
              executor.shutdown();
            }
        
        public static void useCompletableFuture(List<MyTask> tasks) {
              long start = System.nanoTime();
              List<CompletableFuture<Integer>> futures =
                  tasks.stream()
                       .map(t -> CompletableFuture.supplyAsync(() -> t.calculate()))
                       .collect(Collectors.toList());
             
              List<Integer> result =
                  futures.stream()
                         .map(CompletableFuture::join)
                         .collect(Collectors.toList());
              long duration = (System.nanoTime() - start) / 1_000_000;
              System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
              System.out.println(result);
            }
        
        
    
    }
    
    
    class MyTask {
          private final int duration;
          public MyTask(int duration) {
            this.duration = duration;
          }
          public int calculate() {
            System.out.println(Thread.currentThread().getName());
            try {
              Thread.sleep(duration * 1000);
            } catch (final InterruptedException e) {
              throw new RuntimeException(e);
            }
            return duration;
          }
        }
    

    while the "useCompletableFuture" method takes around 4 seconds to complete, "useCompletableFutureWithExecutor" method takes only 1 second to complete.

    No my question is, What different processing does the ForkJoinPool.commonPool() which could do the overhead? In that shouldn't we always prefer the custom executor pool over ForkJoinPool?

  • Szymon Stepniak
    Szymon Stepniak almost 7 years
    KayV, good point. It was explained in details here stackoverflow.com/a/21172732/2194470
  • Eugene
    Eugene almost 7 years
    @SzymonStepniak you actually need to warm-up the VM pretty good to make any sane conclusions about speed and this code does not. Also having more threads then actual CPUs (even virtual) is bad.
  • Eugene
    Eugene almost 7 years
    @SzymonStepniak also the underlying idea of how these pools works is completely different - one divides the work (divide and conquer) and the other does work stealing - completely different implementations
  • Holger
    Holger almost 7 years
    It must be emphasized that using more threads than number of cores helps greatly in this contrived example, where the job consists of Thread.sleep. For real tasks, the default parallelism of the common pool might be more reasonable. The work-stealing has no relevance here, as in the end, the worker threads just process the queued tasks in either case. Neither uses a “divide and conquer approach” here.
  • Holger
    Holger almost 7 years
    @Eugene: I suggest, you re-read the posts there. First of all, “work-stealing” and “divide and conquer” are not contradicting things, in fact, both are usually attributed to the F/J framework. Since other thread pool executors, like the one constructed via newFixedThreadPool() have only a single queue, they always do some kind of “work-stealing”, though it doesn’t make much sense to call it that way, as you need local queues to call it an actual stealing. On the other hand, “divide and conquer” is a problem solving strategy you could implement atop any executor, but preferably atop F/J.