开发者

How first entered thread can signal to other concurrent threads the end of same method?

开发者 https://www.devze.com 2023-02-05 00:13 出处:网络
How first entered thread can signal to other concurrent threads the end of same method ? I have method named say PollDPRAM(). It must make a trip over network to some slow hardware and refresh object

How first entered thread can signal to other concurrent threads the end of same method ?

I have method named say PollDPRAM(). It must make a trip over network to some slow hardware and refresh object private data. If the same method is simultaneously called by other threads, they must not do the trip, but wait for first coming thread to complete the job and simply exit, because the data is fresh (say 10-30 ms ago does not make a difference). Its easy to detect in method that second, 3rd etc threads are not first entered. I use Interlocked counter to detect concurrency.

Problem: I made a poor choice to detect the exit of first thread by watching the counter (Interlocked.Read) to watch after the decrease of counter to value less than it was detected at entrance of n>1 thread. The choice is bad, because the first thread can reenter the method again nearly immediately after it leaves. So the n>1 threads will never detect dip in counter.

So question: How to correctly detect that first entered thread has exited the method, even if this first thread can immediately enter it again ?

Thank you

P.S. Piece of code

        private void pollMotorsData()
    {
        // Execute single poll with "foreground" handshaking 
        DateTime start = DateTime.Now;
        byte retryCount = 0;
        // Pick old data atomically to detect change
        uint motorsDataTimeStampPrev = this.MotorsDataTimeStamp;
        bool changeDetected = false;
        // The design goal of DPRAM is to ease the bottleneck
        // Here is a sensor if bottleneck is actually that tight
        long parallelThreads = Interlocked.Increment(ref this.motorsPollThreadCount);
        try
        {
            // For first thread entering the counter will be 1
            if (parallelThreads <= 1)
            {
                do
                {
                    // Handshake signal to DPRAM write process on controller side that host PC is reading
                    this.controller.deltaTauTcpClient.Pmac_SetBit(OFFSET_0x006A_BIT15_FOREGROUND_READ, 15, true);
                    try
                    {
                        bool canReadMotors = false;
                        byte[] canReadFrozenDataFlag = new byte[2];
                        do
                        {
                            this.controller.deltaTauTcpClient.Pmac_GetMem(OFFSET_0x006E_BIT15_FOREGROUND_DONE, canReadFrozenDataFlag);
                            canReadMotors = (canReadFrozenDataFlag[1] & 0x80) == 0x80;
                            if (canReadMotors) break;
                            retryCount++;
                            Thread.Sleep(1);
                        } while (retryCount < 10);
                        if (!canReadMotors)
                        {
                            throw new DeltaTauControllerException(this.controller, "Timeout waiting on DPRAM Foreground Handshaking Bit");
                        }
                        // The lock is meaningless in contructor as it is certainly single threaded
                        // but for practice sake the access to data should always be serialized
                        lock (motorsDataLock)
                        {
         开发者_如何转开发                   // Obtain fresh content of DPRAM
                            this.controller.deltaTauTcpClient.Pmac_GetMem(OFFSET_0x006A_394BYTES_8MOTORS_DATA, this.motorsData);
                            this.motorsDataBorn = DateTime.Now;
                        }
                    }
                    finally
                    {
                        // Handshake signal to DPRAM write process on controller side that host PC has finished reading
                        this.controller.deltaTauTcpClient.Pmac_SetBit(OFFSET_0x006A_BIT15_FOREGROUND_READ, 15, false);
                    }
                    // Check live change in a separate atom
                    changeDetected = this.MotorsDataTimeStamp != motorsDataTimeStampPrev;
                } while ((!changeDetected) && ((DateTime.Now - start).TotalMilliseconds < 255));
                // Assert that result is live
                if (!changeDetected)
                {
                    throw new DeltaTauControllerException(this.controller, "DPRAM Background Data timestamp is not updated. DPRAM forground handshaking failed.");
                }
            }
            else
            {
                // OK. Bottleneck ! The concurrent polls have collided 
                // Give the controller a breathe by waiting for other thread do the job
                // Avoid aggressive polling of stale data, which is not able to be written, locked by reader
                // Just wait for other thread do whole polling job and return with no action
                // because the data is milliseconds fresh
                do
                {
                    // Amount of parallel threads must eventually decrease
                    // But no thread will leave and decrease the counter until job is done
                    if (Interlocked.Read(ref this.motorsPollThreadCount) < parallelThreads)
                    {
                        // Return is possible because decreased value of concurrentThreads means that
                        // this very time other thread has finished the poll 1 millisecond ago at most
                        return;
                    }
                    Thread.Sleep(1);
                    retryCount++;
                } while ((DateTime.Now - start).TotalMilliseconds < 255);
                throw new DeltaTauControllerException(this.controller, "Timeout 255ms waiting on concurrent thread to complete DPRAM polling");
            }
        }
        finally
        {
            // Signal to other threads that work is done
            Interlocked.Decrement(ref this.motorsPollThreadCount);
            // Trace the timing and bottleneck situations
            TimeSpan duration = DateTime.Now - start;
            if (duration.TotalMilliseconds > 50 || parallelThreads > 1 || retryCount > 0)
            {
                Trace.WriteLine(string.Format("Controller {0}, DPRAM poll {1:0} ms, threads {2}, retries {3}",
                    this.controller.number,
                    duration.TotalMilliseconds,
                    parallelThreads,
                    retryCount));
            }
        }
    }


