771fb40d938b8a4fb69ebd640d7a93823a015f3e
[JavaForFun] /
1 package de.spring.webservices.rest.business.service.impl;
2
3 import java.io.IOException;
4
5 import javax.inject.Inject;
6
7 import org.slf4j.Logger;
8 import org.slf4j.LoggerFactory;
9 import org.springframework.data.domain.Page;
10 import org.springframework.data.domain.Pageable;
11 import org.springframework.stereotype.Service;
12
13 import de.spring.webservices.domain.Car;
14 import de.spring.webservices.rest.business.service.AwesomeBusinessLogic;
15 import de.spring.webservices.rest.business.service.RxJavaBusinessLogic;
16 import rx.Observable;
17 import rx.Observable.OnSubscribe;
18 import rx.Subscriber;
19 import rx.exceptions.Exceptions;
20 import rx.schedulers.Schedulers;
21
22
23 @Service("rxJavaBusinessLogic")
24 public class RxJavaBusinessLogicImpl implements RxJavaBusinessLogic {
25         private static final Logger LOGGER = LoggerFactory.getLogger(RxJavaBusinessLogicImpl.class);
26
27     private final AwesomeBusinessLogic awesomeBusinessLogic;
28     
29     @Inject
30         public RxJavaBusinessLogicImpl(AwesomeBusinessLogic awesomeBusinessLogic) {
31                 this.awesomeBusinessLogic = awesomeBusinessLogic;
32         }
33
34         @Override
35         public Observable<Page<Car>> findAll(Pageable pageRequest) {
36         return Observable.create(new OnSubscribe<Page<Car>>() {
37                         @Override
38                         public void call(Subscriber<? super Page<Car>> observer) {
39                                 observer.onNext( awesomeBusinessLogic.findAll(pageRequest));
40                                 observer.onCompleted();
41                         }
42                 }).subscribeOn(Schedulers.io());
43         }
44         
45         @Override
46         public Observable<Page<Car>> findAllStream(Pageable pageRequest) {
47         return Observable.create(new OnSubscribe<Page<Car>>() {
48                         @Override
49                         public void call(Subscriber<? super Page<Car>> observer) {
50                                 observer.onNext( awesomeBusinessLogic.findAll(pageRequest));
51                                 observer.onCompleted();
52                         }
53                 }).subscribeOn(Schedulers.io());
54         }
55
56         @Override
57         public Observable<Car> findById(long id) {
58         return Observable.create((Subscriber<? super Car> observer) -> {
59                                 observer.onNext( awesomeBusinessLogic.findById(id));
60                                 observer.onCompleted();
61                         }).subscribeOn(Schedulers.io());
62         }
63
64         @Override
65         public Observable<Car> create(Car car) {        
66                 return Observable.create((Subscriber<? super Car> observer) -> {
67                                         observer.onNext(awesomeBusinessLogic.create(car));
68                                         observer.onCompleted();
69                                 }).subscribeOn(Schedulers.io());
70         }
71         
72         @Override
73         public Observable<Car> createThrowable(Car car) {       
74                 return Observable.create((Subscriber<? super Car> observer) -> {
75
76                                 try {
77                                         observer.onNext(awesomeBusinessLogic.createThrowable(car));
78                                         observer.onCompleted();
79                                 } catch (IOException ex) {
80                                         // I could use this implementation. Instead, I will wrap my exception because
81                                         // that is what you would be doing if you were using any other method from RxJava (like map() for example)
82                                         // observer.onError(ex);
83                                         
84                                         LOGGER.error("createThrowable error: ", ex);
85                                         
86                                         Exceptions.propagate(ex);
87                                 }
88
89                 }).subscribeOn(Schedulers.io());
90         }
91 }