开发者

Is it possible to change parallelOptions.MaxDegreeOfParallelism during execution of a Parallel.ForEach?

开发者 https://www.devze.com 2023-01-15 20:21 出处:网络
I am running a multi-threaded loop: protected ParallelOptions parallelOptions = new ParallelOptions();

I am running a multi-threaded loop:

protected ParallelOptions parallelOptions = new ParallelOptions();

parallelOptions.MaxDegreeOfParallelism = 2;
Parallel.ForEach(items, parallelOptions, item =>
{
    // Loop code here
});

I want to change the parallelOptions.MaxDegreeOfParallelism during the execution of the parallel loop, to reduce or increas开发者_开发技巧e a number of threads.

parallelOptions.MaxDegreeOfParallelism = 5;

It doesn't seem to increase the threads. Does anyone have any ideas?


The issue with even trying to do this is that it's a hard problem. For starters, how do you even observe CPU and disk utilization reliably? Sampling CPU infrequently will give a poor picture of what's actually going on and sampling disk utilization is even harder. Secondly, what is the granularity of your tasks and how often can you quickly can you actually change the number that are running. Thirdly things change rapidly over time so you need to apply some kind of filtering to your observations. Fourthly, the ideal number of threads will depend on the CPU that the code is actually running on. Fifthly, if you allocate too many threads you'll be thrashing between them instead of doing useful work.

See http://msdn.microsoft.com/en-us/magazine/ff960958.aspx for a discussion on how the Thread Pool in .NET handles the complex task of deciding how many threads to use.

You could also use reflector and take a look at the code that TPL uses to allocate threads and to avoid unnecessary context switching - it's complex and that's not even taking disk access into account!

You could instead try executing the tasks on a lower priority thread (creating your own TaskScheduler that runs threads with a priority of below-normal is actually quite easy). That at least will ensure that you can run up 100% CPU without impacting the rest of the system. Messing with thread priorities is in itself fraught with problems but if this is a purely background task it can be straightforward and might help.

Often though, disk utilization is the real culprit when it comes to other applications suffering at the hands of one greedy application. Windows can allocate CPU fairly between applications with ease but when relatively slow disk access is involved it's quite another matter. Instead of trying to dynamically adjust how many thread you have running you may instead need to simply throttle your application such that it doesn't access the disk too often. That's something you can do without changing how many threads are active.

You could also look at SetPriorityClass as a way to inform the OS that your process is less important than other applications running on the system, see How can I/O priority of a process be increased? for more information. But that assumes your whole process is less important, not just this part of it.


I wouldn't expect it to be possible to change the degree of parallelism after you've called ForEach. As I understand it, ForEach is going to determine how many threads it can create, create that many partitions, and create the threads to operate on those partitions. There's no point at which it can say, "Oh, wait, he changed our resource allocation, let me re-partition the array and re-allocate threads."


Below is a variant of the Parallel.ForEachAsync API that was introduced in .NET 6, that allows to configure dynamically the maximum degree of parallelism. It shares the same parameters and behavior with the native API, except from taking a derived version of the ParallelOptions as argument (DynamicParallelOptions). Changing the MaxDegreeOfParallelism results in a rapid adaptation of the currently active degree of parallelism.

This implementation is based on the idea of throttling the source of the Parallel.ForEachAsync API. The API itself is configured with maximum parallelism (Int32.MaxValue), but the actual parallelism is limited effectively by denying the loop from free access to the source elements. An element is propagated forward every time another element is processed. The throttling itself is performed with an unbounded SemaphoreSlim. Changing the maximum degree of parallelism is performed by calling the Release/WaitAsync methods of the semaphore.

