开发者

How to implement Barrier class from .NET 4 functionality in .NET 3.5

开发者 https://www.devze.com 2023-03-25 09:42 出处:网络
For some reasons I have to stick to .NET 3.5 and I need a functionality of Barrier class from .NET 4. I have a bunch of threads that do some work and I want them to wait for each other until all are d

For some reasons I have to stick to .NET 3.5 and I need a functionality of Barrier class from .NET 4. I have a bunch of threads that do some work and I want them to wait for each other until all are done. When all are done I want that they do the job again and again in the similar manner. Encouraged by the thread Difference between Barrier in C# 4.0 and WaitHandle in C# 3.0? I have decided to implement the Barrier functionality with AutoResetEvent and WaitHandle classes. Altough I encounter a problem with my code:

class Program
{
    const int numOfThreads = 3;

    static AutoResetEvent[] barrier = new AutoResetEvent[numOfThreads];
    static Random random = new Random(System.DateTime.Now.Millisecond);

    static void barriers2(object barrierObj)
    {
        AutoResetEvent[] barrierLocal = (AutoResetEvent[])barrierObj;
        string name = Thread.CurrentThread.Name;
        for (int i = 0; i < 10; i++)
        {
            int sleepTime = random.Next(2000, 10000);
            System.Console.Out.WriteLine("Thread {0} at the 'barrier' will sleep for {1}.", name, sleepTime);
            Thread.Sleep(sleepTime);
            System.Console.Out.WriteL开发者_Python百科ine("Thread {0} at the 'barrier' with time {1}.", name, sleepTime);
            int currentId = Convert.ToInt32(name);
            //for(int z = 0; z < numOfThreads; z++)
                barrierLocal[currentId].Set();
            WaitHandle.WaitAll(barrier);
            /*
            for (int k = 0; k < numOfThreads; k++)
            {
                if (k == currentId)
                {
                    continue;
                }
                System.Console.Out.WriteLine("Thread {0} is about to wait for the singla from thread: {1}", name, k);
                barrierLocal[k].WaitOne();
                System.Console.Out.WriteLine("Thread {0} is about to wait for the singla from thread: {1}. done", name, k);
            }
            */
        }
    }

    static void Main(string[] args)
    {
        for (int i = 0; i < numOfThreads; i++)
        {
            barrier[i] = new AutoResetEvent(false);
        }
        for (int i = 0; i < numOfThreads; i++)
        {
            Thread t = new Thread(Program.barriers2);
            t.Name = Convert.ToString(i);
            t.Start(barrier);
        }
    }
}

The output I receive is as follows:

Thread 0 at the 'barrier' will sleep for 7564 Thread 1 at the 'barrier' will sleep for 5123 Thread 2 at the 'barrier' will sleep for 4237 Thread 2 at the 'barrier' with time 4237 Thread 1 at the 'barrier' with time 5123 Thread 0 at the 'barrier' with time 7564 Thread 0 at the 'barrier' will sleep for 8641 Thread 0 at the 'barrier' with time 8641

And that's it. After the last line there is no more output and the app does not terminate. It looks like there is some sort of deadlock. However can not find the issue. Any help welcome.

Thanks!


That's because you use AutoResetEvent. One of the thread's WaitAll() call is going to complete first. Which automatically causes Reset() on all the AREs. Which prevents the other threads from ever completing their WaitAll() calls.

A ManualResetEvent is required here.


Download the Reactive Extensions backport for .NET 3.5. You will find the Barrier class along with the other useful concurrent data structures and synchronization mechanisms that were released in .NET 4.0.


Here is my implementation I use for my XNA game. Barrier was not available when I wrote this, and I am still stuck with .Net 3.5. It requires three sets of ManualResetEvents, and a counter array to keep phase.

using System;
using System.Threading;

