1 package de.spring.webservices.rest.business.service.impl;
3 import java.io.IOException;
5 import javax.inject.Inject;
7 import org.slf4j.Logger;
8 import org.slf4j.LoggerFactory;
9 import org.springframework.data.domain.Page;
10 import org.springframework.data.domain.Pageable;
11 import org.springframework.stereotype.Service;
13 import de.spring.webservices.domain.Car;
14 import de.spring.webservices.rest.business.service.AwesomeBusinessLogic;
15 import de.spring.webservices.rest.business.service.RxJavaBusinessLogic;
17 import rx.Observable.OnSubscribe;
19 import rx.exceptions.Exceptions;
20 import rx.schedulers.Schedulers;
23 @Service("rxJavaBusinessLogic")
24 public class RxJavaBusinessLogicImpl implements RxJavaBusinessLogic {
25 private static final Logger LOGGER = LoggerFactory.getLogger(RxJavaBusinessLogicImpl.class);
27 private final AwesomeBusinessLogic awesomeBusinessLogic;
30 public RxJavaBusinessLogicImpl(AwesomeBusinessLogic awesomeBusinessLogic) {
31 this.awesomeBusinessLogic = awesomeBusinessLogic;
35 public Observable<Page<Car>> findAll(Pageable pageRequest) {
36 return Observable.create(new OnSubscribe<Page<Car>>() {
38 public void call(Subscriber<? super Page<Car>> observer) {
39 observer.onNext( awesomeBusinessLogic.findAll(pageRequest));
40 observer.onCompleted();
42 }).subscribeOn(Schedulers.io());
46 public Observable<Page<Car>> findAllStream(Pageable pageRequest) {
47 return Observable.create(new OnSubscribe<Page<Car>>() {
49 public void call(Subscriber<? super Page<Car>> observer) {
50 observer.onNext( awesomeBusinessLogic.findAll(pageRequest));
51 observer.onCompleted();
53 }).subscribeOn(Schedulers.io());
57 public Observable<Car> findById(long id) {
58 return Observable.create((Subscriber<? super Car> observer) -> {
59 observer.onNext( awesomeBusinessLogic.findById(id));
60 observer.onCompleted();
61 }).subscribeOn(Schedulers.io());
65 public Observable<Car> create(Car car) {
66 return Observable.create((Subscriber<? super Car> observer) -> {
67 observer.onNext(awesomeBusinessLogic.create(car));
68 observer.onCompleted();
69 }).subscribeOn(Schedulers.io());
73 public Observable<Car> createThrowable(Car car) {
74 return Observable.create((Subscriber<? super Car> observer) -> {
77 observer.onNext(awesomeBusinessLogic.createThrowable(car));
78 observer.onCompleted();
79 } catch (IOException ex) {
80 // I could use this implementation. Instead, I will wrap my exception because
81 // that is what you would be doing if you were using any other method from RxJava (like map() for example)
82 // observer.onError(ex);
84 LOGGER.error("createThrowable error: ", ex);
86 Exceptions.propagate(ex);
89 }).subscribeOn(Schedulers.io());