zoukankan      html  css  js  c++  java
  • 并发编程——进程同步

    锁——multiprocess.Lock:

      加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,会牺牲了速度却保证了数据安全。

      虽然可以用文件共享数据实现进程间通信,但问题是:

      1,效率低。2,需要自己加锁处理。

    multiprocess模块为我们提供的基于消息的IPC通信机制:队列和管道。

      队列和管道都是将数据存放于内存中:

      队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,我们应该避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可扩展性。

    import os
    import time
    import random
    from multiprocessing import Lock
    from multiprocessing import Process
    
    def work(n,lock):
        lock.acquire()
        print('%s:%s is running' % (n,os.getpid()))
        time.sleep(random.random())
        print('%s:%s is done' % (n,os.getpid()))
        lock.release()
    
    if __name__ == '__main__':
        lock = Lock()
        for i in range(10):
            p = Process(target=work,args=(i,lock))
            p.start()
    
    # 同步控制
    # 只要用到了锁,锁之内的代码就会变成同步的了
    # 锁:控制一段代码,同一时间 只能被一个进程执行
    import json
    import time
    import random
    from multiprocessing import Lock
    from multiprocessing import Process
    
    def check_ticket(i):
        with open('ticket') as f:
            ticket_count = json.load(f)     # 通过json获取文件中的信息
        print('person%s查询当前余票:'% i, ticket_count['count'])
    
    def buy_ticket(i,lock):
        check_ticket(i) # 先进行查票操作
        lock.acquire()  # 得到钥匙,进入程序
        with open('ticket') as f:   
            ticket_count = json.load(f)     # 这一步是为了再次判断是否还有余票
        time.sleep(random.random())
        if ticket_count['count']>0:
            print('person%s购票成功'% i)
            ticket_count -= 1       # 字典的赋值
        else:
            print('余票不足,person%s购票失败'% i)
        time.sleep(random.random())
        with open('ticket','w')as f:
            json.dump(ticket_count,f)       # 通过json.dump将字典转化成字符串形式,然后写入文件。
        lock.release()  # 归还钥匙
    
    if __name__ == '__main__':
        lock = Lock()
        for i in range(10):
            Process(target=buy_ticket,args = (i,lock)).start()
    模拟抢票程序

    信号量——multiprocess.Semaphore(了解)

    互斥锁同时只允许一个线程更爱数据,而信号量Semaphore是同时允许一定数量的线程更改数据。
    信号量同步基于内部计数器,每调用一次acquire(),计数器减1,每调用一次release(),计数器加1,当计数器为0时,acquire()调用被阻塞,这是迪科斯彻信号量概念p()和v()的python实现,信号量同步机制适用于访问像服务器这样的有限资源。
    信号量与进程池的概念很像,但是要区分开,信号量涉及到枷锁的概念。
    信号量介绍
    import time
    import random
    from multiprocessing import Process
    from multiprocessing import Semaphore
    
    def ktv(i,sema):
        sema.acquire()
        print('person%s 进来唱歌了'% i)
        time.sleep(random.randint(1,5))
        print('person%s 从ktv出去了'% i)
        sema.release()
    if __name__ == '__main__':
        sema = Semaphore(3)
        for i in range(5):
            Process(target=ktv,args=(i,sema)).start()
    
    # Semaphore  就是锁+计数器
    # acquire()     计数器-1
    
    # release()     计数器+1
    
    # 当计数器为0,acquire()就会阻塞
    实例

    事件——multiprocess.Event  (了解)

    python线程的时间用于主线程控制其他线程的执行,事件主要提供了三个方法:set(),wait(),clear().
    
    
    事件处理的机制,全局定义了一个flag,如果flag值为False,那么当程序执行,event.wait()方法时就会阻塞,如果flag值为True,那么event.wait 方法时便不再阻塞。
    
    clear:将flag设置为False.
    set:将flag设置为True.
    Event介绍
    import time
    import json
    import random
    from multiprocessing import Event
    from multiprocessing import Process
    
    def car(i,e):
        if not e.is_set():
            print('car%s正在等待'% i)
        e.wait()
        print('car%s正在通过'% i)
    
    def traffic_light(e):
        print('33[1;31m红灯亮了33[0m')
        time.sleep(2)
        while True:
            if not e.is_set():
                print('33[1;32m绿灯亮了33[0m')
                e.set()
            elif e.is_set():
                print('33[1;31m红灯亮了33[0m')
                e.clear()
            time.sleep(2)
    
    if __name__ == '__main__':
        e = Event()
        Process(target=traffic_light,args=(e,)).start()
        for i in range(10):
            time.sleep(random.randrange(1,5,3))
            Process(target=car,args=(i,e)).start()
    红绿灯实例

    进程之间的通信——队列和管道:        (multiprocess.Queue,multiprocess.Pipe)

      进程间的通信:IPC(Inter-Process Communication)

    队列:创建共享的进程队列,Queue时多进程安全的队列,可以使用Queue实现多进程之间的数据传递。

    Queue([maxsize])
    
    创建共享的进程队列。
    
    参数:maxsize是队列中允许的最大项数,如果省略此参数,则无大小限制。
    底层队列使用管道和锁定实现。
    
    Queue的实例q具有以下方法:
    
    q.get( [ block [ ,timeout ] ] ) 
    返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True. 如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常。
    
    q.get_nowait( ) 
    同q.get(False)方法。
    
    q.put(item [, block [,timeout ] ] ) 
    将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Empty异常(定义在Queue库模块中)。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。
    
    q.qsize() 
    返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引发NotImplementedError异常。
    
    
    q.empty() 
    如果调用此方法时 q为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。
    
    q.full() 
    如果q已满,返回为True. 由于线程的存在,结果也可能是不可靠的(参考q.empty()方法)。。
    Queue的方法介绍
    q.close() 
    关闭队列,防止队列中加入更多数据。调用此方法时,后台线程将继续写入那些已入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将自动调用此方法。关闭队列不会在队列使用者中生成任何类型的数据结束信号或异常。例如,如果某个使用者正被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。
    
    q.cancel_join_thread() 
    不会再进程退出时自动连接后台线程。这可以防止join_thread()方法阻塞。
    
    q.join_thread() 
    连接队列的后台线程。此方法用于在调用q.close()方法后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread()方法可以禁止这种行为。
    Queue的其他方法
    from multiprocessing import Queue
    q = Queue(3)    # 限定队列中只能存在3项数据。
    
    q.put(1)
    q.put(2)
    q.put(3)
    # q.put(4)
    # 如果队列已经满了,程序就会停在这里,等待之前的数据被取走,再将数据放入q队列中
    # 如果数据没有被取走,那么程序将会一直停在这里。
    try:
        q.put_nowait(3) # 使用put_nowait()方法,如果队列满了不会阻塞而是会报错。
    except:
        print('队列已经满了')     # 报错则会打印
    
    print(q.full())     # 判断队列是否满了,返回bool值
    
    print(q.get())
    print(q.full())
    print(q.get())
    print(q.get())
    # print(q.get())        同放入一样,如果队列已经空了,继续取值,就会出现阻塞。
    try:
        print(q.get_nowait())   # 用get_nowait()方法取值,如果队列为空不会阻塞则会报错
    except:
        print('队列已经空了')
        
    print(q.empty())    # 判断队列是否为空,返回bool值
    Queue例子
    import time
    from multiprocessing import Process,Queue
    
    def f(q):
        q.put([time.asctime(),'from Eva','hello']) # 调用主函数中p进程传递过来的进程参数put向队列中添加数据。
    
    if __name__ == '__main__':
        q = Queue()     # 创建一个Queue对象
        p = Process(target=f,args=(q,))     # 创建一个进程
        p.start()
        print(q.get())  # ['Fri May 11 17:28:43 2018', 'from Eva', 'hello']
        p.join()
    子进程发送数据给父进程
    import os
    import time
    import multiprocessing
    
    # 向queue中输入数据的函数
    def inputQ(queue):
        info = str(os.getpid()) + '(put):' + str(time.asctime())
        queue.put(info)
    
    # 向queue中输出数据的函数
    def outputQ(queue):
        info = queue.get()
        print('%s%s33[32m%s33[0m'%(str(os.getpid()),'(get):',info))
    
    if __name__ == '__main__':
        multiprocessing.freeze_support()
        record1 = []
        record2 = []
        queue = multiprocessing.Queue(3)
    
        # 输入进程
        for i in range(10):
            process = multiprocessing.Process(target=inputQ,args=(queue,))
            process.start()
            record1.append(process)
    
        # 输出进程
        for i in range(10):
            process = multiprocessing.Process(target=outputQ, args=(queue,))
            process.start()
            record2.append(process)
    
        for p in record1:
            p.join()
    
        for p in record2:
            p.join()
    批量生产数据放入队列在批量获得结果
  • 相关阅读:
    爬取校园新闻首页的新闻的详情,使用正则表达式,函数抽离
    网络爬虫基础练习
    Mysql 使用 select into outfile
    Mysql 使用CMD 登陆
    使用Clean() 去掉由函数自动生成的字符串中的双引号
    Get Resultset from Oracle Stored procedure
    获取引用某个主键的所有外键的表
    Entity Framework 丢失数据链接的绑定,在已绑好的EDMX中提示“Choose Your Data Connection”
    添加MySql Metat Database 信息
    at System.Data.EntityClient.EntityConnection.GetFactory(String providerString)
  • 原文地址:https://www.cnblogs.com/stfei/p/9025870.html
Copyright © 2011-2022 走看看