1 package de.test.thread.executor.future;
3 import java.util.concurrent.CancellationException;
4 import java.util.concurrent.CompletableFuture;
5 import java.util.concurrent.CompletionStage;
6 import java.util.concurrent.ExecutionException;
7 import java.util.function.BiFunction;
8 import java.util.function.Consumer;
9 import java.util.function.Function;
10 import java.util.function.Supplier;
13 * AKA Promise in Java :)
16 public class CompletableFutureExample {
22 String someValue = doRunThrowableAsync();
25 // ForkJoinPool.commonPool-worker-1
27 // ForkJoinPool.commonPool-worker-1
28 // Value from first promise: Hello World
29 // ForkJoinPool.commonPool-worker-2
31 // ForkJoinPool.commonPool-worker-2
32 // #1 First handleAsync value: Enchaining promises
33 // ForkJoinPool.commonPool-worker-1
34 // #2 First thenAcceptAsync value: First handleAsync value: Enchaining promises
35 // ForkJoinPool.commonPool-worker-1
36 // #3 Second handleAsync
37 // ForkJoinPool.commonPool-worker-1
38 // #4 Second thenAcceptAsync value: Second handleAsync
39 // ForkJoinPool.commonPool-worker-1
40 // #5 Third handleAsync
44 // The same but running in the main thread.
45 //String someValue = doRunThrowable();
48 // ForkJoinPool.commonPool-worker-1
51 // Value from first promise: Hello World
52 // ForkJoinPool.commonPool-worker-1
55 // #1 First handle value: Enchaining promises
57 // #2 First thenAccept value: First handle value: Enchaining promises
61 // #4 Second thenAccept value: Second handle
66 System.out.println(someValue);
67 } catch (final InterruptedException e) {
68 Thread.currentThread().interrupt();
71 } catch (final ExecutionException e) {
72 final Throwable cause = e.getCause();
74 throw this.launderThrowable(cause);
78 private String doRunThrowableAsync() throws InterruptedException, ExecutionException, CancellationException {
80 // All async methods without an explicit Executor argument are performed using the ForkJoinPool.commonPool()
81 // (unless it does not support a parallelism level of at least two, in which case, a new Thread is created to run each task).
84 // ForkJoinPool.makeCommonPool() tiene la lógica para calcular el parallelism. El parallelism indica el número de hilos
85 // en el pool del ForkJoinPool común (compartido en una misma JVM)
86 // Viene dado o por la propiedad del sistema java.util.concurrent.ForkJoinPool.common.parallelism
87 // o por el número de CPUs que tenemos Runtime.getRuntime().availableProcessors()
89 // El problema puede venir si no sabemos quién está usando el pool común :( Si alguien lo usa mucho mis promesas
90 // puede que no se ejecuten nunca o tarden más en ejecutarse :(
91 // Supongo que en ese caso podría usar Executors pasados como parámetro a las promesas :/
94 // No using lambdas because I am learning this stuff.
96 return CompletableFuture.supplyAsync(new Supplier<String>() {
101 System.out.println(Thread.currentThread().getName());
102 System.out.println("First promise");
104 // If exception, we get into the rejected stage.
105 // When exception the thenAcceptAsync does not run because it runs when no rejected stage.
106 // throw new UnsupportedOperationException("getPage ERROR");
111 } /* IF YOU DO NOT WANT TO USE THE commonPool YOU SHOULD BE PASSING HERE YOUR OWN Executor */ )
113 // consumes and returns new promise
114 .thenComposeAsync(new Function<String, CompletionStage<String>>() {
117 public CompletionStage<String> apply(String page) {
119 // If exception, we get into the rejected stage.
120 //throw new UnsupportedOperationException();
122 System.out.println(Thread.currentThread().getName());
123 System.out.println("Value from first promise: " + page);
125 return CompletableFuture.supplyAsync(() -> { // The same as above but with lambdas
127 // If exception, we get into the rejected stage.
128 //throw new UnsupportedOperationException();
130 System.out.println(Thread.currentThread().getName());
131 System.out.println("Second promise");
133 return "Enchaining promises";
138 // runs callback whether the stage was resolved or rejected. IT ALWAYS RUNS even in rejected stage.
139 // I guess, it can be used as a FINALLY for promises.
140 .handleAsync(new BiFunction<String, Throwable, String>() {
143 public String apply(String secondPromiseValue, /* the method before returns String */
144 Throwable exception /* not null if rejected stage I guess... */ ) {
146 if (exception != null ) {
147 // rejected stage I guess...
149 System.out.println("#1 First handleAsync exception: ");
150 exception.printStackTrace();
152 // resolved stage I guess...
154 System.out.println(Thread.currentThread().getName());
155 System.out.println("#1 First handleAsync value: " + secondPromiseValue);
158 // handleAsync runs always and enables us to return something
159 return "First handleAsync value: " + secondPromiseValue;
161 // If exception, we get into the rejected stage.
162 // When exception the below thenAcceptAsync does not run because it runs when no rejected stage.
163 // throw new UnsupportedOperationException("First handleAsync ERROR");
167 // consumes and returns nothing. IT RUNS IF NO REJECTED STAGE (if there was no errors)
168 .thenAcceptAsync(new Consumer<String>() {
171 public void accept(String firstHandleAsyncValue) {
173 System.out.println(Thread.currentThread().getName());
174 System.out.println("#2 First thenAcceptAsync value: " + firstHandleAsyncValue);
176 // If exception, we get into the rejected stage.
177 // throw new UnsupportedOperationException("First thenAcceptAsync ERROR");
181 // runs callback whether the stage was resolved or rejected. IT ALWAYS RUNS even in rejected stage.
182 // I guess, it can be used as a FINALLY for promises.
183 .handleAsync(new BiFunction<Void, Throwable, String>() {
186 public String apply(Void nothing, /* the thenAcceptAsync before returns nothing */
187 Throwable exception /* not null if rejected stage I guess... */ ) {
189 if (exception != null ) {
190 // rejected stage I guess...
192 System.out.println("#3 Second handleAsync exception: ");
193 exception.printStackTrace();
195 // resolved stage I guess...
197 System.out.println(Thread.currentThread().getName());
198 System.out.println("#3 Second handleAsync");
201 return "Second handleAsync";
205 // Because the above handleAsync does not throw exception this method will always run even if
206 // the first handleAsync threw exception. The second handleAsync (above) cancels the rejected stage.
208 // consumes and returns nothing. IT RUNS IF NO REJECTED STAGE (if there was no errors)
209 .thenAcceptAsync(new Consumer<String>() {
212 public void accept(String secondHandleAsyncValue) {
214 System.out.println(Thread.currentThread().getName());
215 System.out.println("#4 Second thenAcceptAsync value: " + secondHandleAsyncValue);
217 // If exception, we get into the rejected stage.
218 // throw new UnsupportedOperationException("Second thenAcceptAsync ERROR");
223 // runs callback whether the stage was resolved or rejected. Using lambdas. IT ALWAYS RUNS even in rejected stage.
224 // I guess, it can be used as a FINALLY for promises.
225 .handleAsync((nothing, exception) -> {
227 if (exception != null ) {
228 // rejected stage I guess...
230 System.out.println("#5 Third handleAsync exception: ");
231 exception.printStackTrace();
233 // resolved stage I guess...
235 System.out.println(Thread.currentThread().getName());
236 System.out.println("#5 Third handleAsync");
239 return "Third handleAsync";
242 // Synchronous point. So doRunThrowableAsync should be called doRunThrowable but who cares... :D
245 // No need of this because I am using get()
248 // Thread.sleep(60000);
249 // } catch (final InterruptedException e) {
250 // Thread.currentThread().interrupt();
251 // e.printStackTrace();
257 private String doRunThrowable() throws InterruptedException, ExecutionException, CancellationException {
259 return CompletableFuture.supplyAsync(new Supplier<String>() {
262 public String get() {
264 System.out.println(Thread.currentThread().getName());
265 System.out.println("First promise");
267 // If exception, we get into the rejected stage.
268 // When exception the thenAccept does not run because it runs when no rejected stage.
269 // throw new UnsupportedOperationException("getPage ERROR");
274 } /* If you do not want to use the commonPool you should be passing here your own Executor */ )
276 // consumes and returns new promise
277 .thenCompose(new Function<String, CompletionStage<String>>() {
280 public CompletionStage<String> apply(String page) {
282 // If exception, we get into the rejected stage.
283 //throw new UnsupportedOperationException();
285 System.out.println(Thread.currentThread().getName());
286 System.out.println("Value from first promise: " + page);
288 return CompletableFuture.supplyAsync(() -> { // The same as above but with lambdas
290 System.out.println(Thread.currentThread().getName());
291 System.out.println("Second promise");
293 return "Enchaining promises";
298 // runs callback whether the stage was resolved or rejected. IT ALWAYS RUNS even in rejected stage.
299 // I guess, it can be used as a FINALLY for promises.
300 .handle(new BiFunction<String, Throwable, String>() {
303 public String apply(String secondPromiseValue, /* the method before returns String */
304 Throwable exception /* not null if rejected stage I guess... */ ) {
306 if (exception != null ) {
307 // rejected stage I guess...
309 System.out.println("#1 First handle exception: ");
310 exception.printStackTrace();
312 // resolved stage I guess...
314 System.out.println(Thread.currentThread().getName());
315 System.out.println("#1 First handle value: " + secondPromiseValue);
318 // handleAsync runs always and enables us to return something
319 return "First handle value: " + secondPromiseValue;
321 // If exception, we get into the rejected stage.
322 // When exception the below thenAccept does not run because it runs when no rejected stage.
323 // throw new UnsupportedOperationException("First handle ERROR");
327 // consumes and returns nothing. IT RUNS IF NO REJECTED STAGE (if there was no errors)
328 .thenAccept(new Consumer<String>() {
331 public void accept(String firstHandleValue) {
333 System.out.println(Thread.currentThread().getName());
334 System.out.println("#2 First thenAccept value: " + firstHandleValue);
336 // If exception, we get into the rejected stage.
337 // throw new UnsupportedOperationException("First thenAccept ERROR");
341 // runs callback whether the stage was resolved or rejected. IT ALWAYS RUNS even in rejected stage.
342 // I guess, it can be used as a FINALLY for promises.
343 .handle(new BiFunction<Void, Throwable, String>() {
346 public String apply(Void nothing, /* the thenAcceptAsync before returns nothing */
347 Throwable exception /* not null if rejected stage I guess... */ ) {
349 if (exception != null ) {
350 // rejected stage I guess...
352 System.out.println("#3 Second handle exception: ");
353 exception.printStackTrace();
355 // resolved stage I guess...
357 System.out.println(Thread.currentThread().getName());
358 System.out.println("#3 Second handle");
361 return "Second handle";
365 // Because the above handle does not throw exception this method will always run even if
366 // the first handle threw exception. The second handle (above) cancels the rejected stage.
368 // consumes and returns nothing. IT RUNS IF NO REJECTED STAGE (if there was no errors)
369 .thenAccept(new Consumer<String>() {
372 public void accept(String secondHandleValue) {
374 System.out.println(Thread.currentThread().getName());
375 System.out.println("#4 Second thenAccept value: " + secondHandleValue);
377 // If exception, we get into the rejected stage.
378 // throw new UnsupportedOperationException("Second thenAccept ERROR");
383 // runs callback whether the stage was resolved or rejected. Using lambdas. IT ALWAYS RUNS even in rejected stage.
384 // I guess, it can be used as a FINALLY for promises.
385 .handle((nothing, exception) -> {
387 if (exception != null ) {
388 // rejected stage I guess...
390 System.out.println("#5 Third handle exception: ");
391 exception.printStackTrace();
393 // resolved stage I guess...
395 System.out.println(Thread.currentThread().getName());
396 System.out.println("#5 Third handle");
399 return "Third handle";
405 // No need of this because I am using ̶g̶̶e̶t̶(̶) synchronous methods. I am using the main thread :)
408 // Thread.sleep(60000);
409 // } catch (final InterruptedException e) {
410 // Thread.currentThread().interrupt();
411 // e.printStackTrace();
415 private String getPage() {
417 return "Hello World";
420 private RuntimeException launderThrowable(final Throwable exception) {
421 exception.printStackTrace();
422 if (exception instanceof RuntimeException)
423 return (RuntimeException) exception;
424 else if (exception instanceof Error)
425 throw (Error) exception;
427 throw new IllegalStateException("Not unchecked", exception);