开发者

Python多进程之进程同步及通信详解

开发者 https://www.devze.com 2022-12-05 12:36 出处:网络 作者: 程序员-夏天
目录进程同步Lock(锁)进程通信Queue(队列)编程客栈Pipe(管道)Semaphore(信号量)Event(事件)总结上篇文章介绍了什么是进程、进程与程序的关系、进程的创建与使用、创建进程池等,接下来就来介绍一下进程同步
目录
  • 进程同步
    • Lock(锁)
  • 进程通信
    • Queue(队列)编程客栈
    • Pipe(管道)
    • Semaphore(信号量)
    • Event(事件)
  • 总结

    上篇文章介绍了什么是进程、进程与程序的关系、进程的创建与使用、创建进程池等,接下来就来介绍一下进程同步及进程通信。

    进程同步

    当多个进程使用同一份数据资源的时候,因为进程的运行没有顺序,运行起来也无法控制,如果不加以干预,往往会引发数据安全或顺序混乱的问题,所以要在多个进程读写共享数据资源的时候加以适当的策略,来保证数据的一致性问题。

    Lock(锁)

    一个Lock对象有两个方法:acquire()和release()来控制共享数据的读写权限, 看下面这张图片,使用多进程的时候会经常出现这种情况,这是因为多个进程都在抢占输出资源,共享同一打印终端,从而造成了输出信息的错乱。

    Python多进程之进程同步及通信详解

    那么就可以使用Lock机制:

    import multiwww.cppcns.comprocessing
    import random
    import time
    def work(lock, i):
        lock.acquire()
        print("work'{}'执行中......".format(i), multiprocessing.current_process().name, multiprocessing.current_process().pid)
        time.sleep(random.randint(0, 2))
        print("work'{}'执行完毕......".format(i))
        lock.release()
    if __name__ == '__main__':
        lock = multiprocessing.Lock()
        for i in range(5):
            p = multiprocessing.Process(target=work, args=(lock, i))
            p.start()

    由于引入了Lock机制,同一时间只能有一个进程抢占到输出资源,其他进程等待该进程结束,锁释放到,才可以抢占,这样会解决多进程间资源竞争导致数据错乱的问题,但是由并发执行变成了串行执行,会牺牲运行效率。

    进程通信

    上篇文章说过,进程之间互相隔离,数据是独立的,默认情况下互不影响,那要如何实现进程间通信呢?python提供了多种进程通信的方式,下面就来说一下。

    Queue(队列)

    muhttp://www.cppcns.comltiprocessing模块提供的Queue多进程安全的消息队列,可以实现多进程之间的数据传递。

    说明

    • 初始化Queue()对象时(例如:q=Queue()),若括号中没有指定最⼤可接收的消息数量,或数量为负值,那么就代表可接受的消息数量没有上限(直到内存的尽头)。
    • Queue.qsize():返回当前队列包含的消息数量。
    • Queue.empty():如果队列为空,返回True,反之False。
    • Queue.full():如果队列满了,返回True,反之False。
    • Queue.get(block, timeout):获取队列中的⼀条消息,然后将其从列队中移除,block默认值为True。如果block使⽤默认值,且没有设置timeout(单位秒),消息列队如果为空,此时程序将被阻塞(停在读取状态),直到从消息列队读到消息为⽌,如果设置了timeout,则会等待timeout秒,若还没读取到任何消息,则抛出Queue.Empty异常;如果block值为False,消息列队如果为空,则会⽴刻抛出Queue.Empty异常。
    • Queue.get_nowait():相当Queue.get(False)。
    • Queue.put(item, block, timeout):将item消息写⼊队列,b编程客栈lock默认值为True,如果block使⽤默认值,且没有设置timeout(单位秒),消息列队如果已经没有空间可写⼊,此时程序将被阻塞(停在写⼊状态),直到消息列队腾出空间为⽌,如果设置了timeout,则会等待timeout秒,若还没空间,则抛出Queue.Full异常;如果block值为False,消息列队如果没有空间可写⼊,则会⽴刻抛出Queue.Full异常。
    • Queue.put_nowait(item):相当于Queue.put(item, False)。
    from multiprocessing import Process, Queue
    import time
    def write_task(queue):
        """
        向队列中写入数据
        :param queue: 队列
        :return:
        """
        for i in range(5):
            if queue.full():
                print("队列已满!")
            message = "消息{}".format(str(i))
            queue.put(message)
            print("消息{}写入队列".format(str(i)))
    def read_task(queue):
        """
        从队列读取数据
        :param queue: 队列
        :return:
        """
        while True:
            print("从队列读取:{}".format(queue.get(True)))
    if __name__ == '__main__':
        print("主进程执行......")
        # 主进程创建Queue,最大消息数量为3
        queue = Queue(3)
        pw = Process(target=write_task, args=(queue, ))
        pr = Process(target=read_task, args=(queue, ))
        pw.start()
        pr.start()

    运行结果为:

    Python多进程之进程同步及通信详解

    从结果我们可以看出,队列最大可以放入3条消息,后面再来消息,要等read_task从队列里取出后才行。

    Pipe(管道)

    Pipe常用于两个进程,两个进程分别位于管道的两端,Pipe(duplex)方法返回(conn1,conn2)代表一个管道的两端,duplex参数默认为True,即全双工模式,若为False,conn1只负责接收信息,conn2负责发送。

    send()和recv()方法分别是发送和接受消息的方法。

    import multiprocessing
    import time
    import random
    def proc_send(pipe):
        """
        发送消息
        :param pipe:管道一端
        :return:
        """
        for i in range(10):
            print("process send:{}".format(str(i)))
            pipe.send(i)
            time.sleep(random.random())
    def proc_recv(pipe):
        """
        接收消息
        :param pipe:管道一端
        :return:
        """
        while True:
            print("Process recv:{}".format(pipe.recv()))
            time.sleep(random.random())
    if __name__ == '__main__':
        # 主进程创建pipe
        pipe = multiprocessing.Pipe()
        p1 = multiprocessing.Process(target=proc_send,args=(pipe[0], ))
        p2 = multiprocessing.Process(target=proc_recv,args=(pipe[1], ))
        p1.start()
        p2.start()
        p1.join()
        p2.terminate()

    执行结果为:

    Python多进程之进程同步及通信详解

    Semaphore(信号量)

    Semaphore用来控制对共享资源的访问数量,和进程池的最大连接数类似。

    import multiprocessing
    import random
    import time
    def work(s, i):
        s.acquire()
        print("work'{}'执行中......".format(i), multiprocessing.currhttp://www.cppcns.coment_process().name, multiprocessing.current_process().pid)
        time.sleep(i*2)
        print("work'{}'执行完毕......".format(i))
        s.release()
    if __name__ == '__main__':
        s = multiprocessing.Semaphore(2)
        for i in range(1, 7):
            p = multiprocessing.Process(target=work, args=(s, i))
            p.start()

    上面的代码中使用Semaphore限制了最多有2个进程同时执行,那么来一个进程获得一把锁,计数加1,当计数等于2时,后面再来的进程均需要等待,等前面的进程释放掉,才可以获得锁。

    信号量与进程池的概念上类似,但是要区分开来,信号量涉及到加锁的概念。

    Event(事件)

    Event用来实现进程间同步通信的。运行的机制是:全局定义了一个flag,如果flag值为False,当程序执行event.wait()方法时就会阻塞,如果flag值为True时,程序执行event.wait()方法时不会阻塞继续执行。

    Event常⽤函数:

    • event.wait():在进程中插入一个标记(flag),默认为False,可以设置timeout。
    • event.set():使flag为Ture。
    • event.clear():使flag为False。
    • event.is_set():判断flag是否为True。
    import multiprocessing
    import time
    def wait_for_event(e):
        print("wait_for_event执行")
        e.wait()
        print("wait_for_event: e.is_set():{}".format(e.is_set()))
    def wait_for_event_timeout(e, t):
        print("wait_for_event_timeout执行")
        # 只会阻塞2s
        e.wait(t)
        print("wait_for_event_timeout:e.is_set:{}".format(e.is_set()))
    if __name__ == "__main__":
        e = multiprocessing.Event()
        p1 = multiprocessing.Process(target=wait_for_event, args=(e,))
        p1.start()
        p2 = multiprocessing.Process(target=wait_for_event_timeout, args=(e, 2))
        p2.start()
        time.sleep(4)
        # 4s之后使用e.set()将flag设为Ture
        e.set()
        print("主进程:flag设置为True")

    执行结果如下:

    Python多进程之进程同步及通信详解

    总结

    本篇文章就到这里了,希望能够给你带来帮助,也希望您能够多多关注我们的更多内容!

    0

    精彩评论

    暂无评论...
    验证码 换一张
    取 消

    关注公众号