Thursday, February 19, 2009

Parallel.For(…): A Deeper Dive – Parallel Programming in .Net 4.0 and Visual Studio 2010

Copyright 2008-2009, Paul Jackson, all rights reserved

In a previous post, I showed the performance benefits Parallel.For(…), part of the .Net 4.0 Parallel Extensions, can result in on a multicore system.  In this article, we’ll take a deeper dive into Parallel.For, exploring more of its options and parameters.  There’s also a brief section on Parallel.ForEach(…) at the end of this article, since the two are very similar.

As we saw previously, Parallel.For, at its most basic, requires three arguments:

   1: Parallel.For(0, 100, delegate(int i)
   2:     {
   3:         doWork(i);
   4:     }
   5: );

An inclusive “from” value (0), an exclusive “to” value (100) and a delegate containing the work to be performed.  So the above code will execute the delegate 100 times, iterating “i” from 0 to 99.  This is useful, but it’s far from all Parallel.For can do – as evidenced by the 18 overloads (not quite the 21-overloads of MessageBox.Show, but close and they have time still – one day I’m going to do some research to find the most overloaded method in the .Net framework). 

At its most complex, the method signature looks like this:

   1: public static ParallelLoopResult For<TLocal>(
   2:     int fromInclusive, 
   3:     int toExclusive, 
   4:     int step, 
   5:     Func2<TLocal> threadLocalInit, 
   6:     Action2<int, ParallelState<TLocal>> body, 
   7:     Action<TLocal> threadLocalFinally, 
   8:     TaskManager manager, 
   9:     TaskCreationOptions options);

In this article, we’ll look at the return value (ParallelLoopResult) and all of the parameters except manager (TaskManager) and options (TaskCreationOptions) – those last two will be covered in a separate article. 

fromInclusive, toExclusive and step

