},
_ => {}
);
+ // The Parallel.ForEach loop will create a new instance of the Random class for each of its worker tasks.
+ // This instance will be passed as an argument to each partitioned iteration. Each partitioned iteration is responsible
+ // for returning the next value of the thread-local state. In this example, the returned value is always the same object
+ // that was passed in.
// Console.WriteLine("Custom Task Scheduler for a Parallel Loop");
// task threads. QUESTION: what methods throw AggregateException from some Task? I miss Javadoc... :(
// task.Result calls task.Wait and task.Wait throws AggregateException :)
Console.WriteLine(task.Result);
+
+
+ /**
+ * There are several ways to observe an unhandled task exception:
+ *
+ * Invoking the faulted task's Wait method causes the task's unhandled exception to be observed. The exception is also thrown
+ * in the calling context of the Wait method. The Task class's static WaitAll method allows you to observe the unhandled
+ * exceptions of more than one task with a single method invocation. The Parallel.Invoke method includes an implicit call
+ * to WaitAll. Exceptions from all of the tasks are grouped together in an AggregateException object and thrown in the
+ * calling context of the WaitAll or Wait method.
+ *
+ * Getting the Exception property of a faulted task causes the task's unhandled exception to be observed. The property returns
+ * the aggregate exception object. Getting the value does not automatically cause the exception to be thrown; however,
+ * the exception is considered to have been observed when you get the value of the Exception property. Use the Exception
+ * property instead of the Wait method or WaitAll method when you want to examine the unhandled exceptionbut do not want
+ * it to be rethrown in the current context.
+ *
+ * Special handling occurs if a faulted task's unhandled exceptions are not observed by the time the task object is
+ * garbage-collected. For more information, see the section, "Unobserved Task Exceptions," later in this chapter.
+ */
+
+
+ /**
+ * Unobserved Task Exceptions:
+ *
+ * If you don't give a faulted task the opportunity to propagate its exceptions (for example, by calling the Wait method),
+ * the runtime will escalate the task's unobserved exceptions according to the current .NET exception policy when the task
+ * is garbage-collected. Unobserved task exceptions will eventually be observed in the finalizer thread context.
+ * The finalizer thread is the system thread that invokes the Finalize method of objects that are ready to be garbage-collected.
+ * If an unhandled exception is thrown during the execution of a Finalize method, the runtime will, by default, terminate
+ * the current process, and no active try/finally blocks or additional finalizers will be executed, including finalizers
+ * that release handles to unmanaged resources. To prevent this from happening, you should be very careful that your
+ * application never leaks unobserved task exceptions. You can also elect to receive notification of any unobserved task
+ * exceptions by subscribing to the UnobservedTaskException event of the TaskScheduler class and choose to handle them as
+ * they propagate into the finalizer context.
+ *
+ * This last technique can be useful in scenarios such as hosting untrusted plug-ins that have benign exceptions that
+ * would be cumbersome to observe. For more information, see the section, "Further Reading," at the end of this chapter.
+ *
+ * During finalization, tasks that have a Status property of Faulted are treated differently from tasks with the status
+ * Canceled. The task's status determines how unobserved task exceptions that arise from task cancellation are treated
+ * during finalization. If the cancellation token that was passed as an argument to the StartNew method is the same
+ * token as the one embedded in the unobserved OperationCanceledException instance, the task does not propagate
+ * the operation-canceled exception to the UnobservedTaskException event or to the finalizer thread context. In other words,
+ * if you follow the cancellation protocol described in this chapter, unobserved cancellation exceptions will not be
+ * escalated into the finalizer's thread context.
+ */
}
--- /dev/null
+using System;
+using System.Linq;
+using System.Threading.Tasks;
+using System.Collections.Concurrent;
+
+namespace Threads
+{
+ /// <summary>
+ ///
+ /// Chapter 4.
+ /// Parallel Aggregation
+ ///
+ /// Taken from http://msdn.microsoft.com/en-us/library/ff963547.aspx
+ ///
+ /// </summary>
+ public class Chapter4
+ {
+ public static void Test()
+ {
+ Console.WriteLine("Chapter 4");
+
+ Console.WriteLine("Parallel Aggregation, the basics");
+ double[] sequence = { 2.22, 3.22, 4.33, 5.33, 6.33, 7.33, 8.33, 9.33, 10.33 };
+ double sum = 0.0d;
+ for (int i = 0; i < sequence.Length; i++)
+ {
+ sum += Normalize(sequence[i]);
+ }
+ Console.WriteLine("Result {0}", sum);
+
+
+ Console.WriteLine("Parallel Aggregation, the basics, LINQ");
+ sum = (from x in sequence select Normalize(x)).Sum();
+ Console.WriteLine("Result {0}", sum);
+
+
+ Console.WriteLine("Parallel Aggregation, the basics, PLINQ");
+ sum = (from x in sequence.AsParallel() select Normalize(x)).Sum();
+ Console.WriteLine("Result {0}", sum);
+
+
+ Console.WriteLine("Parallel Aggregation, the basics, PLINQ. Custom aggreate.");
+ sum = (from x in sequence.AsParallel() select Normalize(x)).Aggregate(1.0d, (y1, y2) => y1 * y2);
+ Console.WriteLine("Result {0}", sum);
+
+ /**
+ * If PLINQ doesn't meet your needs or if you prefer a less declarative style of coding, you can also use Parallel.For
+ * or Parallel.ForEach to implement the parallel aggregation pattern. The Parallel.For and Parallel.ForEach methods
+ * require more complex code than PLINQ. For example, the Parallel.ForEach method requires your code to include
+ * synchronization primitives to implement parallel aggregation. For examples and more information, see
+ * "Using Parallel Loops for Aggregation".
+ */
+
+
+ Console.WriteLine("Using Parallel Loops for Aggregation");
+ object lockObject = new object();
+ Parallel.ForEach(
+ // The values to be aggregated
+ sequence,
+
+ // The local initial partial result
+ () => 0.0d,
+
+ // The loop body
+ (x, loopState, partialResult) =>
+ {
+ return Normalize(x) + partialResult;
+ },
+
+ // The final step of each local context
+ (localPartialSum) =>
+ {
+ // Enforce serial access to single, shared result (sum is global)
+ lock (lockObject)
+ {
+ sum += localPartialSum;
+ }
+ });
+ Console.WriteLine("Result {0}", sum);
+
+
+ Console.WriteLine("Using a Range Partitioner for Aggregation");
+ var rangePartitioner = Partitioner.Create(0, sequence.Length);
+ Parallel.ForEach(
+ // The input intervals
+ rangePartitioner,
+
+ // The local initial partial result
+ () => 0.0d,
+
+ // The loop body for each interval
+ (range, loopState, initialValue) =>
+ {
+ Console.WriteLine("Partitioner Default range {0} {1}", range.Item1, range.Item2);
+ double partialSum = initialValue;
+ for (int i = range.Item1; i < range.Item2; i++)
+ {
+ partialSum += Normalize(sequence[i]);
+ }
+ return partialSum;
+ },
+
+ // The final step of each local context
+ (localPartialSum) =>
+ {
+ // Use lock to enforce serial access to shared result
+ lock (lockObject)
+ {
+ sum += localPartialSum;
+ }
+ });
+ Console.WriteLine("Result {0}", sum);
+
+
+ // Do not copy this code. This version will run much slower
+ // than the sequential version. It's included here to
+ // illustrate what not to do.
+ // BUG – don't do this
+ Parallel.For(0, sequence.Length, i =>
+ {
+ // BUG – don't do this
+ lock (lockObject)
+ {
+ sum += sequence[i];
+ }
+ });
+ Console.WriteLine("Result {0}", sum);
+
+ // If you forget to add the lock statement, this code fails to calculate the correct sum on a multicore computer.
+ // Adding the lock statement makes this code example correct with respect to serialization. If you run this code, it
+ // produces the expected sum. However, it fails completely as an optimization. This code is many times slower than
+ // the sequential version it attempted to optimize! The reason for the poor performance is the cost of synchronization.
+ }
+
+ public static double Normalize(double number)
+ {
+ return number * 2;
+ }
+ }
+}
+
--- /dev/null
+using System;
+using System.Threading.Tasks;
+
+namespace Threads
+{
+ /// <summary>
+ ///
+ /// Chapter 5.
+ /// Futures
+ ///
+ /// Taken from http://msdn.microsoft.com/en-us/library/ff963556.aspx
+ ///
+ /// </summary>
+ public class Chapter5
+ {
+ public static void Test()
+ {
+ Console.WriteLine("Futures");
+ int a = 1;
+ Task<int> futureB = Task.Factory.StartNew<int>(() => F1(a));
+ var c = F2(a);
+ var d = F3(c);
+ try {
+ var f = F4(futureB.Result, d);
+ Console.WriteLine("Result {0}", f);
+ }
+ catch (AggregateException e)
+ {
+ Console.WriteLine("Exception in parallel Task F1: {0} {1}", e.Message, e.StackTrace);
+ }
+
+
+ Console.WriteLine("Continuation Tasks");
+ futureB = Task.Factory.StartNew<int>(() => F1(a));
+ var futureD = Task.Factory.StartNew<int>(() => F3(F2(a)));
+ var futureF = Task.Factory.ContinueWhenAll<int, int>(
+ new[] { futureB, futureD },
+ (tasks) => F4(futureB.Result, futureD.Result));
+ // I do not know what could I use this thing for... :/
+ futureF.ContinueWith((t) =>
+ Console.WriteLine("Continuation Tasks result {0}", t)
+ );
+ Console.WriteLine ("Continuation Tasks result {0}", futureF.Result);
+ }
+
+ public static int F1(int number)
+ {
+ return 1 + number;
+ }
+
+ public static int F2(int number)
+ {
+ return 2 + number;
+ }
+
+ public static int F3(int number)
+ {
+ return 3 + number;
+ }
+
+ public static int F4(int numberA, int numberB)
+ {
+ return numberA + numberB;
+ }
+ }
+}
+
{
Chapter2.Test();
Chapter3.Test();
+ Chapter4.Test();
+ Chapter5.Test();
}
}
}
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="Chapter2.cs" />
<Compile Include="Chapter3.cs" />
+ <Compile Include="Chapter4.cs" />
+ <Compile Include="Chapter5.cs" />
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
</Project>
\ No newline at end of file