Monday, February 9, 2009

Parallel Programming in .Net 4.0 and Visual Studio 2010: BlockingCollection<T>

Copyright 2008-2009, Paul Jackson, all rights reserved

The company I work for is making heavy use of smart-client technologies and we were early adopters of the Composite UI Application Block (CAB) and Smart Client Software Factory (SCSF) from Microsoft patterns & practices.  As such, I try to keep up on the discussion forums for those products, and I was on the Smart Client forum today when I saw a post from someone who was encountering an exception out of CAB: “System.InvalidOperationException: Collection was modified; enumeration operation may not execute.”

I spent a bit of time looking into it and found that the problem occurred when one thread was doing something that added to a particular CAB collection while another thread was in the process of iterating over the collection’s content – which, of course results in our friendly “Collection was modified” exception.  As I looked into this, I had a “damn, I wish”-moment – these are moments when I stop coding and mutter something like:

“Damn, I wish we could just go ahead and deploy .Net 4.0 now, because this task would be a lot easier with it.”

“Damn, I wish we’d had the Parallel Extensions when we’d first written this code, because it’d be a lot easier to maintain it now.”

“Damn, I wish I could stop dealing with threads – maybe I’ll move to Wyoming and become a shepherd.  Shepherds don’t have to deal with threads.”  Note: Yes, shepherds have to deal with wool, but it’s not thread yet, so that’s okay.  And, if you get really irritated, you can forget the wool and deal with mutton.

But none of the standard .Net collections are threadsafe, so CAB isn’t and the alternatives open to the guy who has the above exception happening in his application aren’t really attractive.

So what does all this have to do with .Net 4.0 and Visual Studio 2010 and the Parallel Extensions?  Well, it got me thinking about the new threadsafe collections we’ll have access to when 4.0 is released.

In System.Collections.Concurrent there are currently (Visual Studio 2010 CTP), four threadsafe collections that we’ll be able to use (and I’m assuming/hoping there’ll be more available before release):

ConcurrentQueue<>

ConcurrentStack<>

ConcurrentDictionary<>

BlockingCollection<>

The first three are threadsafe versions of existing .Net collections, which is cool enough, but the really cool one is BlockingCollection<T>. 

 

One of the things we can do with BlockingCollection is use it in a foreach loop but have it block and wait for more items when the collection is empty – so our foreach will simply sit and wait if the collection becomes empty.

The other very cool thing we can do with BlockingCollection is set a maximum number of items and then Adds will block until something’s removed.

Let’s take a look at some of the things we can do with this.  Suppose we have an application that looks like this:

   1: static void Main(string[] args)
   2: {
   3:     var c = new BlockingCollection<DateTime>();
   4:  
   5:     for (int i = 0; i < 100; i++)
   6:     {
   7:         c.Add(DateTime.Now);
   8:     }
   9:  
  10:     Console.ReadLine();  
  11: }

We can anticipate that running this will very quickly add 100 DateTimes to the BlockingCollection c.

But if we then create a Task to run on another thread to iterate over the collection:

   1: var t = Task.StartNew(delegate
   2: {
   3:     foreach (var item in c.GetConsumingEnumerable())
   4:     {
   5:         Console.WriteLine(item);
   6:     }
   7: }
   8: );

GetConsumingEnumerable() is the magic method that enumerates the collection and blocks if there are no items.  And we’ll also want to change the loop that adds:

   1: for (int i = 0; i < 100; i++)
   2: {
   3:     c.Add(DateTime.Now);
   4:     Thread.Sleep(2000);
   5: }
   6: c.CompleteAdding();
   7:  
   8: t.Wait();

In Line 4 we’re putting the main thread to sleep for two seconds to simulate that the items are being added far apart.

In Line 6 we tell the collection that no more items will be added.  Since we’re now in a situation where another thread will be blocking, waiting for more items to be added, it becomes somewhat critical to be able to tell it not to expect any more.

And in Line 8 we wait for the background thread that’s reading from the collection to finish.

So the new code in its entirety would look like:

   1: static void Main(string[] args)
   2: {
   3:     var c = new BlockingCollection<DateTime>();
   4:  
   5:     var t = Task.StartNew(delegate
   6:     {
   7:         foreach (var item in c.GetConsumingEnumerable())
   8:         {
   9:             Console.WriteLine(item);
  10:         }
  11:     }
  12:     );
  13:  
  14:     for (int i = 0; i < 100; i++)
  15:     {
  16:         c.Add(DateTime.Now);
  17:         Thread.Sleep(2000);
  18:     }
  19:     c.CompleteAdding();
  20:  
  21:     t.Wait();
  22:  
  23:     Console.ReadLine();         
  24: }

And if we run it, we’ll get output like:

imageThe main thread runs and adds to the BlockingCollection once every two seconds while the background thread enumerates the collection and blocks if there’re no items. 

This, in itself is pretty cool, but let’s suppose it’s the process pulling items from the collection and processing that takes a long time and we want to take advantage of multiple cores by having more than one thread do that work.  For instance, if we were reading records from a file and processing them.  We can have one thread reading from the disk (avoiding an IO bottleneck) and adding to the collection while multiple threads process the data.

