开发者

lost messages on zeromq pub sub

开发者 https://www.devze.com 2023-04-05 16:21 出处:网络
I\'m trying to implement the pub sub design pattern using zeromq framework. The idea is to launch a subscriber and afterwards to launch the publisher.

I'm trying to implement the pub sub design pattern using zeromq framework. The idea is to launch a subscriber and afterwards to launch the publisher. The subscriber will listen to 100 messages and the publisher will publish 100 messages. So far so good... However what actually happens is that even that the subscriber is already up and running when the publisher is launched , not all of the messages are received by the subscriber (a 100 messages will be picked up by the subscriber if the publisher will send at least 500 message). It seems that the first messages sent by the publisher are not sent to the subscriber.

Any ideas?

Thanks in advance, Omer.

Subscriber code (launched before the publisher)

int i=0;
zmq::context_t context (1);
zmq::socket_t subscriber (context, ZMQ_SUB);
subscriber.connect("tcp://localhost:5556");
subscriber.setsockopt(ZMQ_SUBSCRIBE, "", 0);

for (int update_nbr = 0; update_nbr < 100; update_nbr++) 
{        
    zmq::message_t update;
    subscriber.recv(&update);
    i++;
    std::cout<<"receiving  :"<<i<<std::endl;
}

Publisher code (launched after the subscriber)

zmq::context_t context (1);
zmq::socket_t publisher (context, ZMQ_PUB);
publisher.bind("tcp://*:5556");

int i = 0;
for (int update_nbr = 0; update_nbr < 100; update_nbr++) 
{        
    //  Send message to all subscribers
    zmq::message_t request (20);

    time_t seconds;
    seconds = time (NULL);

    char update [20]="";
    sprintf (update, "%ld", seconds);

    memcpy ((void *) request.data (), update,strlen(update));
开发者_JS百科    publisher.send(request);
    i++;
    std::cout << "sending :" << i << std::endl;

}


See https://zguide.zeromq.org/docs/chapter2/#Missing-Message-Problem-Solver (flowchart in Figure 25, and explanations below)

Basically, it takes a little time (a few milliseconds) for the connection to be set up, and in that time lots of messages can be lost. The publisher needs to sleep a little before starting to publish, or (better) it needs to explicitly synchronize with the subscriber.


Please look the guide.

  1. publisher sends "hello"
  2. each subscriber that receive "hello", send a message to publisher via REQ/REP socket
  3. when publisher gets enough REQ/REP message, then it begins to publish data


In 0MQ, successful send() does not mean that data is sent immediately over network. http://api.zeromq.org/2-1:zmq-send. Your messages are pretty small, and AFAIR 0MQ does some sort of buffering for small messages to use network more effectively.

If I remember correctly , out_batch_size in config.hpp of 0MQ controls such behavior.


One thing to look at (beyond what previous commenters have noted) is your shutdown procedure.

The code snippets might simply be incomplete, but I don't see how you are handling shutdown. In particular you might actually be losing the last messages that are sent. Take a look at the documentation for zmq_close, zmq_term and ZMQ_LINGER. If you are not actually calling these functions and instead are simply terminating the application, then there is a chance that the messages that have been sent with zmq_send() but haven't been transferred to the network are being lost at shutdown.

To check which messages are being lost, you might try to send a sequence number in addition to the timestamp.

0

精彩评论

暂无评论...
验证码 换一张
取 消