From ba4ddf870318166582f82874723df370cd83fe92 Mon Sep 17 00:00:00 2001 From: "gu.martinm@gmail.com" Date: Thu, 22 May 2014 20:25:18 +0200 Subject: [PATCH] Threads and C# http://parallelpatterns.codeplex.com/ Chapter 4: Parallel Agregation Chapter 5: Futures --- Allgemeines/Threads/Threads/Chapter2.cs | 4 + Allgemeines/Threads/Threads/Chapter3.cs | 47 ++++++++++ Allgemeines/Threads/Threads/Chapter4.cs | 141 +++++++++++++++++++++++++++++ Allgemeines/Threads/Threads/Chapter5.cs | 67 ++++++++++++++ Allgemeines/Threads/Threads/Program.cs | 2 + Allgemeines/Threads/Threads/Threads.csproj | 2 + 6 files changed, 263 insertions(+) create mode 100644 Allgemeines/Threads/Threads/Chapter4.cs create mode 100644 Allgemeines/Threads/Threads/Chapter5.cs diff --git a/Allgemeines/Threads/Threads/Chapter2.cs b/Allgemeines/Threads/Threads/Chapter2.cs index 5ef3feb..f771696 100644 --- a/Allgemeines/Threads/Threads/Chapter2.cs +++ b/Allgemeines/Threads/Threads/Chapter2.cs @@ -286,6 +286,10 @@ namespace Threads }, _ => {} ); + // The Parallel.ForEach loop will create a new instance of the Random class for each of its worker tasks. + // This instance will be passed as an argument to each partitioned iteration. Each partitioned iteration is responsible + // for returning the next value of the thread-local state. In this example, the returned value is always the same object + // that was passed in. // Console.WriteLine("Custom Task Scheduler for a Parallel Loop"); diff --git a/Allgemeines/Threads/Threads/Chapter3.cs b/Allgemeines/Threads/Threads/Chapter3.cs index 6c82f0d..cb2311b 100644 --- a/Allgemeines/Threads/Threads/Chapter3.cs +++ b/Allgemeines/Threads/Threads/Chapter3.cs @@ -210,6 +210,53 @@ namespace Threads // task threads. QUESTION: what methods throw AggregateException from some Task? I miss Javadoc... :( // task.Result calls task.Wait and task.Wait throws AggregateException :) Console.WriteLine(task.Result); + + + /** + * There are several ways to observe an unhandled task exception: + * + * Invoking the faulted task's Wait method causes the task's unhandled exception to be observed. The exception is also thrown + * in the calling context of the Wait method. The Task class's static WaitAll method allows you to observe the unhandled + * exceptions of more than one task with a single method invocation. The Parallel.Invoke method includes an implicit call + * to WaitAll. Exceptions from all of the tasks are grouped together in an AggregateException object and thrown in the + * calling context of the WaitAll or Wait method. + * + * Getting the Exception property of a faulted task causes the task's unhandled exception to be observed. The property returns + * the aggregate exception object. Getting the value does not automatically cause the exception to be thrown; however, + * the exception is considered to have been observed when you get the value of the Exception property. Use the Exception + * property instead of the Wait method or WaitAll method when you want to examine the unhandled exceptionbut do not want + * it to be rethrown in the current context. + * + * Special handling occurs if a faulted task's unhandled exceptions are not observed by the time the task object is + * garbage-collected. For more information, see the section, "Unobserved Task Exceptions," later in this chapter. + */ + + + /** + * Unobserved Task Exceptions: + * + * If you don't give a faulted task the opportunity to propagate its exceptions (for example, by calling the Wait method), + * the runtime will escalate the task's unobserved exceptions according to the current .NET exception policy when the task + * is garbage-collected. Unobserved task exceptions will eventually be observed in the finalizer thread context. + * The finalizer thread is the system thread that invokes the Finalize method of objects that are ready to be garbage-collected. + * If an unhandled exception is thrown during the execution of a Finalize method, the runtime will, by default, terminate + * the current process, and no active try/finally blocks or additional finalizers will be executed, including finalizers + * that release handles to unmanaged resources. To prevent this from happening, you should be very careful that your + * application never leaks unobserved task exceptions. You can also elect to receive notification of any unobserved task + * exceptions by subscribing to the UnobservedTaskException event of the TaskScheduler class and choose to handle them as + * they propagate into the finalizer context. + * + * This last technique can be useful in scenarios such as hosting untrusted plug-ins that have benign exceptions that + * would be cumbersome to observe. For more information, see the section, "Further Reading," at the end of this chapter. + * + * During finalization, tasks that have a Status property of Faulted are treated differently from tasks with the status + * Canceled. The task's status determines how unobserved task exceptions that arise from task cancellation are treated + * during finalization. If the cancellation token that was passed as an argument to the StartNew method is the same + * token as the one embedded in the unobserved OperationCanceledException instance, the task does not propagate + * the operation-canceled exception to the UnobservedTaskException event or to the finalizer thread context. In other words, + * if you follow the cancellation protocol described in this chapter, unobserved cancellation exceptions will not be + * escalated into the finalizer's thread context. + */ } diff --git a/Allgemeines/Threads/Threads/Chapter4.cs b/Allgemeines/Threads/Threads/Chapter4.cs new file mode 100644 index 0000000..6d6f539 --- /dev/null +++ b/Allgemeines/Threads/Threads/Chapter4.cs @@ -0,0 +1,141 @@ +using System; +using System.Linq; +using System.Threading.Tasks; +using System.Collections.Concurrent; + +namespace Threads +{ + /// + /// + /// Chapter 4. + /// Parallel Aggregation + /// + /// Taken from http://msdn.microsoft.com/en-us/library/ff963547.aspx + /// + /// + public class Chapter4 + { + public static void Test() + { + Console.WriteLine("Chapter 4"); + + Console.WriteLine("Parallel Aggregation, the basics"); + double[] sequence = { 2.22, 3.22, 4.33, 5.33, 6.33, 7.33, 8.33, 9.33, 10.33 }; + double sum = 0.0d; + for (int i = 0; i < sequence.Length; i++) + { + sum += Normalize(sequence[i]); + } + Console.WriteLine("Result {0}", sum); + + + Console.WriteLine("Parallel Aggregation, the basics, LINQ"); + sum = (from x in sequence select Normalize(x)).Sum(); + Console.WriteLine("Result {0}", sum); + + + Console.WriteLine("Parallel Aggregation, the basics, PLINQ"); + sum = (from x in sequence.AsParallel() select Normalize(x)).Sum(); + Console.WriteLine("Result {0}", sum); + + + Console.WriteLine("Parallel Aggregation, the basics, PLINQ. Custom aggreate."); + sum = (from x in sequence.AsParallel() select Normalize(x)).Aggregate(1.0d, (y1, y2) => y1 * y2); + Console.WriteLine("Result {0}", sum); + + /** + * If PLINQ doesn't meet your needs or if you prefer a less declarative style of coding, you can also use Parallel.For + * or Parallel.ForEach to implement the parallel aggregation pattern. The Parallel.For and Parallel.ForEach methods + * require more complex code than PLINQ. For example, the Parallel.ForEach method requires your code to include + * synchronization primitives to implement parallel aggregation. For examples and more information, see + * "Using Parallel Loops for Aggregation". + */ + + + Console.WriteLine("Using Parallel Loops for Aggregation"); + object lockObject = new object(); + Parallel.ForEach( + // The values to be aggregated + sequence, + + // The local initial partial result + () => 0.0d, + + // The loop body + (x, loopState, partialResult) => + { + return Normalize(x) + partialResult; + }, + + // The final step of each local context + (localPartialSum) => + { + // Enforce serial access to single, shared result (sum is global) + lock (lockObject) + { + sum += localPartialSum; + } + }); + Console.WriteLine("Result {0}", sum); + + + Console.WriteLine("Using a Range Partitioner for Aggregation"); + var rangePartitioner = Partitioner.Create(0, sequence.Length); + Parallel.ForEach( + // The input intervals + rangePartitioner, + + // The local initial partial result + () => 0.0d, + + // The loop body for each interval + (range, loopState, initialValue) => + { + Console.WriteLine("Partitioner Default range {0} {1}", range.Item1, range.Item2); + double partialSum = initialValue; + for (int i = range.Item1; i < range.Item2; i++) + { + partialSum += Normalize(sequence[i]); + } + return partialSum; + }, + + // The final step of each local context + (localPartialSum) => + { + // Use lock to enforce serial access to shared result + lock (lockObject) + { + sum += localPartialSum; + } + }); + Console.WriteLine("Result {0}", sum); + + + // Do not copy this code. This version will run much slower + // than the sequential version. It's included here to + // illustrate what not to do. + // BUG – don't do this + Parallel.For(0, sequence.Length, i => + { + // BUG – don't do this + lock (lockObject) + { + sum += sequence[i]; + } + }); + Console.WriteLine("Result {0}", sum); + + // If you forget to add the lock statement, this code fails to calculate the correct sum on a multicore computer. + // Adding the lock statement makes this code example correct with respect to serialization. If you run this code, it + // produces the expected sum. However, it fails completely as an optimization. This code is many times slower than + // the sequential version it attempted to optimize! The reason for the poor performance is the cost of synchronization. + } + + public static double Normalize(double number) + { + return number * 2; + } + } +} + diff --git a/Allgemeines/Threads/Threads/Chapter5.cs b/Allgemeines/Threads/Threads/Chapter5.cs new file mode 100644 index 0000000..7b53cce --- /dev/null +++ b/Allgemeines/Threads/Threads/Chapter5.cs @@ -0,0 +1,67 @@ +using System; +using System.Threading.Tasks; + +namespace Threads +{ + /// + /// + /// Chapter 5. + /// Futures + /// + /// Taken from http://msdn.microsoft.com/en-us/library/ff963556.aspx + /// + /// + public class Chapter5 + { + public static void Test() + { + Console.WriteLine("Futures"); + int a = 1; + Task futureB = Task.Factory.StartNew(() => F1(a)); + var c = F2(a); + var d = F3(c); + try { + var f = F4(futureB.Result, d); + Console.WriteLine("Result {0}", f); + } + catch (AggregateException e) + { + Console.WriteLine("Exception in parallel Task F1: {0} {1}", e.Message, e.StackTrace); + } + + + Console.WriteLine("Continuation Tasks"); + futureB = Task.Factory.StartNew(() => F1(a)); + var futureD = Task.Factory.StartNew(() => F3(F2(a))); + var futureF = Task.Factory.ContinueWhenAll( + new[] { futureB, futureD }, + (tasks) => F4(futureB.Result, futureD.Result)); + // I do not know what could I use this thing for... :/ + futureF.ContinueWith((t) => + Console.WriteLine("Continuation Tasks result {0}", t) + ); + Console.WriteLine ("Continuation Tasks result {0}", futureF.Result); + } + + public static int F1(int number) + { + return 1 + number; + } + + public static int F2(int number) + { + return 2 + number; + } + + public static int F3(int number) + { + return 3 + number; + } + + public static int F4(int numberA, int numberB) + { + return numberA + numberB; + } + } +} + diff --git a/Allgemeines/Threads/Threads/Program.cs b/Allgemeines/Threads/Threads/Program.cs index 24c2963..37a927f 100644 --- a/Allgemeines/Threads/Threads/Program.cs +++ b/Allgemeines/Threads/Threads/Program.cs @@ -20,6 +20,8 @@ namespace Threads { Chapter2.Test(); Chapter3.Test(); + Chapter4.Test(); + Chapter5.Test(); } } } diff --git a/Allgemeines/Threads/Threads/Threads.csproj b/Allgemeines/Threads/Threads/Threads.csproj index 4ab2a62..e158fea 100644 --- a/Allgemeines/Threads/Threads/Threads.csproj +++ b/Allgemeines/Threads/Threads/Threads.csproj @@ -41,6 +41,8 @@ See: http://parallelpatterns.codeplex.com/ + + \ No newline at end of file -- 2.1.4