开发者

zero mq pub/sub with multipart not working

开发者 https://www.devze.com 2023-03-10 07:48 出处:网络
Here\'s my script. #!/usr/bin/env python import traceback import sys import zmq from time import sleep print \"Creating the zmq.Context\"

Here's my script.


#!/usr/bin/env python

import traceback
import sys
import zmq
from time import sleep

print "Creating the zmq.Context"
context = zmq.Context()

print "Binding the publisher to the local socket at port 5557"
sender = context.socket(zmq.PUB)
sender.bind("tcp://*:5557")

print "Binding the subscriber to the local socket at port 5557"
receiver = context.socket(zmq.SUB)
receiver.connect("tcp://*:5557")

print "Setting the subscriber option to get only those originating from \"B\""
receiver.setsockopt(zmq.SUBSCRIBE, "B")

print "Waiting a second for the socket to be created."
sleep(1)

print "Sending messages"
for i in range(1,10):
    msg = "msg %d" % (i)
    env = None
    if i % 2 == 0:
        env = ["B", msg]
    else:
        env = ["A", msg]
    print "Sending Message:  ", env
    sender.send_multipart(env)

print "Closing the sender."
sender.close()

failed_attempts = 0
while failed_attempts < 3:
    try:
        print str(receiver.recv_multipart(zmq.NOBLOCK))
    except:
  开发者_如何学运维      print traceback.format_exception(*sys.exc_info())
        failed_attempts += 1 

print "Closing the receiver."
receiver.close()

print "Terminating the context."
context.term()

"""
Output:

Creating the zmq.Context
Binding the publisher to the local socket at port 5557
Binding the subscriber to the local socket at port 5557
Setting the subscriber option to get only those originating from "B"
Waiting a second for the socket to be created.
Sending messages
Sending Message:   ['A', 'msg 1']
Sending Message:   ['B', 'msg 2']
Sending Message:   ['A', 'msg 3']
Sending Message:   ['B', 'msg 4']
Sending Message:   ['A', 'msg 5']
Sending Message:   ['B', 'msg 6']
Sending Message:   ['A', 'msg 7']
Sending Message:   ['B', 'msg 8']
Sending Message:   ['A', 'msg 9']
Closing the sender.
['B', 'msg 2']
['B', 'msg 4']
['B', 'msg 6']
['B', 'msg 8']
['Traceback (most recent call last):\n', '  File "./test.py", line 43, in \n    print str(receiver.recv_multipart(zmq.NOBLOCK))\n', '  File "socket.pyx", line 611, in zmq.core.socket.Socket.recv_multipart (zmq/core/socket.c:5181)\n', '  File "socket.pyx", line 514, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4811)\n', '  File "socket.pyx", line 548, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4673)\n', '  File "socket.pyx", line 99, in zmq.core.socket._recv_copy (zmq/core/socket.c:1344)\n', 'ZMQError: Resource temporarily unavailable\n']
['Traceback (most recent call last):\n', '  File "./test.py", line 43, in \n    print str(receiver.recv_multipart(zmq.NOBLOCK))\n', '  File "socket.pyx", line 611, in zmq.core.socket.Socket.recv_multipart (zmq/core/socket.c:5181)\n', '  File "socket.pyx", line 514, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4811)\n', '  File "socket.pyx", line 548, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4673)\n', '  File "socket.pyx", line 99, in zmq.core.socket._recv_copy (zmq/core/socket.c:1344)\n', 'ZMQError: Resource temporarily unavailable\n']
['Traceback (most recent call last):\n', '  File "./test.py", line 43, in \n    print str(receiver.recv_multipart(zmq.NOBLOCK))\n', '  File "socket.pyx", line 611, in zmq.core.socket.Socket.recv_multipart (zmq/core/socket.c:5181)\n', '  File "socket.pyx", line 514, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4811)\n', '  File "socket.pyx", line 548, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4673)\n', '  File "socket.pyx", line 99, in zmq.core.socket._recv_copy (zmq/core/socket.c:1344)\n', 'ZMQError: Resource temporarily unavailable\n']
Closing the receiver.
Terminating the context.
"""

And, the question is... why doesn't this code work?

[EDIT] After getting a super quick response on the zeromq mailing list, I've updated the code above.


Credit: Chuck Remes

You may need a "sleep" between the socket creation steps (bind, connect, setsockopt) and the actual transmission of the messages. The bind & connect operations are asynchronous, so they may not complete by the time you get to the logic that sends all of the messages. In that case, any messages sent through the PUB socket will be dropped since a zmq_bind() operation does not create a queue until another socket has successfully connected to it.

As a side note, you don't need to create 2 contexts in this example. Both sockets can be created within the same context. It doesn't hurt, but it also isn't necessary.

Credit: Pieter

There is a "problem solver" at the end of Ch1 that explains this.

Some socket types (ROUTER and PUB) will silently drop messages for which they have no recipients. Connecting is, as Chuck said, asynchronous and takes approx 100msec. If you start two threads, bind one side, connect the other, and then start immediately to send data over such a socket type, you'll lose the first 100msec of data (approximately).

Doing a sleep is a brutal "prove that it works" option. Realistically you'd synchronize in some way, or (more typically) expect message loss as part of normal startup (i.e. see the published data as a pure broadcast with no definite start or end).

See weather update example, syncpub and syncsub for details.


Necro-posting, but for those interested in a solution other than sleeping, there are monitors.

You can set a monitor callback and get called on ZMQ_EVENT_CONNECTED events.

See details and example at http://api.zeromq.org/3-3:zmq-ctx-set-monitor.

0

精彩评论

暂无评论...
验证码 换一张
取 消

关注公众号