Archives for : December2013

Previous Post in this series:
Part 9: Basic Exception handling with the AggregateException

Generally we’re used to having a break when doing loops. If you’ve tried to continue or break out of a parallel loop you get:

No enclosing loop out of which to break or continue

“continue” is the easy case, just return. But breaking is a bit more complex. Do you want to stop all threads? Do you want to run all threads up to the point where you break? Well, you have a choice. Below and in the included solution are two samples showing how to handle loop control.

LoopBreakSample:

public class LoopBreakSample : Sample
{
	public override string SampleName
	{
		get { return "Loop Break Sample"; }
	}

	public override bool ImageRequired
	{
		get { return false; }
	}

	protected int MaxValue { get { return 50; } }
	protected int BreakValue { get { return 20; } }

	public override void Run(System.Drawing.Bitmap bmp = null, Action<string> UpdateLog = null)
	{
		Stopwatch s = new Stopwatch();
		s.Start();

		UpdateLog("Running to " + MaxValue);
		var loopResult = Parallel.For(0, MaxValue, (int i, ParallelLoopState loop) =>
		{
			UpdateLog("Starting " + i);
			if (i == BreakValue)
			{
				UpdateLog("Breaking " + i);
				loop.Break();
				return;
			}

			Thread.Sleep(100);
		});
		UpdateLog("IsCompleted == " + loopResult.IsCompleted);
		if (!loopResult.LowestBreakIteration.HasValue)
			UpdateLog("LowestBreakIteration has no value");
		else
			UpdateLog("LowestBreakIteration.Value == " + loopResult.LowestBreakIteration.Value);

		s.Stop();
		RunTime = s.Elapsed;
	}
}

There are a few things going on here besides your normal delegate for the loop. First is that the parameters for the lambda that define the delegate have a ParallelLoopState. It is this loop state that we are calling .Break().

Second is that we use the ParallelLoopResult to see if the loop has completed and what the lowest iteration was when break was called.

It is critical that you understand how break works. Per the documentation:

Break may be used to communicate to the loop that no other iterations after the current iteration need be run. For example, if Break() is called from the 100th iteration of a for loop iterating in parallel from 0 to 1000, all iterations less than 100 should still be run, but the iterations from 101 through to 1000 are not necessary.

This is very important. Break() will continue to spawn threads until the point is reached had the break been called as if this was a standard loop. LowestBreakIteration is set so the user knows at what point Break() was called.

The result of running this will look similar to:

Starting Loop Break Sample
Running to 50
Starting 12
Starting 0
Starting 6
Starting 18
Starting 24
Starting 30
Starting 36
Starting 42
Starting 13
Starting 1
Starting 7
Starting 25
Starting 31
Starting 43
Starting 37
Starting 2
Starting 14
Starting 8
Starting 20
Breaking 20
Starting 3
Starting 15
Starting 9
Starting 16
Starting 10
Starting 17
Starting 5
Starting 11
IsCompleted == False
LowestBreakIteration.Value == 20
Completed Loop Break Sample

As you can see, the break is called at 20 like it should have been. New threads, however, were spawned to makes sure that “i” still reaches 20 where Break() was called.

UPDATE: As you read the above list of started threads there seems to be a couple of threads missing if we’re really running to 20. I changed out the maximum number of threads to spawn (with ParallelOptions) to 2 and fiddled with some other code to get it to break early. It worked as the documentation states so I’m not sure why there are some missing numbers in the above results.

So what if you want to just stop new thread creation and not continue on? That is where Stop() is used.

LoopStopSample:

public class LoopStopSample : LoopBreakSample
{
	public override string SampleName
	{
		get { return "Loop Stop Sample"; }
	}

	public override void Run(System.Drawing.Bitmap bmp = null, Action<string> UpdateLog = null)
	{
		Stopwatch s = new Stopwatch();
		s.Start();

		UpdateLog("Running to " + MaxValue);
		var loopResult = Parallel.For(0, MaxValue, (int i, ParallelLoopState loop) =>
		{
			if (i > BreakValue)
			{
				UpdateLog("Stopping at " + i);
				loop.Stop();
				return;
			}
			UpdateLog("Starting at " + i);
			while(!loop.IsStopped)
			{
				Thread.Sleep(10);
			}
		});
		UpdateLog("IsCompleted == " + loopResult.IsCompleted);
		if (!loopResult.LowestBreakIteration.HasValue)
			UpdateLog("LowestBreakIteration has no value");
		else
			UpdateLog("LowestBreakIteration.Value == " + loopResult.LowestBreakIteration.Value);

		s.Stop();
		RunTime = s.Elapsed;
	}
}

