zoukankan      html  css  js  c++  java
  • (7)进程间的通信-互斥锁、队列(生产者消费者模型)

    进程和进程之间的数据是物理隔绝的:

    比如qq进程和word进程的数据是不互通的,如果互通则会导致数据混乱,所以进程和进程之间是物理隔绝的,避免数据混乱

    进程和进程之间实现数据操作和共享

    1、通过文件(硬盘级别)

    2、IPC机制-队列(内存级别)

    1、通过文件模拟数据互通(硬盘级别)

    import random
    import time
    from multiprocessing import Process
    import os, json

    '''一个模拟抢票的简易小程序'''


    def check():
    '''查看票的剩余'''
    time.sleep(1) # 模拟网络延迟
    with open('db.txt', 'r', encoding='utf-8')as fb:
    dic = json.load(fb)
    print('%s 查看剩余票数 %s' % (os.getpid(), dic['count'])) # os.pid代表进程号,count就是文件内的剩余数字


    def getTicket():
    '''购票模块'''
    time.sleep(2)
    with open('db.txt', 'r', encoding='utf-8')as fb:
    dic = json.load(fb) #序列化读取文件内容

    if dic['count'] > 0: #如果输了大于1则开始购票
    print('抢票中')
    dic['count'] -= 1 #这里要将数量减去写入文件
    time.sleep(random.randint(0,10)) # random.randint就是随机产生指定范围的数
    with open('db.txt', 'w', encoding='utf-8')as fa:
    json.dump(dic, fa) # 直接序列化写入文件
    print('%s 购买成功! ' % os.getpid())
    else:
    print('%s 抢票失败!' % os.getpid())


    def task(): # 这个函数是执行另外两个函数的
    check() # 执行查看
    getTicket() # 执行抢票


    if __name__ == '__main__':
    for i in range(10): # for 循环开启进程,直接开启1-10个
    p = Process(target=task, name='抢票程序')
    p.start()

    PS:这个程序有个问题就是一个抢票成功,则其他的进程也会显示抢票成功,数据是互通了,但是数据混乱了

    通过文件模拟数据互通-并且一个抢票成功,其他的进程都显示抢票失败

    通过互斥锁实现并行,并且成功抢票,数据互通不混乱

    互斥锁原理:比喻就是将数据放入一个盒子内,锁起来,让所有的进程去抢,第一个抢到的自然能够修改盒子内的数据,修改后再将盒子给其他的进程,这样其他的进程获得到的数据就是一个更新的数据,这样进程间的数据互通了,而且不会混乱

    互斥锁实例

    import random
    import time
    from multiprocessing import Process, Lock # 互斥锁的功能就是调用multiprocessing包内的模块Lock
    import os, json

    '''一个模拟抢票的简易小程序'''


    def check():
    '''查看票的剩余模块'''
    time.sleep(1) # 模拟网络延迟
    with open('db.txt', 'r', encoding='utf-8')as fb:
    dic = json.load(fb)
    print('%s 查看剩余票数 %s' % (os.getpid(), dic['count'])) # os.pid代表进程号,count就是文件内的剩余数字


    def getTicket():
    '''购票模块'''
    time.sleep(2)
    with open('db.txt', 'r', encoding='utf-8')as fb:
    dic = json.load(fb)

    if dic['count'] > 0:
    print('%s 抢票中' % os.getpid())
    dic['count'] -= 1
    time.sleep(random.randint(0, 10)) # random.randint就是随机产生指定范围的数
    with open('db.txt', 'w', encoding='utf-8')as fa:
    json.dump(dic, fa) # 直接序列化写入文件
    print('%s 购买成功! ' % os.getpid())
    else:
    print('%s 抢票失败!' % os.getpid())


    def task(mutex): # 这个函数是执行另外两个函数的,并且接收互斥锁这个参数
    check() # 执行查看模块
    mutex.acquire() # 这个就是上锁,上锁一定要在有执行结果的程序前面
    PS:这时候只有一个子进程在操作这个文件,就是抢到这把锁的子进程将数据锁起来自己操作,完成后再释放这把锁让其他的子进程可以共享数据
    getTicket() # 执行抢票
    mutex.release() # 执行完后要释放这把锁让其他子进程用数据


    if __name__ == '__main__':
    mutex = Lock() # mutex就是互斥的意思,Lock()就是开启互斥锁
    for i in range(10): # for 循环开启进程,直接开启1-10个
    p = Process(target=task, args=(mutex,), name='抢票程序') # 要将互斥锁变成一个参数传入
    p.start()

    PS:acquire(上锁)不能连续,只能等release(释放)后才能在acquire(上锁)

    PS:日常程序中不会刻意去加锁,容易出现问题

    join和mutex的区别

    join:会等待所有的子进程执行完毕再执行主进程,程序就变成了串行

    mutex:互斥锁会将共享的数据操作,变成串行,虽然效率降低了,但是安全性和数据完整性提升,共享数据不会混乱

    2、通过队列实现数据的操作(内存级别)

    队列通过multiprocessing包内的模块Queue

    队列特点:先进先出(有点类似栈,先进先出)

    '''开启队列,就要用实例化对象去开启'''
    q = Queue(3) # 参数就是指定管道内有多少个队列

    '''数据放入队列中'''
    q.put('aaaa') # 括号内的参数就是要放入的数据

    '''从队列中获取数据'''
    q.get()

    开启队列和放入数据以及获取数据

    from multiprocessing import Queue

    '''开启队列放入数据'''
    q = Queue(3)
    q.put('aaaa')
    q.put(123)
    q.put({'data':'datas'})

    '''从队列中获取数据'''
    ''' 这里设置两个参数,第二个timeout就是等待的时间2秒,用处就是等待2秒,队列中如果没有新的数据进来直接报错退出'''
    print(q.get(block=True,timeout=2))
    print(q.get(block=True,timeout=2))
    print(q.get(block=True,timeout=2))
    print(q.get(block=True,timeout=2))

    队列引出的一个概念,生产者和消费者模型

    实现的方式:

    1、Python

    2、rabbitmq、kafka、rocketmq

    PS:rabbitmq完美实现这个模型,是一个实现了这个模型的产品

    这个模型解决的问题

    1、这个模型解决了生产者和消费者之间的耦合,完全做了解耦

    2、生产者只需要把数据放入队列,即可,不需要关心消费者的消费能力,消费者只要从队列内获取数据即可,不需要关心生产者的生产能力

    3、解决了大量数据一下子涌入服务器,导致服务器处理不过来而出现故障

    生产者消费者模型实例

    import random
    import time
    from multiprocessing import Queue,Process

    def producer(q,name,food):
    '''生产者模块'''
    for i in range(3):
    res = '%s%s'%(food,i)
    time.sleep(random.randint(1,3)) #模拟一个延迟
    q.put(res) #将生产内容放入队列
    print('%s 生产了 %s'%(name,res))


    def consumer(q,name):
    '''消费者模型'''
    while True:
    res = q.get() #取数据
    if res == None:break #这里获取到None就退出
    time.sleep(random.randint(1,3)) #模拟延迟
    print('%s 消费了 %s'%(name,res))


    if __name__ == '__main__':
    q = Queue()
    p = Process(target=producer,args=(q,'egon','馒头'))

    c = Process(target=consumer,args=(q,'alex'))

    p.start()
    c.start()
    print('主。。。')

    生产者消费者模型实例(多消费者实例)

    import random
    import time
    from multiprocessing import Process, JoinableQueue


    def producer(q,name,food):
    '''生产者模块'''
    for i in range(3):
    res = '%s%s'%(food,i)
    time.sleep(random.randint(1,3)) #模拟一个延迟
    q.put(res) #将生产内容放入队列
    print('%s 生产了 %s'%(name,res))


    def consumer(q,name):
    '''消费者模型'''
    while True:
    res = q.get() #取数据
    time.sleep(random.randint(1,3)) #模拟延迟
    print('%s 消费了 %s'%(name,res))

    '''程序运行到这一行的时候就告诉程序我已经取完数据了可以退出,实现了数据取完进程结束的效果'''
    q.task_done() #告诉队列数据已经取完(消费完了)


    if __name__ == '__main__':
    q = JoinableQueue() #这个函数就是等待队列中的数据执行完
    p1 = Process(target=producer,args=(q,'egon','馒头'))
    p2 = Process(target=producer, args=(q, 'lisa', '蛋糕'))
    p3 = Process(target=producer, args=(q, 'jack', '煎饼'))

    c1 = Process(target=consumer,args=(q,'alex'))
    c2 = Process(target=consumer, args=(q, 'rouse'))


    p1.start()
    p2.start()
    p3.start()

    '''使用守护进程让'''
    c1.daemon = True
    c2.daemon = True
    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()

    q.join() # 等着队列中数据全部取完,消费者就应该结束,这里需要用到守护进程

    ------------------

    -----------------

    
    
  • 相关阅读:
    RramSim2
    DiskSim
    FTL2
    Durable NAND flash memory management
    node系列:全局与本地
    CSS系列:less备忘
    Sql Server 2008:调试
    JavaScript系列:再巩固-原型链
    移动端 :meta标签1万个作用
    Asp.Net:Repeater 详情 备用
  • 原文地址:https://www.cnblogs.com/shizhengquan/p/10233316.html
Copyright © 2011-2022 走看看