What is the most correct way to establish a two-way communication between processes using 0mq? I need to create several background processes开发者_如何学编程 that will wait for commands from the main process, perform some calculations and return the result back to the main process.
There are a few ways to do this. The most straight forward approach might be to use REQ
/REP
sockets. Each background process/worker would have a REP
socket, and you would use a REQ
socket to communicate with them:
import zmq
def worker(addr):
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind(addr)
while True:
# get message from boss
msg = socket.recv()
# ...do smth
# send back results
socket.send(msg)
if __name__ == '__main__':
# spawn 5 workers
from multiprocessing import Process
for i in range(5):
Process(target=worker, args=('tcp://127.0.0.1:500%d' % i,)).start()
You'd have to connect to each worker to send them a message, and get back results:
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect(worker_addr)
socket.send('message')
msg = socket.recv()
Another approach would be to use PUB
/SUB
to fire off messages to the workers and PUSH
/PULL
to harvest results:
import zmq
def worker(worker_id, publisher_addr, results_addr):
context = zmq.Context()
sub = context.socket(zmq.SUB)
sub.connect(publisher_addr)
sub.setsockopt(zmq.SUBSCRIBE, worker_id)
push = context.socket(zmq.PUSH)
push.connect(results_addr)
while True:
msg = sub.recv_multipart()[1]
# do smth, send off results
push.send_multipart([worker_id, msg])
if __name__ == '__main__':
publisher_addr = 'tcp://127.0.0.1:5000'
results_addr = 'tcp://127.0.0.1:5001'
# launch some workers into space
from multiprocessing import Process
for i in range(5):
Process(target=worker, args=('worker-%d' % i, publisher_addr, results_addr,)).start()
To broadcast a command to a specific worker, you'd do something like:
context = zmq.Context()
pub = context.socket(zmq.PUB)
pub.bind(publisher_addr)
# send message to worker-1
pub.send_multipart(['worker-1', 'hello'])
Pull in results:
context = zmq.Context()
pull = context.socket(zmq.PULL)
pull.bind(results_addr)
while True:
worker_id, result = pull.recv_multipart()
print worker_id, result
Consider using Request Reply Broker but exchange REQ socket into DEALER. DEALER is non blocking for sending and will automatically load balance traffic towards your workers.
In picture Client
would be your main process
and Service A/B/C
are your background processes (workers)
. Main process
should bind to an endpoint. Workers
should connect to main process's endpoint to receive work items.
In main process
keep list of work items and send time. If there is no answer for some time just resend work item again since worker
probably died.
精彩评论