c747c7fbd60d98f4245fbe87b66b2bea01cbb26a
[JavaForFun] /
1 package de.spring.webservices.rest.business.service.impl;
2
3 import java.io.IOException;
4
5 import javax.inject.Inject;
6
7 import org.slf4j.Logger;
8 import org.slf4j.LoggerFactory;
9 import org.springframework.data.domain.Page;
10 import org.springframework.data.domain.Pageable;
11 import org.springframework.stereotype.Service;
12
13 import de.spring.webservices.domain.Car;
14 import de.spring.webservices.rest.business.service.AwesomeBusinessLogic;
15 import de.spring.webservices.rest.business.service.RxJavaBusinessLogic;
16 import rx.Observable;
17 import rx.Observable.OnSubscribe;
18 import rx.Subscriber;
19 import rx.exceptions.Exceptions;
20 import rx.schedulers.Schedulers;
21
22
23 @Service("rxJavaBusinessLogic")
24 public class RxJavaBusinessLogicImpl implements RxJavaBusinessLogic {
25         private static final Logger LOGGER = LoggerFactory.getLogger(RxJavaBusinessLogicImpl.class);
26
27     private final AwesomeBusinessLogic awesomeBusinessLogic;
28     
29     @Inject
30         public RxJavaBusinessLogicImpl(AwesomeBusinessLogic awesomeBusinessLogic) {
31                 this.awesomeBusinessLogic = awesomeBusinessLogic;
32         }
33
34         @Override
35         public Observable<Page<Car>> findAll(Pageable pageRequest) {
36         return Observable.create(new OnSubscribe<Page<Car>>() {
37                         @Override
38                         public void call(Subscriber<? super Page<Car>> observer) {
39                                 observer.onNext( awesomeBusinessLogic.findAll(pageRequest));
40                         }
41                 }).subscribeOn(Schedulers.io());
42         }
43         
44         @Override
45         public Observable<Page<Car>> findAllStream(Pageable pageRequest) {
46         return Observable.create(new OnSubscribe<Page<Car>>() {
47                         @Override
48                         public void call(Subscriber<? super Page<Car>> observer) {
49                                 observer.onNext( awesomeBusinessLogic.findAll(pageRequest));
50                         }
51                 }).subscribeOn(Schedulers.io());
52         }
53
54         @Override
55         public Observable<Car> findById(long id) {
56         return Observable.create((Subscriber<? super Car> observer) ->
57                                 observer.onNext( awesomeBusinessLogic.findById(id)))
58                         .subscribeOn(Schedulers.io());
59         }
60
61         @Override
62         public Observable<Car> create(Car car) {        
63                 return Observable.create((Subscriber<? super Car> observer) ->
64                                         observer.onNext(awesomeBusinessLogic.create(car)))
65                                 .subscribeOn(Schedulers.io());
66         }
67         
68         @Override
69         public Observable<Car> createThrowable(Car car) {       
70                 return Observable.create((Subscriber<? super Car> observer) -> {
71
72                                 try {
73                                         observer.onNext(awesomeBusinessLogic.createThrowable(car));
74                                 } catch (IOException ex) {
75                                         // I could use this implementation. Instead, I will wrap my exception because
76                                         // that is what you would be doing if you were using any other method from RxJava (like map() for example)
77                                         // observer.onError(ex);
78                                         
79                                         LOGGER.error("createThrowable error: ", ex);
80                                         
81                                         Exceptions.propagate(ex);
82                                 }
83                                 
84                                 // No idea when to use this stuff :(
85                                 // observer.onCompleted();
86
87                 }).subscribeOn(Schedulers.io());
88         }
89 }