How to use RxJava Interval Operator

20,757

Solution 1

You have to block until the observable is consumed:

public static void main(String[] args) throws Exception {

    CountDownLatch latch = new CountDownLatch(1);

    Observable
    .interval(1, TimeUnit.SECONDS)
    .subscribe(new Subscriber<Long>() {
        @Override
        public void onCompleted() {
            System.out.println("onCompleted");
            // make sure to complete only when observable is done
            latch.countDown();
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("onError -> " + e.getMessage());
        }

        @Override
        public void onNext(Long l) {
            System.out.println("onNext -> " + l);
        }
    });

    // wait for observable to complete (never in this case...)
    latch.await();
}

You can add .take(10) for example to see the observable complete.

Solution 2

Put Thread.sleep(1000000) after the subscribe and you will see it working. Observable.interval operates by default on Schedulers.computation() so your stream is being run on a thread other than the main thread.

Share:
20,757

Related videos on Youtube

HuangDong.Li
Author by

HuangDong.Li

I love Programming. Programming just gives me peace. I want to be a programmer.

Updated on June 18, 2020

Comments

  • HuangDong.Li
    HuangDong.Li almost 4 years

    I'm learning about RxJava operator, and I found these code below did not print anything:

    public static void main(String[] args) {
    
        Observable
        .interval(1, TimeUnit.SECONDS)
        .subscribe(new Subscriber<Long>() {
            @Override
            public void onCompleted() {
                System.out.println("onCompleted");
            }
    
            @Override
            public void onError(Throwable e) {
                System.out.println("onError -> " + e.getMessage());
            }
    
            @Override
            public void onNext(Long l) {
                System.out.println("onNext -> " + l);
            }
        });
    }
    

    As ReactiveX, interval

    create an Observable that emits a sequence of integers spaced by a particular time interval

    Did I make a mistake or forget about something?

Related