Here Stop() is called when we reached a value greater then BreakValue. Stop() is different then Break() in that it will cause no more thread to be generated at all. Any threads that were created will continue to run. When Stop() is called the parallelLoopState.IsStopped will also be set so other threads know that they should stop. LowestBreakIteration will have no value though. This is only set when Break() is called.

The result of running this will look similiar to:

Starting Loop Stop Sample
Running to 50
Starting at 0
Starting at 6
Starting at 12
Starting at 18
Stopping at 24
Stopping at 30
IsCompleted == False
LowestBreakIteration has no value
Completed Loop Stop Sample

You can see that as soon as Stop() is called no more threads are created even though there are still a lot of threads that haven’t been created up to the iterator.

So to sum up:

Action Break() Stop()
Thread Creation Continues spawning threads until the point is reached had this been a standard loop. Any threads already created are allowed to finish. No more threads are created. Any threads already created are allowed to finish.
LowestBreakIteration Set at the point when Break() is first called. Not set
IsStopped Not set Sets to true when called

Up next is a sample using CancellationTokenSource to cancel the threads from running outside of the loop.
Thanks,
Brian

Previous Post in this series:
Part 8: Adding a New Sample, Matrices Multiplication

In the updated solution you’ll find two new models, AggregateExceptionNoCatchSample and AggregateExceptionCatchSample. The TPL provides a convienient exception handling mechanism in the form of an AggregateException.

If you run through a Parallel.For or .ForEach and an exception is thrown in one of the threads, no more threads are created and any exceptions that are thrown across all threads are wrapped in an Aggregate exception and that is thrown upon leaving the Parallel.For or .ForEach.

The first sample here, AggregateExceptionNoCatchSample, throws an exception when getting to row 100. Of course remember that we’re spinning off threads and the row we’re looking at could be random. The code could run all rows up to 100 and than get thrown or get could get row 400 after only processing row 8.

AggregateExceptionNoCatchSample:

public class AggregateExceptionNoCatchSample : Sample
{
	//used by DrawGreyScale to set the number of rows we started
	private int rowsStarted = 0;

	public override string SampleName
	{
		get { return "Aggregate Exception - Don't Catch Exceptions Sample"; }
	}

	public override bool ImageRequired
	{
		get { return true; }
	}

	public override void Run(System.Drawing.Bitmap bmp = null, Action<string> UpdateLog = null)
	{
		Stopwatch s = new Stopwatch();
		s.Start();

		try
		{
			rowsStarted = 0;
			DrawGreyScale(bmp, UpdateLog);
		}
		catch (AggregateException ae)
		{
			UpdateLog("Started " + rowsStarted + " rows.");
			//if an exception is handled and true is returned then nothing happens
			//if an exception isn't handled and false is returned then all unhandled exceptions are rewrapped in
			//a new AggregateException and thrown again.
			ae.Handle((x) =>
			{
				if (x is StackOverflowException) // This we know how to handle.
				{
					UpdateLog("Handling a stack overflow exception.");
					return true;
				}
				else
				{
					UpdateLog("Unhandled exception.");
				}
				return false; // Let anything else stop the application.
			});
		}

		s.Stop();
		RunTime = s.Elapsed;
	}

