import java.io.IOException;
import java.net.URL;
+import java.util.concurrent.atomic.AtomicLong;
import de.rxjava.tests.httpclient.CustomHTTPClient;
-import rx.Observable;
-import rx.schedulers.Schedulers;
+import io.reactivex.Observable;
+import io.reactivex.schedulers.Schedulers;
public class AsyncHTTPClient {
public void getPages() {
+ AtomicLong streamCounter = new AtomicLong();
+
- getDataAsync("https://github.com/gumartinm")
+ getDataStreamAsync("https://github.com/gumartinm")
// fancy Java way of using lambdas. Called method reference :)
.subscribe(System.out::println, //The subscribe method starts to run the code implemented in getDataSync
Throwable::printStackTrace);//subscribeOn just declares who is going to run my code (a pool of threads)
//implemented by the Web Browser with its asynchronous callbacks)
- getDataAsync("http://www.google.de").
+ getDataStreamAsync("http://www.google.de").
subscribe(page -> { // It will be called on success :)
+ System.out.println("STREAM, counter " + streamCounter.getAndIncrement());
+
System.out.println("Another way, no so cool (with lambdas)");
System.out.println(Thread.currentThread().getName());
System.out.println(page);
// The same with method reference :)
- getDataAsync("http://www.google.es").
+ getDataStreamAsync("http://www.google.es").
subscribe(System.out::println, // It will be called on success :)
Throwable::printStackTrace); // It will be called on error. :)
}
- private Observable<String> getDataAsync(String uri) {
- return getDataSync(uri)
+ private Observable<String> getDataStreamAsync(String uri) {
+ return getDataStreamSync(uri)
.subscribeOn(Schedulers.io()); // Creates a pool of threads for us which will run the code implemented below :)
// THIS METHOD DOES NOT START TO RUN MY CODE!!! IT IS DONE BY subscribe METHOD!!!
}
- private Observable<String> getDataSync(String uri) {
+ private Observable<String> getDataStreamSync(String uri) {
return Observable.create(observer -> {
System.out.println(Thread.currentThread().getName());
- String data = "";
try {
- data = CustomHTTPClient.newInstance("RxJavaTest").retrieveDataAsString(new URL(uri));
+ observer.onNext(CustomHTTPClient.newInstance("RxJavaTest").retrieveDataAsString(new URL(uri)));
+
+ // WE ARE EMITTING MORE THAN ONE VALUE :)
+ observer.onNext(CustomHTTPClient.newInstance("RxJavaTest").retrieveDataAsString(new URL(uri)));
// Making it slower as if I had a bad connection :)
Thread.sleep(2000);
}
// When do you use this stuff?
- // observer.onCompleted();
- observer.onNext(data);
+ observer.onComplete();
});
}
}
import java.util.Arrays;
import java.util.List;
-import rx.Observable;
-import rx.schedulers.Schedulers;
+import io.reactivex.Observable;
+import io.reactivex.schedulers.Schedulers;
public class FlatMapTestImpl {
getWordsAsync()
.flatMap(words -> { // flatmap method enables us to return one Observable. The subscribe method does not have such feature.
- return Observable.from(words);
+ return Observable.fromArray(words.toArray());
})
.subscribe(word -> { //Unlike JavaScript Promises, we can call many times the same promise without resolving it.
//This stuff would be impossible in JavaScript :)
// The same with method references!!!
getWordsAsync()
- .flatMap(Observable::from)
+ .flatMap((List<String> words) -> {
+ return Observable.fromArray(words.toArray());
+ })
.subscribe(System.out::println, Throwable::printStackTrace);