The first three parameters should be self-explanatory, as they’re very similar to the a regular for-loop:

   1: for (int i = 0; i < 100; i++)
   2:  
   3: Parallel.For(0, 100, 1  ...

There is one important difference, however, in that the step of Parallel.For must be positive while a regular for-loop can be negative:

   1: for (int i = 100; i > 0; i--)
   2:  
   3: Parallel.For(100, 0, -1 ,

The for above will count down from 100 to 1, but the Parallel.For will throw an ArgumentOutOfRange exception.  At first glance, this may seem like an artificial and unnecessary limitation, but consider that a for-loop implemented this way is likely doing so for some ordering purpose, and ordering of the loop iterations is completely moot in a parallel implementation.

There also appears to be no way to represent something like:

   1: for (int i =1 ; i < 1000; i=i*2)

in a Parallel.For and, obviously, you wouldn’t be able to modify the iterator inside the loop as you can in for:

   1: for (int i =1 ; i < 1000; i++)
   2: {
   3:     i = i + 5;
   4:     Console.WriteLine(i);
   5: }

But code that manipulates the iterator in these ways is probably not a good candidate for parallelization anyway.

Func2<TLocal> threadLocalInit

This parameter allows you pass in a delegate which will run once for each thread.  That last bit is important.  The code in the delegate will run a number of times equal to the number of threads being used for the loop and each execution will be isolated to that thread, with the returned object being available to iterations of the loop (Tasks) running on that thread.  To demonstrate that let’s go back to the original Console code from my first post on Parallel.For:

   1: static void Main(string[] args)
   2: {
   3:     Stopwatch watch = new Stopwatch();
   4:     watch.Start();
   5:  
   6:     Parallel.For(0, 100, (i) =>
   7:         {
   8:             doWork(i);
   9:         }
  10:     );
  11:  
  12:     watch.Stop();
  13:     Console.WriteLine(String.Format("Entire process took {0} milliseconds", watch.ElapsedMilliseconds));
  14:     Console.ReadLine();
  15:  
  16: }
  17:  
  18: private static void doWork(int instance)
  19: {
  20:     Stopwatch watch = new Stopwatch();
  21:     watch.Start();
  22:     double result = Math.Acos(new Random().NextDouble()) * Math.Atan2(new Random().NextDouble(), new Random().NextDouble());
  23:     for (int i = 0; i < 20000; i++)
  24:     {
  25:         result += (Math.Cos(new Random().NextDouble()) * Math.Acos(new Random().NextDouble()));
  26:     }
  27:     watch.Stop();
  28:     Console.WriteLine(String.Format("{0} took {1} milliseconds", instance, watch.ElapsedMilliseconds));
  29: }

What I want to do is assign each thread executing my delegate a Guid to identify it, so we change the Parallel.For to initialize each thread this way:

   1: Parallel.For(0, 100, 1,
   2:     () =>
   3:         {
   4:             return Guid.NewGuid();
   5:         },
   6:     (i, loopState) =>
   7:         {
   8:             doWork(i);
   9:         }
  10: );

There are three changes in this version.  First, on line 1 we have to specify the step; this is simply because the overload that allows threadLocalInit requires it.  Second, on line 6 we have to add a parameter to our main delegate; this winds up being of type ParallelState<Guid>, and it’s how we can access the value returned by the threadLocalInit delegate.  Third, lines 2 through 5 are the delegate that will execute once for each thread that a Task from our loop is assigned to.

The type for ParallelState<Guid> (loopState in the example above) is inferred from the return value of the threadLocalInit delegate, but we could also specify it:

   1: Parallel.For<Guid>(0, 100, 1,
   2: ...
   3: );

We’ll look more at ParallelState<T> later, but for now, we’ll concentrate on the fact that the value returned from threadLocalInit will be stored in loopState.ThreadLocalState (which will be of type Guid in this example).  Knowing this, we can pass it into the doWork method and add the thread’s Guid to the update we write to the Console:

   1: Parallel.For(0, 100, 1,
   2:     () =>
   3:         {
   4:             return Guid.NewGuid();
   5:         },
   6:     (i, loopState) =>
   7:         {
   8:             doWork(i, loopState.ThreadLocalState);
   9:         }
  10: );

image

This will allow you to create non-threadsafe objects that can be safely used within your parallel task, because there will be a separate instance of them for each thread.

Suppose we wanted to enhance our application to track the time each thread spent processing, in addition to the total time and time for each loop iteration that we track now.  To do this, we’ll create a class to hold both a Guid and the cumulative time spent in the thread:

   1: public class ThreadTimeTracker
   2: {
   3:     public Guid ThreadGuid { get; set; }
   4:     public long CumulativeThreadTime { get; set; }
   5: }

Change the Parallel.For to use this class instead of a Guid:

   1: Parallel.For(0, 100, 1,
   2:     () =>
   3:         {
   4:             return new ThreadTimeTracker()
   5:             {
   6:                 ThreadGuid = Guid.NewGuid()
   7:             };
   8:         },
   9:     (i, loopState) =>
  10:         {
  11:             doWork(i, loopState.ThreadLocalState);
  12:         }
  13: );

And change the doWork method to add its time to the CumulativeThreadTime:

   1: private static void doWork(int instance, ThreadTimeTracker threadInstance)
   2: {
   3:     Stopwatch watch = new Stopwatch();
   4:     watch.Start();
   5:     ...
   6:     watch.Stop();
   7:     threadInstance.CumulativeThreadTime += watch.ElapsedMilliseconds;
   8:     Console.WriteLine(String.Format("Thread {0} - {1} took {2} milliseconds", threadInstance.ToString(), instance, watch.ElapsedMilliseconds));
   9: }

So now each thread processing our work will have a cumulative count of the amount of time it spent processing, which we can access in …

Action<TLocal> threadLocalFinally

Just as threadLocalInit allows us to pass in a delegate that runs once on each thread before any Tasks are processed, threadLocalFinally executes once on each thread after the last Task for that thread is complete.  The delegate will receive a single parameter of the same type returned by threadLocalInit, and we can output the cumulative time for each thread to the Console:

   1: Parallel.For(0, 100, 1,
   2:     () =>
   3:         {
   4:             return new ThreadTimeTracker()
   5:             {
   6:                 ThreadGuid = Guid.NewGuid()
   7:             };
   8:         },
   9:     (i, loopState) =>
  10:         {
  11:             doWork(i, loopState.ThreadLocalState);
  12:         },
  13:     (threadLocalState) =>
  14:         {
  15:             Console.WriteLine(String.Format(
  16:                 "Thread {0} processed for {1} ms.",
  17:                 threadLocalState.ThreadGuid.ToString(),
  18:                 threadLocalState.CumulativeThreadTime
  19:                 ));
  20:         }
  21: );

image

But wait … why are the cumulative thread times all scattered in amongst the loop iteration times?  Remember that threadLocalFinally executes on each thread after all of the Tasks for that thread have completed – it’s virtually guaranteed that all of the threads running your Tasks won’t finish at exactly the same time, so there’s some coordination we’ll have to do if we want to group the results for all threads at the end of all Tasks.

What we can do is create a collection and add the results as each thread completes:

   1: var coordination = new ConcurrentDictionary<Guid, long>();
   2: Parallel.For(0, 100, 1,
   3:      ...
   4:     (threadLocalState) =>
   5:         {
   6:             coordination.Add(
   7:                 threadLocalState.ThreadGuid,
   8:                 threadLocalState.CumulativeThreadTime
   9:                 );
  10:         }
  11: );

ConcurrentDictionary is one of the new, thread-safe collections available in System.Collections.Concurrent in .Net 4.

Each thread will now add its results to the ConcurrentDictionary and we can write them all to the Console after the Parallel.For loop finishes, achieving the results we want:

image

Following is the complete source code for this example.  ConcurrentDictionary isn’t available in the June 2008 CTP of the Parallel Extensions, only as part of the Visual Studio 2010 CTP – if you’d like to run this sample in VS2008 with the June CTP, you can use ConcurrentQueue or ConcurrentStack instead.

   1: class Program
   2: {
   3:     static void Main(string[] args)
   4:     {
   5:         Stopwatch watch = new Stopwatch();
   6:         watch.Start();
   7:  
   8:         var coordination = new ConcurrentDictionary<Guid, long>();
   9:         Parallel.For(0, 100, 1,
  10:             () =>
  11:                 {
  12:                     return new ThreadTimeTracker()
  13:                     {
  14:                         ThreadGuid = Guid.NewGuid()
  15:                     };
  16:                 },
  17:             (i, loopState) =>
  18:                 {
  19:                     doWork(i, loopState.ThreadLocalState);
  20:                 },
  21:             (threadLocalState) =>
  22:                 {
  23:                     coordination.Add(
  24:                         threadLocalState.ThreadGuid,
  25:                         threadLocalState.CumulativeThreadTime
  26:                         );
  27:                 }
  28:         );
  29:  
  30:         foreach (var item in coordination)
  31:         {
  32:             Console.WriteLine(String.Format(
  33:                 "Thread {0} processed for {1} ms.",
  34:                 item.Key.ToString(),
  35:                 item.Value
  36:                 ));
  37:         }
  38:         watch.Stop();
  39:         Console.WriteLine(String.Format("Entire process took {0} milliseconds", watch.ElapsedMilliseconds));
  40:         Console.ReadLine();
  41:  
  42:     }
  43:  
  44:     private static void doWork(int instance, ThreadTimeTracker threadInstance)
  45:     {
  46:         Stopwatch watch = new Stopwatch();
  47:         watch.Start();
  48:         double result = Math.Acos(new Random().NextDouble()) * Math.Atan2(new Random().NextDouble(), new Random().NextDouble());
  49:         for (int i = 0; i < 20000; i++)
  50:         {
  51:             result += (Math.Cos(new Random().NextDouble()) * Math.Acos(new Random().NextDouble()));
  52:         }
  53:         watch.Stop();
  54:         threadInstance.CumulativeThreadTime += watch.ElapsedMilliseconds;
  55:         Console.WriteLine(String.Format("Thread {0} - {1} took {2} milliseconds", threadInstance.ThreadGuid.ToString(), instance, watch.ElapsedMilliseconds));
  56:     }
  57: }

Parallel.For returns ParallelLoopResult

The return value of Parallel.For is another difference between the two available CTPs of the Parallel Extensions.  In the June CTP, Parallel.For returns void – the September (VS2010) CTP changes this to ParallelLoopResult:

   1: public struct ParallelLoopResult
   2: {
   3:     public bool Completed { get; }
   4:     public long? LowestBreakIteration { get; }
   5: }

Completed will be true if the Parallel.For ran to completion, but will be false if it’s stopped for any reason, such as calling Stop() on the ParallelState object from within an iteration:

   1: var result = Parallel.For(1, 100, (i,loopState) =>
   2:     {
   3:         loopState.Stop();
   4:     });

LowestBreakIteration will be set to the lowest iteration that called Break() on ParallelState:

   1: var result = Parallel.For(1, 100, (i,loopState) =>
   2:     {
   3:         if (i==10)
   4:             loopState.Break();
   5:     });

In the above example, result.LowestBreakIteration will be 10.  We look at Break() and Stop() in more detail in …

ParallelState and ParallelState<TLocal>

Note: In the June 2008 CTP of the Parallel Extensions, ParallelState is limited to the Stop() and IsStopped functionality below.  Other methods and properties were added in the September 2008 CTP as part of the VS2010 CTP and .Net 4.

ParallelState is the base loop-state type:

   1: public class ParallelState
   2: {
   3:     public bool IsExceptional { get; }
   4:     public bool IsStopped { get; }
   5:     public virtual long? LowestBreakIteration { get; }
   6:     public virtual bool ShouldExitCurrentIteration { get; }
   7:  
   8:     public virtual void Break();
   9:     public void Stop();
  10: }

And ParallelState<TLocal> adds the ThreadLocalState property, which we saw in the threadLocalInit example:

   1: public class ParallelState<TLocal> : ParallelState
   2: {
   3:     public TLocal ThreadLocalState { get; set; }
   4: }

The two methods, Break() and Stop(), are used to manipulate program flow and, at first glance, appear to be synonymous.  The documentation for them reads:

Break(): This is shared with all other concurrent threads in the system which are participating in the loop's execution. After calling Break(), no additional iterations past the iteration of the caller will be executed on the current thread, and other parallel workers will be stopped at their earliest convenience.

Stop(): This is shared with all other concurrent threads in the system which are participating in the loop's execution. After calling Stop(), no additional iterations will be executed on the current thread, and other parallel workers will be stopped at their earliest convenience.

But there’s a subtle difference – we’ll look at Stop() first.

When Stop() is called, any iterations queued for processing, but not yet executing, will be removed from the thread queues and will never execute, but any iterations already processing will run to completion.  So given a loop of:

   1: Parallel.For(1, 100, (i, loopState) =>
   2: {
   3:     if (i < 5)
   4:         Thread.Sleep(100);
   5:     if (i == 8)
   6:     {
   7:         loopState.Stop();
   8:         return;
   9:     }
  10:     if (loopState.IsStopped)
  11:     {
  12:         Console.WriteLine("{0} was stopped", i);
  13:         return;
  14:     }
  15:     Console.WriteLine("{0} ran to completion", i);
  16: });

The result will be similar to:

image

Some iterations will run to completion, some will never execute and some will be stopped because they checked the IsStopped property.  It’s important to remember that order of execution for the iterations isn’t guaranteed, so stopping on iteration 8 doesn’t mean that iterations 9, 10, 11, etc. will never execute – if those iterations have started processing before Stop() is called, they’ll run to completion unless the IsStopped property is explicitly checked. 

Break() behaves very similarly, but with an important difference: It removes Tasks from the thread queues that have an iteration number higher than the iteration in which Break() was called and sets ShouldExitCurrentIteration True, again for iterations with a number higher than the one in which Break() was called.

So given 100 iterations, if Break() is called in iteration 45:

  • All Tasks for iterations 46 to 100 that haven’t started executing yet will be removed from the thread queues and will never execute;
  • ShouldExitCurrentIteration will equal True in all already executing iterations between 46 and 100;
  • Iterations 1 to 44 will be unaffected;
  • Iterations between 46 and 100 which have already completed are done and nothing happens to them;
  • Iterations between 46 and 100 which are currently executing when Break() is called will run to completion unless you explicitly check for ShouldExitCurrentIteration ;

The important thing to remember about either Stop() or Break() is this:  No Guarantees.  Once a Task has started executing, it’s up to you to exit if that’s the behavior you’re looking for, and, even then, it’s possible for higher-numbered iterations to be scheduled and complete execution before Stop() or Break() is called.

As with all things Parallel, if order of execution is important to you, parallel might not be the answer.

Parallel.ForEach(…)

Although this article is about Parallel.For, it’s worth also mentioning Parallel.ForEach, because the optional arguments are very similar:

   1: public static ParallelLoopResult ForEach<TSource, TLocal>(
   2:     IEnumerable<TSource> source, 
   3:     Func2<TLocal> threadLocalInit, 
   4:     Action2<TSource, int, ParallelState<TLocal>> body, 
   5:     Action<TLocal> threadLocalFinally, 
   6:     TaskManager manager, 
   7:     TaskCreationOptions options);

Parallel.ForEach has the same threadLocalInit and threadLocalFinally parameters, as well as returning the same ParallelLoopResult and ParallelState<TLocal> is an optional argument to the body delegate.  The only real differences have to do with the difference between for and foreach.

Conclusion

A deeper dive into the Parallel-class shows even more of the functionality we’ll have available to us in .Net 4. 

This is a very rich library which will improve the parallel-development experience, with options to create thread-local objects via the threadLocalInit delegate, perform finalization and aggregation via the threadLocalFinally delegate, and options to control the loops with the ParallelState class.

In a future article, we’ll look at the last two arguments to Parallel.For, Parallel.ForEach and Parallel.Invoke: the TaskManager and TaskCreationOptions.

kick it on DotNetKicks.comShout it

2 comments:

Gonzo Programmer said...

Thank you for an excellent article.

Leonel Gurdian said...

wow ! Excellent detailed explanation in helping me understand usage of this class and patterns!