	private void DrawGreyScale(System.Drawing.Bitmap bmp, Action<string> UpdateLog)
	{
		System.Drawing.Imaging.BitmapData bmData = bmp.LockBits(new System.Drawing.Rectangle(0, 0, bmp.Width, bmp.Height), System.Drawing.Imaging.ImageLockMode.ReadWrite, System.Drawing.Imaging.PixelFormat.Format24bppRgb);
		int stride = bmData.Stride;
		System.IntPtr Scan0 = bmData.Scan0;
		unsafe
		{
			byte* start = (byte*)(void*)Scan0;

			int height = bmp.Height;
			int width = bmp.Width;

			Parallel.For(0, height, y =>
			{
				UpdateLog("Starting line " + y.ToString());
				Interlocked.Increment(ref rowsStarted);
				byte* p = start + (y * stride);
				for (int x = 0; x < width; ++x)
				{
					byte blue = p[0];
					byte green = p[1];
					byte red = p[2];

					p[0] = p[1] = p[2] = (byte)(.299 * red
						+ .587 * green
						+ .114 * blue);

					p += 3;
				}

				if (y >= 100)
				{
					UpdateLog("Throwing an exception at " + y);
					if (y % 2 == 0)
						throw new StackOverflowException("yeah, we got a stack overflow.");
					else
						throw new ArgumentNullException("yeah, we got a null argument.");
				}
			});
		}
		bmp.UnlockBits(bmData);
	}
}

As mentioned an exception is thrown in DrawGreyScale upon getting to row 100. In all likelihood multiple rows would have been spun off for rows greater then or equal to 100. All of these rows that throw an exception will get combined into an AggregateException. Inside of the AggregateException is a property titled “InnerExceptions” that contains all of these exceptions. For even rows a StackOverflowException is thrown (for no reason other then I wanted to throw that type) and for odd rows an ArgumentNullException is throws (for the same reason as the StackOverflowException).

In the .Run of AggregateExceptionNoCatchSample the call to .DrawGreyScale is wrapped in a try/catch for an AggregateException. Reading the documentation on ExceptionHandling in the TPL it is recommended you don’t wrap a call like this in a try/catch and not do anything with the exceptions. Uh, yeah, well, I hope you wouldn’t catch exceptions and not do something with them.

In the catch .Handle is being called that invokes the predicate passed in. This will iterate through all exceptions in the InnerExceptions of the AggregateException so you can handle each one individually. If you handle the exception the predicate should return a true so that AggregateException knows not to do anything with it. If you don’t handle the exception you should return false. If any exceptions aren’t handled they are wrapped in a new AggregateException and thrown outside of the handle method. This way if you get multiple types of exceptions and one appears you weren’t expecting it can be handled higher up in the stack.

In the sample it may actually kill the app because I only handle the StackOverflowException and not the ArgumentNullException which will get re-thrown.

AggregateExceptionCatchSample:

public class AggregateExceptionCatchSample : Sample
{
	//used by DrawGreyScale to set the number of rows we started
	private int rowsStarted = 0;

	public override string SampleName
	{
		get { return "Aggregate Exception - Catch Exceptions Sample"; }
	}

	public override bool ImageRequired
	{
		get { return true; }
	}

	public override void Run(System.Drawing.Bitmap bmp = null, Action<string> UpdateLog = null)
	{
		Stopwatch s = new Stopwatch();
		s.Start();

		try
		{
			rowsStarted = 0;
			DrawGreyScale(bmp, UpdateLog);
		}
		catch (AggregateException ae)
		{
			UpdateLog("Started " + rowsStarted + " rows.");

			ae.Handle((x) =>
			{
				UpdateLog("Handling an exception.");
				return true;
			});
		}

		s.Stop();
		RunTime = s.Elapsed;
	}

	private void DrawGreyScale(System.Drawing.Bitmap bmp, Action<string> UpdateLog)
	{
		ConcurrentQueue<Exception> exceptions = new ConcurrentQueue<Exception>();
		System.Drawing.Imaging.BitmapData bmData = bmp.LockBits(new System.Drawing.Rectangle(0, 0, bmp.Width, bmp.Height), System.Drawing.Imaging.ImageLockMode.ReadWrite, System.Drawing.Imaging.PixelFormat.Format24bppRgb);
		int stride = bmData.Stride;
		System.IntPtr Scan0 = bmData.Scan0;
		unsafe
		{
			byte* start = (byte*)(void*)Scan0;

			int height = bmp.Height;
			int width = bmp.Width;

			Parallel.For(0, height, y =>
			{
				try
				{
					UpdateLog("Starting line " + y.ToString());
					Interlocked.Increment(ref rowsStarted);
					byte* p = start + (y * stride);
					for (int x = 0; x < width; ++x)
					{
						byte blue = p[0];
						byte green = p[1];
						byte red = p[2];

						p[0] = p[1] = p[2] = (byte)(.299 * red
							+ .587 * green
							+ .114 * blue);

						p += 3;
					}

					if (y >= 100)
					{
						UpdateLog("Throwing an exception at " + y);
						if (y % 2 == 0)
							throw new StackOverflowException("yeah, we got a stack overflow.");
						else
							throw new ArgumentNullException("yeah, we got a null argument.");
					}
				}
				catch (StackOverflowException)
				{
					UpdateLog("Internally handled the StackOverflowException.");
				}
				catch (Exception e)
				{
					exceptions.Enqueue(e);
				}
			});
		}
		bmp.UnlockBits(bmData);

		if (exceptions.Count > 0)
			throw new AggregateException(exceptions);
	}
}

