ee7d4776cf334d7705a092217ffd46cd5eaea516
[JavaForFun] /
1 package de.test.thread.executor.future;
2
3 import java.util.concurrent.CancellationException;
4 import java.util.concurrent.CompletableFuture;
5 import java.util.concurrent.CompletionStage;
6 import java.util.concurrent.ExecutionException;
7 import java.util.function.BiFunction;
8 import java.util.function.Consumer;
9 import java.util.function.Function;
10 import java.util.function.Supplier;
11
12 /**
13  * AKA Promise in Java :)
14  *
15  */
16 public class CompletableFutureExample {
17         
18
19         public void doRun() {
20                                 
21         try {
22                         String someValue = doRunThrowableAsync();
23
24 //          STDOUT:                     
25 //                      ForkJoinPool.commonPool-worker-1
26 //                      First promise
27 //                      ForkJoinPool.commonPool-worker-1
28 //                      Value from first promise: Hello World
29 //                      ForkJoinPool.commonPool-worker-2
30 //                      Second promise
31 //                      ForkJoinPool.commonPool-worker-2
32 //                      #1 First handleAsync value: Enchaining promises
33 //                      ForkJoinPool.commonPool-worker-1
34 //                      #2 First thenAcceptAsync value: First handleAsync value: Enchaining promises
35 //                      ForkJoinPool.commonPool-worker-1
36 //                      #3 Second handleAsync
37 //                      ForkJoinPool.commonPool-worker-1
38 //                      #4 Second thenAcceptAsync value: Second handleAsync
39 //                      ForkJoinPool.commonPool-worker-1
40 //                      #5 Third handleAsync
41 //                      Third handleAsync
42                         
43                         
44                         // The same but running in the main thread.
45                         //String someValue = doRunThrowable();
46
47 //          STDOUT:                     
48 //                      ForkJoinPool.commonPool-worker-1
49 //                      First promise
50 //                      main
51 //                      Value from first promise: Hello World
52 //                      ForkJoinPool.commonPool-worker-1
53 //                      Second promise
54 //                      main
55 //                      #1 First handle value: Enchaining promises
56 //                      main
57 //                      #2 First thenAccept value: First handle value: Enchaining promises
58 //                      main
59 //                      #3 Second handle
60 //                      main
61 //                      #4 Second thenAccept value: Second handle
62 //                      main
63 //                      #5 Third handle
64 //                      Third handle
65                         
66                         System.out.println(someValue);
67         } catch (final InterruptedException e) {
68                         Thread.currentThread().interrupt();
69                         
70             e.printStackTrace();
71         } catch (final ExecutionException e) {
72             final Throwable cause = e.getCause();
73             
74             throw this.launderThrowable(cause);
75         }
76         }
77         
78         private String doRunThrowableAsync() throws InterruptedException, ExecutionException, CancellationException {
79                 
80                 // All async methods without an explicit Executor argument are performed using the ForkJoinPool.commonPool()
81                 // (unless it does not support a parallelism level of at least two, in which case, a new Thread is created to run each task).
82                 
83                 
84                 // ForkJoinPool.makeCommonPool() tiene la lógica para calcular el parallelism. El parallelism indica el número de hilos
85                 // en el pool del ForkJoinPool común (compartido en una misma JVM)
86                 // Viene dado o por la propiedad del sistema java.util.concurrent.ForkJoinPool.common.parallelism
87                 // o por el número de CPUs que tenemos Runtime.getRuntime().availableProcessors()
88                 
89                 // El problema puede venir si no sabemos quién está usando el pool común :( Si alguien lo usa mucho mis promesas
90                 // puede que no se ejecuten nunca o tarden más en ejecutarse :(
91                 // Supongo que en ese caso podría usar Executors pasados como parámetro a las promesas :/
92                 
93                 
94                 // No using lambdas because I am learning this stuff.
95                 
96                 return CompletableFuture.supplyAsync(new Supplier<String>() {
97                         
98                         @Override
99                         public String get() {
100                                 
101                                 System.out.println(Thread.currentThread().getName());
102                                 System.out.println("First promise");
103                                 
104                                 // If exception, we get into the rejected stage.
105                                 // When exception the thenAcceptAsync does not run because it runs when no rejected stage.
106                                 // throw new UnsupportedOperationException("getPage ERROR"); 
107                                 
108                                 return getPage();
109                         }
110                         
111                 } /* IF YOU DO NOT WANT TO USE THE commonPool YOU SHOULD BE PASSING HERE YOUR OWN Executor */ )
112                 
113                  // consumes and returns new promise
114                 .thenComposeAsync(new Function<String, CompletionStage<String>>() {
115
116                         @Override
117                         public CompletionStage<String> apply(String page) {
118                                 
119                                 // If exception, we get into the rejected stage.
120                                 //throw new UnsupportedOperationException();
121                                 
122                                 System.out.println(Thread.currentThread().getName());
123                                 System.out.println("Value from first promise: " + page);
124                                 
125                                 return CompletableFuture.supplyAsync(() -> {                 // The same as above but with lambdas
126                                         
127                                         // If exception, we get into the rejected stage.
128                                         //throw new UnsupportedOperationException();
129
130                                         System.out.println(Thread.currentThread().getName());
131                                         System.out.println("Second promise");
132                                         
133                                         return "Enchaining promises";
134                                 });     
135                         }
136                         
137                 }) 
138                  // runs callback whether the stage was resolved or rejected. IT ALWAYS RUNS even in rejected stage.
139                  // I guess, it can be used as a FINALLY for promises.
140                 .handleAsync(new BiFunction<String, Throwable, String>() {
141
142                         @Override
143                         public String apply(String secondPromiseValue, /* the method before returns String */
144                                                                 Throwable exception        /* not null if rejected stage I guess... */ ) {
145                                 
146                                 if (exception != null ) {
147                                         // rejected stage I guess...
148                                         
149                                         System.out.println("#1 First handleAsync exception: ");
150                                         exception.printStackTrace();
151                                 } else {
152                                         // resolved stage I guess...
153                                         
154                                         System.out.println(Thread.currentThread().getName());
155                                         System.out.println("#1 First handleAsync value: " + secondPromiseValue);
156                                 }
157                                 
158                                 // handleAsync runs always and enables us to return something
159                                 return "First handleAsync value: " + secondPromiseValue;
160                                 
161                                 // If exception, we get into the rejected stage.
162                                 // When exception the below thenAcceptAsync does not run because it runs when no rejected stage.
163                                 // throw new UnsupportedOperationException("First handleAsync ERROR"); 
164                         }
165                         
166                 })
167                  // consumes and returns nothing. IT RUNS IF NO REJECTED STAGE (if there was no errors)
168                 .thenAcceptAsync(new Consumer<String>() {
169                         
170                         @Override
171                         public void accept(String firstHandleAsyncValue) {
172                                 
173                                 System.out.println(Thread.currentThread().getName());
174                                 System.out.println("#2 First thenAcceptAsync value: " + firstHandleAsyncValue);
175                                 
176                                 // If exception, we get into the rejected stage.
177                                 // throw new UnsupportedOperationException("First thenAcceptAsync ERROR");
178                         }
179                         
180                 })
181                  // runs callback whether the stage was resolved or rejected. IT ALWAYS RUNS even in rejected stage.
182                  // I guess, it can be used as a FINALLY for promises.
183                 .handleAsync(new BiFunction<Void, Throwable, String>() {
184
185                         @Override
186                         public String apply(Void nothing,         /* the thenAcceptAsync before returns nothing */
187                                                     Throwable exception   /* not null if rejected stage I guess... */ ) {
188                                 
189                                 if (exception != null ) {
190                                         // rejected stage I guess...
191                                         
192                                         System.out.println("#3 Second handleAsync exception: ");
193                                         exception.printStackTrace();
194                                 } else {
195                                         // resolved stage I guess...
196                                         
197                                         System.out.println(Thread.currentThread().getName());
198                                         System.out.println("#3 Second handleAsync");
199                                 }
200                                                         
201                                 return "Second handleAsync";
202                         }
203                 })
204                 
205                  // Because the above handleAsync does not throw exception this method will always run even if
206                  // the first handleAsync threw exception. The second handleAsync (above) cancels the rejected stage.
207                 
208                  // consumes and returns nothing. IT RUNS IF NO REJECTED STAGE (if there was no errors)
209                 .thenAcceptAsync(new Consumer<String>() {
210                         
211                         @Override
212                         public void accept(String secondHandleAsyncValue) {
213                                 
214                                 System.out.println(Thread.currentThread().getName());
215                                 System.out.println("#4 Second thenAcceptAsync value: " + secondHandleAsyncValue);
216                                 
217                                 // If exception, we get into the rejected stage.
218                                 // throw new UnsupportedOperationException("Second thenAcceptAsync ERROR");
219                         }
220                         
221                 })
222                 
223                  // runs callback whether the stage was resolved or rejected. Using lambdas. IT ALWAYS RUNS even in rejected stage.
224                  // I guess, it can be used as a FINALLY for promises.
225                 .handleAsync((nothing, exception) -> {
226                         
227                         if (exception != null ) {
228                                 // rejected stage I guess...
229                                 
230                                 System.out.println("#5 Third handleAsync exception: ");
231                                 exception.printStackTrace();
232                         } else {
233                                 // resolved stage I guess...
234                                 
235                                 System.out.println(Thread.currentThread().getName());
236                                 System.out.println("#5 Third handleAsync");
237                         }
238                                                 
239                         return "Third handleAsync";
240                 })
241                 
242                 // Synchronous point. So doRunThrowableAsync should be called doRunThrowable but who cares... :D
243                 .get();
244                 
245 //        No need of this because I am using get()
246 //              
247 //        try {
248 //            Thread.sleep(60000);
249 //        } catch (final InterruptedException e) {
250 //                      Thread.currentThread().interrupt();
251 //            e.printStackTrace();
252 //        }
253         
254         }
255
256         
257         private String doRunThrowable() throws InterruptedException, ExecutionException, CancellationException {
258                 
259                 return CompletableFuture.supplyAsync(new Supplier<String>() {
260                         
261                         @Override
262                         public String get() {
263                                 
264                                 System.out.println(Thread.currentThread().getName());
265                                 System.out.println("First promise");
266                                 
267                                 // If exception, we get into the rejected stage.
268                                 // When exception the thenAccept does not run because it runs when no rejected stage.
269                                 // throw new UnsupportedOperationException("getPage ERROR"); 
270                                 
271                                 return getPage();
272                         }
273                         
274                 } /* If you do not want to use the commonPool you should be passing here your own Executor */ )
275                 
276                  // consumes and returns new promise
277                 .thenCompose(new Function<String, CompletionStage<String>>() {
278
279                         @Override
280                         public CompletionStage<String> apply(String page) {
281                                 
282                                 // If exception, we get into the rejected stage.
283                                 //throw new UnsupportedOperationException();
284
285                                 System.out.println(Thread.currentThread().getName());
286                                 System.out.println("Value from first promise: " + page);
287                                 
288                                 return CompletableFuture.supplyAsync(() -> {                 // The same as above but with lambdas
289                                         
290                                         System.out.println(Thread.currentThread().getName());
291                                         System.out.println("Second promise");
292                                         
293                                         return "Enchaining promises";
294                                 });     
295                         }
296                         
297                 }) 
298                  // runs callback whether the stage was resolved or rejected. IT ALWAYS RUNS even in rejected stage.
299                  // I guess, it can be used as a FINALLY for promises.
300                 .handle(new BiFunction<String, Throwable, String>() {
301
302                         @Override
303                         public String apply(String secondPromiseValue, /* the method before returns String */
304                                                                 Throwable exception        /* not null if rejected stage I guess... */ ) {
305                                 
306                                 if (exception != null ) {
307                                         // rejected stage I guess...
308                                         
309                                         System.out.println("#1 First handle exception: ");
310                                         exception.printStackTrace();
311                                 } else {
312                                         // resolved stage I guess...
313                                         
314                                         System.out.println(Thread.currentThread().getName());
315                                         System.out.println("#1 First handle value: " + secondPromiseValue);
316                                 }
317                                 
318                                 // handleAsync runs always and enables us to return something
319                                 return "First handle value: " + secondPromiseValue;
320                                 
321                                 // If exception, we get into the rejected stage.
322                                 // When exception the below thenAccept does not run because it runs when no rejected stage.
323                                 // throw new UnsupportedOperationException("First handle ERROR"); 
324                         }
325                         
326                 })
327                  // consumes and returns nothing. IT RUNS IF NO REJECTED STAGE (if there was no errors)
328                 .thenAccept(new Consumer<String>() {
329                         
330                         @Override
331                         public void accept(String firstHandleValue) {
332                                 
333                                 System.out.println(Thread.currentThread().getName());
334                                 System.out.println("#2 First thenAccept value: " + firstHandleValue);
335                                 
336                                 // If exception, we get into the rejected stage.
337                                 // throw new UnsupportedOperationException("First thenAccept ERROR");
338                         }
339                         
340                 })
341                  // runs callback whether the stage was resolved or rejected. IT ALWAYS RUNS even in rejected stage.
342                  // I guess, it can be used as a FINALLY for promises.
343                 .handle(new BiFunction<Void, Throwable, String>() {
344
345                         @Override
346                         public String apply(Void nothing,         /* the thenAcceptAsync before returns nothing */
347                                                     Throwable exception   /* not null if rejected stage I guess... */ ) {
348                                 
349                                 if (exception != null ) {
350                                         // rejected stage I guess...
351                                         
352                                         System.out.println("#3 Second handle exception: ");
353                                         exception.printStackTrace();
354                                 } else {
355                                         // resolved stage I guess...
356                                         
357                                         System.out.println(Thread.currentThread().getName());
358                                         System.out.println("#3 Second handle");
359                                 }
360                                                         
361                                 return "Second handle";
362                         }
363                 })
364                 
365                  // Because the above handle does not throw exception this method will always run even if
366                  // the first handle threw exception. The second handle (above) cancels the rejected stage.
367                 
368                  // consumes and returns nothing. IT RUNS IF NO REJECTED STAGE (if there was no errors)
369                 .thenAccept(new Consumer<String>() {
370                         
371                         @Override
372                         public void accept(String secondHandleValue) {
373                                 
374                                 System.out.println(Thread.currentThread().getName());
375                                 System.out.println("#4 Second thenAccept value: " + secondHandleValue);
376                                 
377                                 // If exception, we get into the rejected stage.
378                                 // throw new UnsupportedOperationException("Second thenAccept ERROR");
379                         }
380                         
381                 })
382                 
383                  // runs callback whether the stage was resolved or rejected. Using lambdas. IT ALWAYS RUNS even in rejected stage.
384                  // I guess, it can be used as a FINALLY for promises.
385                 .handle((nothing, exception) -> {
386                         
387                         if (exception != null ) {
388                                 // rejected stage I guess...
389                                 
390                                 System.out.println("#5 Third handle exception: ");
391                                 exception.printStackTrace();
392                         } else {
393                                 // resolved stage I guess...
394                                 
395                                 System.out.println(Thread.currentThread().getName());
396                                 System.out.println("#5 Third handle");
397                         }
398                                                 
399                         return "Third handle";
400                 })
401                 
402                 .get();
403                 
404                 
405 //      No need of this because I am using ̶g̶̶e̶t̶(̶) synchronous methods. I am using the main thread :)
406 //              
407 //      try {
408 //          Thread.sleep(60000);
409 //      } catch (final InterruptedException e) {
410 //                      Thread.currentThread().interrupt();
411 //          e.printStackTrace();
412 //      }
413         }
414         
415         private String getPage() {
416                 
417                 return "Hello World";
418         }
419         
420     private RuntimeException launderThrowable(final Throwable exception) {
421         exception.printStackTrace();
422         if (exception instanceof RuntimeException)
423             return (RuntimeException) exception;
424         else if (exception instanceof Error)
425             throw (Error) exception;
426         else
427             throw new IllegalStateException("Not unchecked", exception);
428     }
429 }