zoukankan      html  css  js  c++  java
  • 进程之间的通行

    互斥锁

    • 锁:from multiprocessing import Lock,Lock即为锁

    • from multiprocessing import Process
      from multiprocessing import Lock
      import os
      import time
      import random
      
      
      def task1(p,lock):
          lock.acquire()
          print(f'{p}开始打印了')
          time.sleep(random.randint(1, 4))
          print(f'{p}打印结束了')
          lock.release()
      
      
      def task2(p,lock):
          lock.acquire()
          print(f'{p}开始打印了')
          time.sleep(random.randint(1, 4))
          print(f'{p}打印结束了')
          lock.release()
      
      
      def task3(p,lock):
          lock.acquire()
          print(f'{p}开始打印了')
          time.sleep(random.randint(1, 4))
          print(f'{p}打印结束了')
          lock.release()
      
      
      if __name__ == '__main__':
          mutex = Lock()
          p1 = Process(target=task1, args=('p1', mutex))
          p2 = Process(target=task2, args=('p2', mutex))
          p3 = Process(target=task3, args=('p3', mutex))
          p1.start()
          p1.join()
          p2.start()
          p2.join()
          p3.start()
          p3.join()
      
      
    • lock与join的区别

      • 共同点:都可以把并发变成串行,保证了顺序
      • 不同点:join人为设定顺序,lock让其争抢顺序,保证了公平性

    进程之间的通信

    1,基于文件通信

    • 效率低

    • 应用到了互斥锁:可以公平性的保证顺序,以数据的安全,串行

    • 自己加锁麻烦且很容易出现死锁

    • from multiprocessing import Process
      from multiprocessing import Lock
      import os
      import time
      import json
      
      
      def search():
          time.sleep(1)
          with open("ticket.json", 'r', encoding='utf-8') as f:
              dic = json.load(f)
              print(f'{os.getpid()}查询了票,剩余{dic["count"]}张票')
      
      
      def paid():
          with open("ticket.json", 'r', encoding='utf-8') as f:
              dic = json.load(f)
          if dic["count"] > 0:
              dic["count"] -= 1
              time.sleep(1)
              with open("ticket.json", 'w', encoding='utf-8') as f1:
                  json.dump(dic, f1)
              print(f'{os.getpid()}购票成功')
      
      
      def task(lock):
          search()
          lock.acquire()
          paid()
          lock.release()
      
      
      if __name__ == '__main__':
          mutex = Lock()
          for i in range(6):
              p = Process(target=task, args=(mutex,))
              p.start()
      

    2,基于队列的通行

    • 进程彼此之间互相隔离,要实现进程间通信,multiprocessing模块支持两种形式:队列和管道,传递消息

    • queue:创建共享的进程队列,queue是多进程安全带队列,可以使用queue实现多进程之间的数据传递

    • from multiprocessing import Queue

    • 队列:q = Queue()

    • Queue(object),队列是一个继承object的类,

      • 默认参数:maxsize,不传参表示零到无穷大。
      • 传参以后,put方法和get方法的最大上限就是参数的值,超过参数的值,程序进入阻塞
    • 主要方法:

      • put方法

      •     def put(self, obj, block=True, timeout=None):
                assert not self._closed, "Queue {0!r} has been closed".format(self)
                if not self._sem.acquire(block, timeout):
                    raise Full
        
      • get方法

      •     def get(self, block=True, timeout=None):
                if block and timeout is None:
                    with self._rlock:
                        res = self._recv_bytes()
                    self._sem.release()
                else:
                    if block:
                        deadline = time.monotonic() + timeout
                    if not self._rlock.acquire(block, timeout):
                        raise Empty
        
    • 方法的默认参数

      • blocked
        • 默认为blocked = True ,主动设置为False则抛出FullEmpty异常
      • timeout
        • 默认为timeout = 0,如果指定时间,方法进入阻塞,超出时间,则会抛出FullE异常
    希望你眼眸有星辰,心中有山海,从此以梦为马,不负韶华
  • 相关阅读:
    Jenkins运行完Test后,把ngreport生成的测试报告 拷贝到相应的文件夹
    解析xml报classnotfound错误
    配置NGReport 报告中文
    fork()调用使子进程先于父进程被调度
    堆排序
    良序原理
    高速缓冲区初始化
    Python3:输出当前目录所有目录和文件--walk()函数
    Python3:输出当前目录所有文件的第二种方式-walk()函数
    Python3:递归实现输出目录下所有的文件
  • 原文地址:https://www.cnblogs.com/daviddd/p/12034397.html
Copyright © 2011-2022 走看看