开发者

Incorrect use of boost::asio and boost::thread

开发者 https://www.devze.com 2023-03-17 02:53 出处:网络
I am using boost::asio and boost::thread to realize a message service which accepts messages, send them asynchronously if there is no message being processed or queues the message if there are message

I am using boost::asio and boost::thread to realize a message service which accepts messages, send them asynchronously if there is no message being processed or queues the message if there are messages being processed.

The message rate is to my mind high, about 2.000 messages per second. With so many messages I face corrupted message, very seldom though. In 2.000 messages about 4-8 are corrupted. I believe the problem is due to incorrect use of the boost::asio an开发者_Go百科d/or boost::thread library.

The code I've implemented is mainly based on this boost tutorial. I cannot find a mistake and since the major messages work out find it's hard for me to narrow down the problem.

Maybe someone else has an idea what's going wrong here?

Basically this class is used in the following way:

(1) The constructor is called in the beginning of my program in order to launch the thread, thus the service to accept and transmit messages

(2) Whenever I want to transmit a message I make a call to MessageService::transmitMessage() which delegates the task with async_write to the thread processing the message queue.

using namespace google::protobuf::io;
using boost::asio::ip::tcp;

MessageService::MessageService(std::string ip, std::string port) :
    work(io_service), resolver(io_service), socket(io_service) {

    messageQueue = new std::deque<AgentMessage>;
    tcp::resolver::query query(ip, port);
    endpoint_iterator = resolver.resolve(query);

    tcp::endpoint endpoint = *endpoint_iterator;

    socket.async_connect(endpoint, boost::bind(&MessageService::handle_connect,
            this, boost::asio::placeholders::error, ++endpoint_iterator));

    boost::thread t(boost::bind(&boost::asio::io_service::run, &io_service));
}

void MessageService::await() {

    while (!messageQueue->empty()) {

        signal(SIGINT, exit);

        int messagesLeft = messageQueue->size();
        sleep(3);
        std::cout << "Pending Profiler Agents Messages: "
                << messageQueue->size() << std::endl;
        if (messagesLeft == messageQueue->size()) {
            std::cout << "Connection Error" << std::endl;
            break;
        }
    }

    std::cout << i << std::endl;
}

void MessageService::write(AgentMessage agentMessage, long systemTime,
        int JVM_ID) {
    agentMessage.set_timestamp(Agent::Helper::getCurrentClockCycle());
    agentMessage.set_jvm_id(JVM_ID);
    agentMessage.set_systemtime(systemTime);
    io_service.post(boost::bind(&MessageService::do_write, this, agentMessage));
}

void MessageService::do_close() {
    socket.close();
}

void MessageService::transmitMessage(AgentMessage agentMessage) {

    ++i;

    boost::asio::streambuf b;
    std::ostream os(&b);

    ZeroCopyOutputStream *raw_output = new OstreamOutputStream(&os);
    CodedOutputStream *coded_output = new CodedOutputStream(raw_output);

    coded_output->WriteVarint32(agentMessage.ByteSize());
    agentMessage.SerializeToCodedStream(coded_output);

    delete coded_output;
    delete raw_output;

    boost::system::error_code ignored_error;

    boost::asio::async_write(socket, b.data(), boost::bind(
            &MessageService::handle_write, this,
            boost::asio::placeholders::error));
}

void MessageService::do_write(AgentMessage agentMessage) {

    bool write_in_progress = !messageQueue->empty();
    messageQueue->push_back(agentMessage);

    if (!write_in_progress) {
        transmitMessage(agentMessage);
    }
}

void MessageService::handle_write(const boost::system::error_code &error) {

    if (!error) {
        messageQueue->pop_front();
        if (!messageQueue->empty()) {
            transmitMessage(messageQueue->front());
        }
    } else {
        std::cout << error << std::endl;
        do_close();
    }
}

void MessageService::handle_connect(const boost::system::error_code &error,
        tcp::resolver::iterator endpoint_iterator) {
    // can be used to receive commands from the Java profiler interface
}

MessageService::~MessageService() {
    // TODO Auto-generated destructor stub
}

The header file:

    using boost::asio::ip::tcp;

class MessageService {
public:
    MessageService(std::string ip, std::string port);
    virtual ~MessageService();
    void write(AgentMessage agentMessage, long systemTime, int JVM_ID);
    void await();

private:
    boost::asio::io_service io_service;
    boost::asio::io_service::work work;
    tcp::resolver resolver;
    tcp::resolver::iterator endpoint_iterator;
    tcp::socket socket;
    std::deque<AgentMessage> *messageQueue;

    void do_write(AgentMessage agentMessage);

    void do_close();

    void handle_write(const boost::system::error_code &error);

    void handle_connect(const boost::system::error_code &error,
            tcp::resolver::iterator endpoint_iterator);

    void transmitMessage(AgentMessage agentMessage);
};


this method seems dubious to me

void MessageService::transmitMessage(AgentMessage agentMessage) {
    ++i;

    boost::asio::streambuf b;
    std::ostream os(&b);

    ZeroCopyOutputStream *raw_output = new OstreamOutputStream(&os);
    CodedOutputStream *coded_output = new CodedOutputStream(raw_output);

    coded_output->WriteVarint32(agentMessage.ByteSize());
    agentMessage.SerializeToCodedStream(coded_output);

    delete coded_output;
    delete raw_output;

    boost::system::error_code ignored_error;

    boost::asio::async_write(socket, b.data(), boost::bind(
            &MessageService::handle_write, this,
            boost::asio::placeholders::error));
}

You appear to be serializing an AgentMessage (which should be passed via const reference btw) into a streambuf. Yet this serialized data does is not guaranteed to exist until the async_write completion handler is invoked, which is explicitly described in the async_write documentation

buffers

One or more buffers containing the data to be written. Although the buffers object may be copied as necessary, ownership of the underlying memory blocks is retained by the caller, which must guarantee that they remain valid until the handler is called.

to solve this behavior, ensure the buffer remains in scope until the completion handler is invoked. One way to do this is to pass the buffer as an argument to your bounded completion handler:

boost::asio::async_write(socket, b.data(), boost::bind(
            &MessageService::handle_write, this,
            boost::asio::placeholders::error,
            coded_output
            // ^^^ buffer goes here
            ));

then delete it from within the completion handler. I'd suggest you also look at using a shared_ptr instead of naked pointers as well.

0

精彩评论

暂无评论...
验证码 换一张
取 消

关注公众号