Partition a Java 8 Stream

47,508

Solution 1

It's impossible to partition the arbitrary source stream to the fixed size batches, because this will screw up the parallel processing. When processing in parallel you may not know how many elements in the first sub-task after the split, so you cannot create the partitions for the next sub-task until the first is fully processed.

However it is possible to create the stream of partitions from the random access List. Such feature is available, for example, in my StreamEx library:

List<Type> input = Arrays.asList(...);

Stream<List<Type>> stream = StreamEx.ofSubLists(input, partitionSize);

Or if you really want the stream of streams:

Stream<Stream<Type>> stream = StreamEx.ofSubLists(input, partitionSize).map(List::stream);

If you don't want to depend on third-party libraries, you can implement such ofSubLists method manually:

public static <T> Stream<List<T>> ofSubLists(List<T> source, int length) {
    if (length <= 0)
        throw new IllegalArgumentException("length = " + length);
    int size = source.size();
    if (size <= 0)
        return Stream.empty();
    int fullChunks = (size - 1) / length;
    return IntStream.range(0, fullChunks + 1).mapToObj(
        n -> source.subList(n * length, n == fullChunks ? size : (n + 1) * length));
}

This implementation looks a little bit long, but it takes into account some corner cases like close-to-MAX_VALUE list size.


If you want parallel-friendly solution for unordered stream (so you don't care which stream elements will be combined in single batch), you may use the collector like this (thanks to @sibnick for inspiration):

public static <T, A, R> Collector<T, ?, R> unorderedBatches(int batchSize, 
                   Collector<List<T>, A, R> downstream) {
    class Acc {
        List<T> cur = new ArrayList<>();
        A acc = downstream.supplier().get();
    }
    BiConsumer<Acc, T> accumulator = (acc, t) -> {
        acc.cur.add(t);
        if(acc.cur.size() == batchSize) {
            downstream.accumulator().accept(acc.acc, acc.cur);
            acc.cur = new ArrayList<>();
        }
    };
    return Collector.of(Acc::new, accumulator,
            (acc1, acc2) -> {
                acc1.acc = downstream.combiner().apply(acc1.acc, acc2.acc);
                for(T t : acc2.cur) accumulator.accept(acc1, t);
                return acc1;
            }, acc -> {
                if(!acc.cur.isEmpty())
                    downstream.accumulator().accept(acc.acc, acc.cur);
                return downstream.finisher().apply(acc.acc);
            }, Collector.Characteristics.UNORDERED);
}

Usage example:

List<List<Integer>> list = IntStream.range(0,20)
                                    .boxed().parallel()
                                    .collect(unorderedBatches(3, Collectors.toList()));

Result:

[[2, 3, 4], [7, 8, 9], [0, 1, 5], [12, 13, 14], [17, 18, 19], [10, 11, 15], [6, 16]]

Such collector is perfectly thread-safe and produces ordered batches for sequential stream.

If you want to apply an intermediate transformation for every batch, you may use the following version:

public static <T, AA, A, B, R> Collector<T, ?, R> unorderedBatches(int batchSize,
        Collector<T, AA, B> batchCollector,
        Collector<B, A, R> downstream) {
    return unorderedBatches(batchSize, 
            Collectors.mapping(list -> list.stream().collect(batchCollector), downstream));
}

For example, this way you can sum the numbers in every batch on the fly:

List<Integer> list = IntStream.range(0,20)
        .boxed().parallel()
        .collect(unorderedBatches(3, Collectors.summingInt(Integer::intValue), 
            Collectors.toList()));

Solution 2

Provided you want to use the Stream sequentially, it is possible to partition a Stream (as well as perform related functions such as windowing - which I think is what you really want in this case). Two libraries that will support partitoning for standard Streams are cyclops-react (I am the author) and jOOλ which cyclops-react extends (to add functionality such as Windowing).

cyclops-streams has a collection of static functions StreamUtils for operating on Java Streams, and a series of functions such as splitAt, headAndTail, splitBy, partition for partitioning.

To window a Stream into a Stream of nested Streams of size 30 you can use the window method.

To the OPs point, in Streaming terms, splitting a Stream into multiple Streams of a given size is a Windowing operation (rather than a Partitioning operation).

  Stream<Streamable<Integer>> streamOfStreams = StreamUtils.window(stream,30);

There is a Stream extension class called ReactiveSeq that extends jool.Seq and adds Windowing functionality, that may make the code a little cleaner.

  ReactiveSeq<Integer> seq;
  ReactiveSeq<ListX<Integer>> streamOfLists = seq.grouped(30);

As Tagir points out above though, this isn't suitable for parallel Streams. If you want to window or batch a Stream you wish to executed in a multithreaded fashion. LazyFutureStream in cyclops-reactmight be useful (Windowing is on the to-do list, but plain old batching is available now).

In this case data will be passed from the multiple threads executing the Stream to a Multi-Producer/Single-Consumer wait-free Queue and the sequential data from that queue can be windowed before being distributed to threads again.

  Stream<List<Data>> batched = new LazyReact().range(0,1000)
                                              .grouped(30)
                                              .map(this::process);

Solution 3

I found an elegant solution: Iterable parts = Iterables.partition(stream::iterator, size)

Solution 4

It seem like, as Jon Skeet has shown in his comment, it's not possible to make partitions lazy. For non-lazy partitions, I already have this code:

public static <T> Stream<Stream<T>> partition(Stream<T> source, int size) {
    final Iterator<T> it = source.iterator();
    final Iterator<Stream<T>> partIt = Iterators.transform(Iterators.partition(it, size), List::stream);
    final Iterable<Stream<T>> iterable = () -> partIt;

    return StreamSupport.stream(iterable.spliterator(), false);
}

Solution 5

This is a pure Java solution that's evaluated lazily instead of using List.

public static <T> Stream<List<T>> partition(Stream<T> stream, int batchSize){
    List<List<T>> currentBatch = new ArrayList<List<T>>(); //just to make it mutable 
    currentBatch.add(new ArrayList<T>(batchSize));
    return Stream.concat(stream
      .sequential()                   
      .map(new Function<T, List<T>>(){
          public List<T> apply(T t){
              currentBatch.get(0).add(t);
              return currentBatch.get(0).size() == batchSize ? currentBatch.set(0,new ArrayList<>(batchSize)): null;
            }
      }), Stream.generate(()->currentBatch.get(0).isEmpty()?null:currentBatch.get(0))
                .limit(1)
    ).filter(Objects::nonNull);
}

The method returns Stream<List<T>> for flexibility. You can convert it to Stream<Stream<T>> easily by partition(something, 10).map(List::stream).

Share:
47,508
Trader001
Author by

Trader001

Updated on July 05, 2022

Comments

  • Trader001
    Trader001 almost 2 years

    How to implement "partition" operation on Java 8 Stream? By partition I mean, divide a stream into sub-streams of a given size. Somehow it will be identical to Guava Iterators.partition() method, just it's desirable that the partitions are lazily-evaluated Streams rather than List's.

  • sibnick
    sibnick over 8 years
    You are right about undocumented fact and it is reason why I called it 'hack'. Also you are right about non-atomic computeIfAbsent. I will edit code shortly. But why it is not lazy? It is not allocate all lists before processing one batch. Also it is common that concurrent batch processing is not ordered.
  • Tagir Valeev
    Tagir Valeev over 8 years
    For parallel stream it doesn't work at all. applyConcurrentBatchToStream(System.out::println, IntStream.range(0,100).boxed().parallel(), 3) prints garbage (randomly collected groups, some elements repeating, even number of groups differ between runs). For sequential-only streams there are far simpler and more efficient solutions (like one presented by OP).
  • sibnick
    sibnick over 8 years
    But you also show the source of bug: non-atomic computeIfAbsent.
  • Tagir Valeev
    Tagir Valeev over 8 years
    As for ordering, it's not so common that concurrent batch processing is unordered. There's even special Stream API method unordered() which you may use to explicitly state that you don't care about order. In many cases you do care. And, I believe there's a simpler alternative for creating fixed size unordered batches...
  • sibnick
    sibnick over 8 years
    I added fix for multithread issue, but I will think about it yet.
  • Tagir Valeev
    Tagir Valeev over 8 years
    I posted my own unordered-parallel solution.
  • Tomasz Stanczak
    Tomasz Stanczak almost 8 years
    I know it is an old topic, but it think is worth mentioning - it's not pure Java 8: Iterators class is from Guava.
  • Torque
    Torque about 6 years
    I would be very interested in seeing something like the parallel solution added to StreamEx (which has become a staple in my projects the way Guava and Lombok has). Less because I care about paralellism, but instead because it works on streams - StreamEx.ofSubLists requires you to already have a collapsed list, whereas my use cases are usually ongoing streams that I don't want to collapse into a Collection and have in memory all at once.
  • gouessej
    gouessej about 5 years
    Iterables comes from Guava: github.com/google/guava/blob/master/guava/src/com/google/com‌​mon/… Some developers may have good reasons not to use it. You should mention the third party libraries you use.