zoukankan      html  css  js  c++  java
  • 网络编程——锁,信号量,事件机制,队列,队列实现进程间通信,生产者消费者模型

    学习锁机制

      l = LOCK()

      一把锁配一把钥匙

      拿钥匙,锁门,l.acquire()

      还钥匙,开门,i.release()

    from multiprocessing import Lock
    
    l = Lock()
    
    l.acquire()# 拿走钥匙,锁门,不让其他人进屋
    
    l.release()# 释放锁。  还钥匙,开门,允许其他人进屋
    View Code
    例子:
    from multiprocessing import Process,Value,Lock
    import time
    
    
    def get_money(num,l):# 取钱
        l.acquire()# 拿走钥匙,锁上门,不允许其他人进屋
        for i in range(100):
            num.value -= 1
            print(num.value)
            time.sleep(0.01)
        l.release()# 还钥匙,打开门,允许其他人进屋
    
    def put_money(num,l):# 存钱
        l.acquire()
        for i in range(100):
            num.value += 1
            print(num.value)
            time.sleep(0.01)
        l.release()
    
    if __name__ == '__main__':
        num = Value('i',100)
        l = Lock()
        p = Process(target=get_money,args=(num,l))
        p.start()
        p1 = Process(target=put_money, args=(num,l))
        p1.start()
        p.join()
        p1.join()
        print(num.value)
    View Code
    from multiprocessing import Process, Lock
    import json, time
    def rob_tickets (i, l):
        time.sleep(1)
        l.acquire() # 拿钥匙
        with open('file', 'r') as f:
            last_tickets_info = json.load(f)
            last_tickets = last_tickets_info['count']
            if last_tickets > 0:
                last_tickets = last_tickets - 1
                last_tickets_info['count'] = last_tickets
                with open('file', 'w') as f:
                    json.dump(last_tickets_info, f)
                print('33[32m%s 抢到票了 33[0m' % i)
            else :
                print('33[34m%s 没抢到票 33[0m' % i)
        l.release() # 还钥匙
    if __name__ == '__main__':
        l = Lock()
        for i in range(10):
            p = Process(target=rob_tickets, args=(i, l))
            p.start()
                    

    学习信号机制

      sem = Semaphore(n)

      n, 初始化一个锁配n把钥匙,一个int型

      拿钥匙,锁门,l.acquire() 

      还钥匙,开门,i.release()

      信号机制比锁机制多了一个计数器,这个计数器是用来记录当前剩余几把钥匙的

      当计数器为0时,表示没有钥匙了,此时acquire()处于阻塞,等待有人还了钥匙,才能继续执行。

      对于计数器来说,每acquire一次,计数器内部就减1,release一次 计数器就加1

     例子:

    from multiprocessing import Process,Semaphore
    import time
    import random
    
    def func(i,sem):
        sem.acquire()
        print('第%s个人进入小黑屋,拿了钥匙锁上门' % i)
        time.sleep(random.randint(3,5))
        print('第%s个人出去小黑屋,还了钥匙打开门' % i)
        sem.release()
    
    if __name__ == '__main__':
        sem = Semaphore(5)# 初始化了一把锁5把钥匙,也就是说允许5个人同时进入小黑屋
        # 之后其他人必须等待,等有人从小黑屋出来,还了钥匙,才能允许后边的人进入
        for i in range(20):
            p = Process(target=func,args=(i,sem,))
            p.start()
    View Code

    发廊?

    from multiprocessing import Process,Semaphore
    import time
    import random
    
    def func(i,sem):
        sem.acquire()
        print('第%s个人进入小黑屋,拿了钥匙锁上门' % i)
        time.sleep(random.randint(3,5))
        print('第%s个人出去小黑屋,还了钥匙打开门' % i)
        sem.release()
    
    if __name__ == '__main__':
        sem = Semaphore(5)# 初始化了一把锁5把钥匙,也就是说允许5个人同时进入小黑屋
        # 之后其他人必须等待,等有人从小黑屋出来,还了钥匙,才能允许后边的人进入
        for i in range(20):
            p = Process(target=func,args=(i,sem,))
            p.start()
    View Code
    from multiprocessing import Semaphore, Process
    import time, random
    def eat (i, s):
        s.acquire()
        print("33[36m 第 %s 个吃货吃饭 33[0m" % i)
        print('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
        time.sleep(random.randrange(3, 6))
        s.release()
        print("33[35m 第 %s 个吃货吃完饭了 33[0m" % i)
    if __name__ == '__main__':
        s = Semaphore(5)
        for i in range(10):
            p = Process(target=eat, args=(i, s))
            p.start()

     学习事件机制

      multprocessing 模块中的 Event

      e = Event

      e.set()

      e.clear()

      e.is_set()

      e.wait()

      事件时通过is_set()的bool值,来标识e.wait()的阻塞状态

      当 is_set()的bool值为True的时候,e.wait()是非阻塞状态

      当 is_set()的bool值为False的时候,e.wait()是阻塞状态

         当使用e.set()的时候,是将is_set()的bool值变为True

      当使用e.set()的时候,是将is_set()的bool值变为False

     举个栗子:

    from multiprocessing import Event
    
    e = Event()
    # e.set()
    # e.clear()
    # e.wait()
    # e.is_set()
    # 事件是通过is_set()的bool值,去标识e.wait() 的阻塞状态
    # 当is_set()的bool值为False时,e.wait()是阻塞状态
    # 当is_set()的bool值为True时,e.wait()是非阻塞状态
    # 当使用set()时,是把is_set的bool变为True
    # 当使用clear()时,是把is_set的bool变为False
    
    print(e.is_set())# False wait应该是阻塞住
    e.set()# 将is_set 的bool值变为True,将wait变为非阻塞
    e.wait()
    print(e.is_set())
    print(123)
    e.clear()
    print(e.is_set())
    e.wait()
    print(123)

    信号灯模拟:

    from multiprocessing import Process,Event
    import time
    import random
    
    def tra(e):
        '''信号灯函数'''
        # e.set()
        # print('33[32m 绿灯亮! 33[0m')
        while 1:# 红绿灯得一直亮着,要么是红灯要么是绿灯
            if e.is_set():# True,代表绿灯亮,那么此时代表可以过车
                time.sleep(5)# 所以在这让灯等5秒钟,这段时间让车过
                print('33[31m 红灯亮! 33[0m')# 绿灯亮了5秒后应该提示到红灯亮
                e.clear()# 把is_set设置为False
            else:
                time.sleep(5)# 此时代表红灯亮了,此时应该红灯亮5秒,在此等5秒
                print('33[32m 绿灯亮! 33[0m')# 红的亮够5秒后,该绿灯亮了
                e.set()# 将is_set设置为True
    
    def Car(i,e):
        e.wait()# 车等在红绿灯,此时要看是红灯还是绿灯,如果is_set为True就是绿灯,此时可以过车
        print('第%s辆车过去了'%i)
    
    if __name__ == '__main__':
        e = Event()
        triff_light = Process(target=tra,args=(e,))# 信号灯的进程
        triff_light.start()
        for i in range(50):# 描述50辆车的进程
            if i % 3 == 0:
                time.sleep(2)
            car = Process(target=Car,args=(i+1,e,))
            car.start()
    View Code

     方法2:

    from multiprocessing import Process, Event
    import time
    def light (e):
        while 1:
            print(' 红灯亮 ')
            time.sleep(5)
            e.set() # 把 is_set() 变为 True
            print(' 绿灯亮 ')
            time.sleep(3)
            e.clear() # 把 is_set() 变为 False
    def car (e,i):
        if not e.is_set(): # 如果为 False, 表示状态是阻塞状态 , 所以就需要等红绿灯
            print('33[33m 我们在等红灯 33[0m')
            e.wait() # 是将状态变为阻塞 ( 只有 e.is_set() 为 False 时候才管用 )
        else :
            print('33[036m 我们在通行 33[0m')
    if __name__ == '__main__':
        e = Event()
        p1 = Process(target=light, args=(e,))
        p1.start()
        while 1:
            time.sleep(1)
            for i in range(20):
                p2 = Process(target=car, args=(e, i))
                p2.start()

    队列:

      队列的方法

     

    from multiprocessing import Process, Queue, JoinableQueue
    import time
    q = Queue(5) # 设置队列缓冲区的大小
    for i in range(5): #添加的个数不得超过队列大小,否则添加不进去
        q.put(i) # 放数据
        print(q.get(i)) # 那数据
        print(q.full()) # q.full() 队列满了返回 True, 不满返回 False
        print(q.empty()) # 不可信 , 队列空了返回 True, 不为空返回 False
        print(q.qsize()) # 打印当前缓存区存放了多少数据
    while 1:
        try :
            s = q.get( False ) 
          # queue.Empty 相当于 q.get_nowait() #正常q.get()会在队列中没有东西的时候继续等待,阻塞住。
           q.get(False)是在没有东西可以拿的时候抛出Empty异常
    print('%s 正在拿 ' % s) except : print(' 已经没有东西可以拿了 ')

    2.队列实现进程间的通信

    from multiprocessing import Process, Queue, JoinableQueue
    import time
    def girl (q):
        print(' 来自男孩的消息 ', q.get())
        print(' 来自家长的忠告 ', q.get())
    def boy (q):
        q.put(' 约吗 ?')
    if __name__ == '__main__':
        q = Queue(2)
        girl_pro = Process(target=girl, args=(q,))
        boy_pro = Process(target=boy, args=(q,))
        girl_pro.start()
        boy_pro.start()
        # boy_pro.join()
        time.sleep(2) # 多进程执行的时候 , 父进程比子进程先执行 , 如果不阻塞或者先执行子进    程的话 , 会先执行下面的 q.put, 而
    子进程中的 q.put 会在之后才能执行
        q.put(' 好好学习 , 不要谈恋爱 !!')

     生产者消费者模型 

    例子1:

    from multiprocessing import Process, Queue, JoinableQueue
    import time
    def consumers (q, name):
    while 1:
        baozi = q.get()
        if baozi:
            time.sleep(1)
            print('%s 吃了 %s' % (name, baozi))
        else :
            print(' 吃成熊了 ')
            break
    def prodecers (q):
        for i in range(20):
            time.sleep(2)
            baozi = " 包子 %s 号 " % (i + 1)
            print(' 生产了 %s' % baozi)
            q.put(baozi)
        q.put( None )
    if __name__ == '__main__':
        q = Queue(5)
        pro = Process(target=prodecers, args=(q,))
        pro.start()
        con = Process(target=consumers, args=(q, ' 吕三儿 '))
        con.start()

    例子2:

    from multiprocessing import Process, Queue, JoinableQueue
    import time
    def consumers (q, name):
        while 1:
        time.sleep(2)
        try :
            baozi = q.get( False )
            if baozi == None :
                continue
            else :
                print('%s 吃了 %s' % (name, baozi))
        except :
            print(' 吃成熊了 ')
            break
    # def consumers(q, name):
        # while 1:
        # time.sleep(2)
        # baozi = q.get()
        # if baozi == None:
            # print(' 吃成熊了 ')
            # break
        # else:
            # print('%s 吃了 %s' % (name, baozi))
    def prodecers (q):
        for i in range(20):
            time.sleep(1)
            baozi = " 包子 %s 号 " % (i + 1)
            print(' 生产了 %s' % baozi)
            q.put(baozi)
        q.put( None )
    if __name__ == '__main__':
        q = Queue(5)
        pro = Process(target=prodecers, args=(q,))
        pro.start()
        con = Process(target=consumers, args=(q, ' 吕三儿 '))
        con.start()

    例子3:

    from multiprocessing import Process, Queue, JoinableQueue
    import time
    def consumers (q, name):
        while 1:
            baozi = q.get()
            if baozi:
                time.sleep(1)
                print('%s 吃了 %s' % (name, baozi))
            else :
                print(' 吃成熊了 ')
                break
    def prodecers (q):
        for i in range(20):
        time.sleep(2)
        baozi = " 包子 %s 号 " % (i + 1)
        print(' 生产了 %s' % baozi)
        q.put(baozi)
    if __name__ == '__main__':
        q = Queue(5)
        pro = Process(target=prodecers, args=(q,))
        pro.start()
        con = Process(target=consumers, args=(q, ' 吕三儿 '))
        con.start()
        pro.join() #生产者进程会先执行完毕,然后主进程会给消费者传一个已经生产完的标识,消费者接收到后结束进程
        q.put( None ) #给消费者发送一个已经生产完毕的标识

    例子4(JoinableQueue):

    from multiprocessing import Process, Queue, JoinableQueue
    import time
    def consumers (q, name):
        while 1:
            baozi = q.get()
            if baozi:
                time.sleep(2)
                print('%s 吃了 %s' % (name, baozi))
                q.task_done() # 消费者每消费一个数据 , 就会给 join 返回一个标识 , 但是即便信号全部发出去之后 , 还会继续 get 拿数
    据 , 没有数据就会一直等待 , 所以要给消费者设置为守护进程
    def prodecers (q):
        for i in range(20):
            time.sleep(1)
            baozi = " 包子 %s 号 " % (i + 1)
            print(' 生产了 %s' % baozi)
            q.put(baozi)
        q.join() # 记录了生产 20 个数据在队列中 , 此时会阻塞等待消费消费完队列中的所有数据
    
    if __name__ == '__main__':
        q = JoinableQueue(20)
        pro = Process(target=prodecers, args=(q,))
        pro.start()
        con = Process(target=consumers, args=(q, ' 吕三儿 '))
        con.daemon = True
        con.start()
        pro.join() # 父进程需要等待生产者执行完之后再执行结束
        
  • 相关阅读:
    CentOS部署ElasticSearch7.6.1集群
    Linux安装Elasticsearch7.x
    ElasticSearch安装为Windows服务
    SolrNet Group分组 实现
    ubuntu 下安装sublime
    LeetCode 3: Longest Substring Without Repeating Characters
    LeetCode 179: Largest Number
    LeetCode 1: Two Sum
    LeetCode 190: Reverse Bits
    LeetCode 7: Reverse Integer
  • 原文地址:https://www.cnblogs.com/Loren2o/p/9511782.html
Copyright © 2011-2022 走看看