1 package de.spring.webservices.rest.business.service.impl;
3 import java.io.IOException;
5 import javax.inject.Inject;
7 import org.slf4j.Logger;
8 import org.slf4j.LoggerFactory;
9 import org.springframework.data.domain.Page;
10 import org.springframework.data.domain.Pageable;
11 import org.springframework.stereotype.Service;
13 import de.spring.webservices.domain.Car;
14 import de.spring.webservices.rest.business.service.AwesomeBusinessLogic;
15 import de.spring.webservices.rest.business.service.RxJavaBusinessLogic;
17 import rx.Observable.OnSubscribe;
19 import rx.exceptions.Exceptions;
20 import rx.schedulers.Schedulers;
23 @Service("rxJavaBusinessLogic")
24 public class RxJavaBusinessLogicImpl implements RxJavaBusinessLogic {
25 private static final Logger LOGGER = LoggerFactory.getLogger(RxJavaBusinessLogicImpl.class);
27 private final AwesomeBusinessLogic awesomeBusinessLogic;
30 public RxJavaBusinessLogicImpl(AwesomeBusinessLogic awesomeBusinessLogic) {
31 this.awesomeBusinessLogic = awesomeBusinessLogic;
35 public Observable<Page<Car>> findAll(Pageable pageRequest) {
36 return Observable.create(new OnSubscribe<Page<Car>>() {
38 public void call(Subscriber<? super Page<Car>> observer) {
39 observer.onNext( awesomeBusinessLogic.findAll(pageRequest));
41 }).subscribeOn(Schedulers.io());
45 public Observable<Page<Car>> findAllStream(Pageable pageRequest) {
46 return Observable.create(new OnSubscribe<Page<Car>>() {
48 public void call(Subscriber<? super Page<Car>> observer) {
49 observer.onNext( awesomeBusinessLogic.findAll(pageRequest));
51 }).subscribeOn(Schedulers.io());
55 public Observable<Car> findById(long id) {
56 return Observable.create((Subscriber<? super Car> observer) ->
57 observer.onNext( awesomeBusinessLogic.findById(id)))
58 .subscribeOn(Schedulers.io());
62 public Observable<Car> create(Car car) {
63 return Observable.create((Subscriber<? super Car> observer) ->
64 observer.onNext(awesomeBusinessLogic.create(car)))
65 .subscribeOn(Schedulers.io());
69 public Observable<Car> createThrowable(Car car) {
70 return Observable.create((Subscriber<? super Car> observer) -> {
73 observer.onNext(awesomeBusinessLogic.createThrowable(car));
74 } catch (IOException ex) {
75 // I could use this implementation. Instead, I will wrap my exception because
76 // that is what you would be doing if you were using any other method from RxJava (like map() for example)
77 // observer.onError(ex);
79 LOGGER.error("createThrowable error: ", ex);
81 Exceptions.propagate(ex);
84 // No idea when to use this stuff :(
85 // observer.onCompleted();
87 }).subscribeOn(Schedulers.io());