Threads and C#
authorgu.martinm@gmail.com <gu.martinm@gmail.com>
Thu, 22 May 2014 18:25:18 +0000 (20:25 +0200)
committergu.martinm@gmail.com <gu.martinm@gmail.com>
Thu, 22 May 2014 18:25:18 +0000 (20:25 +0200)
http://parallelpatterns.codeplex.com/

Chapter 4: Parallel Agregation
Chapter 5: Futures

Allgemeines/Threads/Threads/Chapter2.cs
Allgemeines/Threads/Threads/Chapter3.cs
Allgemeines/Threads/Threads/Chapter4.cs [new file with mode: 0644]
Allgemeines/Threads/Threads/Chapter5.cs [new file with mode: 0644]
Allgemeines/Threads/Threads/Program.cs
Allgemeines/Threads/Threads/Threads.csproj

index 5ef3feb..f771696 100644 (file)
@@ -286,6 +286,10 @@ namespace Threads
                              },
                              _ => {}
             );
+            // The Parallel.ForEach loop will create a new instance of the Random class for each of its worker tasks.
+            // This instance will be passed as an argument to each partitioned iteration. Each partitioned iteration is responsible
+            // for returning the next value of the thread-local state. In this example, the returned value is always the same object
+            // that was passed in.
 
 
             // Console.WriteLine("Custom Task Scheduler for a Parallel Loop");
index 6c82f0d..cb2311b 100644 (file)
@@ -210,6 +210,53 @@ namespace Threads
             // task threads. QUESTION: what methods throw AggregateException from some Task? I miss Javadoc... :(
             // task.Result calls task.Wait and task.Wait throws AggregateException :)
             Console.WriteLine(task.Result);
+
+
+            /**
+             * There are several ways to observe an unhandled task exception:
+             * 
+             * Invoking the faulted task's Wait method causes the task's unhandled exception to be observed. The exception is also thrown
+             * in the calling context of the Wait method. The Task class's static WaitAll method allows you to observe the unhandled
+             * exceptions of more than one task with a single method invocation. The Parallel.Invoke method includes an implicit call
+             * to WaitAll. Exceptions from all of the tasks are grouped together in an AggregateException object and thrown in the
+             * calling context of the WaitAll or Wait method.
+             * 
+             * Getting the Exception property of a faulted task causes the task's unhandled exception to be observed. The property returns
+             * the aggregate exception object. Getting the value does not automatically cause the exception to be thrown; however,
+             * the exception is considered to have been observed when you get the value of the Exception property. Use the Exception
+             * property instead of the Wait method or WaitAll method when you want to examine the unhandled exceptionbut do not want
+             * it to be rethrown in the current context.
+             * 
+             * Special handling occurs if a faulted task's unhandled exceptions are not observed by the time the task object is
+             * garbage-collected. For more information, see the section, "Unobserved Task Exceptions," later in this chapter.
+             */
+
+
+            /** 
+             * Unobserved Task Exceptions:
+             *
+             * If you don't give a faulted task the opportunity to propagate its exceptions (for example, by calling the Wait method),
+             * the runtime will escalate the task's unobserved exceptions according to the current .NET exception policy when the task
+             * is garbage-collected. Unobserved task exceptions will eventually be observed in the finalizer thread context.
+             * The finalizer thread is the system thread that invokes the Finalize method of objects that are ready to be garbage-collected.
+             * If an unhandled exception is thrown during the execution of a Finalize method, the runtime will, by default, terminate
+             * the current process, and no active try/finally blocks or additional finalizers will be executed, including finalizers
+             * that release handles to unmanaged resources. To prevent this from happening, you should be very careful that your
+             * application never leaks unobserved task exceptions. You can also elect to receive notification of any unobserved task
+             * exceptions by subscribing to the UnobservedTaskException event of the TaskScheduler class and choose to handle them as
+             * they propagate into the finalizer context.
+             *
+             * This last technique can be useful in scenarios such as hosting untrusted plug-ins that have benign exceptions that
+             * would be cumbersome to observe. For more information, see the section, "Further Reading," at the end of this chapter.
+             *
+             * During finalization, tasks that have a Status property of Faulted are treated differently from tasks with the status
+             * Canceled. The task's status determines how unobserved task exceptions that arise from task cancellation are treated
+             * during finalization. If the cancellation token that was passed as an argument to the StartNew method is the same
+             * token as the one embedded in the unobserved OperationCanceledException instance, the task does not propagate
+             * the operation-canceled exception to the UnobservedTaskException event or to the finalizer thread context. In other words,
+             * if you follow the cancellation protocol described in this chapter, unobserved cancellation exceptions will not be
+             * escalated into the finalizer's thread context.
+             */
         }
 
 
