Spring 5 Reactive fails when extending Flux/implementing Publisher and calling s.onNext() more than once

11,598

Solution 1

My Publisher implementation didn't follow the reactive streams spec, this is how I fixed it:

@GetMapping(value="/strings", produces="text/event-stream")
public Publisher<String> getStrings(){
    return new Publisher<String>() {

        private int loops = 5;

        @Override
        public void subscribe(Subscriber<? super String> s) {

            s.onSubscribe(new Subscription() {
                @Override
                public void request(long n) {
                    for (int i = 0; i < n; i++) {
                        if(loops-- > 0){
                            try {
                                Thread.sleep(1000);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            s.onNext("message"+Math.random());                          
                        }else{
                            s.onComplete();
                        }
                    }
                }
                @Override
                public void cancel() {
                    loops = 0;
                }
            });
        }
    };
}

If you want to learn more about this, have a look at the issue I opened in Spring's JIRA and the helpful comment I got there.

Solution 2

I haven't tested this yet as I'm a bit busy - will do later so if this doesn't work sorry! :)

From the above comments it looks like the problem is with your Flux creation.

I'm assuming that Spring Reactive Controllers are able to handle a Flux which emits multiple without this being over WebSockets or SSE. Again, I'll have a play a bit later.

Flux has a lot of static methods for construction which will help you here.

How about doing it the following way:

return Flux.intervalMillis(1000)
.map(l -> new new SensorRead(sensorId, Math.random()));

But this will give you a never ending stream which might not be what you want.

The other option is something like this:

return Flux.range(1, 5) //Spit out 5 values starting from 1
.delayMillis(1000) //Delay the onNext calls to separate 1 second apart
.map(l -> new new SensorRead(sensorId, Math.random()));

Update

OK, so this question has changed quite significantly.

In answer to "Why can't we invoke onNext() multiple times? How could we do that?"

Of course I didn't write the API so the reasoning I can't answer, but IMO there is an ambiguity and complexity as to how one would want to handle the multiple emissions in the miriad of different ways it can be expressed.

HTTP 1.1 doesn't allow multiple responses per request so the only valid option is some collect into a list or low level write the onNext to the output stream for each emission - both of which have complexities around content type (EG XML vs JSON)

This is further complicated when we bring in HTTP2, WebSockets and SSE which can do some form of multiple responses per request - again each needing to be handled differently.

If you want to be able to do multiple emissions, then you'll need to look at WebSockets or SSE.

The Spring-Reactive project does have SSE classes so looks like it's implemented.

EG

@RequestMapping("/sse/event")
    Flux<SseEvent> sse() {
        return Flux.interval(Duration.ofMillis(100)).map(l -> {
            SseEvent event = new SseEvent();
            event.setId(Long.toString(l));
            event.setData("foo");
            event.setComment("bar");
            return event;
        }).take(2);
    }

Have a look at below for more examples:

https://github.com/spring-projects/spring-reactive/blob/master/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java

Hope this helps

Share:
11,598
codependent
Author by

codependent

By day: I code for 8 hours and a half. Cloud, Kubernetes, Spring, NodeJS... By night: I code a little more, work out and try to get some sleep.

Updated on June 17, 2022

Comments

  • codependent
    codependent almost 2 years

    I just started playing with the new Spring 5 reactive support and wanted to simulate some asyncronous data generation, having noticed two faulty behaviours:

    1) Calling s.onNext( String ) more than once:

    @GetMapping("/strings")
    public Publisher<String> getStrings(){
    
        return new Publisher<String>() {
    
            @Override
            public void subscribe(Subscriber<? super String> s) {
                int i = 0;
                while(++i <= 5){
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    s.onNext("message");
                }
                s.onComplete();
            }
        };
    }
    

    In this case the stacktrace is:

    2016-08-03 13:35:04.986 DEBUG 5136 --- [nio-8080-exec-1] s.w.r.r.m.a.RequestMappingHandlerMapping : Looking up handler method for path /strings
    2016-08-03 13:35:04.994 DEBUG 5136 --- [nio-8080-exec-1] s.w.r.r.m.a.RequestMappingHandlerMapping : Returning handler method [public org.reactivestreams.Publisher<java.lang.String> com.codependent.spring5.playground.reactive.web.AccountsController.getStrings()]
    2016-08-03 13:35:04.994 DEBUG 5136 --- [nio-8080-exec-1] o.s.b.f.s.DefaultListableBeanFactory     : Returning cached instance of singleton bean 'accountsController'
    2016-08-03 13:35:07.120 DEBUG 5136 --- [nio-8080-exec-1] o.s.w.s.h.ExceptionHandlingWebHandler    : Could not complete request
    
    java.lang.IllegalStateException: RECEIVED
        at org.springframework.http.server.reactive.AbstractResponseBodyProcessor$State.onNext(AbstractResponseBodyProcessor.java:316) ~[spring-web-5.0.0.M1.jar:5.0.0.M1]
        at org.springframework.http.server.reactive.AbstractResponseBodyProcessor.onNext(AbstractResponseBodyProcessor.java:77) ~[spring-web-5.0.0.M1.jar:5.0.0.M1]
        at org.springframework.http.server.reactive.AbstractResponseBodyProcessor.onNext(AbstractResponseBodyProcessor.java:47) ~[spring-web-5.0.0.M1.jar:5.0.0.M1]
        at org.springframework.http.server.reactive.ChannelSendOperator$WriteWithBarrier.doNext(ChannelSendOperator.java:97) ~[spring-web-5.0.0.M1.jar:5.0.0.M1]
        at reactor.core.publisher.OperatorAdapter.onNext(OperatorAdapter.java:88) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:123) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at com.codependent.spring5.playground.reactive.web.AccountsController$4.subscribe(AccountsController.java:107) [classes/:na]
        at reactor.core.publisher.FluxSource.subscribe(FluxSource.java:59) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.FluxMap.subscribe(FluxMap.java:73) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at org.springframework.http.server.reactive.ChannelSendOperator.subscribe(ChannelSendOperator.java:54) [spring-web-5.0.0.M1.jar:5.0.0.M1]
        at reactor.core.publisher.MonoOtherwise.subscribe(MonoOtherwise.java:47) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.MonoThenApply$MonoThenApplyManager.onNext(MonoThenApply.java:133) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.Operators$DeferredScalarSubscriber.complete(Operators.java:797) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.MonoThenApply$MonoThenApplyManager$SecondSubscriber.onNext(MonoThenApply.java:203) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.FluxResume$ResumeSubscriber.onNext(FluxResume.java:75) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:130) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:1293) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:186) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:1010) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.FluxResume$ResumeSubscriber.onSubscribe(FluxResume.java:70) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:100) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:169) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.MonoThenApply.subscribe(MonoThenApply.java:51) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:69) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.MonoOtherwise.subscribe(MonoOtherwise.java:47) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.MonoThenApply$MonoThenApplyManager.onNext(MonoThenApply.java:133) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:71) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:383) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:192) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:96) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:60) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.FluxConcatMap.subscribe(FluxConcatMap.java:116) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.MonoNext.subscribe(MonoNext.java:45) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.MonoOtherwiseIfEmpty.subscribe(MonoOtherwiseIfEmpty.java:47) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.MonoThenApply.subscribe(MonoThenApply.java:58) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.MonoThenApply.subscribe(MonoThenApply.java:58) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.MonoOtherwise.subscribe(MonoOtherwise.java:47) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.MonoOtherwise.subscribe(MonoOtherwise.java:47) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.MonoOtherwise.subscribe(MonoOtherwise.java:47) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.MonoThenSupply$MonoConcatIgnoreManager.drain(MonoThenSupply.java:167) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.MonoThenSupply.subscribe(MonoThenSupply.java:55) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at org.springframework.http.server.reactive.ServletHttpHandlerAdapter.service(ServletHttpHandlerAdapter.java:93) [spring-web-5.0.0.M1.jar:5.0.0.M1]
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:729) [tomcat-embed-core-8.5.4.jar:8.5.4]
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:230) [tomcat-embed-core-8.5.4.jar:8.5.4]
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:165) [tomcat-embed-core-8.5.4.jar:8.5.4]
        at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:198) [tomcat-embed-core-8.5.4.jar:8.5.4]
        at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:108) [tomcat-embed-core-8.5.4.jar:8.5.4]
        at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:522) [tomcat-embed-core-8.5.4.jar:8.5.4]
        at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:140) [tomcat-embed-core-8.5.4.jar:8.5.4]
        at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:79) [tomcat-embed-core-8.5.4.jar:8.5.4]
        at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87) [tomcat-embed-core-8.5.4.jar:8.5.4]
        at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:349) [tomcat-embed-core-8.5.4.jar:8.5.4]
        at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:1110) [tomcat-embed-core-8.5.4.jar:8.5.4]
        at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66) [tomcat-embed-core-8.5.4.jar:8.5.4]
        at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:785) [tomcat-embed-core-8.5.4.jar:8.5.4]
        at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1425) [tomcat-embed-core-8.5.4.jar:8.5.4]
        at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-8.5.4.jar:8.5.4]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_45]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_45]
        at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-8.5.4.jar:8.5.4]
        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]
    
    2016-08-03 13:35:07.121 DEBUG 5136 --- [nio-8080-exec-1] o.s.h.s.r.ServletServerHttpResponse      : Can't set the status 500 because the HTTP response has already been committed
    2016-08-03 13:35:08.127 ERROR 5136 --- [nio-8080-exec-1] a.c.c.C.[.[.0.0.0.[.[httpHandlerServlet] : Servlet.service() for servlet [httpHandlerServlet] in context with path [] threw exception
    
    reactor.core.Exceptions$BubblingException: java.lang.IllegalStateException: RECEIVED
        at reactor.core.Exceptions.bubble(Exceptions.java:97) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.Exceptions.onErrorDropped(Exceptions.java:263) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.MonoThenApply$MonoThenApplyManager$SecondSubscriber.onError(MonoThenApply.java:209) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.FluxResume$ResumeSubscriber.onError(FluxResume.java:105) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.OperatorAdapter.doOnSubscriberError(OperatorAdapter.java:113) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.OperatorAdapter.onNext(OperatorAdapter.java:91) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:123) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at com.codependent.spring5.playground.reactive.web.AccountsController$4.subscribe(AccountsController.java:107) ~[classes/:na]
        at reactor.core.publisher.FluxSource.subscribe(FluxSource.java:59) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.FluxMap.subscribe(FluxMap.java:73) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at org.springframework.http.server.reactive.ChannelSendOperator.subscribe(ChannelSendOperator.java:54) ~[spring-web-5.0.0.M1.jar:5.0.0.M1]
        at reactor.core.publisher.MonoOtherwise.subscribe(MonoOtherwise.java:47) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.MonoThenApply$MonoThenApplyManager.onNext(MonoThenApply.java:133) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.Operators$DeferredScalarSubscriber.complete(Operators.java:797) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.MonoThenApply$MonoThenApplyManager$SecondSubscriber.onNext(MonoThenApply.java:203) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.FluxResume$ResumeSubscriber.onNext(FluxResume.java:75) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:130) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:1293) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:186) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:1010) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.FluxResume$ResumeSubscriber.onSubscribe(FluxResume.java:70) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:100) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:169) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.MonoThenApply.subscribe(MonoThenApply.java:51) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:69) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.MonoOtherwise.subscribe(MonoOtherwise.java:47) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.MonoThenApply$MonoThenApplyManager.onNext(MonoThenApply.java:133) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:71) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:383) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:192) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:96) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:60) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.FluxConcatMap.subscribe(FluxConcatMap.java:116) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.MonoNext.subscribe(MonoNext.java:45) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.MonoOtherwiseIfEmpty.subscribe(MonoOtherwiseIfEmpty.java:47) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.MonoThenApply.subscribe(MonoThenApply.java:58) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.MonoThenApply.subscribe(MonoThenApply.java:58) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.MonoOtherwise.subscribe(MonoOtherwise.java:47) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.MonoOtherwise.subscribe(MonoOtherwise.java:47) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.MonoOtherwise.subscribe(MonoOtherwise.java:47) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.MonoThenSupply$MonoConcatIgnoreManager.drain(MonoThenSupply.java:167) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at reactor.core.publisher.MonoThenSupply.subscribe(MonoThenSupply.java:55) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        at org.springframework.http.server.reactive.ServletHttpHandlerAdapter.service(ServletHttpHandlerAdapter.java:93) ~[spring-web-5.0.0.M1.jar:5.0.0.M1]
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:729) ~[tomcat-embed-core-8.5.4.jar:8.5.4]
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:230) ~[tomcat-embed-core-8.5.4.jar:8.5.4]
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:165) ~[tomcat-embed-core-8.5.4.jar:8.5.4]
        at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:198) ~[tomcat-embed-core-8.5.4.jar:8.5.4]
        at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:108) [tomcat-embed-core-8.5.4.jar:8.5.4]
        at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:522) [tomcat-embed-core-8.5.4.jar:8.5.4]
        at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:140) [tomcat-embed-core-8.5.4.jar:8.5.4]
        at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:79) [tomcat-embed-core-8.5.4.jar:8.5.4]
        at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87) [tomcat-embed-core-8.5.4.jar:8.5.4]
        at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:349) [tomcat-embed-core-8.5.4.jar:8.5.4]
        at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:1110) [tomcat-embed-core-8.5.4.jar:8.5.4]
        at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66) [tomcat-embed-core-8.5.4.jar:8.5.4]
        at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:785) [tomcat-embed-core-8.5.4.jar:8.5.4]
        at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1425) [tomcat-embed-core-8.5.4.jar:8.5.4]
        at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-8.5.4.jar:8.5.4]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_45]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_45]
        at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-8.5.4.jar:8.5.4]
        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]
    Caused by: java.lang.IllegalStateException: RECEIVED
        at org.springframework.http.server.reactive.AbstractResponseBodyProcessor$State.onNext(AbstractResponseBodyProcessor.java:316) ~[spring-web-5.0.0.M1.jar:5.0.0.M1]
        at org.springframework.http.server.reactive.AbstractResponseBodyProcessor.onNext(AbstractResponseBodyProcessor.java:77) ~[spring-web-5.0.0.M1.jar:5.0.0.M1]
        at org.springframework.http.server.reactive.AbstractResponseBodyProcessor.onNext(AbstractResponseBodyProcessor.java:47) ~[spring-web-5.0.0.M1.jar:5.0.0.M1]
        at org.springframework.http.server.reactive.ChannelSendOperator$WriteWithBarrier.doNext(ChannelSendOperator.java:97) ~[spring-web-5.0.0.M1.jar:5.0.0.M1]
        at reactor.core.publisher.OperatorAdapter.onNext(OperatorAdapter.java:88) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na]
        ... 57 common frames omitted
    

    2) Calling s.onNext( Alert.class -any DTO- ) more than once:

    @GetMapping("/alerts")
    public Publisher<Alert> getAlerts(){
    
        return new Publisher<Alert>() {
    
            @Override
            public void subscribe(Subscriber<? super Alert> s) {
                int i = 0;
                while(++i <= 5){
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    s.onNext(new Alert((long)1, "ms"));
                }
                s.onComplete();
            }
        };
    }
    

    Now it doesn't show any error on the logs but the caller gets a 500 response code and the content '['.

    Log:

    2016-08-03 13:37:11.834 DEBUG 5136 --- [nio-8080-exec-3] o.s.web.reactive.DispatcherHandler       : Processing GET request for [http://localhost:8080/alerts]
    2016-08-03 13:37:11.835 DEBUG 5136 --- [nio-8080-exec-3] s.w.r.r.m.a.RequestMappingHandlerMapping : Looking up handler method for path /alerts
    2016-08-03 13:37:11.836 DEBUG 5136 --- [nio-8080-exec-3] s.w.r.r.m.a.RequestMappingHandlerMapping : Returning handler method [public org.reactivestreams.Publisher<com.codependent.spring5.playground.reactive.dto.Alert> com.codependent.spring5.playground.reactive.web.AccountsController.getAlerts()]
    2016-08-03 13:37:11.836 DEBUG 5136 --- [nio-8080-exec-3] o.s.b.f.s.DefaultListableBeanFactory     : Returning cached instance of singleton bean 'accountsController'
    

    Why can't we invoke onNext() multiple times? How could we do that?

    NOTE: I if just call onNext once it works ok:

    @Override
    public void subscribe(Subscriber<? super String> s) {
        s.onNext("my message" + Math.random());
        s.onComplete();
    }
    

    or

    @Override
    public void subscribe(Subscriber<? super Alert> s) {
        s.onNext(new Alert((long)1, "ms"));
        s.onComplete();
    }
    
    • Will
      Will almost 8 years
      What happens if you return something simpler - EG Flux.just(new SensorRead(sensorId, Math.random()));
    • codependent
      codependent almost 8 years
      In that case it works fine.
  • codependent
    codependent almost 8 years
    Will thanks for your suggestion, it definitely works that way. Anyway I'd appreciate if someone could clear up why we can't just implement Publisher.subscribe(s) and invoke s.onNext().
  • Will
    Will almost 8 years
    All my experience is with RxJava, and I don't have any experience with Reactor sorry (although I'm just starting to teach myself) so can't immediately comment, but will have a look. In general, constructing with the static methods is easier, and in RxJava the Observable constructor is protected so you can't directly construct it.
  • codependent
    codependent over 7 years
    ups I missed your update, thank you for answering. Your example with the /sse/event endpoint + Flux works perfectly. However it doesn't solve my doubt: Spring Reactive follows the reactive streams spec so using the Publisher interface should be possible, and therefore, being able to call s.onNext() multiple times. Following your advice I also added produces="text/event-stream"in my request mapping and it still throws the same exception :-(