开发者

Any obvious problems or improvements for my producer consumer queue

开发者 https://www.devze.com 2023-01-07 16:18 出处:网络
I asked a previous question about producer/consumer code that was overly general (though the answers were certainly helpful). So I\'ve taken the suggestions from an earlier SO question by another auth

I asked a previous question about producer/consumer code that was overly general (though the answers were certainly helpful). So I've taken the suggestions from an earlier SO question by another author and converted them to C++ and boost. However I'm always a bit concerned about multithreaded code - so if anyone can see any obvious improvements I'd love to hear about them.

#include <pthread.h>
#include <deque>
#include <iostream>

#include "boost/thread.hpp"


class MyQueue
{
protected:
  boost::mutex mutex_;
  boost::condition_variable condition_;
  bool cancel_;
  std::deque<int> data_;

public:
  MyQueue() : mutex_(), condition_(), cancel_(false), data_()
  {
  }

  struct Canceled{};

  void push( int i )
  {
     boost::lock_guard<boost::mutex> l(mutex_);
     if(cancel_) throw Canceled();
     data_.push_back(i);
     condition_.notify_all();
  }

  void pop( int & i )
  {
     boost::unique_lock<boost::mutex> l(mutex_);
     while(! cancel_ && data_.size()==0 )
     {
        condition_.wait( l );
     }
     if(cancel_) throw Canceled();

     assert( data_.size() != 0 );
     i = data_.front();
     data_.pop_front();
  }

  void cancel()
  {
     boost::lock_guard<boost::mutex> l(mutex_);
     if( cancel_) throw Canceled();
     cancel_ = true;
     condition_.notify_all();
  }
};


boost::mutex iomutex;

void producer( MyQueue * q, const std::string & name )
try
{
  for(unsigned int i=0 ; i<20; ++i)
  {
    q->push( i );
    boost::lock_guard<boost::mutex> l(iomutex);
    std::cout<<name<<"  PRODUCED "<<i<<std::endl;
  }

  sleep(1);
  q->cancel();
  {
    boost::lock_guard<boost::mutex> l(iomutex);
    std::cout<<name<<"  PRODUCER EXITING NORMALLY"<<std::endl;
  }
}
catch( MyQueue::Canceled & c )
{
  boost::lock_guard<boost::mutex> l(iomutex);
  std::cout<<name<<"  PRODUCER CANCLED "<<std::endl;
}

void consumer( MyQueue * q, const std::string & name )
try
{
  while(true)
  {
    int i;
    q->pop( i );
    boost::lock_guard<boost::mutex> l(iomutex);
    std::cout<<name<<"  CONSUMED "<<i<<std::endl;
  }
}
catch( MyQueue::Canceled & c )
{
  boost::lock_guard<boost::mutex> l(iomutex);
  std::cout<<name<<"  CONSUMER CANCLED "<<std::endl;
}

int main()
{
  MyQueue q;
  boost::thread pr1( producer, &q, "pro1");
  boost::thread pr2( producer, &q, "pro2");
  boost::thread cons1( consumer, &q, "con1");
开发者_JS百科  boost::thread cons2( consumer, &q, "con2");

  pr1.join();
  pr2.join();
  cons1.join();
  cons2.join();
}

UPDATE: I ended up using a modified version of Anthony Williams' concurrent queue. My modified version can be found here.


If you're worried about potential pitfalls in your implementation, you can try using Anthony Williams' (maintainer of the Boost.Thread library) excellent thread-safe, multiple-producer, multiple-consumer queue.

0

精彩评论

暂无评论...
验证码 换一张
取 消