From: Gustavo Martin Morcuende Date: Tue, 6 Dec 2016 22:24:56 +0000 (+0100) Subject: Trying to understand ForkJoinPool X-Git-Url: https://git.gumartinm.name/?a=commitdiff_plain;h=b6fcafd8615eed95a3a3bb2c57ec1a6c74362077;p=JavaForFun Trying to understand ForkJoinPool --- diff --git a/Allgemeines/Threads/ForkJoin/src/de/test/fork/join/ForkJoinTaskExample.java b/Allgemeines/Threads/ForkJoin/src/de/test/fork/join/ForkJoinTaskExample.java new file mode 100644 index 0000000..9932c42 --- /dev/null +++ b/Allgemeines/Threads/ForkJoin/src/de/test/fork/join/ForkJoinTaskExample.java @@ -0,0 +1,242 @@ +package de.test.fork.join; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.RecursiveTask; + +/** + * + * Explanation taken from: + * {@link http://www.javacreed.com/java-fork-join-example/} + * + * + * Ventajas respecto a Executors (esto es lo único que veo...): + * Minimiza el número de colas y tamaño de colas que normalmente tendría que lanzar yo + * vía Executors cuando queremos hacer algo recursivo. NO VEO OTRA VENTAJA A ESTO :/ + * + * Este ejemplo tendría que hacerlo lanzado ThreadPools a mano en cada subdirectorio y estos pools tendrían un + * tamaño reservado para sus colas (mínimo, máximo y umbral) y por tanto estarían consumiendo + * recursos que probablemente no se vayan a usar. + * + * Con ForkJoinPool optimizo porque él se encarga de distribuir las tareas por los + * pool de hilos que actualmente existen (sin necesidad de crear nuevos) Tendremos tantos hilos y pool de hilos como + * CPUs tenga mi PC y no más. + */ +public class ForkJoinTaskExample { + + + // The common pool is by default constructed with default parameters, but these may be controlled by setting three system properties: + // + // java.util.concurrent.ForkJoinPool.common.parallelism - the parallelism level, a non-negative integer + // java.util.concurrent.ForkJoinPool.common.threadFactory - the class name of a ForkJoinPool.ForkJoinWorkerThreadFactory + // java.util.concurrent.ForkJoinPool.common.exceptionHandler - the class name of a Thread.UncaughtExceptionHandler + + // By default there is no value for java.util.concurrent.ForkJoinPool.common.exceptionHandler + // SO BY DEFAULT, UNCAUGHT EXCEPTIONS ARE LOST FOREVER!!!! :/ + final ForkJoinPool pool = new ForkJoinPool(); + + + public void doRun(File file) { + + synchronous(file); + + // After pool.shutdown() and pool.shutdownNow() invocations to pool.invoke(task), pool.execute(task) and + // pool.submit(task) throw RejectedExecutionException + // synchronous(file); + + + + // asynchronous(file); + + // After pool.shutdown() and pool.shutdownNow() invocations to pool.invoke(task), pool.execute(task) and + // pool.submit(task) throw RejectedExecutionException + //asynchronous(file); + + + + // IF NO JOIN, ForkJoinTasks WILL BE KILLED BY THE JVM. + + // Unlike Executor, threads in ForkJoinPool are daemons and they are killed by the JVM when exiting. + + // Executor: NOT DAEMON THREADS. THEY NO DIE WHEN JVM EXITS. JVM MUST WAIT FOR THEM TO EXIT (we need a shutdown) + // ForkJoinPool: DAEMON THREADS. THEY DIE WHEN JVM EXITS. JVM DO NOT WAIT FOR THEM TO EXIT (we do not need shutdown + // and it also seems to do nothing :/ ) + + // So, if not pool.join() program will exit and your ForkJoinTasks will die :( + } + + /** + * synchronous execution + * + */ + private void synchronous(File file) { + SizeOfFileTask task = new SizeOfFileTask(file); + try { + + Long size = pool.invoke(task); + System.out.print("Synchronous result: " + size); + + } finally { + // Only stops submitting tasks. It does not try to cancel threads. + pool.shutdown(); + // What means "stops submitting tasks"? task.fork() does not throw exception after pool.shutdown() + // no idea what is pool.shutdown() for... It does not do anything at all... :/ + + // pool.shutdown() does not seem to do anything at all. No interruption of threads + // (interrupted status does not change to interrupted), task.fork() and task.get() do not throw exceptions... + // NO IDEA WHAT THE HECK IS IT FOR. + + // BUT AFTER pool.shutdown() AND pool.shutdownNow() INVOCATIONS TO pool.execute(task), pool.submit(task) and + // pool.invoke(task) THROW RejectedExecutionException. + // SO pool.shutdown() REALLY DOES SOMETHING :/ + // ANYWAY, ALWAYS CALL pool.shutdown() in finally block!!! + + + + + // pool.shutdownNow() Performs cancellation of threads (interrupted status changes to interrupted) and + // task.get() and task.join() throw CancellationException + // pool.shutdownNow(); + } + } + + /** + * asynchronous execution + * + */ + private void asynchronous(File file) { + SizeOfFileTask task = new SizeOfFileTask(file); + try { + + // pool.execute() and pool.submit() do the same, but submit() returns the same task as a ForkJoinTask :/ + pool.execute(task); + // If you want to do it synchronous (the same as pool.invoke()) + // task.join(); + + // pool.submit(task) does not do anything special. Just returns task as a ForkJoinTask... + // ForkJoinTask theSameTask = pool.submit(task); + // If you want to do it synchronous (the same as invoke()) + // task.join() or theSameTask.join(); + + } finally { + // Only stops submitting tasks. It does not try to cancel threads. + pool.shutdown(); + // What means "stops submitting tasks"? task.fork() does not throw exception after pool.shutdown() + // no idea what is pool.shutdown() for... It does not do anything at all... :/ + + // pool.shutdown() does not seem to do anything at all. No interruption of threads + // (interrupted status does not change to interrupted), task.fork() and task.get() do not throw exceptions... + // NO IDEA WHAT THE HECK IS IT FOR. + + // BUT AFTER pool.shutdown() AND pool.shutdownNow() INVOCATIONS TO pool.execute(task), pool.submit(task) and + // pool.invoke(task) THROW RejectedExecutionException. + // SO pool.shutdown() REALLY DOES SOMETHING :/ + // ANYWAY, ALWAYS CALL pool.shutdown() in finally block!!! + + + + + // pool.shutdownNow() Performs cancellation of threads (interrupted status changes to interrupted) and + // task.get() and task.join() throw CancellationException + // pool.shutdownNow(); + } + } + + private static class SizeOfFileTask extends RecursiveTask { + private final File file; + + public SizeOfFileTask(final File file) { + this.file = Objects.requireNonNull(file); + } + + @Override + protected Long compute() { + System.out.println("Computing size of: " + file.getAbsolutePath()); + System.out.println("Thread name: " + Thread.currentThread().getName()); + + if (file.isFile()) { + return file.length(); + } + + final List tasks = new ArrayList<>(); + final File[] children = file.listFiles(); + if (children != null) { + for (final File child : children) { + final SizeOfFileTask task = new SizeOfFileTask(child); + + // Current task puts new task on the queue of the thread + // which is running the current task. + // Every thread has one queue. + // No exception from here neither when using pool.shutdown() nor pool.shutdownNow() :( + task.fork(); + + tasks.add(task); + } + } + + long size = 0; + for (final SizeOfFileTask task : tasks) { + + // Current task makes space for the forked tasks above, + // one task is run by the current thread the other ones (if there are) + // will be stolen by other threads (if they are idle) + + // throws CancellationException after calling pool.shutdownNow() + size += task.join(); + + //size += get(task); + } + + return size; + } + + private Long get(SizeOfFileTask task) { + long size = 0; + try { + + // Current task makes space for the forked tasks above, + // one task is run by the current thread the other ones (if there are) + // will be stolen by other threads (if they are idle) + size = task.get(); + System.out.println(Thread.currentThread().getName() + " is interrupted?: " + Thread.currentThread().isInterrupted()); + + } catch (final CancellationException e) { + // When using pool.shutdownNow() in asynchronous mode, threads will be interrupted and there will be CancellationException + e.printStackTrace(); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + e.printStackTrace(); + } catch (final ExecutionException e) { + System.out.println("Exception from task " + task.file + Thread.currentThread().getName()); + e.printStackTrace(); + final Throwable cause = e.getCause(); + + throw this.launderThrowable(cause); + } finally { + task.cancel(true); + } + + return size; + } + + // THERE IS NO NEED OF CATCHING AND LAUNDER EXCEPTIONS BECAUSE BY DEFAULT UNCAUGHT EXCEPTIONS ARE LOST FOREVER!!!! + // I AM IMPLEMENTING THIS METHOD HERE BUT IN REAL LIFE I DO NOT THINK THIS STUFF IS REQUIRED :/ + // The same about catching exceptions in get method (above) in real life I do not think I should catch any exception + // (perhaps for logging them but nothing else) + private RuntimeException launderThrowable(final Throwable exception) { + exception.printStackTrace(); + if (exception instanceof RuntimeException) + return (RuntimeException) exception; + else if (exception instanceof Error) + throw (Error) exception; + else + throw new IllegalStateException("Not unchecked", exception); + } + } + +} diff --git a/Allgemeines/Threads/ForkJoin/src/de/test/fork/join/MainTest.java b/Allgemeines/Threads/ForkJoin/src/de/test/fork/join/MainTest.java new file mode 100644 index 0000000..223e534 --- /dev/null +++ b/Allgemeines/Threads/ForkJoin/src/de/test/fork/join/MainTest.java @@ -0,0 +1,12 @@ +package de.test.fork.join; + +import java.io.File; + +public class MainTest { + + public static void main(String[] args) { + ForkJoinTaskExample example = new ForkJoinTaskExample(); + example.doRun(new File("/tmp")); + } + +}