I'm using boost threads to parallelize the calculations in my program. A controller object manages the calculation jobs and the results. I create a bunch of worker threads which get their jobs from the controller object while the main thread displays the results. The results need to be shown in the correct order. To achieve this I use boost futures in a std::deque
. GetNewJob()
adds a new boost::promise
to the end of the deque and returns a pointer. GetNextResult()
takes a result from the front end of the queue. If no result is ready yet, it blocks the calling thread.
The important parts of my Controller class:
class Controller
{
public:
Controller();
boost::shared_ptr<boost::promise<Results> > GetNewJob();
Results GetNextResult();
class NoJobsLeft{};
class NoResultsLeft{};
private:
bool JobsLeft() const;
bool ResultsLeft() const;
std::deque<boost::shared_ptr<boost::promise<Results> > > queue_;
boost::mutex mutex_;
boost::condition_variable condition_;
};
Worker function:
void DoWork()
{
try
{
while(true)
{
boost::shared_ptr<boost::promise<Results> >
promise(controller.GetNewJob());
//do calculations
promise->set_value(results);
开发者_如何学Python}
}
catch(NoJobsLeft)
{
}
}
Main program code:
Controller controller(args);
boost::thread_group worker_threads;
for (unsigned i = 0; i < n_cpus; ++i)
worker_threads.create_thread(DoWork);
try
{
while(true)
{
Results results = controller.GetNextResult();
std::cout << results;
std::cout << std::endl;
}
}
catch(NoResultsLeft)
{
}
worker_threads.join_all();
Sometimes this works just fine, all results are displayed. But very often I cannot see any output at all.
I do not use cout
in the worker threads.
Implementations of GetNewJob()
, GetNextResult()
:
boost::shared_ptr<boost::promise<Results> > Controller::GetNewJob()
{
boost::lock_guard<boost::mutex> lock(mutex_);
if (!JobsLeft())
throw NoJobsLeft();
//determine more information about the job, not important here
queue_.push_back(boost::make_shared<boost::promise<Results> >());
condition_.notify_one();
return queue_.back();
}
Results Controller::GetNextResult()
{
boost::shared_ptr<boost::promise<Results> > results;
{
boost::unique_lock<boost::mutex> lock(mutex_);
if (!ResultsLeft())
throw NoResultsLeft();
while(!queue_.size())
{
condition_.wait(lock);
}
results = queue_.front();
queue_.pop_front();
}
return results->get_future().get();
}
bool Controller::ResultsLeft() const
{
return (queue_.size() || JobsLeft()) ? true : false;
}
In the case where you don't see any output it could be throwing NoResultsLeft
because nothing is in the queue on the first pass. The other possibility is that it is failing to add things to the queue in the first place or throwing NoJobsLeft
. Adding std::cout
statements into your catch blocks might help identify what's going on.
However if the results can't be displayed asynchronously then there is no reason for the mechanism you might as well wait all. There is no guarantee of the order of completion of threads only the order in which you add the results to the queue via boost::promise
so you have to block anyway in GetNextResult
at least until the first thread completes.
If you want to display results sequentially your controller could gather all results in the same way and fire a boost::function
to display the results in the correct order when everything is ready.
Edit:
BTW while(!queue_.size())
really ought to be while(queue_.empty())
while technically anything non-zero is interpreted as true methods named size()
, length()
, etc really look ugly when used as an if condition. Same goes for return (queue_.size() || JobsLeft()) ? true : false;
which could be return (!queue.empty() || JobsLeft());
精彩评论