Purpose of third argument to 'reduce' function in Java 8 functional programming

10,679

Solution 1

Are you talking about this function?

reduce <U> U reduce(U identity,
             BiFunction<U,? super T,U> accumulator,
             BinaryOperator<U> combiner) 

Performs a reduction on the elements of this stream, using the provided identity, accumulation and combining functions. This is equivalent to:

 U result = identity;
 for (T element : this stream)
     result = accumulator.apply(result, element)
 return result;   

but is not constrained to execute sequentially. The identity value must be an identity for the combiner function. This means that for all u, combiner(identity, u) is equal to u. Additionally, the combiner function must be compatible with the accumulator function; for all u and t, the following must hold:

 combiner.apply(u, accumulator.apply(identity, t)) == 
     accumulator.apply(u, t)   

This is a terminal operation.

API Note: Many reductions using this form can be represented more simply by an explicit combination of map and reduce operations. The accumulator function acts as a fused mapper and accumulator, which can sometimes be more efficient than separate mapping and reduction, such as when knowing the previously reduced value allows you to avoid some computation. Type Parameters: U - The type of the result Parameters: identity - the identity value for the combiner function accumulator - an associative, non-interfering, stateless function for incorporating an additional element into a result combiner - an associative, non-interfering, stateless function for combining two values, which must be compatible with the accumulator function Returns: the result of the reduction See Also: reduce(BinaryOperator), reduce(Object, BinaryOperator)

I assume its purpose is to allow parallel computation, and so my guess is that it's only used if the reduction is performed in parallel. If it's performed sequentially, there's no need to use combiner. I do not know this for sure -- I'm just guessing based on the doc comment "[...] is not constrained to execute sequentially" and the many other mentions of "parallel execution" in the comments.

Solution 2

I think Reduction operations paragraph from java.util.stream package summary can answer the question. Let me cite the most important part here:


In its more general form, a reduce operation on elements of type <T> yielding a result of type <U> requires three parameters:

<U> U reduce(U identity,
              BiFunction<U, ? super T, U> accumulator,
              BinaryOperator<U> combiner);

Here, the identity element is both an initial seed value for the reduction and a default result if there are no input elements. The accumulator function takes a partial result and the next element, and produces a new partial result. The combiner function combines two partial results to produce a new partial result. (The combiner is necessary in parallel reductions, where the input is partitioned, a partial accumulation computed for each partition, and then the partial results are combined to produce a final result.) More formally, the identity value must be an identity for the combiner function. This means that for all u, combiner.apply(identity, u) is equal to u. Additionally, the combiner function must be associative and must be compatible with the accumulator function: for all u and t, combiner.apply(u, accumulator.apply(identity, t)) must be equals() to accumulator.apply(u, t).

The three-argument form is a generalization of the two-argument form, incorporating a mapping step into the accumulation step. We could re-cast the simple sum-of-weights example using the more general form as follows:

 int sumOfWeights = widgets.stream()
                           .reduce(0,
                                   (sum, b) -> sum + b.getWeight())
                                   Integer::sum);

though the explicit map-reduce form is more readable and therefore should usually be preferred. The generalized form is provided for cases where significant work can be optimized away by combining mapping and reducing into a single function.


In other words, as far as I understand, the three-argument form is useful in two cases:

  1. When parallel execution matters.
  2. When significant performance optimization can be achieved by combining mapping and accumulation steps. Otherwise more simple and readable explicit map-reduce form can be used.

Explicit form is mentioned previously in the same doc:

int sumOfWeights = widgets.parallelStream()
        .filter(b -> b.getColor() == RED)
        .mapToInt(b -> b.getWeight())
        .sum();

Solution 3

Simple test code to confirm the usage of combiner:

String[] strArray = {"abc", "mno", "xyz"};
List<String> strList = Arrays.asList(strArray);

