zoukankan      html  css  js  c++  java
  • 进程之间的通信

    互斥锁

    from multiprocessing import Process,Lock
    import os,time
    def work(lock):
        lock.acquire()    上锁
        print('%s is running' %os.getpid())
        time.sleep(2)
        print('%s is done' %os.getpid())
        lock.release()    解锁
    if __name__ == '__main__':
        lock=Lock()      造锁(实例化一个锁)
        for i in range(3):
            p=Process(target=work,args=(lock,))
            p.start()
    调用“锁”的方式

    示例背景

    三台电脑同一时刻共同调用打印机完成打印任务,即三个进程同一时刻共抢同一个资源(输出平台)

    多个进程共抢一个资源,应结果第一位,效率第二位,所以应该牺牲效率,保求结果(串行)

    示例代码

    以上的代码虽然完成串行结果,但没有实现公平,执行的顺序都是人为写好的,应该做到公平的抢占资源,谁先抢到就执行谁

    from multiprocessing import Process
    from multiprocessing import Lock
    import time
    import random
    
    
    def task1(lock):
        print('task1')  # 验证cpu遇到io切换了
        lock.acquire()
        print('task1: 开始打印')
        time.sleep(random.randint(1, 3))
        print('task1: 打印完成')
        lock.release()
    
    def task2(lock):
        print('task2')  # 验证cpu遇到io切换了
        lock.acquire()
        print('task2: 开始打印')
        time.sleep(random.randint(1, 3))
        print('task2: 打印完成')
        lock.release()
    
    
    def task3(lock):
        print('task3') # 验证cpu遇到io切换了
        lock.acquire()
        print('task3: 开始打印')
        time.sleep(random.randint(1, 3))
        print('task3: 打印完成')
        lock.release()
    
    
    if __name__ == '__main__':
    
        lock = Lock()
    
        p1 = Process(target=task1, args=(lock,))
        p2 = Process(target=task2, args=(lock,))
        p3 = Process(target=task3, args=(lock,))
    
        p1.start()
        p2.start()
        p3.start()
    正确代码

    上锁时一定要给进程上同一把锁,而且上锁一次,就一定要解锁一次

     互斥锁和join的区别

    共同点

    都是完成了进程之间的串行

    区别

    join是通过人为控制使进程串行

    互斥锁是随机抢占资源,保证了公平性

    业务需求分析:
    
    买票之前先要查票,必须经历的流程: 你在查票的同时,100个人也在查本此列票.
    买票时,你要先从服务端获取到票数,票数>0 ,买票,然后服务端票数减一. 中间肯定有网络延迟.
    from multiprocessing import Process
    from multiprocessing import Lock
    import time
    import json
    import os
    import random
    多进程原则上是不能互相通信的,它们在内存级别数据隔离的.不代表磁盘上数据隔离.
    它们可以共同操作一个文件.
    
    def search():
        time.sleep(random.random())
        with open('db.json',encoding='utf-8') as f1:
            dic = json.load(f1)
        print(f'剩余票数{dic["count"]}')
    
    
    def get():
        with open('db.json',encoding='utf-8') as f1:
            dic = json.load(f1)
        time.sleep(random.randint(1,3))
        if dic['count'] > 0:
            dic['count'] -= 1
            with open('db.json', encoding='utf-8', mode='w') as f1:
                json.dump(dic,f1)
            print(f'{os.getpid()}用户购买成功')
        else:
            print('没票了.....')
    
    def task(lock):
        search()
        lock.acquire()
        get()
        lock.release()
    
    
    if __name__ == '__main__':
        lock = Lock()
        for i in range(5):
            p = Process(target=task,args=(lock,))
            p.start()
    模拟抢票系统

    队列

    进程之间的通信最好的方式是基于队列

    from multiprocessing import Process
    from multiprocessing import Queue
    import time
    import os
    import random
    # 多进程原则上是不能互相通信的,它们在内存级别数据隔离的.不代表磁盘上数据隔离.
    # 它们可以共同操作一个文件.
    
    def search(ticket):
        time.sleep(random.random())
        print(f'剩余票数{ticket}')
    
    
    def _get(q):
        dic = q.get()
        time.sleep(random.randint(1,3))
        if dic['count'] > 0:
            dic['count'] -= 1
            print(f'{os.getpid()}用户购买成功')
        else:
            print('没票了.....')
        q.put(dic)
    
    def task(ticket,q):
        search(ticket)
        _get(q)
    
    
    if __name__ == '__main__':
        q = Queue(1)
        q.put({'count': 1})
        ticket = 1
        for i in range(5):
            p = Process(target=task,args=(ticket,q))
            p.start()
    通过队列模拟抢票系统

    什么是队列

    队列是存在与内存中的一个容器,最大的特点:先进先出(FIFO)

    利用队列进行进程之间的通信,简单,方便,不用自己手动加锁,队列自带优质阻塞,可持续化取数据

    from multiprocessing import Queue
    
    q = Queue(3)  # 可以设置元素个数
    
    def func():
         print('in func')
    
    q.put('alex')
    q.put({'count': 1})
    q.put(func)
    q.put(666)  # 当队列数据已经达到上限,在插入数据的时候,程序就会夯住.
                         当你将数据全部取完继续再取值时,程序也会夯住
    
    put中有block参数 默认为True 当你插入的数据超过最大限度,默认阻塞.改成False 数据超过最大限度,不阻塞了直接报错.
    
    put中有timeout参数 延时报错,比如q.put(3,timeout = 3) 超过三秒再put不进,程序就会报错
    
    get也有这两个参数
    队列的基本方法
    # 小米:抢手环4.预期发售10个.
    # 有100个人去抢.
    import os
    from multiprocessing import Queue
    from multiprocessing import Process
    
    def task(q):
        try:
            q.put(f'{os.getpid()}',block=False)
        except Exception:
            return
    
    
    if __name__ == '__main__':
        q = Queue(10)
        for i in range(100):
            p = Process(target=task,args=(q,))
            p.start()
        for i in range(1,11):
            print(f'排名第{i}的用户: {q.get()}',)
    用进程通信队列模拟实例

    栈与队列性质类似,特点与队列相反:先进后出(FILO)

    import queue
    
    q = queue.LifoQueue()
    q.put(1)
    q.put(3)
    q.put('barry')
    
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.get())
    栈的基本方法

    优先级队列

    需要通过元组的形式放入,(int,数据) int 代表优先级,数字越低,优先级越高

    import queue
    q = queue.PriorityQueue(3)
    
    q.put((10, '垃圾消息'))
    q.put((-9, '紧急消息'))
    q.put((3, '一般消息'))
    
    print(q.get())
    print(q.get())
    print(q.get())
    队列的基本方法

    生产者消费者模型(多应用于并发)

    生产者:生产数据进程

    消费者:对生产者生产出来的数据做进一步处理进程

    from multiprocessing import Process
    from multiprocessing import Queue
    import time
    import random
    
    
    def producer(name,q):
        for i in range(1,6):
            time.sleep(random.randint(1,3))
            res = f'{i}号包子'
            q.put(res)
    
            print(f'33[0;32m 生产者{name}: 生产了{res}33[0m')
    
    
    
    def consumer(name,q):
        while 1:
            try:
                time.sleep(random.randint(1,3))
                ret = q.get(timeout=5)
                print(f'消费者{name}: 吃了{ret}')
            except Exception:
                return
    
    
    
    
    if __name__ == '__main__':
    
        q = Queue()
    
        p1 = Process(target=producer, args=('alex',q))
        p2 = Process(target=consumer, args=('barry',q))
    
        p1.start()
        p2.start()

     模型中有三个主体:生产者,消费者,容器队列

    合理的调控多个进程去生成数据以及提取数据,中间有个必不可少的环节就是容器队列

    如果没有容器,生产者与消费者形成强耦合性,不合理,所以要有一个缓冲区(容器),平衡了生产力与消费力

    总结

    进程之间的通信

    基于文件 + 锁    效率低,麻烦

    基于队列       推荐

    基于管道       管道自己加锁,底层可能会出现数据丢失损坏

    应用

    多个进程抢占一个资源

    串行,有序以及数据安全

    多个进程实现并发效果

    生产者消费者模型

  • 相关阅读:
    css计数器
    使用area标签模仿a标签
    移动端判断触摸的方向
    简单圆形碰撞检测
    冒泡排序和二分查找算法
    基本数据类型float和double的区别
    HTML5-form表单
    代码块以及它们的执行顺序
    反射
    Excel表格的导入导出
  • 原文地址:https://www.cnblogs.com/biulo/p/11232892.html
Copyright © 2011-2022 走看看