开发者

.NET Reverse Semaphore?

开发者 https://www.devze.com 2022-12-14 17:55 出处:网络
Perhaps it\'s too late at night, but I can\'t think of a nice way to do this. I\'ve started a bunch of asynchronous downloads, and I want to wait until they all complete before the program ter开发者_

Perhaps it's too late at night, but I can't think of a nice way to do this.

I've started a bunch of asynchronous downloads, and I want to wait until they all complete before the program ter开发者_高级运维minates. This leads me to believe I should increment something when a download starts, and decrement it when it finishes. But then how do I wait until the count is 0 again?

Semaphores sort of work in the opposite way in that you block when there are no resources available, not when they're all available (blocks when count is 0, rather than non-zero).


Check out the CountdownLatch class in this magazine article.

Update: now covered by the framework since version 4.0, CountdownEvent class.


In .NET 4 there is a special type for that purpose CountdownEvent.

Or you can build similar thing yourself like this:

const int workItemsCount = 10;
// Set remaining work items count to initial work items count
int remainingWorkItems = workItemsCount;

using (var countDownEvent = new ManualResetEvent(false))
{
    for (int i = 0; i < workItemsCount; i++)
    {
        ThreadPool.QueueUserWorkItem(delegate
                                        {
                                            // Work item body
                                            // At the end signal event
                                            if (Interlocked.Decrement(ref remainingWorkItems) == 0)
                                                countDownEvent.Set();
                                        });
    }
    // Wait for all work items to complete
    countDownEvent.WaitOne();
}


Looks like System.Threading.WaitHandle.WaitAll might be a pretty good fit:

Waits for all the elements in the specified array to receive a signal.


Well... you can snatch all the semaphore counters on the main thread back in order to blocks when count is 0, rather than non-zero.

REVISED: Here I assumed 3 things:

  • While the program is running, a new download job may start at any time.
  • On exiting the program, there will be no more new downloads that needs taken care of.
  • On exiting the program, you need to wait for the all the files to finish downloading

So here's my solution, revised:

Initializes the Semaphore with a large enough counter so you never hit the maximum (it could be simply 100 or just 10 depending on your situation):

var maxDownloads = 1000;
_semaphore = new Semaphore(0, maxDownloads);

Then on each downloads, begins with WaitOne() before starting the download so that in the event of program exiting, no downloads can start.

if (_semaphore.WaitOne())
    /* proceeds with downloads */
else
    /* we're terminating */

Then on download completion, release one counter (if we had acquired one):

finally { _semaphore.Release(1); }

And then on the "Exit" event, consume up all the counters on the Semaphore:

for (var i = 0; i < maxDownloads; i++)
    _semaphore.WaitOne();

// all downloads are finished by this point.

...


I had a similar issue where i needed to reset a server upon some event, but had to wait for all the open requests to finish before killing it.

I used the CountdownEvent class upon server start to initialize it with 1, and inside each request I do:

try
{
    counter.AddCount();
    //do request stuff
}
finally
{
    counter.Signal();
}

And upon receiving the ResetEvent i signal the counter once to eliminate the starting 1, and wait for live requests to signal they are done.

void OnResetEvent()
{
    counter.Signal();
    counter.Wait();
    ResetServer();
    //counter.Reset(); //if you want to reset everything again.
}

Basically you initialize the CountdownEvent with one, so that it's in a non signaled state, and with each AddCount call you are increasing the counter, and with each Signal call you are decreasing it, always staying above 1. In your wait thread you first signal it once to decrease the initial 1 value to 0, and if there are no threads running Wail() will immediately stop blocking, but if there are other threads that are still running, the wait thread will wait until they signal. Watch out, once the counter hits 0, all subsequent AddCount calls will throw an exception, you need to Reset the counter first.


For each thread you start Interlock.Increment a counter. And for each callback on thread finish, Decrement it.

Then do a loop with a Thread.Sleep(10) or something until the count reaches zero.


Here is my C# 2.0 implementation of CountdownLatch:

public class CountdownLatch
{
    private int m_count;
    private EventWaitHandle m_waitHandle = new EventWaitHandle(true, EventResetMode.ManualReset);

    public CountdownLatch()
    {
    }

    public void Increment()
    {
        int count = Interlocked.Increment(ref m_count);
        if (count == 1)
        {
            m_waitHandle.Reset();
        }
    }

    public void Add(int value)
    {
        int count = Interlocked.Add(ref m_count, value);
        if (count == value)
        {
            m_waitHandle.Reset();
        }
    }

    public void Decrement()
    {
        int count = Interlocked.Decrement(ref m_count);
        if (m_count == 0)
        {
            m_waitHandle.Set();
        }
        else if (count < 0)
        {
            throw new InvalidOperationException("Count must be greater than or equal to 0");
        }
    }

    public void WaitUntilZero()
    {
        m_waitHandle.WaitOne();
    }
}


Based on the suggestions here, this is what I came up with. In addition to waiting until the count is 0, it will sleep if you spawn too many threads (count > max). Warning: This is not fully tested.

public class ThreadCounter
{
    #region Variables
    private int currentCount, maxCount;
    private ManualResetEvent eqZeroEvent;
    private object instanceLock = new object();
    #endregion

    #region Properties
    public int CurrentCount
    {
        get
        {
            return currentCount;
        }
        set
        {
            lock (instanceLock)
            {
                currentCount = value;
                AdjustZeroEvent();
                AdjustMaxEvent();
            }
        }
    }

    public int MaxCount
    {
        get
        {
            return maxCount;
        }
        set
        {
            lock (instanceLock)
            {
                maxCount = value;
                AdjustMaxEvent();
            }
        }
    }
    #endregion

    #region Constructors
    public ThreadCounter() : this(0) { }
    public ThreadCounter(int initialCount) : this(initialCount, int.MaxValue) { }
    public ThreadCounter(int initialCount, int maximumCount)
    {
        currentCount = initialCount;
        maxCount = maximumCount;
        eqZeroEvent = currentCount == 0 ? new ManualResetEvent(true) : new ManualResetEvent(false);
    }
    #endregion

    #region Public Methods
    public void Increment()
    {
        ++CurrentCount;
    }

    public void Decrement()
    {
        --CurrentCount;
    }

    public void WaitUntilZero()
    {
        eqZeroEvent.WaitOne();
    }
    #endregion

    #region Private Methods
    private void AdjustZeroEvent()
    {
        if (currentCount == 0) eqZeroEvent.Set();
        else eqZeroEvent.Reset();
    }

    private void AdjustMaxEvent()
    {
        if (currentCount <= maxCount) Monitor.Pulse(instanceLock);
        else do { Monitor.Wait(instanceLock); } while (currentCount > maxCount);
    }
    #endregion
}
0

精彩评论

暂无评论...
验证码 换一张
取 消