Synchronize the method and inside the method check a record of the time that the network access was last done to determine if it needs to be done again.


You can use the C# monitor classes which are supported by the "lock" keyword.

Basically your method can be wrapped in lock(lockobj) { CallMethod() }

This will give you protection, assuming all threads are in the same process.

You will need to use a Mutex if you need to lock across processes.

As for your program I would look at putting a static timestamp and cached value into your method. So when the method enters, if timestamp is within my acceptable range, return the cached value, otherwise simply perform the fetch. Combined with a locking mechanism this should do what you need it to.

Of course this assumes that the time to take and block on the C# monitor is not going to affect the performance of your app.

UPDATE: I've updated your code to show you what I meant about using a cache and timestamp. I have assumed that your "motorsData" variable is the thing that is returned from the motor polling and as such I don't have a variable for it. However if I"ve misunderstood, simply add a variable that stores the data after it is returned from the code. Note I haven't done any error checking for you so you need to deal with your exceptions.

    static DateTime lastMotorPoll;
    const TimeSpan CACHE_PERIOD = new TimeSpan(0, 0, 0, 0, 250);
    private object cachedCheckMotorsDataLock = new object();

    private void CachedCheckMotorsData()
    {
        lock (cachedCheckMotorsDataLock)  //Could refactor this to perform a try enter which returns quickly if required
        {
            //If the last time the data was polled is older than the cache period, poll
            if (lastMotorPoll.Add(CACHE_PERIOD) < DateTime.Now)
            {
                pollMotorsData();
                lastMotorPoll = DateTime.Now;
            }
            else //Data is fresh so don't poll
            {
                return;
            }
        }       
    }

    private void pollMotorsData()
    {
        // Execute single poll with "foreground" handshaking 
        DateTime start = DateTime.Now;
        byte retryCount = 0;
        // Pick old data atomically to detect change
        uint motorsDataTimeStampPrev = this.MotorsDataTimeStamp;
        bool changeDetected = false;
        try
        {
            do
            {
                // Handshake signal to DPRAM write process on controller side that host PC is reading
                this.controller.deltaTauTcpClient.Pmac_SetBit(OFFSET_0x006A_BIT15_FOREGROUND_READ, 15, true);
                try
                {
                    bool canReadMotors = false;
                    byte[] canReadFrozenDataFlag = new byte[2];
                    do
                    {
                        this.controller.deltaTauTcpClient.Pmac_GetMem(OFFSET_0x006E_BIT15_FOREGROUND_DONE, canReadFrozenDataFlag);
                        canReadMotors = (canReadFrozenDataFlag[1] & 0x80) == 0x80;
                        if (canReadMotors) break;
                        retryCount++;
                        Thread.Sleep(1);
                    } while (retryCount < 10);
                    if (!canReadMotors)
                    {
                        throw new DeltaTauControllerException(this.controller, "Timeout waiting on DPRAM Foreground Handshaking Bit");
                    }
                    // Obtain fresh content of DPRAM
                    this.controller.deltaTauTcpClient.Pmac_GetMem(OFFSET_0x006A_394BYTES_8MOTORS_DATA, this.motorsData);
                    this.motorsDataBorn = DateTime.Now;
                }
                finally
                {
                    // Handshake signal to DPRAM write process on controller side that host PC has finished reading
                    this.controller.deltaTauTcpClient.Pmac_SetBit(OFFSET_0x006A_BIT15_FOREGROUND_READ, 15, false);
                }
                // Check live change in a separate atom
                changeDetected = this.MotorsDataTimeStamp != motorsDataTimeStampPrev;
            } while ((!changeDetected) && ((DateTime.Now - start).TotalMilliseconds < 255));

            // Assert that result is live
            if (!changeDetected)
            {
                throw new DeltaTauControllerException(this.controller, "DPRAM Background Data timestamp is not updated. DPRAM forground handshaking failed.");
            }
        }
    }


Theres lots of different ways you could do this. You could use a critical section, as someone has already mentioned, but that won't give you the behavior of "just exit" if the other thread is blocking. For that you need some kind of flag. You could go with a volatile bool and lock around access of that bool, or you could use a semaphore with a single count. Finally you can use a mutex. The benefit of using the synchronization objects is you can do a WaitForSingleObject and set the timeout to 0. Then you can check if the wait was successful (if it is then the first thread has exited) or not (in which case the first thread is still running).

0

精彩评论

暂无评论...
验证码 换一张
取 消