So we’ll put the delay in the process reading from the collection and add a second worker thread – and just for variety we’ll make the threads sleep for random times:

   1: var t = Task.StartNew(delegate
   2: {
   3:     var random = new Random();
   4:     foreach (var item in c.GetConsumingEnumerable())
   5:     {
   6:         Console.WriteLine("1:" + item);
   7:         Thread.Sleep(random.Next(500,2000));
   8:     }
   9: }
  10: );
  11: var t2 = Task.StartNew(delegate
  12: {
  13:     var random = new Random();
  14:     foreach (var item in c.GetConsumingEnumerable())
  15:     {
  16:         Console.WriteLine("2:" + item);
  17:         Thread.Sleep(random.Next(500, 2000));
  18:     }
  19: }
  20: );

If we then remove the Sleep() from the main thread where we’re adding items and Wait() on the second thread to finish processing as well, we’ll wind up with output similar to:

imageThe times are all the same (mostly) because the main thread very quickly added 100 items to the collection.  The worker threads then pull items out in between their random delays, accounting for one thread sometimes processing two items in a row before the other can process one.  If you run the full code, you’ll see the delay happen:

   1: static void Main(string[] args)
   2: {
   3:     var c = new BlockingCollection<DateTime>();
   4:  
   5:     var t = Task.StartNew(delegate
   6:     {
   7:         var random = new Random();
   8:         foreach (var item in c.GetConsumingEnumerable())
   9:         {
  10:             Console.WriteLine("1:" + item);
  11:             Thread.Sleep(random.Next(500,2000));
  12:         }
  13:     }
  14:     );
  15:     var t2 = Task.StartNew(delegate
  16:     {
  17:         var random = new Random();
  18:         foreach (var item in c.GetConsumingEnumerable())
  19:         {
  20:             Console.WriteLine("2:" + item);
  21:             Thread.Sleep(random.Next(500, 2000));
  22:         }
  23:     }
  24:     );
  25:  
  26:     for (int i = 0; i < 100; i++)
  27:     {
  28:         c.Add(DateTime.Now);
  29:     }
  30:     c.CompleteAdding();
  31:  
  32:     t.Wait();
  33:     t2.Wait();
  34:  
  35:     Console.ReadLine();         
  36: }

So now we have two threads processing the data and taking some time about it, but the main thread immediately fills the collection with 100 items to be processed.  But suppose the data from our file is large – we may not want to load everything at once.  For instance, we may not want to have more than four items in the collection.  To demonstrate this, we’ll increase the time that the processing worker threads sleep (to 5-7 seconds) and change the creation of the BlockingCollection to:

   1: var c = new BlockingCollection<DateTime>(4);

Now the first several items in the collection will have the same time as the main thread quickly adds the first five items (the initial load appears to be the number you specify in the constructor plus one) and then adds more as the worker threads process their first units of work:

image The main thread will block on the Add until one of the worker threads removes an item from the BlockingCollection.  So if our items are large, we can use this technique to limit the number of instances loaded into memory at any one time and throttle the application’s memory usage.  Run the full code and play around with it to explore this new feature:

   1: static void Main(string[] args)
   2: {
   3:  
   4:     var c = new BlockingCollection<DateTime>(4);
   5:  
   6:     var t = Task.StartNew(delegate
   7:     {
   8:         var random = new Random();
   9:         foreach (var item in c.GetConsumingEnumerable())
  10:         {
  11:             Console.WriteLine("1:" + item);
  12:             Thread.Sleep(random.Next(5000,7000));
  13:         }
  14:     }
  15:     );
  16:     var t2 = Task.StartNew(delegate
  17:     {
  18:         var random = new Random();
  19:         foreach (var item in c.GetConsumingEnumerable())
  20:         {
  21:             Console.WriteLine("2:" + item);
  22:             Thread.Sleep(random.Next(5000, 7000));
  23:         }
  24:     }
  25:     );
  26:  
  27:     for (int i = 0; i < 100; i++)
  28:     {
  29:         c.Add(DateTime.Now);
  30:     }
  31:     c.CompleteAdding();
  32:  
  33:     t.Wait();
  34:     t2.Wait();
  35:  
  36:     Console.ReadLine();         
  37:  
  38: }

BlockingCollection<T> opens up some really interesting possibilities processing data in a manycore environment:  publish/subscribe designs and pipelining spring to mind as possibilities – one of the very cool new things we have to look forward to in .Net 4.0.

But wait, there’s more!

BlockingCollection also has some other interesting methods like:

   1: public static int TakeFromAny(BlockingCollection<T>[] collections, out T item);

Apparently we’ll be able to pass in an array of collections and ask for an item out of any of them.  Or:

   1: public static int AddToAny(BlockingCollection<T>[] collections, T item);

Which looks like we’ll be able to add an item to whichever BlockingCollection in the array isn’t blocked from adding.

I’m going to be playing with these a bit as soon as I have time and will definitely post the results.

DotNetKicks Image

2 comments:

evilhomer said...

A great post :-)

I'm looking forward to these new thread safe collections, they will make some of my life easier.

Great to see you are blogging again.
Your writing style is very easy to read and entertaining.

Jon Kragh said...

Hey man nice post. I was just about to write my own immutable collections because I was getting sick of writing locking code over and over. I figured 4.0 must have something with the parallel extensions coming out. Nice to see they do! Your examples are really clear BTW.

Good job!
Jon