Having fun with CompletableFuture AKA Promise in Java
authorGustavo Martin Morcuende <gu.martinm@gmail.com>
Thu, 8 Dec 2016 00:25:42 +0000 (01:25 +0100)
committerGustavo Martin Morcuende <gu.martinm@gmail.com>
Thu, 8 Dec 2016 00:30:47 +0000 (01:30 +0100)
Allgemeines/Threads/Executor/ExecutorwithFuture/src/de/test/thread/executor/future/CompletableFutureExample.java [new file with mode: 0644]
Allgemeines/Threads/Executor/ExecutorwithFuture/src/de/test/thread/executor/future/ThreadMain.java

diff --git a/Allgemeines/Threads/Executor/ExecutorwithFuture/src/de/test/thread/executor/future/CompletableFutureExample.java b/Allgemeines/Threads/Executor/ExecutorwithFuture/src/de/test/thread/executor/future/CompletableFutureExample.java
new file mode 100644 (file)
index 0000000..ee7d477
--- /dev/null
@@ -0,0 +1,429 @@
+package de.test.thread.executor.future;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/**
+ * AKA Promise in Java :)
+ *
+ */
+public class CompletableFutureExample {
+       
+
+       public void doRun() {
+                               
+        try {
+                       String someValue = doRunThrowableAsync();
+
+//          STDOUT:                    
+//                     ForkJoinPool.commonPool-worker-1
+//                     First promise
+//                     ForkJoinPool.commonPool-worker-1
+//                     Value from first promise: Hello World
+//                     ForkJoinPool.commonPool-worker-2
+//                     Second promise
+//                     ForkJoinPool.commonPool-worker-2
+//                     #1 First handleAsync value: Enchaining promises
+//                     ForkJoinPool.commonPool-worker-1
+//                     #2 First thenAcceptAsync value: First handleAsync value: Enchaining promises
+//                     ForkJoinPool.commonPool-worker-1
+//                     #3 Second handleAsync
+//                     ForkJoinPool.commonPool-worker-1
+//                     #4 Second thenAcceptAsync value: Second handleAsync
+//                     ForkJoinPool.commonPool-worker-1
+//                     #5 Third handleAsync
+//                     Third handleAsync
+                       
+                       
+                       // The same but running in the main thread.
+                       //String someValue = doRunThrowable();
+
+//          STDOUT:                    
+//                     ForkJoinPool.commonPool-worker-1
+//                     First promise
+//                     main
+//                     Value from first promise: Hello World
+//                     ForkJoinPool.commonPool-worker-1
+//                     Second promise
+//                     main
+//                     #1 First handle value: Enchaining promises
+//                     main
+//                     #2 First thenAccept value: First handle value: Enchaining promises
+//                     main
+//                     #3 Second handle
+//                     main
+//                     #4 Second thenAccept value: Second handle
+//                     main
+//                     #5 Third handle
+//                     Third handle
+                       
+                       System.out.println(someValue);
+        } catch (final InterruptedException e) {
+                       Thread.currentThread().interrupt();
+                       
+            e.printStackTrace();
+        } catch (final ExecutionException e) {
+            final Throwable cause = e.getCause();
+            
+            throw this.launderThrowable(cause);
+        }
+       }
+       
+       private String doRunThrowableAsync() throws InterruptedException, ExecutionException, CancellationException {
+               
+               // All async methods without an explicit Executor argument are performed using the ForkJoinPool.commonPool()
+               // (unless it does not support a parallelism level of at least two, in which case, a new Thread is created to run each task).
+               
+               
+               // ForkJoinPool.makeCommonPool() tiene la lógica para calcular el parallelism. El parallelism indica el número de hilos
+               // en el pool del ForkJoinPool común (compartido en una misma JVM)
+               // Viene dado o por la propiedad del sistema java.util.concurrent.ForkJoinPool.common.parallelism
+               // o por el número de CPUs que tenemos Runtime.getRuntime().availableProcessors()
+               
+               // El problema puede venir si no sabemos quién está usando el pool común :( Si alguien lo usa mucho mis promesas
+               // puede que no se ejecuten nunca o tarden más en ejecutarse :(
+               // Supongo que en ese caso podría usar Executors pasados como parámetro a las promesas :/
+               
+               
+               // No using lambdas because I am learning this stuff.
+               
+               return CompletableFuture.supplyAsync(new Supplier<String>() {
+                       
+                       @Override
+                       public String get() {
+                               
+                               System.out.println(Thread.currentThread().getName());
+                               System.out.println("First promise");
+                               
+                               // If exception, we get into the rejected stage.
+                               // When exception the thenAcceptAsync does not run because it runs when no rejected stage.
+                               // throw new UnsupportedOperationException("getPage ERROR"); 
+                               
+                               return getPage();
+                       }
+                       
+               } /* IF YOU DO NOT WANT TO USE THE commonPool YOU SHOULD BE PASSING HERE YOUR OWN Executor */ )
+               
+                // consumes and returns new promise
+               .thenComposeAsync(new Function<String, CompletionStage<String>>() {
+
+                       @Override
+                       public CompletionStage<String> apply(String page) {
+                               
+                               // If exception, we get into the rejected stage.
+                               //throw new UnsupportedOperationException();
+                               
+                               System.out.println(Thread.currentThread().getName());
+                               System.out.println("Value from first promise: " + page);
+                               
+                               return CompletableFuture.supplyAsync(() -> {                 // The same as above but with lambdas
+                                       
+                                       // If exception, we get into the rejected stage.
+                                       //throw new UnsupportedOperationException();
+
+                                       System.out.println(Thread.currentThread().getName());
+                                       System.out.println("Second promise");
+                                       
+                                       return "Enchaining promises";
+                               });     
+                       }
+                       
+               }) 
+                // runs callback whether the stage was resolved or rejected. IT ALWAYS RUNS even in rejected stage.
+                // I guess, it can be used as a FINALLY for promises.
+               .handleAsync(new BiFunction<String, Throwable, String>() {
+
+                       @Override
+                       public String apply(String secondPromiseValue, /* the method before returns String */
+                                                               Throwable exception        /* not null if rejected stage I guess... */ ) {
+                               
+                               if (exception != null ) {
+                                       // rejected stage I guess...
+                                       
+                                       System.out.println("#1 First handleAsync exception: ");
+                                       exception.printStackTrace();
+                               } else {
+                                       // resolved stage I guess...
+                                       
+                                       System.out.println(Thread.currentThread().getName());
+                                       System.out.println("#1 First handleAsync value: " + secondPromiseValue);
+                               }
+                               
+                               // handleAsync runs always and enables us to return something
+                               return "First handleAsync value: " + secondPromiseValue;
+                               
+                               // If exception, we get into the rejected stage.
+                               // When exception the below thenAcceptAsync does not run because it runs when no rejected stage.
+                               // throw new UnsupportedOperationException("First handleAsync ERROR"); 
+                       }
+                       
+               })
+                // consumes and returns nothing. IT RUNS IF NO REJECTED STAGE (if there was no errors)
+               .thenAcceptAsync(new Consumer<String>() {
+                       
+                       @Override
+                       public void accept(String firstHandleAsyncValue) {
+                               
+                               System.out.println(Thread.currentThread().getName());
+                               System.out.println("#2 First thenAcceptAsync value: " + firstHandleAsyncValue);
+                               
+                               // If exception, we get into the rejected stage.
+                               // throw new UnsupportedOperationException("First thenAcceptAsync ERROR");
+                       }
+                       
+               })
+                // runs callback whether the stage was resolved or rejected. IT ALWAYS RUNS even in rejected stage.
+                // I guess, it can be used as a FINALLY for promises.
+               .handleAsync(new BiFunction<Void, Throwable, String>() {
+
+                       @Override
+                       public String apply(Void nothing,         /* the thenAcceptAsync before returns nothing */
+                                                   Throwable exception   /* not null if rejected stage I guess... */ ) {
+                               
+                               if (exception != null ) {
+                                       // rejected stage I guess...
+                                       
+                                       System.out.println("#3 Second handleAsync exception: ");
+                                       exception.printStackTrace();
+                               } else {
+                                       // resolved stage I guess...
+                                       
+                                       System.out.println(Thread.currentThread().getName());
+                                       System.out.println("#3 Second handleAsync");
+                               }
+                                                       
+                               return "Second handleAsync";
+                       }
+               })
+               
+                // Because the above handleAsync does not throw exception this method will always run even if
+                // the first handleAsync threw exception. The second handleAsync (above) cancels the rejected stage.
+               
+                // consumes and returns nothing. IT RUNS IF NO REJECTED STAGE (if there was no errors)
+               .thenAcceptAsync(new Consumer<String>() {
+                       
+                       @Override
+                       public void accept(String secondHandleAsyncValue) {
+                               
+                               System.out.println(Thread.currentThread().getName());
+                               System.out.println("#4 Second thenAcceptAsync value: " + secondHandleAsyncValue);
+                               
+                               // If exception, we get into the rejected stage.
+                               // throw new UnsupportedOperationException("Second thenAcceptAsync ERROR");
+                       }
+                       
+               })
+               
+                // runs callback whether the stage was resolved or rejected. Using lambdas. IT ALWAYS RUNS even in rejected stage.
+                // I guess, it can be used as a FINALLY for promises.
+               .handleAsync((nothing, exception) -> {
+                       
+                       if (exception != null ) {
+                               // rejected stage I guess...
+                               
+                               System.out.println("#5 Third handleAsync exception: ");
+                               exception.printStackTrace();
+                       } else {
+                               // resolved stage I guess...
+                               
+                               System.out.println(Thread.currentThread().getName());
+                               System.out.println("#5 Third handleAsync");
+                       }
+                                               
+                       return "Third handleAsync";
+               })
+               
+               // Synchronous point. So doRunThrowableAsync should be called doRunThrowable but who cares... :D
+               .get();
+               
+//        No need of this because I am using get()
+//             
+//        try {
+//            Thread.sleep(60000);
+//        } catch (final InterruptedException e) {
+//                     Thread.currentThread().interrupt();
+//            e.printStackTrace();
+//        }
+       
+       }
+
+       
+       private String doRunThrowable() throws InterruptedException, ExecutionException, CancellationException {
+               
+               return CompletableFuture.supplyAsync(new Supplier<String>() {
+                       
+                       @Override
+                       public String get() {
+                               
+                               System.out.println(Thread.currentThread().getName());
+                               System.out.println("First promise");
+                               
+                               // If exception, we get into the rejected stage.
+                               // When exception the thenAccept does not run because it runs when no rejected stage.
+                               // throw new UnsupportedOperationException("getPage ERROR"); 
+                               
+                               return getPage();
+                       }
+                       
+               } /* If you do not want to use the commonPool you should be passing here your own Executor */ )
+               
+                // consumes and returns new promise
+               .thenCompose(new Function<String, CompletionStage<String>>() {
+
+                       @Override
+                       public CompletionStage<String> apply(String page) {
+                               
+                               // If exception, we get into the rejected stage.
+                               //throw new UnsupportedOperationException();
+
+                               System.out.println(Thread.currentThread().getName());
+                               System.out.println("Value from first promise: " + page);
+                               
+                               return CompletableFuture.supplyAsync(() -> {                 // The same as above but with lambdas
+                                       
+                                       System.out.println(Thread.currentThread().getName());
+                                       System.out.println("Second promise");
+                                       
+                                       return "Enchaining promises";
+                               });     
+                       }
+                       
+               }) 
+                // runs callback whether the stage was resolved or rejected. IT ALWAYS RUNS even in rejected stage.
+                // I guess, it can be used as a FINALLY for promises.
+               .handle(new BiFunction<String, Throwable, String>() {
+
+                       @Override
+                       public String apply(String secondPromiseValue, /* the method before returns String */
+                                                               Throwable exception        /* not null if rejected stage I guess... */ ) {
+                               
+                               if (exception != null ) {
+                                       // rejected stage I guess...
+                                       
+                                       System.out.println("#1 First handle exception: ");
+                                       exception.printStackTrace();
+                               } else {
+                                       // resolved stage I guess...
+                                       
+                                       System.out.println(Thread.currentThread().getName());
+                                       System.out.println("#1 First handle value: " + secondPromiseValue);
+                               }
+                               
+                               // handleAsync runs always and enables us to return something
+                               return "First handle value: " + secondPromiseValue;
+                               
+                               // If exception, we get into the rejected stage.
+                               // When exception the below thenAccept does not run because it runs when no rejected stage.
+                               // throw new UnsupportedOperationException("First handle ERROR"); 
+                       }
+                       
+               })
+                // consumes and returns nothing. IT RUNS IF NO REJECTED STAGE (if there was no errors)
+               .thenAccept(new Consumer<String>() {
+                       
+                       @Override
+                       public void accept(String firstHandleValue) {
+                               
+                               System.out.println(Thread.currentThread().getName());
+                               System.out.println("#2 First thenAccept value: " + firstHandleValue);
+                               
+                               // If exception, we get into the rejected stage.
+                               // throw new UnsupportedOperationException("First thenAccept ERROR");
+                       }
+                       
+               })
+                // runs callback whether the stage was resolved or rejected. IT ALWAYS RUNS even in rejected stage.
+                // I guess, it can be used as a FINALLY for promises.
+               .handle(new BiFunction<Void, Throwable, String>() {
+
+                       @Override
+                       public String apply(Void nothing,         /* the thenAcceptAsync before returns nothing */
+                                                   Throwable exception   /* not null if rejected stage I guess... */ ) {
+                               
+                               if (exception != null ) {
+                                       // rejected stage I guess...
+                                       
+                                       System.out.println("#3 Second handle exception: ");
+                                       exception.printStackTrace();
+                               } else {
+                                       // resolved stage I guess...
+                                       
+                                       System.out.println(Thread.currentThread().getName());
+                                       System.out.println("#3 Second handle");
+                               }
+                                                       
+                               return "Second handle";
+                       }
+               })
+               
+                // Because the above handle does not throw exception this method will always run even if
+                // the first handle threw exception. The second handle (above) cancels the rejected stage.
+               
+                // consumes and returns nothing. IT RUNS IF NO REJECTED STAGE (if there was no errors)
+               .thenAccept(new Consumer<String>() {
+                       
+                       @Override
+                       public void accept(String secondHandleValue) {
+                               
+                               System.out.println(Thread.currentThread().getName());
+                               System.out.println("#4 Second thenAccept value: " + secondHandleValue);
+                               
+                               // If exception, we get into the rejected stage.
+                               // throw new UnsupportedOperationException("Second thenAccept ERROR");
+                       }
+                       
+               })
+               
+                // runs callback whether the stage was resolved or rejected. Using lambdas. IT ALWAYS RUNS even in rejected stage.
+                // I guess, it can be used as a FINALLY for promises.
+               .handle((nothing, exception) -> {
+                       
+                       if (exception != null ) {
+                               // rejected stage I guess...
+                               
+                               System.out.println("#5 Third handle exception: ");
+                               exception.printStackTrace();
+                       } else {
+                               // resolved stage I guess...
+                               
+                               System.out.println(Thread.currentThread().getName());
+                               System.out.println("#5 Third handle");
+                       }
+                                               
+                       return "Third handle";
+               })
+               
+               .get();
+               
+               
+//      No need of this because I am using ̶g̶̶e̶t̶(̶) synchronous methods. I am using the main thread :)
+//             
+//      try {
+//          Thread.sleep(60000);
+//      } catch (final InterruptedException e) {
+//                     Thread.currentThread().interrupt();
+//          e.printStackTrace();
+//      }
+       }
+       
+       private String getPage() {
+               
+               return "Hello World";
+       }
+       
+    private RuntimeException launderThrowable(final Throwable exception) {
+        exception.printStackTrace();
+        if (exception instanceof RuntimeException)
+            return (RuntimeException) exception;
+        else if (exception instanceof Error)
+            throw (Error) exception;
+        else
+            throw new IllegalStateException("Not unchecked", exception);
+    }
+}
index aab53b8..4be3a4c 100644 (file)
@@ -7,12 +7,11 @@ public class ThreadMain {
        
        public static void main(String[] args) {
 //             ThreadTest test = new ThreadTest();
-//
 //             test.start();
                
-               FutureTaskExample lol = new FutureTaskExample();
-               Car jeje = lol.test();
+               new CompletableFutureExample().doRun();
                
-               System.out.println(jeje.getId());
+               Car car = new FutureTaskExample().test();
+               System.out.println(car.getId());
        }
 }