zoukankan      html  css  js  c++  java
  • (7)进程间的通信-互斥锁、队列(生产者消费者模型)

    进程和进程之间的数据是物理隔绝的:

    比如qq进程和word进程的数据是不互通的,如果互通则会导致数据混乱,所以进程和进程之间是物理隔绝的,避免数据混乱

    进程和进程之间实现数据操作和共享

    1、通过文件(硬盘级别)

    2、IPC机制-队列(内存级别)

    1、通过文件模拟数据互通(硬盘级别)

    import random
    import time
    from multiprocessing import Process
    import os, json

    '''一个模拟抢票的简易小程序'''


    def check():
    '''查看票的剩余'''
    time.sleep(1) # 模拟网络延迟
    with open('db.txt', 'r', encoding='utf-8')as fb:
    dic = json.load(fb)
    print('%s 查看剩余票数 %s' % (os.getpid(), dic['count'])) # os.pid代表进程号,count就是文件内的剩余数字


    def getTicket():
    '''购票模块'''
    time.sleep(2)
    with open('db.txt', 'r', encoding='utf-8')as fb:
    dic = json.load(fb) #序列化读取文件内容

    if dic['count'] > 0: #如果输了大于1则开始购票
    print('抢票中')
    dic['count'] -= 1 #这里要将数量减去写入文件
    time.sleep(random.randint(0,10)) # random.randint就是随机产生指定范围的数
    with open('db.txt', 'w', encoding='utf-8')as fa:
    json.dump(dic, fa) # 直接序列化写入文件
    print('%s 购买成功! ' % os.getpid())
    else:
    print('%s 抢票失败!' % os.getpid())


    def task(): # 这个函数是执行另外两个函数的
    check() # 执行查看
    getTicket() # 执行抢票


    if __name__ == '__main__':
    for i in range(10): # for 循环开启进程,直接开启1-10个
    p = Process(target=task, name='抢票程序')
    p.start()

    PS:这个程序有个问题就是一个抢票成功,则其他的进程也会显示抢票成功,数据是互通了,但是数据混乱了

    通过文件模拟数据互通-并且一个抢票成功,其他的进程都显示抢票失败

    通过互斥锁实现并行,并且成功抢票,数据互通不混乱

    互斥锁原理:比喻就是将数据放入一个盒子内,锁起来,让所有的进程去抢,第一个抢到的自然能够修改盒子内的数据,修改后再将盒子给其他的进程,这样其他的进程获得到的数据就是一个更新的数据,这样进程间的数据互通了,而且不会混乱

    互斥锁实例

    import random
    import time
    from multiprocessing import Process, Lock # 互斥锁的功能就是调用multiprocessing包内的模块Lock
    import os, json

    '''一个模拟抢票的简易小程序'''


    def check():
    '''查看票的剩余模块'''
    time.sleep(1) # 模拟网络延迟
    with open('db.txt', 'r', encoding='utf-8')as fb:
    dic = json.load(fb)
    print('%s 查看剩余票数 %s' % (os.getpid(), dic['count'])) # os.pid代表进程号,count就是文件内的剩余数字


    def getTicket():
    '''购票模块'''
    time.sleep(2)
    with open('db.txt', 'r', encoding='utf-8')as fb:
    dic = json.load(fb)

    if dic['count'] > 0:
    print('%s 抢票中' % os.getpid())
    dic['count'] -= 1
    time.sleep(random.randint(0, 10)) # random.randint就是随机产生指定范围的数
    with open('db.txt', 'w', encoding='utf-8')as fa:
    json.dump(dic, fa) # 直接序列化写入文件
    print('%s 购买成功! ' % os.getpid())
    else:
    print('%s 抢票失败!' % os.getpid())


    def task(mutex): # 这个函数是执行另外两个函数的,并且接收互斥锁这个参数
    check() # 执行查看模块
    mutex.acquire() # 这个就是上锁,上锁一定要在有执行结果的程序前面
    PS:这时候只有一个子进程在操作这个文件,就是抢到这把锁的子进程将数据锁起来自己操作,完成后再释放这把锁让其他的子进程可以共享数据
    getTicket() # 执行抢票
    mutex.release() # 执行完后要释放这把锁让其他子进程用数据


    if __name__ == '__main__':
    mutex = Lock() # mutex就是互斥的意思,Lock()就是开启互斥锁
    for i in range(10): # for 循环开启进程,直接开启1-10个
    p = Process(target=task, args=(mutex,), name='抢票程序') # 要将互斥锁变成一个参数传入
    p.start()

    PS:acquire(上锁)不能连续,只能等release(释放)后才能在acquire(上锁)

    PS:日常程序中不会刻意去加锁,容易出现问题

    join和mutex的区别

    join:会等待所有的子进程执行完毕再执行主进程,程序就变成了串行

    mutex:互斥锁会将共享的数据操作,变成串行,虽然效率降低了,但是安全性和数据完整性提升,共享数据不会混乱

    2、通过队列实现数据的操作(内存级别)

    队列通过multiprocessing包内的模块Queue

    队列特点:先进先出(有点类似栈,先进先出)

    '''开启队列,就要用实例化对象去开启'''
    q = Queue(3) # 参数就是指定管道内有多少个队列

    '''数据放入队列中'''
    q.put('aaaa') # 括号内的参数就是要放入的数据

    '''从队列中获取数据'''
    q.get()

    开启队列和放入数据以及获取数据

    from multiprocessing import Queue

    '''开启队列放入数据'''
    q = Queue(3)
    q.put('aaaa')
    q.put(123)
    q.put({'data':'datas'})

    '''从队列中获取数据'''
    ''' 这里设置两个参数,第二个timeout就是等待的时间2秒,用处就是等待2秒,队列中如果没有新的数据进来直接报错退出'''
    print(q.get(block=True,timeout=2))
    print(q.get(block=True,timeout=2))
    print(q.get(block=True,timeout=2))
    print(q.get(block=True,timeout=2))

    队列引出的一个概念,生产者和消费者模型

    实现的方式:

    1、Python

    2、rabbitmq、kafka、rocketmq

    PS:rabbitmq完美实现这个模型,是一个实现了这个模型的产品

    这个模型解决的问题

    1、这个模型解决了生产者和消费者之间的耦合,完全做了解耦

    2、生产者只需要把数据放入队列,即可,不需要关心消费者的消费能力,消费者只要从队列内获取数据即可,不需要关心生产者的生产能力

    3、解决了大量数据一下子涌入服务器,导致服务器处理不过来而出现故障

    生产者消费者模型实例

    import random
    import time
    from multiprocessing import Queue,Process

    def producer(q,name,food):
    '''生产者模块'''
    for i in range(3):
    res = '%s%s'%(food,i)
    time.sleep(random.randint(1,3)) #模拟一个延迟
    q.put(res) #将生产内容放入队列
    print('%s 生产了 %s'%(name,res))


    def consumer(q,name):
    '''消费者模型'''
    while True:
    res = q.get() #取数据
    if res == None:break #这里获取到None就退出
    time.sleep(random.randint(1,3)) #模拟延迟
    print('%s 消费了 %s'%(name,res))


    if __name__ == '__main__':
    q = Queue()
    p = Process(target=producer,args=(q,'egon','馒头'))

    c = Process(target=consumer,args=(q,'alex'))

    p.start()
    c.start()
    print('主。。。')

    生产者消费者模型实例(多消费者实例)

    import random
    import time
    from multiprocessing import Process, JoinableQueue


    def producer(q,name,food):
    '''生产者模块'''
    for i in range(3):
    res = '%s%s'%(food,i)
    time.sleep(random.randint(1,3)) #模拟一个延迟
    q.put(res) #将生产内容放入队列
    print('%s 生产了 %s'%(name,res))


    def consumer(q,name):
    '''消费者模型'''
    while True:
    res = q.get() #取数据
    time.sleep(random.randint(1,3)) #模拟延迟
    print('%s 消费了 %s'%(name,res))

    '''程序运行到这一行的时候就告诉程序我已经取完数据了可以退出,实现了数据取完进程结束的效果'''
    q.task_done() #告诉队列数据已经取完(消费完了)


    if __name__ == '__main__':
    q = JoinableQueue() #这个函数就是等待队列中的数据执行完
    p1 = Process(target=producer,args=(q,'egon','馒头'))
    p2 = Process(target=producer, args=(q, 'lisa', '蛋糕'))
    p3 = Process(target=producer, args=(q, 'jack', '煎饼'))

    c1 = Process(target=consumer,args=(q,'alex'))
    c2 = Process(target=consumer, args=(q, 'rouse'))


    p1.start()
    p2.start()
    p3.start()

    '''使用守护进程让'''
    c1.daemon = True
    c2.daemon = True
    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()

    q.join() # 等着队列中数据全部取完,消费者就应该结束,这里需要用到守护进程

    ------------------

    -----------------

    
    
  • 相关阅读:
    Linq聚合操作之Aggregate,Count,Sum,Distinct源码分析
    Linq分区操作之Skip,SkipWhile,Take,TakeWhile源码分析
    Linq生成操作之DefautIfEmpty,Empty,Range,Repeat源码分析
    Linq基础操作之Select,Where,OrderBy,ThenBy源码分析
    PAT 1152 Google Recruitment
    PAT 1092 To Buy or Not to Buy
    PAT 1081 Rational Sum
    PAT 1084 Broken Keyboard
    PAT 1077 Kuchiguse
    PAT 1073 Scientific Notation
  • 原文地址:https://www.cnblogs.com/shizhengquan/p/10233316.html
Copyright © 2011-2022 走看看