From da7e43138d866d2882db33b5785b5036285839bd Mon Sep 17 00:00:00 2001 From: Gustavo Martin Morcuende Date: Fri, 2 Dec 2016 23:44:05 +0100 Subject: [PATCH] RxJava: using flatMap method --- .../src/main/java/de/rxjava/tests/MainRxJava.java | 6 +-- .../rxjava/tests/service/impl/AsyncHTTPClient.java | 7 ++- .../rxjava/tests/service/impl/FlatMapTestImpl.java | 61 ++++++++++++++++++++++ 3 files changed, 67 insertions(+), 7 deletions(-) create mode 100644 Allgemeines/RxJava/src/main/java/de/rxjava/tests/service/impl/FlatMapTestImpl.java diff --git a/Allgemeines/RxJava/src/main/java/de/rxjava/tests/MainRxJava.java b/Allgemeines/RxJava/src/main/java/de/rxjava/tests/MainRxJava.java index 09c0d70..1999493 100644 --- a/Allgemeines/RxJava/src/main/java/de/rxjava/tests/MainRxJava.java +++ b/Allgemeines/RxJava/src/main/java/de/rxjava/tests/MainRxJava.java @@ -1,12 +1,12 @@ package de.rxjava.tests; import de.rxjava.tests.service.impl.AsyncHTTPClient; +import de.rxjava.tests.service.impl.FlatMapTestImpl; public class MainRxJava { public static void main(String[] args) { - AsyncHTTPClient asyncHTTPClient = new AsyncHTTPClient(); - - asyncHTTPClient.getPages(); + new AsyncHTTPClient().getPages(); + new FlatMapTestImpl().getWords(); } } diff --git a/Allgemeines/RxJava/src/main/java/de/rxjava/tests/service/impl/AsyncHTTPClient.java b/Allgemeines/RxJava/src/main/java/de/rxjava/tests/service/impl/AsyncHTTPClient.java index 331356d..0a0f11e 100644 --- a/Allgemeines/RxJava/src/main/java/de/rxjava/tests/service/impl/AsyncHTTPClient.java +++ b/Allgemeines/RxJava/src/main/java/de/rxjava/tests/service/impl/AsyncHTTPClient.java @@ -1,7 +1,6 @@ package de.rxjava.tests.service.impl; import java.io.IOException; -import java.net.MalformedURLException; import java.net.URL; import de.rxjava.tests.httpclient.CustomHTTPClient; @@ -27,7 +26,7 @@ public class AsyncHTTPClient { }); - System.out.println("YOU SEE ME FIRST!!!!"); + System.out.println("AsyncHTTPClient: YOU SEE ME FIRST!!!!"); try { @@ -42,7 +41,7 @@ public class AsyncHTTPClient { private Observable getDataAsync(String uri) { return getDataSync(uri) .subscribeOn(Schedulers.io()); // Creates a pool of threads for us which will run the code implemented below :) - // THIE METHOD DOES NOT START TO RUN MY CODE!!! IT IS DONE BY subscribe METHOD!!! + // THIS METHOD DOES NOT START TO RUN MY CODE!!! IT IS DONE BY subscribe METHOD!!! } private Observable getDataSync(String uri) { @@ -52,7 +51,7 @@ public class AsyncHTTPClient { try { data = CustomHTTPClient.newInstance("RxJavaTest").retrieveDataAsString(new URL(uri)); - // Making it slower as if having a bad connection :) + // Making it slower as if I had a bad connection :) Thread.sleep(2000); } catch (InterruptedException exception) { // Do not forget good patterns when dealing with InterruptedException :( diff --git a/Allgemeines/RxJava/src/main/java/de/rxjava/tests/service/impl/FlatMapTestImpl.java b/Allgemeines/RxJava/src/main/java/de/rxjava/tests/service/impl/FlatMapTestImpl.java new file mode 100644 index 0000000..a06bf33 --- /dev/null +++ b/Allgemeines/RxJava/src/main/java/de/rxjava/tests/service/impl/FlatMapTestImpl.java @@ -0,0 +1,61 @@ +package de.rxjava.tests.service.impl; + +import java.util.Arrays; +import java.util.List; + +import rx.Observable; +import rx.schedulers.Schedulers; + +public class FlatMapTestImpl { + + + public void getWords() { + + getWordsAsync() + .flatMap(words -> { // flatmap method enables us to return one Observable. The subscribe method does not have such feature. + return Observable.from(words); + }) + .subscribe(word -> { //Unlike JavaScript Promises, we can call many times the same promise without resolving it. + //This stuff would be impossible in JavaScript :) + System.out.println(word); + }); + + System.out.println("FlatMapTestImpl: YOU SEE ME FIRST!!!!"); + + try { + Thread.sleep(30000); + } catch (InterruptedException exception) { + // Do not forget good patterns when dealing with InterruptedException :( + Thread.currentThread().interrupt(); + } + + } + + private Observable> getWordsAsync() { + return getWordsSync() + .subscribeOn(Schedulers.io()); // Creates a pool of threads for us which will run the code implemented below :) + // THIS METHOD DOES NOT START TO RUN MY CODE!!! IT IS DONE BY subscribe METHOD!!! + } + + private Observable> getWordsSync() { + return Observable.create(observer -> { + System.out.println(Thread.currentThread().getName()); + + String[] words = { "gus", "stuff", "given", "no", "way" }; + try { + // Making it slower as if having a bad connection :) + Thread.sleep(2000); + } catch (InterruptedException exception) { + // Do not forget good patterns when dealing with InterruptedException :( + Thread.currentThread().interrupt(); + + observer.onError(exception); + } + + // When do you use this stuff? + // observer.onCompleted(); + + observer.onNext(Arrays.asList(words)); + }); + } +} -- 2.1.4