From 77ad7ffc64f022c0b278c9e9fd8be365329a38ae Mon Sep 17 00:00:00 2001 From: Gustavo Martin Morcuende Date: Thu, 8 Dec 2016 18:54:19 +0100 Subject: [PATCH] More about working with Promises in Java 8 --- .../executor/future/CompletableFutureExample.java | 338 +++++++++++++++++---- 1 file changed, 287 insertions(+), 51 deletions(-) diff --git a/Allgemeines/Threads/Executor/ExecutorwithFuture/src/de/test/thread/executor/future/CompletableFutureExample.java b/Allgemeines/Threads/Executor/ExecutorwithFuture/src/de/test/thread/executor/future/CompletableFutureExample.java index ee7d477..6c37196 100644 --- a/Allgemeines/Threads/Executor/ExecutorwithFuture/src/de/test/thread/executor/future/CompletableFutureExample.java +++ b/Allgemeines/Threads/Executor/ExecutorwithFuture/src/de/test/thread/executor/future/CompletableFutureExample.java @@ -1,9 +1,11 @@ package de.test.thread.executor.future; +import java.util.Optional; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; +import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; @@ -19,7 +21,10 @@ public class CompletableFutureExample { public void doRun() { try { + + // Using ForkJoinPool String someValue = doRunThrowableAsync(); + System.out.println(someValue); // STDOUT: // ForkJoinPool.commonPool-worker-1 @@ -30,40 +35,60 @@ public class CompletableFutureExample { // Second promise // ForkJoinPool.commonPool-worker-2 // #1 First handleAsync value: Enchaining promises -// ForkJoinPool.commonPool-worker-1 -// #2 First thenAcceptAsync value: First handleAsync value: Enchaining promises -// ForkJoinPool.commonPool-worker-1 -// #3 Second handleAsync -// ForkJoinPool.commonPool-worker-1 -// #4 Second thenAcceptAsync value: Second handleAsync -// ForkJoinPool.commonPool-worker-1 -// #5 Third handleAsync +// ForkJoinPool.commonPool-worker-2 +// #2 First whenCompleteAsync value: First handleAsync value: Enchaining promises +// ForkJoinPool.commonPool-worker-2 +// #3 First thenAcceptAsync value: First handleAsync value: Enchaining promises +// ForkJoinPool.commonPool-worker-2 +// #4 Second handleAsync +// ForkJoinPool.commonPool-worker-2 +// #5 Second thenAcceptAsync value: Second handleAsync +// ForkJoinPool.commonPool-worker-2 +// #6 Third handleAsync // Third handleAsync - - // The same but running in the main thread. - //String someValue = doRunThrowable(); + // The same but running everything in the main thread. + System.out.println(); + System.out.println("#################################################### doRunThrowable ####################################################"); + System.out.println(); + someValue = doRunThrowable(); + System.out.println(someValue); + // STDOUT: -// ForkJoinPool.commonPool-worker-1 +// ForkJoinPool.commonPool-worker-2 // First promise // main // Value from first promise: Hello World -// ForkJoinPool.commonPool-worker-1 +// ForkJoinPool.commonPool-worker-2 // Second promise // main // #1 First handle value: Enchaining promises // main -// #2 First thenAccept value: First handle value: Enchaining promises +// #2 First whenComplete value: First handle value: Enchaining promises +// main +// #3 First thenAccept value: First handle value: Enchaining promises // main -// #3 Second handle +// #4 Second handle // main -// #4 Second thenAccept value: Second handle +// #5 Second thenAccept value: Second handle // main -// #5 Third handle +// #6 Third handle // Third handle - System.out.println(someValue); + + System.out.println(); + System.out.println("#################################################### doRunThrowableSimpleAsync ####################################################"); + System.out.println(); + doRunThrowableSimpleAsync(); + + + System.out.println(); + System.out.println("#################################################### doRunThrowableComplexAsync ####################################################"); + System.out.println(); + doRunThrowableComplexAsync().ifPresent(value -> System.out.println("doRunThrowableComplexAsync value: " + value)); + + } catch (final InterruptedException e) { Thread.currentThread().interrupt(); @@ -93,6 +118,8 @@ public class CompletableFutureExample { // No using lambdas because I am learning this stuff. + + return CompletableFuture.supplyAsync(new Supplier() { @Override @@ -110,7 +137,8 @@ public class CompletableFutureExample { } /* IF YOU DO NOT WANT TO USE THE commonPool YOU SHOULD BE PASSING HERE YOUR OWN Executor */ ) - // consumes and returns new promise + // returns new promise + // IT DOES NOT RUN IN REJECTED STAGE (if there was exception) .thenComposeAsync(new Function>() { @Override @@ -134,9 +162,11 @@ public class CompletableFutureExample { }); } - }) + }) + + // Creates a new CompletableFuture with the returned value FROM HERE (unlike whenCompleteAsync) // runs callback whether the stage was resolved or rejected. IT ALWAYS RUNS even in rejected stage. - // I guess, it can be used as a FINALLY for promises. + // Could be used as a complex finally. .handleAsync(new BiFunction() { @Override @@ -146,6 +176,7 @@ public class CompletableFutureExample { if (exception != null ) { // rejected stage I guess... + // Because we do not rethrow exception we are canceling the rejected stage :( System.out.println("#1 First handleAsync exception: "); exception.printStackTrace(); } else { @@ -164,22 +195,58 @@ public class CompletableFutureExample { } }) - // consumes and returns nothing. IT RUNS IF NO REJECTED STAGE (if there was no errors) + + // Creates a new CompletableFuture with the value returned FROM handleAsync or thenComposeAsync + // (depending on what was run before) THAT IS THE DIFERENCE BETWEEN handleAsync AND whenCompleteAsync + // IT DOES NOT CREATE A NEW CompletableFuture with a different value, it creates a new CompletableFuture + // with the same value as before. + + // Runs callback whether the stage was resolved or rejected. IT ALWAYS RUNS even in rejected stage. + // Could be used as a finally neither as complex as handleAsync nor as simple as exceptionally method, + .whenCompleteAsync(new BiConsumer() { + + @Override + public void accept(String firstHandleAsyncValue, /* the method before returns String */ + Throwable exception /* not null if rejected stage I guess... */) { + + + if (exception != null ) { + // rejected stage I guess... + + // Because we do not rethrow exception we are canceling the rejected stage :( + System.out.println("#2 First whenCompleteAsync exception: "); + exception.printStackTrace(); + } else { + // resolved stage I guess... + + System.out.println(Thread.currentThread().getName()); + System.out.println("#2 First whenCompleteAsync value: " + firstHandleAsyncValue); + } + + // If exception, we get into the rejected stage. + // throw new UnsupportedOperationException(); + } + }) + + // Creates a new CompletableFuture with Void value. + // IT RUNS IF NO REJECTED STAGE (if there was no errors) .thenAcceptAsync(new Consumer() { @Override public void accept(String firstHandleAsyncValue) { System.out.println(Thread.currentThread().getName()); - System.out.println("#2 First thenAcceptAsync value: " + firstHandleAsyncValue); + System.out.println("#3 First thenAcceptAsync value: " + firstHandleAsyncValue); // If exception, we get into the rejected stage. // throw new UnsupportedOperationException("First thenAcceptAsync ERROR"); } }) + + // Creates a new CompletableFuture with the returned value FROM HERE (unlike whenCompleteAsync) // runs callback whether the stage was resolved or rejected. IT ALWAYS RUNS even in rejected stage. - // I guess, it can be used as a FINALLY for promises. + // Could be used as a complex finally. .handleAsync(new BiFunction() { @Override @@ -189,13 +256,14 @@ public class CompletableFutureExample { if (exception != null ) { // rejected stage I guess... - System.out.println("#3 Second handleAsync exception: "); + // Because we do not rethrow exception we are canceling the rejected stage :( + System.out.println("#4 Second handleAsync exception: "); exception.printStackTrace(); } else { // resolved stage I guess... System.out.println(Thread.currentThread().getName()); - System.out.println("#3 Second handleAsync"); + System.out.println("#4 Second handleAsync"); } return "Second handleAsync"; @@ -205,14 +273,15 @@ public class CompletableFutureExample { // Because the above handleAsync does not throw exception this method will always run even if // the first handleAsync threw exception. The second handleAsync (above) cancels the rejected stage. - // consumes and returns nothing. IT RUNS IF NO REJECTED STAGE (if there was no errors) + // Creates a new CompletableFuture with Void value. + // IT RUNS IF NO REJECTED STAGE (if there was no errors) .thenAcceptAsync(new Consumer() { @Override public void accept(String secondHandleAsyncValue) { System.out.println(Thread.currentThread().getName()); - System.out.println("#4 Second thenAcceptAsync value: " + secondHandleAsyncValue); + System.out.println("#5 Second thenAcceptAsync value: " + secondHandleAsyncValue); // If exception, we get into the rejected stage. // throw new UnsupportedOperationException("Second thenAcceptAsync ERROR"); @@ -220,25 +289,47 @@ public class CompletableFutureExample { }) - // runs callback whether the stage was resolved or rejected. Using lambdas. IT ALWAYS RUNS even in rejected stage. - // I guess, it can be used as a FINALLY for promises. + // Creates a new CompletableFuture with the returned value FROM HERE (unlike whenCompleteAsync) + // runs callback whether the stage was resolved or rejected. IT ALWAYS RUNS even in rejected stage. + // Could be used as a complex finally. .handleAsync((nothing, exception) -> { if (exception != null ) { // rejected stage I guess... - System.out.println("#5 Third handleAsync exception: "); + // Because we do not rethrow exception we are canceling the rejected stage :( + System.out.println("#6 Third handleAsync exception: "); exception.printStackTrace(); } else { // resolved stage I guess... System.out.println(Thread.currentThread().getName()); - System.out.println("#5 Third handleAsync"); + System.out.println("#6 Third handleAsync"); } return "Third handleAsync"; }) + // IF NOT REJECTED STAGE: creates a new CompletableFuture with the value returned FROM whatever was run before. + // IF REJECTED STAGE: creates a new CompletableFuture with the value returned from exceptionally. + // It is almost like handleAsync but with less functionality because we have no access to the value returned by the stage before. + + // IT ONLY RUNS IN REJECTED STAGE!! + // Could be used as a simple finally + .exceptionally(new Function() { + + @Override + public String apply(Throwable exception) { + + // Because we do not rethrow exception we are canceling the rejected stage :( + System.out.println(Thread.currentThread().getName()); + System.out.println("#7 You only see me if rejected stage."); + exception.printStackTrace(); + + return "You will not see me if not rejected stage"; + } + }) + // Synchronous point. So doRunThrowableAsync should be called doRunThrowable but who cares... :D .get(); @@ -254,6 +345,84 @@ public class CompletableFutureExample { } + private void doRunThrowableSimpleAsync() throws InterruptedException, ExecutionException, CancellationException { + + Void nothing = // There is nothing because of thenAcceptAsync + + CompletableFuture.supplyAsync(() -> { + + System.out.println(Thread.currentThread().getName()); + System.out.println("First promise"); + + // If exception, we get into the rejected stage. + // When exception the thenAcceptAsync does not run because it runs + // when no rejected stage. + // throw new UnsupportedOperationException("getPage ERROR"); + + return getPage(); + }) + + .thenAcceptAsync(page -> { + + System.out.println(Thread.currentThread().getName()); + System.out.println("Page: " + page); + + }) + + .exceptionally(exception -> { + + System.out.println(Thread.currentThread().getName()); + System.out.println("exceptionally"); + exception.printStackTrace(); + + return null; + }) + + // Synchronization for the sake of this example. + // RETURNS VOID BECAUSE thenAcceptAsync creates a new CompletableFuture with Void value :( + .get(); + + } + + + private Optional doRunThrowableComplexAsync() throws InterruptedException, ExecutionException, CancellationException { + + return CompletableFuture.supplyAsync(() -> { + + System.out.println(Thread.currentThread().getName()); + System.out.println("First promise"); + + // If exception, we get into the rejected stage. + // When exception the thenAcceptAsync does not run because it runs + // when no rejected stage. + // throw new UnsupportedOperationException("getPage ERROR"); + + return Optional.ofNullable(getPage()); + }) + + // Changing the returned value from String to Long :) + // Remember, if we do not rethrow exception in handleAsync the rejected stage is cancelled :( + .handleAsync((page, exception) -> Optional.of(999666L)) + + .exceptionally(exception -> { + + // Because we do not rethrow exception we are canceling the rejected stage :( + + // This code will never be run because the handleAsync before is cancelling exceptions :( + System.out.println(Thread.currentThread().getName()); + System.out.println("exceptionally"); + exception.printStackTrace(); + + // null sucks :) + return Optional.empty(); + }) + + // For the sake of this example. + // RETURNS Optional BECAUSE handleAsync creates a new CompletableFuture. + // IT WILL RETURN Optional.of(999666L) IF EVERYTHING WENT OK OR Optional.empty() IF ERROR + .get(); + } + private String doRunThrowable() throws InterruptedException, ExecutionException, CancellationException { return CompletableFuture.supplyAsync(new Supplier() { @@ -265,15 +434,16 @@ public class CompletableFutureExample { System.out.println("First promise"); // If exception, we get into the rejected stage. - // When exception the thenAccept does not run because it runs when no rejected stage. + // When exception the thenAcceptAsync does not run because it runs when no rejected stage. // throw new UnsupportedOperationException("getPage ERROR"); return getPage(); } - } /* If you do not want to use the commonPool you should be passing here your own Executor */ ) + } /* IF YOU DO NOT WANT TO USE THE commonPool YOU SHOULD BE PASSING HERE YOUR OWN Executor */ ) - // consumes and returns new promise + // returns new promise + // IT DOES NOT RUN IN REJECTED STAGE (if there was exception) .thenCompose(new Function>() { @Override @@ -281,12 +451,15 @@ public class CompletableFutureExample { // If exception, we get into the rejected stage. //throw new UnsupportedOperationException(); - + System.out.println(Thread.currentThread().getName()); System.out.println("Value from first promise: " + page); return CompletableFuture.supplyAsync(() -> { // The same as above but with lambdas + // If exception, we get into the rejected stage. + //throw new UnsupportedOperationException(); + System.out.println(Thread.currentThread().getName()); System.out.println("Second promise"); @@ -294,9 +467,11 @@ public class CompletableFutureExample { }); } - }) + }) + + // Creates a new CompletableFuture with the returned value FROM HERE (unlike whenComplete) // runs callback whether the stage was resolved or rejected. IT ALWAYS RUNS even in rejected stage. - // I guess, it can be used as a FINALLY for promises. + // Could be used as a complex finally. .handle(new BiFunction() { @Override @@ -306,6 +481,7 @@ public class CompletableFutureExample { if (exception != null ) { // rejected stage I guess... + // Because we do not rethrow exception we are canceling the rejected stage :( System.out.println("#1 First handle exception: "); exception.printStackTrace(); } else { @@ -324,22 +500,58 @@ public class CompletableFutureExample { } }) - // consumes and returns nothing. IT RUNS IF NO REJECTED STAGE (if there was no errors) + + // Creates a new CompletableFuture with the value returned FROM handle or thenCompose + // (depending on what was run before) THAT IS THE DIFERENCE BETWEEN handle AND whenComplete + // IT DOES NOT CREATE A NEW CompletableFuture with a different value, it creates a new CompletableFuture + // with the same value as before. + + // Runs callback whether the stage was resolved or rejected. IT ALWAYS RUNS even in rejected stage. + // Could be used as a finally neither as complex as handle nor as simple as exceptionally method, + .whenComplete(new BiConsumer() { + + @Override + public void accept(String firstHandleValue, /* the method before returns String */ + Throwable exception /* not null if rejected stage I guess... */) { + + + if (exception != null ) { + // rejected stage I guess... + + // Because we do not rethrow exception we are canceling the rejected stage :( + System.out.println("#2 First whenComplete exception: "); + exception.printStackTrace(); + } else { + // resolved stage I guess... + + System.out.println(Thread.currentThread().getName()); + System.out.println("#2 First whenComplete value: " + firstHandleValue); + } + + // If exception, we get into the rejected stage. + // throw new UnsupportedOperationException(); + } + }) + + // Creates a new CompletableFuture with Void value. + // IT RUNS IF NO REJECTED STAGE (if there was no errors) .thenAccept(new Consumer() { @Override public void accept(String firstHandleValue) { System.out.println(Thread.currentThread().getName()); - System.out.println("#2 First thenAccept value: " + firstHandleValue); + System.out.println("#3 First thenAccept value: " + firstHandleValue); // If exception, we get into the rejected stage. - // throw new UnsupportedOperationException("First thenAccept ERROR"); + // throw new UnsupportedOperationException("First thenAcceptAsync ERROR"); } }) + + // Creates a new CompletableFuture with the returned value FROM HERE (unlike whenComplete) // runs callback whether the stage was resolved or rejected. IT ALWAYS RUNS even in rejected stage. - // I guess, it can be used as a FINALLY for promises. + // Could be used as a complex finally. .handle(new BiFunction() { @Override @@ -349,30 +561,32 @@ public class CompletableFutureExample { if (exception != null ) { // rejected stage I guess... - System.out.println("#3 Second handle exception: "); + // Because we do not rethrow exception we are canceling the rejected stage :( + System.out.println("#4 Second handle exception: "); exception.printStackTrace(); } else { // resolved stage I guess... System.out.println(Thread.currentThread().getName()); - System.out.println("#3 Second handle"); + System.out.println("#4 Second handle"); } return "Second handle"; } }) - // Because the above handle does not throw exception this method will always run even if + // Because the above handleAsync does not throw exception this method will always run even if // the first handle threw exception. The second handle (above) cancels the rejected stage. - // consumes and returns nothing. IT RUNS IF NO REJECTED STAGE (if there was no errors) + // Creates a new CompletableFuture with Void value. + // IT RUNS IF NO REJECTED STAGE (if there was no errors) .thenAccept(new Consumer() { @Override public void accept(String secondHandleValue) { System.out.println(Thread.currentThread().getName()); - System.out.println("#4 Second thenAccept value: " + secondHandleValue); + System.out.println("#5 Second thenAccept value: " + secondHandleValue); // If exception, we get into the rejected stage. // throw new UnsupportedOperationException("Second thenAccept ERROR"); @@ -380,25 +594,47 @@ public class CompletableFutureExample { }) - // runs callback whether the stage was resolved or rejected. Using lambdas. IT ALWAYS RUNS even in rejected stage. - // I guess, it can be used as a FINALLY for promises. + // Creates a new CompletableFuture with the returned value FROM HERE (unlike whenComplete) + // runs callback whether the stage was resolved or rejected. IT ALWAYS RUNS even in rejected stage. + // Could be used as a complex finally. .handle((nothing, exception) -> { if (exception != null ) { // rejected stage I guess... - System.out.println("#5 Third handle exception: "); + // Because we do not rethrow exception we are canceling the rejected stage :( + System.out.println("#6 Third handle exception: "); exception.printStackTrace(); } else { // resolved stage I guess... System.out.println(Thread.currentThread().getName()); - System.out.println("#5 Third handle"); + System.out.println("#6 Third handle"); } return "Third handle"; }) + // IF NOT REJECTED STAGE: creates a new CompletableFuture with the value returned FROM whatever was run before. + // IF REJECTED STAGE: creates a new CompletableFuture with the value returned from exceptionally. + // It is almost like handle but with less functionality because we have no access to the value returned by the stage before. + + // IT ONLY RUNS IN REJECTED STAGE!! + // Could be used as a simple finally + .exceptionally(new Function() { + + @Override + public String apply(Throwable exception) { + + // Because we do not rethrow exception we are canceling the rejected stage :( + System.out.println(Thread.currentThread().getName()); + System.out.println("#7 You only see me if rejected stage."); + exception.printStackTrace(); + + return "You will not see me if not rejected stage"; + } + }) + .get(); -- 2.1.4