The Task Parallel Library Sampler – Part 10: Loop control of the Parallel.For and .ForEach

Previous Post in this series:
Part 9: Basic Exception handling with the AggregateException

Generally we’re used to having a break when doing loops. If you’ve tried to continue or break out of a parallel loop you get:

No enclosing loop out of which to break or continue

“continue” is the easy case, just return. But breaking is a bit more complex. Do you want to stop all threads? Do you want to run all threads up to the point where you break? Well, you have a choice. Below and in the included solution are two samples showing how to handle loop control.

LoopBreakSample:

public class LoopBreakSample : Sample
{
	public override string SampleName
	{
		get { return "Loop Break Sample"; }
	}

	public override bool ImageRequired
	{
		get { return false; }
	}

	protected int MaxValue { get { return 50; } }
	protected int BreakValue { get { return 20; } }

	public override void Run(System.Drawing.Bitmap bmp = null, Action<string> UpdateLog = null)
	{
		Stopwatch s = new Stopwatch();
		s.Start();

		UpdateLog("Running to " + MaxValue);
		var loopResult = Parallel.For(0, MaxValue, (int i, ParallelLoopState loop) =>
		{
			UpdateLog("Starting " + i);
			if (i == BreakValue)
			{
				UpdateLog("Breaking " + i);
				loop.Break();
				return;
			}

			Thread.Sleep(100);
		});
		UpdateLog("IsCompleted == " + loopResult.IsCompleted);
		if (!loopResult.LowestBreakIteration.HasValue)
			UpdateLog("LowestBreakIteration has no value");
		else
			UpdateLog("LowestBreakIteration.Value == " + loopResult.LowestBreakIteration.Value);

		s.Stop();
		RunTime = s.Elapsed;
	}
}

There are a few things going on here besides your normal delegate for the loop. First is that the parameters for the lambda that define the delegate have a ParallelLoopState. It is this loop state that we are calling .Break().

Second is that we use the ParallelLoopResult to see if the loop has completed and what the lowest iteration was when break was called.

It is critical that you understand how break works. Per the documentation:

Break may be used to communicate to the loop that no other iterations after the current iteration need be run. For example, if Break() is called from the 100th iteration of a for loop iterating in parallel from 0 to 1000, all iterations less than 100 should still be run, but the iterations from 101 through to 1000 are not necessary.

This is very important. Break() will continue to spawn threads until the point is reached had the break been called as if this was a standard loop. LowestBreakIteration is set so the user knows at what point Break() was called.

The result of running this will look similar to:

Starting Loop Break Sample
Running to 50
Starting 12
Starting 0
Starting 6
Starting 18
Starting 24
Starting 30
Starting 36
Starting 42
Starting 13
Starting 1
Starting 7
Starting 25
Starting 31
Starting 43
Starting 37
Starting 2
Starting 14
Starting 8
Starting 20
Breaking 20
Starting 3
Starting 15
Starting 9
Starting 16
Starting 10
Starting 17
Starting 5
Starting 11
IsCompleted == False
LowestBreakIteration.Value == 20
Completed Loop Break Sample

As you can see, the break is called at 20 like it should have been. New threads, however, were spawned to makes sure that “i” still reaches 20 where Break() was called.

UPDATE: As you read the above list of started threads there seems to be a couple of threads missing if we’re really running to 20. I changed out the maximum number of threads to spawn (with ParallelOptions) to 2 and fiddled with some other code to get it to break early. It worked as the documentation states so I’m not sure why there are some missing numbers in the above results.

So what if you want to just stop new thread creation and not continue on? That is where Stop() is used.

LoopStopSample:

public class LoopStopSample : LoopBreakSample
{
	public override string SampleName
	{
		get { return "Loop Stop Sample"; }
	}

	public override void Run(System.Drawing.Bitmap bmp = null, Action<string> UpdateLog = null)
	{
		Stopwatch s = new Stopwatch();
		s.Start();

		UpdateLog("Running to " + MaxValue);
		var loopResult = Parallel.For(0, MaxValue, (int i, ParallelLoopState loop) =>
		{
			if (i > BreakValue)
			{
				UpdateLog("Stopping at " + i);
				loop.Stop();
				return;
			}
			UpdateLog("Starting at " + i);
			while(!loop.IsStopped)
			{
				Thread.Sleep(10);
			}
		});
		UpdateLog("IsCompleted == " + loopResult.IsCompleted);
		if (!loopResult.LowestBreakIteration.HasValue)
			UpdateLog("LowestBreakIteration has no value");
		else
			UpdateLog("LowestBreakIteration.Value == " + loopResult.LowestBreakIteration.Value);

		s.Stop();
		RunTime = s.Elapsed;
	}
}

Here Stop() is called when we reached a value greater then BreakValue. Stop() is different then Break() in that it will cause no more thread to be generated at all. Any threads that were created will continue to run. When Stop() is called the parallelLoopState.IsStopped will also be set so other threads know that they should stop. LowestBreakIteration will have no value though. This is only set when Break() is called.

The result of running this will look similiar to:

Starting Loop Stop Sample
Running to 50
Starting at 0
Starting at 6
Starting at 12
Starting at 18
Stopping at 24
Stopping at 30
IsCompleted == False
LowestBreakIteration has no value
Completed Loop Stop Sample

You can see that as soon as Stop() is called no more threads are created even though there are still a lot of threads that haven’t been created up to the iterator.

So to sum up:

Action Break() Stop()
Thread Creation Continues spawning threads until the point is reached had this been a standard loop. Any threads already created are allowed to finish. No more threads are created. Any threads already created are allowed to finish.
LowestBreakIteration Set at the point when Break() is first called. Not set
IsStopped Not set Sets to true when called

Up next is a sample using CancellationTokenSource to cancel the threads from running outside of the loop.
Thanks,
Brian

Leave a Reply