From 332383877f29a15b50cc235786fe26744f64a4d5 Mon Sep 17 00:00:00 2001 From: Gustavo Martin Morcuende Date: Sun, 1 Jan 2017 12:42:14 +0100 Subject: [PATCH] RxJava, Spring REST example --- .../web-services-spring-rxjava-server/pom.xml | 11 +- .../rest/business/service/RxJavaBusinessLogic.java | 4 +- .../service/impl/RxJavaBusinessLogicImpl.java | 40 +++++-- .../rest/controller/RxJavaCarController.java | 29 ++--- .../rest/controller/adapters/RxJavaAdapter.java | 9 +- .../cloud/netflix/rx/DeferredResultSubscriber.java | 69 ++++++++++++ .../cloud/netflix/rx/ObservableSseEmitter.java | 46 ++++++++ .../netflix/rx/ResponseBodyEmitterSubscriber.java | 91 +++++++++++++++ .../cloud/netflix/rx/RxResponse.java | 58 ++++++++++ .../cloud/netflix/rx/SingleDeferredResult.java | 48 ++++++++ .../cloud/netflix/rx/SingleReturnValueHandler.java | 125 +++++++++++++++++++++ .../spring-configuration/mvc/rest/rest-config.xml | 20 +++- .../controller/CarControllerIntegrationTest.java | 2 + ...pletableFutureCarControllerIntegrationTest.java | 2 + .../RxJavaCarControllerIntegrationTest.java | 4 +- 15 files changed, 521 insertions(+), 37 deletions(-) create mode 100644 SpringJava/RxJava/web-services-spring-rxjava-server/src/main/java/org/springframework/cloud/netflix/rx/DeferredResultSubscriber.java create mode 100644 SpringJava/RxJava/web-services-spring-rxjava-server/src/main/java/org/springframework/cloud/netflix/rx/ObservableSseEmitter.java create mode 100644 SpringJava/RxJava/web-services-spring-rxjava-server/src/main/java/org/springframework/cloud/netflix/rx/ResponseBodyEmitterSubscriber.java create mode 100644 SpringJava/RxJava/web-services-spring-rxjava-server/src/main/java/org/springframework/cloud/netflix/rx/RxResponse.java create mode 100644 SpringJava/RxJava/web-services-spring-rxjava-server/src/main/java/org/springframework/cloud/netflix/rx/SingleDeferredResult.java create mode 100644 SpringJava/RxJava/web-services-spring-rxjava-server/src/main/java/org/springframework/cloud/netflix/rx/SingleReturnValueHandler.java diff --git a/SpringJava/RxJava/web-services-spring-rxjava-server/pom.xml b/SpringJava/RxJava/web-services-spring-rxjava-server/pom.xml index ce56a31..35b50e1 100644 --- a/SpringJava/RxJava/web-services-spring-rxjava-server/pom.xml +++ b/SpringJava/RxJava/web-services-spring-rxjava-server/pom.xml @@ -18,9 +18,9 @@ - io.reactivex.rxjava2 - rxjava - 2.0.2 + io.reactivex + rxjava + 1.2.4 @@ -35,7 +35,8 @@ org.springframework.data spring-data-commons - + + javax.servlet @@ -51,7 +52,7 @@ - org.springframework + org.springframework spring-oxm 4.2.4.RELEASE diff --git a/SpringJava/RxJava/web-services-spring-rxjava-server/src/main/java/de/spring/webservices/rest/business/service/RxJavaBusinessLogic.java b/SpringJava/RxJava/web-services-spring-rxjava-server/src/main/java/de/spring/webservices/rest/business/service/RxJavaBusinessLogic.java index d86922f..dff4d5e 100644 --- a/SpringJava/RxJava/web-services-spring-rxjava-server/src/main/java/de/spring/webservices/rest/business/service/RxJavaBusinessLogic.java +++ b/SpringJava/RxJava/web-services-spring-rxjava-server/src/main/java/de/spring/webservices/rest/business/service/RxJavaBusinessLogic.java @@ -4,11 +4,13 @@ import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; import de.spring.webservices.domain.Car; -import io.reactivex.Observable; +import rx.Observable; public interface RxJavaBusinessLogic { + public Observable> findAllStream(Pageable pageRequest); + public Observable> findAll(Pageable pageRequest); public Observable findById(long id); diff --git a/SpringJava/RxJava/web-services-spring-rxjava-server/src/main/java/de/spring/webservices/rest/business/service/impl/RxJavaBusinessLogicImpl.java b/SpringJava/RxJava/web-services-spring-rxjava-server/src/main/java/de/spring/webservices/rest/business/service/impl/RxJavaBusinessLogicImpl.java index 930972f..c747c7f 100644 --- a/SpringJava/RxJava/web-services-spring-rxjava-server/src/main/java/de/spring/webservices/rest/business/service/impl/RxJavaBusinessLogicImpl.java +++ b/SpringJava/RxJava/web-services-spring-rxjava-server/src/main/java/de/spring/webservices/rest/business/service/impl/RxJavaBusinessLogicImpl.java @@ -13,8 +13,11 @@ import org.springframework.stereotype.Service; import de.spring.webservices.domain.Car; import de.spring.webservices.rest.business.service.AwesomeBusinessLogic; import de.spring.webservices.rest.business.service.RxJavaBusinessLogic; -import io.reactivex.Observable; -import io.reactivex.exceptions.Exceptions; +import rx.Observable; +import rx.Observable.OnSubscribe; +import rx.Subscriber; +import rx.exceptions.Exceptions; +import rx.schedulers.Schedulers; @Service("rxJavaBusinessLogic") @@ -30,23 +33,41 @@ public class RxJavaBusinessLogicImpl implements RxJavaBusinessLogic { @Override public Observable> findAll(Pageable pageRequest) { - return Observable.create(observer -> observer.onNext( awesomeBusinessLogic.findAll(pageRequest))); - + return Observable.create(new OnSubscribe>() { + @Override + public void call(Subscriber> observer) { + observer.onNext( awesomeBusinessLogic.findAll(pageRequest)); + } + }).subscribeOn(Schedulers.io()); + } + + @Override + public Observable> findAllStream(Pageable pageRequest) { + return Observable.create(new OnSubscribe>() { + @Override + public void call(Subscriber> observer) { + observer.onNext( awesomeBusinessLogic.findAll(pageRequest)); + } + }).subscribeOn(Schedulers.io()); } @Override public Observable findById(long id) { - return Observable.create(observer -> observer.onNext( awesomeBusinessLogic.findById(id))); + return Observable.create((Subscriber observer) -> + observer.onNext( awesomeBusinessLogic.findById(id))) + .subscribeOn(Schedulers.io()); } @Override public Observable create(Car car) { - return Observable.create(observer -> observer.onNext(awesomeBusinessLogic.create(car))); + return Observable.create((Subscriber observer) -> + observer.onNext(awesomeBusinessLogic.create(car))) + .subscribeOn(Schedulers.io()); } @Override public Observable createThrowable(Car car) { - return Observable.create(observer -> { + return Observable.create((Subscriber observer) -> { try { observer.onNext(awesomeBusinessLogic.createThrowable(car)); @@ -59,7 +80,10 @@ public class RxJavaBusinessLogicImpl implements RxJavaBusinessLogic { Exceptions.propagate(ex); } + + // No idea when to use this stuff :( + // observer.onCompleted(); - }); + }).subscribeOn(Schedulers.io()); } } diff --git a/SpringJava/RxJava/web-services-spring-rxjava-server/src/main/java/de/spring/webservices/rest/controller/RxJavaCarController.java b/SpringJava/RxJava/web-services-spring-rxjava-server/src/main/java/de/spring/webservices/rest/controller/RxJavaCarController.java index 7a1029c..744928f 100644 --- a/SpringJava/RxJava/web-services-spring-rxjava-server/src/main/java/de/spring/webservices/rest/controller/RxJavaCarController.java +++ b/SpringJava/RxJava/web-services-spring-rxjava-server/src/main/java/de/spring/webservices/rest/controller/RxJavaCarController.java @@ -8,6 +8,7 @@ import javax.inject.Inject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.cloud.netflix.rx.RxResponse; import org.springframework.data.domain.Page; import org.springframework.data.domain.PageRequest; import org.springframework.http.HttpHeaders; @@ -23,10 +24,12 @@ import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseStatus; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.context.request.async.DeferredResult; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import de.spring.webservices.domain.Car; import de.spring.webservices.rest.business.service.RxJavaBusinessLogic; -import io.reactivex.Observable; +import rx.Observable; +import rx.Single; @RestController @RequestMapping("/api/rxjava/cars/") @@ -44,20 +47,19 @@ public class RxJavaCarController { @RequestMapping(produces = { MediaType.APPLICATION_JSON_UTF8_VALUE }, method = RequestMethod.GET) @ResponseStatus(HttpStatus.OK) - public DeferredResult> cars() { - - // BE CAREFUL: I am returning Page object but when using io.reactivex.Observable (stream) instead of io.reactivex.Single (only one element) - // if you want this code to work you will have to return DeferredResult> and you will have to call - // the toList() method of Observable. - // The toList() method is the only way I know for returning Observable (stream) perhaps in Spring 5.0.0 there will be something better. - // Until then, this is the only way I know for using Observable with Spring. - - return deferredAdapter(rxJavaBusinessLogic.findAll(new PageRequest(PAGE, PAGE_SIZE)) /** .toList() **/); + public Single> cars() { + return rxJavaBusinessLogic.findAll(new PageRequest(PAGE, PAGE_SIZE)).toSingle(); + } + + @RequestMapping(path = "stream", produces = { MediaType.APPLICATION_JSON_UTF8_VALUE }, method = RequestMethod.GET) + @ResponseStatus(HttpStatus.OK) + public SseEmitter carsStream() { + return RxResponse.sse(rxJavaBusinessLogic.findAllStream(new PageRequest(PAGE, PAGE_SIZE))); } @RequestMapping(value = "{id}", produces = MediaType.APPLICATION_JSON_UTF8_VALUE, method = RequestMethod.GET) @ResponseStatus(HttpStatus.OK) - public DeferredResult car(@RequestHeader(value = "MY_HEADER", required = false) String specialHeader, + public ResponseEntity> car(@RequestHeader(value = "MY_HEADER", required = false) String specialHeader, @PathVariable("id") long id, @RequestParam Map params, @RequestParam(value = "wheel", required = false) String[] wheelParams) { @@ -85,8 +87,9 @@ public class RxJavaCarController { // the toList() method of Observable. // The toList() method is the only way I know for returning Observable (stream) perhaps in Spring 5.0.0 there will be something better. // Until then, this is the only way I know for using Observable with Spring. - - return deferredAdapter(rxJavaBusinessLogic.findById(id) /** .toList() **/); + Single car = rxJavaBusinessLogic.findById(id).toSingle(); + + return ResponseEntity.status(HttpStatus.OK).body(car); } diff --git a/SpringJava/RxJava/web-services-spring-rxjava-server/src/main/java/de/spring/webservices/rest/controller/adapters/RxJavaAdapter.java b/SpringJava/RxJava/web-services-spring-rxjava-server/src/main/java/de/spring/webservices/rest/controller/adapters/RxJavaAdapter.java index 4191239..f0d4985 100644 --- a/SpringJava/RxJava/web-services-spring-rxjava-server/src/main/java/de/spring/webservices/rest/controller/adapters/RxJavaAdapter.java +++ b/SpringJava/RxJava/web-services-spring-rxjava-server/src/main/java/de/spring/webservices/rest/controller/adapters/RxJavaAdapter.java @@ -4,9 +4,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.context.request.async.DeferredResult; -import io.reactivex.Observable; -import io.reactivex.Single; -import io.reactivex.schedulers.Schedulers; +import rx.Observable; +import rx.Single; /** * @@ -14,6 +13,8 @@ import io.reactivex.schedulers.Schedulers; * org.springframework.web.servlet.mvc.method.annotation.DeferredResultAdapter * for Observable and Single. * + * spring netflix is already doing this stuff for me. + * */ public class RxJavaAdapter { private static final Logger LOGGER = LoggerFactory.getLogger(RxJavaAdapter.class); @@ -34,7 +35,6 @@ public class RxJavaAdapter { DeferredResult deferredResult = new DeferredResult<>(/** ASYNC_TIMEOUT **/); observable - .subscribeOn(Schedulers.io()) .subscribe(deferredResult::setResult, exception -> { Throwable realException = launderException(exception); @@ -51,7 +51,6 @@ public class RxJavaAdapter { DeferredResult deferredResult = new DeferredResult<>(/** ASYNC_TIMEOUT **/); single - .subscribeOn(Schedulers.io()) .subscribe(deferredResult::setResult, exception -> { Throwable realException = launderException(exception); diff --git a/SpringJava/RxJava/web-services-spring-rxjava-server/src/main/java/org/springframework/cloud/netflix/rx/DeferredResultSubscriber.java b/SpringJava/RxJava/web-services-spring-rxjava-server/src/main/java/org/springframework/cloud/netflix/rx/DeferredResultSubscriber.java new file mode 100644 index 0000000..2763aec --- /dev/null +++ b/SpringJava/RxJava/web-services-spring-rxjava-server/src/main/java/org/springframework/cloud/netflix/rx/DeferredResultSubscriber.java @@ -0,0 +1,69 @@ +/* + * Copyright 2013-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.netflix.rx; + +import org.springframework.web.context.request.async.DeferredResult; + +import rx.Observable; +import rx.Subscriber; +import rx.Subscription; + +/** + * A subscriber that sets the single value produced by the {@link Observable} on the {@link DeferredResult}. + * + * @author Jakub Narloch + * @see DeferredResult + */ +class DeferredResultSubscriber extends Subscriber implements Runnable { + + private final DeferredResult deferredResult; + + private final Subscription subscription; + + private boolean completed; + + public DeferredResultSubscriber(Observable observable, DeferredResult deferredResult) { + + this.deferredResult = deferredResult; + this.deferredResult.onTimeout(this); + this.deferredResult.onCompletion(this); + this.subscription = observable.subscribe(this); + } + + @Override + public void onNext(T value) { + if (!completed) { + deferredResult.setResult(value); + } + } + + @Override + public void onError(Throwable e) { + deferredResult.setErrorResult(e); + } + + @Override + public void onCompleted() { + completed = true; + } + + @Override + public void run() { + this.subscription.unsubscribe(); + } +} + diff --git a/SpringJava/RxJava/web-services-spring-rxjava-server/src/main/java/org/springframework/cloud/netflix/rx/ObservableSseEmitter.java b/SpringJava/RxJava/web-services-spring-rxjava-server/src/main/java/org/springframework/cloud/netflix/rx/ObservableSseEmitter.java new file mode 100644 index 0000000..6070fb2 --- /dev/null +++ b/SpringJava/RxJava/web-services-spring-rxjava-server/src/main/java/org/springframework/cloud/netflix/rx/ObservableSseEmitter.java @@ -0,0 +1,46 @@ +/* + * Copyright 2013-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.netflix.rx; + +import org.springframework.http.MediaType; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +import rx.Observable; + +/** + * A specialized {@link SseEmitter} that handles {@link Observable} return types. + * + * @author Jakub Narloch + * @see SseEmitter + */ +class ObservableSseEmitter extends SseEmitter { + + public ObservableSseEmitter(Observable observable) { + this(null, observable); + } + + public ObservableSseEmitter(MediaType mediaType, Observable observable) { + this(null, mediaType, observable); + } + + public ObservableSseEmitter(Long timeout, MediaType mediaType, + Observable observable) { + super(timeout); + new ResponseBodyEmitterSubscriber<>(mediaType, observable, this); + } +} + diff --git a/SpringJava/RxJava/web-services-spring-rxjava-server/src/main/java/org/springframework/cloud/netflix/rx/ResponseBodyEmitterSubscriber.java b/SpringJava/RxJava/web-services-spring-rxjava-server/src/main/java/org/springframework/cloud/netflix/rx/ResponseBodyEmitterSubscriber.java new file mode 100644 index 0000000..2902b4c --- /dev/null +++ b/SpringJava/RxJava/web-services-spring-rxjava-server/src/main/java/org/springframework/cloud/netflix/rx/ResponseBodyEmitterSubscriber.java @@ -0,0 +1,91 @@ +/* + * Copyright 2013-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.netflix.rx; + +import java.io.IOException; + +import org.springframework.http.MediaType; +import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter; + +import rx.Observable; +import rx.Subscriber; +import rx.Subscription; + + +/** + * Subscriber that emits any value produced by the {@link Observable} into the delegated {@link ResponseBodyEmitter}. + * + * @author Jakub Narloch + */ +class ResponseBodyEmitterSubscriber extends Subscriber implements Runnable { + + private final MediaType mediaType; + + private final Subscription subscription; + + private final ResponseBodyEmitter responseBodyEmitter; + + private boolean completed; + + /** + * Creates new instance of {@link ResponseBodyEmitterSubscriber} with response media type, observable and response + * emitter. + * + * @param mediaType the marshaled object media type + * @param observable the observable + * @param responseBodyEmitter the response emitter + */ + public ResponseBodyEmitterSubscriber(MediaType mediaType, Observable observable, ResponseBodyEmitter responseBodyEmitter) { + + this.mediaType = mediaType; + this.responseBodyEmitter = responseBodyEmitter; + this.responseBodyEmitter.onTimeout(this); + this.responseBodyEmitter.onCompletion(this); + this.subscription = observable.subscribe(this); + } + + @Override + public void onNext(T value) { + + try { + if(!completed) { + responseBodyEmitter.send(value, mediaType); + } + } catch (IOException e) { + throw new RuntimeException(e.getMessage(), e); + } + } + + @Override + public void onError(Throwable e) { + responseBodyEmitter.completeWithError(e); + } + + @Override + public void onCompleted() { + if(!completed) { + completed = true; + responseBodyEmitter.complete(); + } + } + + @Override + public void run() { + subscription.unsubscribe(); + } +} + diff --git a/SpringJava/RxJava/web-services-spring-rxjava-server/src/main/java/org/springframework/cloud/netflix/rx/RxResponse.java b/SpringJava/RxJava/web-services-spring-rxjava-server/src/main/java/org/springframework/cloud/netflix/rx/RxResponse.java new file mode 100644 index 0000000..48a966a --- /dev/null +++ b/SpringJava/RxJava/web-services-spring-rxjava-server/src/main/java/org/springframework/cloud/netflix/rx/RxResponse.java @@ -0,0 +1,58 @@ +package org.springframework.cloud.netflix.rx; + +import org.springframework.http.MediaType; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +import rx.Observable; + +/** + * A convenient class allowing to wrap either the {@link Observable} into a response supported by the + * Spring MVC. + * + * @author Jakub Narloch + */ +public final class RxResponse { + + private RxResponse() { + + } + + /** + * Wraps the {@link Observable} into a {@link SseEmitter}. Every value produced by the observable will be emitted + * as server side event. + * + * @param observable the observable instance + * @param the result type + * @return the sse emitter + */ + public static SseEmitter sse(Observable observable) { + return new ObservableSseEmitter<>(observable); + } + + /** + * Wraps the {@link Observable} into a {@link SseEmitter}. Every value produced by the observable will be emitted + * as server side event. + * + * @param mediaType the media type of produced entry + * @param observable the observable instance + * @param the result type + * @return the sse emitter + */ + public static SseEmitter sse(MediaType mediaType, Observable observable) { + return new ObservableSseEmitter<>(mediaType, observable); + } + + /** + * Wraps the {@link Observable} into a {@link SseEmitter}. Every value produced by the observable will be emitted + * as server side event. + * + * @param timeout the response timeout + * @param mediaType the media type of produced entry + * @param observable the observable instance + * @param the result type + * @return the sse emitter + */ + public static SseEmitter sse(long timeout, MediaType mediaType, Observable observable) { + return new ObservableSseEmitter<>(timeout, mediaType, observable); + } +} diff --git a/SpringJava/RxJava/web-services-spring-rxjava-server/src/main/java/org/springframework/cloud/netflix/rx/SingleDeferredResult.java b/SpringJava/RxJava/web-services-spring-rxjava-server/src/main/java/org/springframework/cloud/netflix/rx/SingleDeferredResult.java new file mode 100644 index 0000000..2c6682d --- /dev/null +++ b/SpringJava/RxJava/web-services-spring-rxjava-server/src/main/java/org/springframework/cloud/netflix/rx/SingleDeferredResult.java @@ -0,0 +1,48 @@ +/* + * Copyright 2013-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.netflix.rx; + +import org.springframework.util.Assert; +import org.springframework.web.context.request.async.DeferredResult; + +import rx.Single; + +/** + * A specialized {@link DeferredResult} that handles {@link Single} return type. + * + * @author Jakub Narloch + * @see DeferredResult + */ +class SingleDeferredResult extends DeferredResult { + + private static final Object EMPTY_RESULT = new Object(); + + public SingleDeferredResult(Single single) { + this(null, EMPTY_RESULT, single); + } + + public SingleDeferredResult(long timeout, Single single) { + this(timeout, EMPTY_RESULT, single); + } + + public SingleDeferredResult(Long timeout, Object timeoutResult, Single single) { + super(timeout, timeoutResult); + Assert.notNull(single, "single can not be null"); + new DeferredResultSubscriber<>(single.toObservable(), this); + } +} + diff --git a/SpringJava/RxJava/web-services-spring-rxjava-server/src/main/java/org/springframework/cloud/netflix/rx/SingleReturnValueHandler.java b/SpringJava/RxJava/web-services-spring-rxjava-server/src/main/java/org/springframework/cloud/netflix/rx/SingleReturnValueHandler.java new file mode 100644 index 0000000..1db91b7 --- /dev/null +++ b/SpringJava/RxJava/web-services-spring-rxjava-server/src/main/java/org/springframework/cloud/netflix/rx/SingleReturnValueHandler.java @@ -0,0 +1,125 @@ +/* + * Copyright 2013-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.netflix.rx; + +import org.springframework.core.MethodParameter; +import org.springframework.core.ResolvableType; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.context.request.NativeWebRequest; +import org.springframework.web.context.request.async.DeferredResult; +import org.springframework.web.context.request.async.WebAsyncUtils; +import org.springframework.web.method.support.AsyncHandlerMethodReturnValueHandler; +import org.springframework.web.method.support.ModelAndViewContainer; + +import rx.Single; +import rx.functions.Func1; + +/** + * A specialized {@link AsyncHandlerMethodReturnValueHandler} that handles {@link Single} + * return types. + * + * @author Spencer Gibb + * @author Jakub Narloch + */ +public class SingleReturnValueHandler implements AsyncHandlerMethodReturnValueHandler { + + @Override + public boolean isAsyncReturnValue(Object returnValue, MethodParameter returnType) { + return returnValue != null && supportsReturnType(returnType); + } + + @Override + public boolean supportsReturnType(MethodParameter returnType) { + return Single.class.isAssignableFrom(returnType.getParameterType()) + || isResponseEntity(returnType); + } + + private boolean isResponseEntity(MethodParameter returnType) { + if (ResponseEntity.class.isAssignableFrom(returnType.getParameterType())) { + Class bodyType = ResolvableType.forMethodParameter(returnType) + .getGeneric(0).resolve(); + return bodyType != null && Single.class.isAssignableFrom(bodyType); + } + return false; + } + + @Override + public void handleReturnValue(Object returnValue, MethodParameter returnType, + ModelAndViewContainer mavContainer, NativeWebRequest webRequest) + throws Exception { + + if (returnValue == null) { + mavContainer.setRequestHandled(true); + return; + } + + ResponseEntity> responseEntity = getResponseEntity(returnValue); + if (responseEntity != null) { + returnValue = responseEntity.getBody(); + if (returnValue == null) { + mavContainer.setRequestHandled(true); + return; + } + } + + final Single single = Single.class.cast(returnValue); + WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing( + convertToDeferredResult(responseEntity, single), mavContainer); + } + + @SuppressWarnings("unchecked") + private ResponseEntity> getResponseEntity(Object returnValue) { + if (ResponseEntity.class.isAssignableFrom(returnValue.getClass())) { + return (ResponseEntity>) returnValue; + + } + return null; + } + + protected DeferredResult convertToDeferredResult( + final ResponseEntity> responseEntity, Single single) { + + // TODO: use lambda when java8 :-) + Single> singleResponse = single + .map(new Func1>() { + @Override + public ResponseEntity call(Object object) { + return new ResponseEntity(object, + getHttpHeaders(responseEntity), + getHttpStatus(responseEntity)); + } + }); + + return new SingleDeferredResult<>(singleResponse); + } + + private HttpStatus getHttpStatus(ResponseEntity responseEntity) { + if (responseEntity == null) { + return HttpStatus.OK; + } + return responseEntity.getStatusCode(); + } + + private HttpHeaders getHttpHeaders(ResponseEntity responseEntity) { + if (responseEntity == null) { + return new HttpHeaders(); + } + return responseEntity.getHeaders(); + } +} diff --git a/SpringJava/RxJava/web-services-spring-rxjava-server/src/main/resources/spring-configuration/mvc/rest/rest-config.xml b/SpringJava/RxJava/web-services-spring-rxjava-server/src/main/resources/spring-configuration/mvc/rest/rest-config.xml index ce5b5a7..5bf22fe 100644 --- a/SpringJava/RxJava/web-services-spring-rxjava-server/src/main/resources/spring-configuration/mvc/rest/rest-config.xml +++ b/SpringJava/RxJava/web-services-spring-rxjava-server/src/main/resources/spring-configuration/mvc/rest/rest-config.xml @@ -16,12 +16,25 @@ See: org.springframework.web.servlet.config.AnnotationDrivenBeanDefinitionParser --> - - + + + + + + + + - +