diff --git a/Allgemeines/Threads/Threads/Chapter4.cs b/Allgemeines/Threads/Threads/Chapter4.cs
new file mode 100644 (file)
index 0000000..6d6f539
--- /dev/null
@@ -0,0 +1,141 @@
+using System;
+using System.Linq;
+using System.Threading.Tasks;
+using System.Collections.Concurrent;
+
+namespace Threads
+{
+    /// <summary>
+    /// 
+    /// Chapter 4.
+    /// Parallel Aggregation
+    /// 
+    /// Taken from http://msdn.microsoft.com/en-us/library/ff963547.aspx
+    /// 
+    /// </summary>
+    public class Chapter4
+    {
+        public static void Test()
+        {
+            Console.WriteLine("Chapter 4");
+
+            Console.WriteLine("Parallel Aggregation, the basics");
+            double[] sequence = { 2.22, 3.22, 4.33, 5.33, 6.33, 7.33, 8.33, 9.33, 10.33 };
+            double sum = 0.0d;
+            for (int i = 0; i < sequence.Length; i++)
+            {
+                sum += Normalize(sequence[i]);
+            }
+            Console.WriteLine("Result {0}", sum);
+
+
+            Console.WriteLine("Parallel Aggregation, the basics, LINQ");
+            sum = (from x in sequence select Normalize(x)).Sum();
+            Console.WriteLine("Result {0}", sum);
+
+
+            Console.WriteLine("Parallel Aggregation, the basics, PLINQ");
+            sum = (from x in sequence.AsParallel() select Normalize(x)).Sum();
+            Console.WriteLine("Result {0}", sum);
+
+
+            Console.WriteLine("Parallel Aggregation, the basics, PLINQ. Custom aggreate.");
+            sum = (from x in sequence.AsParallel() select Normalize(x)).Aggregate(1.0d, (y1, y2) => y1 * y2);
+            Console.WriteLine("Result {0}", sum);
+
+            /**
+             * If PLINQ doesn't meet your needs or if you prefer a less declarative style of coding, you can also use Parallel.For
+             * or Parallel.ForEach to implement the parallel aggregation pattern. The Parallel.For and Parallel.ForEach methods
+             * require more complex code than PLINQ. For example, the Parallel.ForEach method requires your code to include
+             * synchronization primitives to implement parallel aggregation. For examples and more information, see
+             * "Using Parallel Loops for Aggregation".
+             */
+
+
+            Console.WriteLine("Using Parallel Loops for Aggregation");
+            object lockObject = new object();
+            Parallel.ForEach(
+                // The values to be aggregated 
+                sequence,
+
+                // The local initial partial result
+                () => 0.0d,
+
+                // The loop body
+                (x, loopState, partialResult) =>
+                {
+                    return Normalize(x) + partialResult;
+                },
+
+                // The final step of each local context            
+                (localPartialSum) =>
+                {
+                    // Enforce serial access to single, shared result (sum is global)
+                    lock (lockObject)
+                    {
+                        sum += localPartialSum;
+                    }
+                });
+            Console.WriteLine("Result {0}", sum);
+
+
+            Console.WriteLine("Using a Range Partitioner for Aggregation");
+            var rangePartitioner = Partitioner.Create(0, sequence.Length);
+            Parallel.ForEach(
+                // The input intervals
+                rangePartitioner, 
+
+                // The local initial partial result
+                () => 0.0d, 
+
+                // The loop body for each interval
+                (range, loopState, initialValue) =>
+                {
+                    Console.WriteLine("Partitioner Default range {0} {1}", range.Item1, range.Item2);
+                    double partialSum = initialValue;
+                    for (int i = range.Item1; i < range.Item2; i++)
+                    {
+                        partialSum += Normalize(sequence[i]);
+                    }
+                    return partialSum;
+                },
+
+                // The final step of each local context
+                (localPartialSum) =>
+                {
+                    // Use lock to enforce serial access to shared result
+                    lock (lockObject)
+                    {
+                        sum += localPartialSum;
+                    }
+                });
+            Console.WriteLine("Result {0}", sum);
+
+
+            // Do not copy this code. This version will run much slower
+            // than the sequential version. It's included here to 
+            // illustrate what not to do.
+            // BUG – don't do this
+            Parallel.For(0, sequence.Length, i =>
+                {
+                    // BUG – don't do this
+                    lock (lockObject)
+                    {
+                        sum += sequence[i];
+                    }
+                });
+            Console.WriteLine("Result {0}", sum);
+
+            // If you forget to add the lock statement, this code fails to calculate the correct sum on a multicore computer.
+            // Adding the lock statement makes this code example correct with respect to serialization. If you run this code, it
+            // produces the expected sum. However, it fails completely as an optimization. This code is many times slower than
+            // the sequential version it attempted to optimize! The reason for the poor performance is the cost of synchronization.         
+        }
+
+        public static double Normalize(double number)
+        {
+            return number * 2;
+        }
+    }
+}
+
diff --git a/Allgemeines/Threads/Threads/Chapter5.cs b/Allgemeines/Threads/Threads/Chapter5.cs
new file mode 100644 (file)
index 0000000..7b53cce
--- /dev/null
@@ -0,0 +1,67 @@
+using System;
+using System.Threading.Tasks;
+
+namespace Threads
+{
+    /// <summary>
+    /// 
+    /// Chapter 5.
+    /// Futures
+    /// 
+    /// Taken from http://msdn.microsoft.com/en-us/library/ff963556.aspx
+    /// 
+    /// </summary>
+    public class Chapter5
+    {
+        public static void Test()
+        {
+            Console.WriteLine("Futures");
+            int a = 1;
+            Task<int> futureB = Task.Factory.StartNew<int>(() => F1(a));
+            var c = F2(a); 
+            var d = F3(c);
+            try {
+                var f = F4(futureB.Result, d);
+                Console.WriteLine("Result {0}", f);
+            }
+            catch (AggregateException e)
+            {
+                Console.WriteLine("Exception in parallel Task F1: {0} {1}", e.Message, e.StackTrace);
+            }
+
+
+            Console.WriteLine("Continuation Tasks");
+            futureB = Task.Factory.StartNew<int>(() => F1(a));
+            var futureD = Task.Factory.StartNew<int>(() => F3(F2(a)));
+            var futureF = Task.Factory.ContinueWhenAll<int, int>(
+                new[] { futureB, futureD },
+                (tasks) => F4(futureB.Result, futureD.Result));
+            // I do not know what could I use this thing for... :/
+            futureF.ContinueWith((t) =>
+                Console.WriteLine("Continuation Tasks result {0}", t)
+            );
+            Console.WriteLine ("Continuation Tasks result {0}", futureF.Result);
+        }
+
+        public static int F1(int number)
+        {
+            return 1 + number;
+        }
+
+        public static int F2(int number)
+        {
+            return 2 + number;
+        }
+
+        public static int F3(int number)
+        {
+            return 3 + number;
+        }
+
+        public static int F4(int numberA, int numberB)
+        {
+            return numberA + numberB;
+        }
+    }
+}
+
index 24c2963..37a927f 100644 (file)
@@ -20,6 +20,8 @@ namespace Threads
         {
             Chapter2.Test();
             Chapter3.Test();
+            Chapter4.Test();
+            Chapter5.Test();
         }
     }
 }
index 4ab2a62..e158fea 100644 (file)
@@ -41,6 +41,8 @@ See: http://parallelpatterns.codeplex.com/</Description>
     <Compile Include="Properties\AssemblyInfo.cs" />
     <Compile Include="Chapter2.cs" />
     <Compile Include="Chapter3.cs" />
+    <Compile Include="Chapter4.cs" />
+    <Compile Include="Chapter5.cs" />
   </ItemGroup>
   <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
 </Project>
\ No newline at end of file