From: Gustavo Martin Morcuende Date: Sun, 11 Dec 2016 21:02:00 +0000 (+0100) Subject: Using RxJava 2.0 X-Git-Url: https://git.gumartinm.name/?a=commitdiff_plain;h=dfe82e50c90c4e7c99941beceb0d5de8f2020ed9;p=JavaForFun Using RxJava 2.0 --- diff --git a/Allgemeines/RxJava/pom.xml b/Allgemeines/RxJava/pom.xml index b011e85..6d6fb59 100644 --- a/Allgemeines/RxJava/pom.xml +++ b/Allgemeines/RxJava/pom.xml @@ -51,9 +51,9 @@ - io.reactivex + io.reactivex.rxjava2 rxjava - 1.2.3 + 2.0.2 diff --git a/Allgemeines/RxJava/src/main/java/de/rxjava/tests/MainRxJava.java b/Allgemeines/RxJava/src/main/java/de/rxjava/tests/MainRxJava.java index 1999493..dbc2e9b 100644 --- a/Allgemeines/RxJava/src/main/java/de/rxjava/tests/MainRxJava.java +++ b/Allgemeines/RxJava/src/main/java/de/rxjava/tests/MainRxJava.java @@ -6,7 +6,7 @@ import de.rxjava.tests.service.impl.FlatMapTestImpl; public class MainRxJava { public static void main(String[] args) { - new AsyncHTTPClient().getPages(); + //new AsyncHTTPClient().getPages(); new FlatMapTestImpl().getWords(); } } diff --git a/Allgemeines/RxJava/src/main/java/de/rxjava/tests/service/impl/AsyncHTTPClient.java b/Allgemeines/RxJava/src/main/java/de/rxjava/tests/service/impl/AsyncHTTPClient.java index 252a7d5..7c6fe80 100644 --- a/Allgemeines/RxJava/src/main/java/de/rxjava/tests/service/impl/AsyncHTTPClient.java +++ b/Allgemeines/RxJava/src/main/java/de/rxjava/tests/service/impl/AsyncHTTPClient.java @@ -2,16 +2,19 @@ package de.rxjava.tests.service.impl; import java.io.IOException; import java.net.URL; +import java.util.concurrent.atomic.AtomicLong; import de.rxjava.tests.httpclient.CustomHTTPClient; -import rx.Observable; -import rx.schedulers.Schedulers; +import io.reactivex.Observable; +import io.reactivex.schedulers.Schedulers; public class AsyncHTTPClient { public void getPages() { + AtomicLong streamCounter = new AtomicLong(); + - getDataAsync("https://github.com/gumartinm") + getDataStreamAsync("https://github.com/gumartinm") // fancy Java way of using lambdas. Called method reference :) .subscribe(System.out::println, //The subscribe method starts to run the code implemented in getDataSync Throwable::printStackTrace);//subscribeOn just declares who is going to run my code (a pool of threads) @@ -21,8 +24,10 @@ public class AsyncHTTPClient { //implemented by the Web Browser with its asynchronous callbacks) - getDataAsync("http://www.google.de"). + getDataStreamAsync("http://www.google.de"). subscribe(page -> { // It will be called on success :) + System.out.println("STREAM, counter " + streamCounter.getAndIncrement()); + System.out.println("Another way, no so cool (with lambdas)"); System.out.println(Thread.currentThread().getName()); System.out.println(page); @@ -31,7 +36,7 @@ public class AsyncHTTPClient { // The same with method reference :) - getDataAsync("http://www.google.es"). + getDataStreamAsync("http://www.google.es"). subscribe(System.out::println, // It will be called on success :) Throwable::printStackTrace); // It will be called on error. :) @@ -48,18 +53,20 @@ public class AsyncHTTPClient { } - private Observable getDataAsync(String uri) { - return getDataSync(uri) + private Observable getDataStreamAsync(String uri) { + return getDataStreamSync(uri) .subscribeOn(Schedulers.io()); // Creates a pool of threads for us which will run the code implemented below :) // THIS METHOD DOES NOT START TO RUN MY CODE!!! IT IS DONE BY subscribe METHOD!!! } - private Observable getDataSync(String uri) { + private Observable getDataStreamSync(String uri) { return Observable.create(observer -> { System.out.println(Thread.currentThread().getName()); - String data = ""; try { - data = CustomHTTPClient.newInstance("RxJavaTest").retrieveDataAsString(new URL(uri)); + observer.onNext(CustomHTTPClient.newInstance("RxJavaTest").retrieveDataAsString(new URL(uri))); + + // WE ARE EMITTING MORE THAN ONE VALUE :) + observer.onNext(CustomHTTPClient.newInstance("RxJavaTest").retrieveDataAsString(new URL(uri))); // Making it slower as if I had a bad connection :) Thread.sleep(2000); @@ -73,8 +80,7 @@ public class AsyncHTTPClient { } // When do you use this stuff? - // observer.onCompleted(); - observer.onNext(data); + observer.onComplete(); }); } } diff --git a/Allgemeines/RxJava/src/main/java/de/rxjava/tests/service/impl/FlatMapTestImpl.java b/Allgemeines/RxJava/src/main/java/de/rxjava/tests/service/impl/FlatMapTestImpl.java index 1c6b27d..460fbbe 100644 --- a/Allgemeines/RxJava/src/main/java/de/rxjava/tests/service/impl/FlatMapTestImpl.java +++ b/Allgemeines/RxJava/src/main/java/de/rxjava/tests/service/impl/FlatMapTestImpl.java @@ -3,8 +3,8 @@ package de.rxjava.tests.service.impl; import java.util.Arrays; import java.util.List; -import rx.Observable; -import rx.schedulers.Schedulers; +import io.reactivex.Observable; +import io.reactivex.schedulers.Schedulers; public class FlatMapTestImpl { @@ -13,7 +13,7 @@ public class FlatMapTestImpl { getWordsAsync() .flatMap(words -> { // flatmap method enables us to return one Observable. The subscribe method does not have such feature. - return Observable.from(words); + return Observable.fromArray(words.toArray()); }) .subscribe(word -> { //Unlike JavaScript Promises, we can call many times the same promise without resolving it. //This stuff would be impossible in JavaScript :) @@ -23,7 +23,9 @@ public class FlatMapTestImpl { // The same with method references!!! getWordsAsync() - .flatMap(Observable::from) + .flatMap((List words) -> { + return Observable.fromArray(words.toArray()); + }) .subscribe(System.out::println, Throwable::printStackTrace);