1 package de.spring.webservices.rest.controller.adapters;
3 import org.slf4j.Logger;
4 import org.slf4j.LoggerFactory;
5 import org.springframework.web.context.request.async.DeferredResult;
7 import io.reactivex.Observable;
8 import io.reactivex.Single;
9 import io.reactivex.schedulers.Schedulers;
13 * Instead of using this class, you could create you own implementation of
14 * org.springframework.web.servlet.mvc.method.annotation.DeferredResultAdapter
15 * for Observable and Single.
18 public class RxJavaAdapter {
19 private static final Logger LOGGER = LoggerFactory.getLogger(RxJavaAdapter.class);
21 // With no value, we depend on the Tomcat/Jboss/Jetty/etc timeout value for asynchronous requests.
22 // Spring will answer after 60 secs with an empty response (by default) and HTTP 503 status (by default) when timeout.
23 private static final long ASYNC_TIMEOUT = 60000; /* milliseconds */
26 public static final <T> DeferredResult<T> deferredAdapter(Observable<T> observable) {
28 DeferredResult<T> deferredResult = new DeferredResult<>(ASYNC_TIMEOUT);
31 .subscribeOn(Schedulers.io())
32 .subscribe(deferredResult::setResult, exception -> {
33 Throwable realException = launderException(exception);
35 LOGGER.error("error: ", realException);
37 deferredResult.setErrorResult(realException);
40 return deferredResult;
43 public static final <T> DeferredResult<T> deferredAdapter(Single<T> single) {
45 DeferredResult<T> deferredResult = new DeferredResult<>(ASYNC_TIMEOUT);
48 .subscribeOn(Schedulers.io())
49 .subscribe(deferredResult::setResult, exception -> {
50 Throwable realException = launderException(exception);
52 LOGGER.error("error: ", realException);
54 deferredResult.setErrorResult(realException);
57 return deferredResult;
60 private static final Throwable launderException(Throwable exception) {
61 return exception.getCause() != null
62 ? exception.getCause()