In the AggregateExceptionCatchSample the exceptions are caught in the Parallel.For and dropped into a ConcurrentQueue if we don’t handle it. Then upon exiting the Parallel.For, if any exceptions were queued we throw a new AggregateException passing in the queue so it can get handled above us.

So why do this? The biggest advantage is that you may be able to handle the exceptions right in the thread with no reason to kill all threads. This way, if there are any threads that have exceptions you can’t handle then let them bubble up in the AggregateException. In this sample we assume we can handle all exceptions in the .Run and just return true in the .Handle of the catch so we don’t kill the app for a sample but you will only want to return true if you handle the exception and false if you don’t. Up next is stopping and breaking in a Parallel.For and .ForEach.

Thanks,
Brian

Part One: Starting with MVVM
Part Two: The MVVM solution structure and basic framework
Part Three: Base Classes
Part 4: Sampler View, View Model and Model
Part 5: Running and working with the TPL samples
Part 6: Parallel.For Sample
Part 7: Using Parallel.For effectively

We’re going to add in two new samples to continue showing the benefits of utilizing the TPL by doing some matrix multiplication. As mentioned before I added this sample for the initial code I created for the mentoring session on the TPL because this sample was directly applicable to some of the work we do. These derive from a sample provided by Microsoft on How to: Write a Simple Parallel.For Loop. Here the samples are pretty straight-forward. What’s more important in this post is what I had to do to add these new samples to the MVVM solution.

MatricesMultiplicationSample:

public class MatricesMultiplicationSample : Sample
{
	// Set up matrices. Use small values to better view 
	// result matrix. Increase the counts to see greater 
	// speedup in the parallel loop vs. the sequential loop.
	protected static readonly int colCount = 180;
	protected static readonly int rowCount = 2000;
	protected static readonly int colCount2 = 270;

	//protected statics so these can be used in the other sample
	static readonly Lazy<double[,]> _lazyMatrix1 = new Lazy<double[,]>(() => { return InitializeRandomMatrix(rowCount, colCount); });
	protected static double[,] Matrix1
	{
		get { return _lazyMatrix1.Value; }
	}

	static readonly Lazy<double[,]> _lazyMatrix2 = new Lazy<double[,]>(() => { return InitializeRandomMatrix(colCount, colCount2); });
	protected static double[,] Matrix2
	{
		get { return _lazyMatrix2.Value; }
	}

	static Random ran = new Random();
	static double[,] InitializeRandomMatrix(int rows, int cols)
	{
		double[,] matrix = new double[rows, cols];

		for (int i = 0; i < rows; i++)
		{
			for (int j = 0; j < cols; j++)
			{
				matrix[i, j] = ran.Next(100);
			}
		}
		return matrix;
	}

	public override string SampleName
	{
		get { return "Matrices Multiplication"; }
	}

	public override bool ImageRequired
	{
		get { return false; }
	}

	public override void Run(System.Drawing.Bitmap bmp = null, Action<string> updateLog = null)
	{
		double[,] matA = Matrix1;
		double[,] matB = Matrix2;
		double[,] result = new double[rowCount, colCount2];

		Stopwatch s = new Stopwatch();
		s.Start();

		int matACols = matA.GetLength(1);
		int matBCols = matB.GetLength(1);
		int matARows = matA.GetLength(0);

		for (int i = 0; i < matARows; i++)
		{
			for (int j = 0; j < matBCols; j++)
			{
				for (int k = 0; k < matACols; k++)
				{
					result[i, j] += matA[i, k] * matB[k, j];
				}
			}
		}

		s.Stop();
		RunTime = s.Elapsed;
	}
}

