How first entered thread can signal to other concurrent threads the end of same method ?
I have method named say PollDPRAM(). It must make a trip over network to some slow hardware and refresh object private data. If the same method is simultaneously called by other threads, they must not do the trip, but wait for first coming thread to complete the job and simply exit, because the data is fresh (say 10-30 ms ago does not make a difference). Its easy to detect in method that second, 3rd etc threads are not first entered. I use Interlocked counter to detect concurrency.
Problem: I made a poor choice to detect the exit of first thread by watching the counter (Interlocked.Read) to watch after the decrease of counter to value less than it was detected at entrance of n>1 thread. The choice is bad, because the first thread can reenter the method again nearly immediately after it leaves. So the n>1 threads will never detect dip in counter.
So question: How to correctly detect that first entered thread has exited the method, even if this first thread can immediately enter it again ?
Thank you
P.S. Piece of code
private void pollMotorsData()
{
// Execute single poll with "foreground" handshaking
DateTime start = DateTime.Now;
byte retryCount = 0;
// Pick old data atomically to detect change
uint motorsDataTimeStampPrev = this.MotorsDataTimeStamp;
bool changeDetected = false;
// The design goal of DPRAM is to ease the bottleneck
// Here is a sensor if bottleneck is actually that tight
long parallelThreads = Interlocked.Increment(ref this.motorsPollThreadCount);
try
{
// For first thread entering the counter will be 1
if (parallelThreads <= 1)
{
do
{
// Handshake signal to DPRAM write process on controller side that host PC is reading
this.controller.deltaTauTcpClient.Pmac_SetBit(OFFSET_0x006A_BIT15_FOREGROUND_READ, 15, true);
try
{
bool canReadMotors = false;
byte[] canReadFrozenDataFlag = new byte[2];
do
{
this.controller.deltaTauTcpClient.Pmac_GetMem(OFFSET_0x006E_BIT15_FOREGROUND_DONE, canReadFrozenDataFlag);
canReadMotors = (canReadFrozenDataFlag[1] & 0x80) == 0x80;
if (canReadMotors) break;
retryCount++;
Thread.Sleep(1);
} while (retryCount < 10);
if (!canReadMotors)
{
throw new DeltaTauControllerException(this.controller, "Timeout waiting on DPRAM Foreground Handshaking Bit");
}
// The lock is meaningless in contructor as it is certainly single threaded
// but for practice sake the access to data should always be serialized
lock (motorsDataLock)
{
开发者_如何转开发 // Obtain fresh content of DPRAM
this.controller.deltaTauTcpClient.Pmac_GetMem(OFFSET_0x006A_394BYTES_8MOTORS_DATA, this.motorsData);
this.motorsDataBorn = DateTime.Now;
}
}
finally
{
// Handshake signal to DPRAM write process on controller side that host PC has finished reading
this.controller.deltaTauTcpClient.Pmac_SetBit(OFFSET_0x006A_BIT15_FOREGROUND_READ, 15, false);
}
// Check live change in a separate atom
changeDetected = this.MotorsDataTimeStamp != motorsDataTimeStampPrev;
} while ((!changeDetected) && ((DateTime.Now - start).TotalMilliseconds < 255));
// Assert that result is live
if (!changeDetected)
{
throw new DeltaTauControllerException(this.controller, "DPRAM Background Data timestamp is not updated. DPRAM forground handshaking failed.");
}
}
else
{
// OK. Bottleneck ! The concurrent polls have collided
// Give the controller a breathe by waiting for other thread do the job
// Avoid aggressive polling of stale data, which is not able to be written, locked by reader
// Just wait for other thread do whole polling job and return with no action
// because the data is milliseconds fresh
do
{
// Amount of parallel threads must eventually decrease
// But no thread will leave and decrease the counter until job is done
if (Interlocked.Read(ref this.motorsPollThreadCount) < parallelThreads)
{
// Return is possible because decreased value of concurrentThreads means that
// this very time other thread has finished the poll 1 millisecond ago at most
return;
}
Thread.Sleep(1);
retryCount++;
} while ((DateTime.Now - start).TotalMilliseconds < 255);
throw new DeltaTauControllerException(this.controller, "Timeout 255ms waiting on concurrent thread to complete DPRAM polling");
}
}
finally
{
// Signal to other threads that work is done
Interlocked.Decrement(ref this.motorsPollThreadCount);
// Trace the timing and bottleneck situations
TimeSpan duration = DateTime.Now - start;
if (duration.TotalMilliseconds > 50 || parallelThreads > 1 || retryCount > 0)
{
Trace.WriteLine(string.Format("Controller {0}, DPRAM poll {1:0} ms, threads {2}, retries {3}",
this.controller.number,
duration.TotalMilliseconds,
parallelThreads,
retryCount));
}
}
}
Synchronize the method and inside the method check a record of the time that the network access was last done to determine if it needs to be done again.
You can use the C# monitor classes which are supported by the "lock" keyword.
Basically your method can be wrapped in lock(lockobj) { CallMethod() }
This will give you protection, assuming all threads are in the same process.
You will need to use a Mutex if you need to lock across processes.
As for your program I would look at putting a static timestamp and cached value into your method. So when the method enters, if timestamp is within my acceptable range, return the cached value, otherwise simply perform the fetch. Combined with a locking mechanism this should do what you need it to.
Of course this assumes that the time to take and block on the C# monitor is not going to affect the performance of your app.
UPDATE: I've updated your code to show you what I meant about using a cache and timestamp. I have assumed that your "motorsData" variable is the thing that is returned from the motor polling and as such I don't have a variable for it. However if I"ve misunderstood, simply add a variable that stores the data after it is returned from the code. Note I haven't done any error checking for you so you need to deal with your exceptions.
static DateTime lastMotorPoll;
const TimeSpan CACHE_PERIOD = new TimeSpan(0, 0, 0, 0, 250);
private object cachedCheckMotorsDataLock = new object();
private void CachedCheckMotorsData()
{
lock (cachedCheckMotorsDataLock) //Could refactor this to perform a try enter which returns quickly if required
{
//If the last time the data was polled is older than the cache period, poll
if (lastMotorPoll.Add(CACHE_PERIOD) < DateTime.Now)
{
pollMotorsData();
lastMotorPoll = DateTime.Now;
}
else //Data is fresh so don't poll
{
return;
}
}
}
private void pollMotorsData()
{
// Execute single poll with "foreground" handshaking
DateTime start = DateTime.Now;
byte retryCount = 0;
// Pick old data atomically to detect change
uint motorsDataTimeStampPrev = this.MotorsDataTimeStamp;
bool changeDetected = false;
try
{
do
{
// Handshake signal to DPRAM write process on controller side that host PC is reading
this.controller.deltaTauTcpClient.Pmac_SetBit(OFFSET_0x006A_BIT15_FOREGROUND_READ, 15, true);
try
{
bool canReadMotors = false;
byte[] canReadFrozenDataFlag = new byte[2];
do
{
this.controller.deltaTauTcpClient.Pmac_GetMem(OFFSET_0x006E_BIT15_FOREGROUND_DONE, canReadFrozenDataFlag);
canReadMotors = (canReadFrozenDataFlag[1] & 0x80) == 0x80;
if (canReadMotors) break;
retryCount++;
Thread.Sleep(1);
} while (retryCount < 10);
if (!canReadMotors)
{
throw new DeltaTauControllerException(this.controller, "Timeout waiting on DPRAM Foreground Handshaking Bit");
}
// Obtain fresh content of DPRAM
this.controller.deltaTauTcpClient.Pmac_GetMem(OFFSET_0x006A_394BYTES_8MOTORS_DATA, this.motorsData);
this.motorsDataBorn = DateTime.Now;
}
finally
{
// Handshake signal to DPRAM write process on controller side that host PC has finished reading
this.controller.deltaTauTcpClient.Pmac_SetBit(OFFSET_0x006A_BIT15_FOREGROUND_READ, 15, false);
}
// Check live change in a separate atom
changeDetected = this.MotorsDataTimeStamp != motorsDataTimeStampPrev;
} while ((!changeDetected) && ((DateTime.Now - start).TotalMilliseconds < 255));
// Assert that result is live
if (!changeDetected)
{
throw new DeltaTauControllerException(this.controller, "DPRAM Background Data timestamp is not updated. DPRAM forground handshaking failed.");
}
}
}
Theres lots of different ways you could do this. You could use a critical section, as someone has already mentioned, but that won't give you the behavior of "just exit" if the other thread is blocking. For that you need some kind of flag. You could go with a volatile bool and lock around access of that bool, or you could use a semaphore with a single count. Finally you can use a mutex. The benefit of using the synchronization objects is you can do a WaitForSingleObject and set the timeout to 0. Then you can check if the wait was successful (if it is then the first thread has exited) or not (in which case the first thread is still running).
精彩评论