zoukankan      html  css  js  c++  java
  • 并发编程——进程同步

    锁——multiprocess.Lock:

      加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,会牺牲了速度却保证了数据安全。

      虽然可以用文件共享数据实现进程间通信,但问题是:

      1,效率低。2,需要自己加锁处理。

    multiprocess模块为我们提供的基于消息的IPC通信机制:队列和管道。

      队列和管道都是将数据存放于内存中:

      队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,我们应该避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可扩展性。

    import os
    import time
    import random
    from multiprocessing import Lock
    from multiprocessing import Process
    
    def work(n,lock):
        lock.acquire()
        print('%s:%s is running' % (n,os.getpid()))
        time.sleep(random.random())
        print('%s:%s is done' % (n,os.getpid()))
        lock.release()
    
    if __name__ == '__main__':
        lock = Lock()
        for i in range(10):
            p = Process(target=work,args=(i,lock))
            p.start()
    
    # 同步控制
    # 只要用到了锁,锁之内的代码就会变成同步的了
    # 锁:控制一段代码,同一时间 只能被一个进程执行
    import json
    import time
    import random
    from multiprocessing import Lock
    from multiprocessing import Process
    
    def check_ticket(i):
        with open('ticket') as f:
            ticket_count = json.load(f)     # 通过json获取文件中的信息
        print('person%s查询当前余票:'% i, ticket_count['count'])
    
    def buy_ticket(i,lock):
        check_ticket(i) # 先进行查票操作
        lock.acquire()  # 得到钥匙,进入程序
        with open('ticket') as f:   
            ticket_count = json.load(f)     # 这一步是为了再次判断是否还有余票
        time.sleep(random.random())
        if ticket_count['count']>0:
            print('person%s购票成功'% i)
            ticket_count -= 1       # 字典的赋值
        else:
            print('余票不足,person%s购票失败'% i)
        time.sleep(random.random())
        with open('ticket','w')as f:
            json.dump(ticket_count,f)       # 通过json.dump将字典转化成字符串形式,然后写入文件。
        lock.release()  # 归还钥匙
    
    if __name__ == '__main__':
        lock = Lock()
        for i in range(10):
            Process(target=buy_ticket,args = (i,lock)).start()
    模拟抢票程序

    信号量——multiprocess.Semaphore(了解)

    互斥锁同时只允许一个线程更爱数据,而信号量Semaphore是同时允许一定数量的线程更改数据。
    信号量同步基于内部计数器,每调用一次acquire(),计数器减1,每调用一次release(),计数器加1,当计数器为0时,acquire()调用被阻塞,这是迪科斯彻信号量概念p()和v()的python实现,信号量同步机制适用于访问像服务器这样的有限资源。
    信号量与进程池的概念很像,但是要区分开,信号量涉及到枷锁的概念。
    信号量介绍
    import time
    import random
    from multiprocessing import Process
    from multiprocessing import Semaphore
    
    def ktv(i,sema):
        sema.acquire()
        print('person%s 进来唱歌了'% i)
        time.sleep(random.randint(1,5))
        print('person%s 从ktv出去了'% i)
        sema.release()
    if __name__ == '__main__':
        sema = Semaphore(3)
        for i in range(5):
            Process(target=ktv,args=(i,sema)).start()
    
    # Semaphore  就是锁+计数器
    # acquire()     计数器-1
    
    # release()     计数器+1
    
    # 当计数器为0,acquire()就会阻塞
    实例

    事件——multiprocess.Event  (了解)

    python线程的时间用于主线程控制其他线程的执行,事件主要提供了三个方法:set(),wait(),clear().
    
    
    事件处理的机制,全局定义了一个flag,如果flag值为False,那么当程序执行,event.wait()方法时就会阻塞,如果flag值为True,那么event.wait 方法时便不再阻塞。
    
    clear:将flag设置为False.
    set:将flag设置为True.
    Event介绍
    import time
    import json
    import random
    from multiprocessing import Event
    from multiprocessing import Process
    
    def car(i,e):
        if not e.is_set():
            print('car%s正在等待'% i)
        e.wait()
        print('car%s正在通过'% i)
    
    def traffic_light(e):
        print('33[1;31m红灯亮了33[0m')
        time.sleep(2)
        while True:
            if not e.is_set():
                print('33[1;32m绿灯亮了33[0m')
                e.set()
            elif e.is_set():
                print('33[1;31m红灯亮了33[0m')
                e.clear()
            time.sleep(2)
    
    if __name__ == '__main__':
        e = Event()
        Process(target=traffic_light,args=(e,)).start()
        for i in range(10):
            time.sleep(random.randrange(1,5,3))
            Process(target=car,args=(i,e)).start()
    红绿灯实例

    进程之间的通信——队列和管道:        (multiprocess.Queue,multiprocess.Pipe)

      进程间的通信:IPC(Inter-Process Communication)

    队列:创建共享的进程队列,Queue时多进程安全的队列,可以使用Queue实现多进程之间的数据传递。

    Queue([maxsize])
    
    创建共享的进程队列。
    
    参数:maxsize是队列中允许的最大项数,如果省略此参数,则无大小限制。
    底层队列使用管道和锁定实现。
    
    Queue的实例q具有以下方法:
    
    q.get( [ block [ ,timeout ] ] ) 
    返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True. 如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常。
    
    q.get_nowait( ) 
    同q.get(False)方法。
    
    q.put(item [, block [,timeout ] ] ) 
    将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Empty异常(定义在Queue库模块中)。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。
    
    q.qsize() 
    返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引发NotImplementedError异常。
    
    
    q.empty() 
    如果调用此方法时 q为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。
    
    q.full() 
    如果q已满,返回为True. 由于线程的存在,结果也可能是不可靠的(参考q.empty()方法)。。
    Queue的方法介绍
    q.close() 
    关闭队列,防止队列中加入更多数据。调用此方法时,后台线程将继续写入那些已入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将自动调用此方法。关闭队列不会在队列使用者中生成任何类型的数据结束信号或异常。例如,如果某个使用者正被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。
    
    q.cancel_join_thread() 
    不会再进程退出时自动连接后台线程。这可以防止join_thread()方法阻塞。
    
    q.join_thread() 
    连接队列的后台线程。此方法用于在调用q.close()方法后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread()方法可以禁止这种行为。
    Queue的其他方法
    from multiprocessing import Queue
    q = Queue(3)    # 限定队列中只能存在3项数据。
    
    q.put(1)
    q.put(2)
    q.put(3)
    # q.put(4)
    # 如果队列已经满了,程序就会停在这里,等待之前的数据被取走,再将数据放入q队列中
    # 如果数据没有被取走,那么程序将会一直停在这里。
    try:
        q.put_nowait(3) # 使用put_nowait()方法,如果队列满了不会阻塞而是会报错。
    except:
        print('队列已经满了')     # 报错则会打印
    
    print(q.full())     # 判断队列是否满了,返回bool值
    
    print(q.get())
    print(q.full())
    print(q.get())
    print(q.get())
    # print(q.get())        同放入一样,如果队列已经空了,继续取值,就会出现阻塞。
    try:
        print(q.get_nowait())   # 用get_nowait()方法取值,如果队列为空不会阻塞则会报错
    except:
        print('队列已经空了')
        
    print(q.empty())    # 判断队列是否为空,返回bool值
    Queue例子
    import time
    from multiprocessing import Process,Queue
    
    def f(q):
        q.put([time.asctime(),'from Eva','hello']) # 调用主函数中p进程传递过来的进程参数put向队列中添加数据。
    
    if __name__ == '__main__':
        q = Queue()     # 创建一个Queue对象
        p = Process(target=f,args=(q,))     # 创建一个进程
        p.start()
        print(q.get())  # ['Fri May 11 17:28:43 2018', 'from Eva', 'hello']
        p.join()
    子进程发送数据给父进程
    import os
    import time
    import multiprocessing
    
    # 向queue中输入数据的函数
    def inputQ(queue):
        info = str(os.getpid()) + '(put):' + str(time.asctime())
        queue.put(info)
    
    # 向queue中输出数据的函数
    def outputQ(queue):
        info = queue.get()
        print('%s%s33[32m%s33[0m'%(str(os.getpid()),'(get):',info))
    
    if __name__ == '__main__':
        multiprocessing.freeze_support()
        record1 = []
        record2 = []
        queue = multiprocessing.Queue(3)
    
        # 输入进程
        for i in range(10):
            process = multiprocessing.Process(target=inputQ,args=(queue,))
            process.start()
            record1.append(process)
    
        # 输出进程
        for i in range(10):
            process = multiprocessing.Process(target=outputQ, args=(queue,))
            process.start()
            record2.append(process)
    
        for p in record1:
            p.join()
    
        for p in record2:
            p.join()
    批量生产数据放入队列在批量获得结果
  • 相关阅读:
    总结报告的感想
    第14、15週PTA題目的處理
    PTA題目的處理(三)
    PTA题目的處理(四)
    PTA題目的處理(二)
    PTA題目的處理(一)
    國慶和中秋的學習成果
    剛進入大學一個月的總結和作業
    【接口平台】too many values to unpack
    【接口平台】生成静态模拟数据
  • 原文地址:https://www.cnblogs.com/stfei/p/9025870.html
Copyright © 2011-2022 走看看