开发者

0mq one-to-many connection

开发者 https://www.devze.com 2023-03-31 00:03 出处:网络
What is the most correct way to establish a two-way communication between processes using 0mq?I need to create several background processes开发者_如何学编程 that will wait for commands from the main p

What is the most correct way to establish a two-way communication between processes using 0mq? I need to create several background processes开发者_如何学编程 that will wait for commands from the main process, perform some calculations and return the result back to the main process.


There are a few ways to do this. The most straight forward approach might be to use REQ/REP sockets. Each background process/worker would have a REP socket, and you would use a REQ socket to communicate with them:

import zmq

def worker(addr):
    context = zmq.Context()
    socket = context.socket(zmq.REP)
    socket.bind(addr)
    while True:
        # get message from boss
        msg = socket.recv()
        # ...do smth
        # send back results
        socket.send(msg)

if __name__ == '__main__':
    # spawn 5 workers
    from multiprocessing import Process
    for i in range(5):
        Process(target=worker, args=('tcp://127.0.0.1:500%d' % i,)).start()

You'd have to connect to each worker to send them a message, and get back results:

context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect(worker_addr)
socket.send('message')
msg = socket.recv()

Another approach would be to use PUB/SUB to fire off messages to the workers and PUSH/PULL to harvest results:

import zmq

def worker(worker_id, publisher_addr, results_addr):
    context = zmq.Context()
    sub = context.socket(zmq.SUB)
    sub.connect(publisher_addr)
    sub.setsockopt(zmq.SUBSCRIBE, worker_id)
    push = context.socket(zmq.PUSH)
    push.connect(results_addr)

    while True:
        msg = sub.recv_multipart()[1]
        # do smth, send off results
        push.send_multipart([worker_id, msg])

if __name__ == '__main__':
    publisher_addr = 'tcp://127.0.0.1:5000'
    results_addr = 'tcp://127.0.0.1:5001'

    # launch some workers into space
    from multiprocessing import Process
    for i in range(5):
        Process(target=worker, args=('worker-%d' % i, publisher_addr, results_addr,)).start()

To broadcast a command to a specific worker, you'd do something like:

context = zmq.Context()
pub = context.socket(zmq.PUB)
pub.bind(publisher_addr)
# send message to worker-1
pub.send_multipart(['worker-1', 'hello'])

Pull in results:

context = zmq.Context()
pull = context.socket(zmq.PULL)
pull.bind(results_addr)

while True:
    worker_id, result = pull.recv_multipart()
    print worker_id, result


Consider using Request Reply Broker but exchange REQ socket into DEALER. DEALER is non blocking for sending and will automatically load balance traffic towards your workers.

In picture Client would be your main process and Service A/B/C are your background processes (workers). Main process should bind to an endpoint. Workers should connect to main process's endpoint to receive work items.

In main process keep list of work items and send time. If there is no answer for some time just resend work item again since worker probably died.

0mq one-to-many connection

0

精彩评论

暂无评论...
验证码 换一张
取 消