I am trying to create a concurrent queue with Qt's concurrent threading constructs.
#ifndef CONCURRENTQUEUE_H
#define CONCURRENTQUEUE_H
#include <QMutex>
#include <QWaitCondition>
#include <queue>
template<typename Data>
class ConcurrentQueue
{
private:
std::queue<Data> the_queue;
QMutex the_mutex;
QWaitCondition the_condition_variable;
bool closed;
public:
void setClosed(bool state)
{
QMutexLocker locker(&the_mutex);
closed = state;
}
bool getClosed()
{
QMutexLocker locker(&the_mutex);
return closed;
}
void push(Data const& data)
{
QMutexLocker locker(&the_mutex);
the_queue.push(data);
the_condition_variable.wakeOne();
}
bool empty()
{
QMutexLocker locker(&the_mutex);
return the_queue.empty();
}
bool try_pop(Data& popped_value)
{
QMutexLocker locker(&the_mutex);
if(the_queue.empty())
{
return false;
}
popped_value = the_queue.front();
the_queue.pop();
return true;
}
void wait_and_pop(Data& popped_value)
{
QMutexLocker locker(&the_开发者_高级运维mutex);
while(the_queue.empty())
{
the_condition_variable.wait(&the_mutex);
}
popped_value = the_queue.front();
the_queue.pop();
the_condition_variable.wakeOne();
}
//created to allow for a limited queue size
void wait_and_push(Data const& data, const int max_size)
{
QMutexLocker locker(&the_mutex);
while(the_queue.size() >= max_size)
{
the_condition_variable.wait(&the_mutex);
}
the_queue.push(data);
the_condition_variable.wakeOne();
}
};
#endif // CONCURRENTQUEUE_H
I have my producer thread using the wait_and_push method to push data into the queue, and I have been trying to get my consumer to read from the queue using try_pop
while(!tiles->empty() || !tiles->getClosed())
{
if(!tiles->try_pop(tile))
continue;
//do stuff with the tile
}
However, this deadlocks sometimes. The producer sets the closed boolean as a flag to the consumer threads that it is finished loading the queue. My consumer only has that as a way to know whether teh queue is being loaded, still in progress, or hasnt been started.
The reason the producer has a "wait_and_push" isntead of using the normal push is because I wanted to be able to make that thread block until some items had been processed to avoid eating up so much memory, and doing unnecessary disk I/O.
Can anyone point me to what is going wrong?
You forgot to add
the_condition_variable.wakeOne();
in try_pop
.
If there will be multiple producers / consumers accessing your queue, you should have a separate QWaitCondition
for producers and consumers, otherwise wakeOne
might not wake the correct thread.
EDIT:
If there will be multiple producers / consumers, then you should have a notFullCondvar
, and an notEmptyCondvar
.
- The
try_pop
method wakes thenotFullCondvar
. - The
wait_and_pop
method waits on thenotEmptyCondvar
, but wakes thenotFullCondvar
. - The
push
method wakes thenotEmptyCondvar
. - The
wait_and_push
method waits on thenotFullCondvar
, but wakes thenotEmptyCondvar
.
I hope this makes sense.
精彩评论