System.out.println("stream test");
int streamResult = strList.stream().reduce(
        0, 
        (total,s) -> { System.out.println("accumulator: total[" + total + "] s[" + s + "] s.codePointAt(0)[" + s.codePointAt(0) + "]"); return total + s.codePointAt(0); }, 
        (a,b) -> { System.out.println("combiner: a[" + a + "] b[" + b + "]"); return 1000000;}
    );
System.out.println("streamResult: " + streamResult);

System.out.println("parallelStream test");
int parallelStreamResult = strList.parallelStream().reduce(
        0, 
        (total,s) -> { System.out.println("accumulator: total[" + total + "] s[" + s + "] s.codePointAt(0)[" + s.codePointAt(0) + "]"); return total + s.codePointAt(0); }, 
        (a,b) -> { System.out.println("combiner: a[" + a + "] b[" + b + "]"); return 1000000;}
    );
System.out.println("parallelStreamResult: " + parallelStreamResult);

System.out.println("parallelStream test2");
int parallelStreamResult2 = strList.parallelStream().reduce(
        0, 
        (total,s) -> { System.out.println("accumulator: total[" + total + "] s[" + s + "] s.codePointAt(0)[" + s.codePointAt(0) + "]"); return total + s.codePointAt(0); }, 
        (a,b) -> { System.out.println("combiner: a[" + a + "] b[" + b + "] a+b[" + (a+b) + "]"); return a+b;}
    );
System.out.println("parallelStreamResult2: " + parallelStreamResult2);

Output:

stream test
accumulator: total[0] s[abc] s.codePointAt(0)[97]
accumulator: total[97] s[mno] s.codePointAt(0)[109]
accumulator: total[206] s[xyz] s.codePointAt(0)[120]
streamResult: 326
parallelStream test
accumulator: total[0] s[mno] s.codePointAt(0)[109]
accumulator: total[0] s[abc] s.codePointAt(0)[97]
accumulator: total[0] s[xyz] s.codePointAt(0)[120]
combiner: a[109] b[120]
combiner: a[97] b[1000000]
parallelStreamResult: 1000000
parallelStream test2
accumulator: total[0] s[mno] s.codePointAt(0)[109]
accumulator: total[0] s[xyz] s.codePointAt(0)[120]
accumulator: total[0] s[abc] s.codePointAt(0)[97]
combiner: a[109] b[120] a+b[229]
combiner: a[97] b[229] a+b[326]
parallelStreamResult2: 326
Share:
10,679
Garth Gilmour
Author by

Garth Gilmour

Feelance computer consultant

Updated on June 08, 2022

Comments

  • Garth Gilmour
    Garth Gilmour almost 2 years

    Under what circumstances is the third argument to 'reduce' called in Java 8 streams?

    The code below attempts to traverse a list of strings and add up the code point values of the first character of each. The value returned by the final lambda never seems to be used and, if you insert a println, it never seems to be invoked. The documentation describes it as a 'combiner' but I cant find more detail...

    int result =
      data.stream().reduce(0, (total,s) -> total + s.codePointAt(0), (a,b) -> 1000000); 
    
  • Garth Gilmour
    Garth Gilmour about 10 years
    Thats the function indeed. Interesting thought on the parallelism - will try it out...
  • Garth Gilmour
    Garth Gilmour about 10 years
    That would appear to be it. Inserting ".parallel()" causes 1000000 to be returned.
  • Martin Andersson
    Martin Andersson about 10 years
    I'm not sure why you would need a combiner in sequential streams either. Oracle should definitively be a bit more clear on this topic in their JavaDocs.
  • Brian Goetz
    Brian Goetz almost 10 years
    The short answer is that it is needed for the parallel case. (Note an explicit combiner is only needed when the input and output types of the accumulator are different; the overload reduce(BinaryOperator) needs no combiner.) So, why not have two versions? Because we do not distinguish between sequential and parallel streams. There's only one stream time, Stream. Sequential execution is best thought of as a degenerate case of parallel execution, for which the combiner is needed.