Using RxJava 2.0
authorGustavo Martin Morcuende <gu.martinm@gmail.com>
Sun, 11 Dec 2016 21:02:00 +0000 (22:02 +0100)
committerGustavo Martin Morcuende <gu.martinm@gmail.com>
Sun, 11 Dec 2016 21:02:00 +0000 (22:02 +0100)
Allgemeines/RxJava/pom.xml
Allgemeines/RxJava/src/main/java/de/rxjava/tests/MainRxJava.java
Allgemeines/RxJava/src/main/java/de/rxjava/tests/service/impl/AsyncHTTPClient.java
Allgemeines/RxJava/src/main/java/de/rxjava/tests/service/impl/FlatMapTestImpl.java

index b011e85..6d6fb59 100644 (file)
@@ -51,9 +51,9 @@
         </dependency>
 
         <dependency>
-            <groupId>io.reactivex</groupId>
+            <groupId>io.reactivex.rxjava2</groupId>
             <artifactId>rxjava</artifactId>
-            <version>1.2.3</version>
+            <version>2.0.2</version>
         </dependency>
 
     </dependencies>
index 1999493..dbc2e9b 100644 (file)
@@ -6,7 +6,7 @@ import de.rxjava.tests.service.impl.FlatMapTestImpl;
 public class MainRxJava {
 
        public static void main(String[] args) {
-               new AsyncHTTPClient().getPages();
+               //new AsyncHTTPClient().getPages();
                new FlatMapTestImpl().getWords();
        }
 }
index 252a7d5..7c6fe80 100644 (file)
@@ -2,16 +2,19 @@ package de.rxjava.tests.service.impl;
 
 import java.io.IOException;
 import java.net.URL;
+import java.util.concurrent.atomic.AtomicLong;
 
 import de.rxjava.tests.httpclient.CustomHTTPClient;
-import rx.Observable;
-import rx.schedulers.Schedulers;
+import io.reactivex.Observable;
+import io.reactivex.schedulers.Schedulers;
 
 public class AsyncHTTPClient {
 
        public void getPages() {
+           AtomicLong streamCounter = new AtomicLong();
+           
                                
-               getDataAsync("https://github.com/gumartinm")
+               getDataStreamAsync("https://github.com/gumartinm")
                        // fancy Java way of using lambdas. Called method reference :)
                        .subscribe(System.out::println,         //The subscribe method starts to run the code implemented in getDataSync
                                        Throwable::printStackTrace);//subscribeOn just declares who is going to run my code (a pool of threads)
@@ -21,8 +24,10 @@ public class AsyncHTTPClient {
                                                                                                //implemented by the Web Browser with its asynchronous callbacks)
                                                                                                 
                
-               getDataAsync("http://www.google.de").
+               getDataStreamAsync("http://www.google.de").
                        subscribe(page -> {                               // It will be called on success :)
+                               System.out.println("STREAM, counter " + streamCounter.getAndIncrement());
+
                                System.out.println("Another way, no so cool (with lambdas)");
                                System.out.println(Thread.currentThread().getName());
                                System.out.println(page);
@@ -31,7 +36,7 @@ public class AsyncHTTPClient {
                
                
                // The same with method reference :)
-               getDataAsync("http://www.google.es").
+               getDataStreamAsync("http://www.google.es").
                subscribe(System.out::println,             // It will be called on success :)
                                  Throwable::printStackTrace);     // It will be called on error. :)
                
@@ -48,18 +53,20 @@ public class AsyncHTTPClient {
 
        }
        
-       private Observable<String> getDataAsync(String uri) {
-        return getDataSync(uri)
+       private Observable<String> getDataStreamAsync(String uri) {
+        return getDataStreamSync(uri)
                        .subscribeOn(Schedulers.io());  // Creates a pool of threads for us which will run the code implemented below :)
                                                 // THIS METHOD DOES NOT START TO RUN MY CODE!!! IT IS DONE BY subscribe METHOD!!!
     }
        
-       private Observable<String> getDataSync(String uri) {
+       private Observable<String> getDataStreamSync(String uri) {
         return Observable.create(observer -> {
                System.out.println(Thread.currentThread().getName());
-               String data = "";
                try {
-                               data = CustomHTTPClient.newInstance("RxJavaTest").retrieveDataAsString(new URL(uri));
+                       observer.onNext(CustomHTTPClient.newInstance("RxJavaTest").retrieveDataAsString(new URL(uri)));
+                       
+                       // WE ARE EMITTING MORE THAN ONE VALUE :)
+                       observer.onNext(CustomHTTPClient.newInstance("RxJavaTest").retrieveDataAsString(new URL(uri)));
                                
                                // Making it slower as if I had a bad connection :)
                                Thread.sleep(2000);
@@ -73,8 +80,7 @@ public class AsyncHTTPClient {
                        }
 
                // When do you use this stuff?
-               // observer.onCompleted();
-            observer.onNext(data);    
+               observer.onComplete();
         });
     }
 }
index 1c6b27d..460fbbe 100644 (file)
@@ -3,8 +3,8 @@ package de.rxjava.tests.service.impl;
 import java.util.Arrays;
 import java.util.List;
 
-import rx.Observable;
-import rx.schedulers.Schedulers;
+import io.reactivex.Observable;
+import io.reactivex.schedulers.Schedulers;
 
 public class FlatMapTestImpl {
 
@@ -13,7 +13,7 @@ public class FlatMapTestImpl {
                
                getWordsAsync()
                .flatMap(words -> {                // flatmap method enables us to return one Observable. The subscribe method does not have such feature.
-                       return Observable.from(words);
+                       return Observable.fromArray(words.toArray());
                })
                .subscribe(word -> {               //Unlike JavaScript Promises, we can call many times the same promise without resolving it.
                                                       //This stuff would be impossible in JavaScript :)
@@ -23,7 +23,9 @@ public class FlatMapTestImpl {
                
                // The same with method references!!!
                getWordsAsync()
-               .flatMap(Observable::from)
+               .flatMap((List<String> words) -> {
+                       return Observable.fromArray(words.toArray());
+               })
                .subscribe(System.out::println, Throwable::printStackTrace);