We need to share the values between the two models we have for demonstrating matrices multiplication. As such I’ve decided to put these values in a class that extends Sample and then have the second model extend this class. We’re using the Lazy<> class to initialize our matrices. Because of that I don’t start the stopwatch until after the matrices have been retrieved so as not to affect the time. Since we’re not using an image the ImageRequired returns false.

MatricesMultiplicationParallelSample:

public class MatricesMultiplicationParallelSample : MatricesMultiplicationSample
{
	public override string SampleName
	{
		get { return "Matrices Multiplication Parallel"; }
	}

	public override void Run(System.Drawing.Bitmap bmp = null, Action<string> updateLog = null)
	{
		double[,] matA = Matrix1;
		double[,] matB = Matrix2;
		double[,] result = new double[rowCount, colCount2];

		Stopwatch s = new Stopwatch();
		s.Start();

		int matACols = matA.GetLength(1);
		int matBCols = matB.GetLength(1);
		int matARows = matA.GetLength(0);

		// A basic matrix multiplication.
		// Parallelize the outer loop to partition the source array by rows.
		Parallel.For(0, matARows, i =>
		{
			for (int j = 0; j < matBCols; j++)
			{
				double temp = 0;
				for (int k = 0; k < matACols; k++)
				{
					temp += matA[i, k] * matB[k, j];
				}
				result[i, j] = temp;
			}
		});

		s.Stop();
		RunTime = s.Elapsed;
	}
}

For the second model, the only big difference is replacing the first outer loop with a Parallel.For. I’ve also overridden the SampleName but other than that it just uses values it inherited from the base.

What is really exciting to me is that to add a new sample there are only two places you have to modify.

SamplerViewMode.ctor:

public SamplerViewModel()
{
	Samples = new ObservableCollection();
	Sampler = new Sampler();
	Sampler.Samples.Add(new LineSample());
	Sampler.Samples.Add(new LineParallelSample());
	Sampler.Samples.Add(new GreyScaleSample());
	Sampler.Samples.Add(new GreyScaleParallelSample());
	Sampler.Samples.Add(new GreyScaleDoubleParallelSample());
	Sampler.Samples.Add(new MatricesMultiplicationSample());
	Sampler.Samples.Add(new MatricesMultiplicationParallelSample());
	ResetSampler();
}

SamplerViewModelFactory.maps dictionary:

private static Dictionary<Type, Func<Sample, SampleViewModel>> maps = new Dictionary<Type, Func<Sample, SampleViewModel>>()
{
	{ typeof(LineSample), (q) => new SampleViewModel((Sample)q)},
	{ typeof(LineParallelSample), (q) => new SampleViewModel((Sample)q)},
	{ typeof(GreyScaleSample), (q) => new SampleViewModel((Sample)q)},
	{ typeof(GreyScaleParallelSample), (q) => new SampleViewModel((Sample)q)},
	{ typeof(GreyScaleDoubleParallelSample), (q) => new SampleViewModel((Sample)q)},
	{ typeof(MatricesMultiplicationSample), (q) => new SampleViewModel((Sample)q)},
	{ typeof(MatricesMultiplicationParallelSample), (q) => new SampleViewModel((Sample)q)}
};

In the first code sample I add an instance of each of the two new models to the constructor of our ViewModel. In the second code sample I add a mapping of the sample type to the view model.

And that is it. Because we’re using the same ViewModel and View for these two models adding them is easy. Now binding just takes care of wiring everything up.

Starting Matrices Multiplication
Completed Matrices Multiplication
Matrices Multiplication ran in 00:00:01.8307101

Starting Matrices Multiplication Parallel
Completed Matrices Multiplication Parallel
Matrices Multiplication Parallel ran in 00:00:00.3797019

As with our other samples, where we give the parallel loops enough work we get significant benefits when using the TPL and Parallel.For, nearly 5 times faster in my case, though as always you may see better or worse times based on your situation.

In the next post we’ll go over a sample that shows how to handle exceptions in a Parallel.For and Parallel.ForEach using AggregateException.

Thanks,
Brian