/// <summary>
/// Executes a parallel for-each operation on an async-enumerable sequence,
/// enforcing a dynamic maximum degree of parallelism.
/// </summary>
public static Task DynamicParallelForEachAsync<TSource>(
    IAsyncEnumerable<TSource> source,
    DynamicParallelOptions options,
    Func<TSource, CancellationToken, ValueTask> body)
{
    if (source == null) throw new ArgumentNullException(nameof(source));
    if (options == null) throw new ArgumentNullException(nameof(options));
    if (body == null) throw new ArgumentNullException(nameof(body));

    var throttler = new SemaphoreSlim(options.MaxDegreeOfParallelism);
    options.DegreeOfParallelismChangedDelta += Options_ChangedDelta;
    void Options_ChangedDelta(object sender, int delta)
    {
        if (delta > 0)
            throttler.Release(delta);
        else
            for (int i = delta; i < 0; i++) throttler.WaitAsync();
    }

    async IAsyncEnumerable<TSource> GetThrottledSource()
    {
        await foreach (var item in source.ConfigureAwait(false))
        {
            await throttler.WaitAsync().ConfigureAwait(false);
            yield return item;
        }
    }

    return Parallel.ForEachAsync(GetThrottledSource(), options, async (item, ct) =>
    {
        try { await body(item, ct).ConfigureAwait(false); }
        finally { throttler.Release(); }
    }).ContinueWith(t =>
    {
        options.DegreeOfParallelismChangedDelta -= Options_ChangedDelta;
        return t;
    }, default, TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default)
        .Unwrap();
}

/// <summary>
/// Stores options that configure the DynamicParallelForEachAsync method.
/// </summary>
public class DynamicParallelOptions : ParallelOptions
{
    private int _maxDegreeOfParallelism;

    public event EventHandler<int> DegreeOfParallelismChangedDelta;

    public DynamicParallelOptions()
    {
        // Set the base DOP to the maximum.
        // That's what the native Parallel.ForEachAsync will see.
        base.MaxDegreeOfParallelism = Int32.MaxValue;
        _maxDegreeOfParallelism = Environment.ProcessorCount;
    }

    public new int MaxDegreeOfParallelism
    {
        get { return _maxDegreeOfParallelism; }
        set
        {
            if (value < 1) throw new ArgumentOutOfRangeException();
            if (value == _maxDegreeOfParallelism) return;
            int delta = value - _maxDegreeOfParallelism;
            DegreeOfParallelismChangedDelta?.Invoke(this, delta);
            _maxDegreeOfParallelism = value;
        }
    }
}

The DynamicParallelOptions.MaxDegreeOfParallelism property is not thread-safe. It is assumed that controlling the maximum degree of parallelism will be performed by a single thread, or at least that the operations will be synchronized.

Usage example, featuring a Channel<T> as the source of the parallel loop:

var channel = Channel.CreateUnbounded<int>();
var options = new DynamicParallelOptions() { MaxDegreeOfParallelism = 2 };

await DynamicParallelForEachAsync(
    channel.Reader.ReadAllAsync(), options, async (item, ct) =>
    {
        Console.WriteLine($"Processing #{item}");
        await Task.Delay(1000, ct); // Simulate an I/O-bound operation
    });

// Push values to the channel from any thread
channel.Writer.TryWrite(1);
channel.Writer.TryWrite(2);
channel.Writer.TryWrite(3);
channel.Writer.Complete();

// Set the MaxDegreeOfParallelism to a positive value from a single thread
options.MaxDegreeOfParallelism = 5;

Some overloads with synchronous source, or synchronous body:

public static Task DynamicParallelForEachAsync<TSource>(
    IEnumerable<TSource> source,
    DynamicParallelOptions options,
    Func<TSource, CancellationToken, ValueTask> body)
{
    if (source == null) throw new ArgumentNullException(nameof(source));

    #pragma warning disable CS1998
    async IAsyncEnumerable<TSource> GetSource()
    { foreach (var item in source) yield return item; }
    #pragma warning restore CS1998

    return DynamicParallelForEachAsync(GetSource(), options, body);
}

public static void DynamicParallelForEach<TSource>(
    IEnumerable<TSource> source,
    DynamicParallelOptions options,
    Action<TSource, CancellationToken> body)
{
    if (body == null) throw new ArgumentNullException(nameof(body));
    DynamicParallelForEachAsync(source, options, (item, ct) =>
    {
        body(item, ct); return ValueTask.CompletedTask;
    }).Wait();
}
0

精彩评论

暂无评论...
验证码 换一张
取 消