namespace Colin.Threading
{
    /// <summary>
    /// Threading primitive for "barrier" sync, where N threads must stop at certain points 
    /// and wait for all their bretheren before continuing.
    /// </summary>
    public sealed class NThreadGate
    {
        public int mNumThreads;
        private ManualResetEvent[] mEventsA;
        private ManualResetEvent[] mEventsB;
        private ManualResetEvent[] mEventsC;
        private ManualResetEvent[] mEventsBootStrap;
        private Object mLockObject;
        private int[] mCounter;
        private int mCurrentThreadIndex = 0;

        public NThreadGate(int numThreads)
        {
            this.mNumThreads = numThreads;

            this.mEventsA = new ManualResetEvent[this.mNumThreads];
            this.mEventsB = new ManualResetEvent[this.mNumThreads];
            this.mEventsC = new ManualResetEvent[this.mNumThreads];
            this.mEventsBootStrap = new ManualResetEvent[this.mNumThreads];
            this.mCounter = new int[this.mNumThreads];
            this.mLockObject = new Object();

            for (int i = 0; i < this.mNumThreads; i++)
            {
                this.mEventsA[i] = new ManualResetEvent(false);
                this.mEventsB[i] = new ManualResetEvent(false);
                this.mEventsC[i] = new ManualResetEvent(false);
                this.mEventsBootStrap[i] = new ManualResetEvent(false);
                this.mCounter[i] = 0;
            }
        }

        /// <summary>
        /// Adds a new thread to the gate system.
        /// </summary>
        /// <returns>Returns a thread ID for this thread, to be used later when waiting.</returns>
        public int AddThread()
        {
            lock (this.mLockObject)
            {
                this.mEventsBootStrap[this.mCurrentThreadIndex].Set();
                this.mCurrentThreadIndex++;
                return this.mCurrentThreadIndex - 1;
            }
        }

        /// <summary>
        /// Stop here and wait for all the other threads in the NThreadGate. When all the threads have arrived at this call, they
        /// will unblock and continue.
        /// </summary>
        /// <param name="myThreadID">The thread ID of the caller</param>
        public void WaitForOtherThreads(int myThreadID)
        {
            // Make sure all the threads are ready.
            WaitHandle.WaitAll(this.mEventsBootStrap);

            // Rotate between three phases.
            int phase = this.mCounter[myThreadID];
            if (phase == 0)        // Flip
            {
                this.mEventsA[myThreadID].Set();
                WaitHandle.WaitAll(this.mEventsA);
                this.mEventsC[myThreadID].Reset();
            }
            else if (phase == 1)    // Flop
            {
                this.mEventsB[myThreadID].Set();
                WaitHandle.WaitAll(this.mEventsB);
                this.mEventsA[myThreadID].Reset();
            }
            else    // Floop
            {
                this.mEventsC[myThreadID].Set();
                WaitHandle.WaitAll(this.mEventsC);
                this.mEventsB[myThreadID].Reset();
                this.mCounter[myThreadID] = 0;
                return;
            }

            this.mCounter[myThreadID]++;
        }
    }
}

Setting up the thread gate:

private void SetupThreads()
{
    // Make an NThreadGate for N threads.
    this.mMyThreadGate = new NThreadGate(Environment.ProcessorCount);

    // Make some threads...
    // e.g. new Thread(new ThreadStart(this.DoWork);
}

Thread worker method:

private void DoWork()
{
    int localThreadID = this.mMyThreadGate.AddThread();

    while (this.WeAreStillRunning)
    {
        // Signal this thread as waiting at the barrier
        this.mMyThreadGate.WaitForOtherThreads(localThreadID);

        // Synchronized work here...

        // Signal this thread as waiting at the barrier
        this.mMyThreadGate.WaitForOtherThreads(localThreadID);

        // Synchronized work here...

        // Signal this thread as waiting at the barrier
        this.mMyThreadGate.WaitForOtherThreads(localThreadID);
    }
}
0

精彩评论

暂无评论...
验证码 换一张
取 消