zoukankan      html  css  js  c++  java
  • 进程

    1. 进程

    引入模块

    from multiprocessing import Process    #谋定噗rua赛斯听            #噗rua赛斯

    进程之间是空间隔离的,不共享资源

    进程的两种创建方法

    创建进程的第一种方式:

        p1 = Process(target=func1, args=(1,))      #target(他给特)#args(啊渴死)
        p1.start()                        #start四大特

    创建进行的第二种方式:

      自己定义一个类,继承Process类,必须写一个run方法,想传参数,自行写init方法,然后执行super父类的init方法

    class MyProcess(Process):
        def __init__(self,n,name):
            super().__init__()
            self.n = n
            self.name = name
        def run(self):
            # print(1+1)
            # print(123)
            print('子进程的进程ID',os.getpid())
            print('你看看n>>',self.n)
    
    if __name__ == '__main__':
        p1 = MyProcess(100,name='子进程1')
        p1.start() #给操作系统发送创建进程的指令,子进程创建好之后,要被执行,执行的时候就会执行run方法
        print('p1.name',p1.name)
        print('p1.pid',p1.pid)
        print('主进程结束')
    

     进程的其他方法 

    import time
    from multiprocessing import Process
    def func1():
        time.sleep(2)
        print()
        print('子进程')
    if __name__ == '__main__':
        p1 = Process(target=func1,)
        p1.start()
        p1.terminate() # 给操作系统发了一个关闭p1子进程的信号,关闭进程        terminate(涛妹雷特)
        time.sleep(1)              四离谱
        print('进程是否还活着:',p1.is_alive())#判断进程是否存活          is_alive(A子蓝付)
        print(p1.pid)
        print('主进程结束')
    class MyProcess(Process):
        def __init__(self,n,name):
            super().__init__()
            self.n = n
            self.name = name
        def run(self):
            # print(1+1)
            # print(123)
            print('子进程的进程ID',os.getpid())
            print('你看看n>>',self.n)
    if __name__ == '__main__':
        p1 = MyProcess(100,name='子进程1')
        p1.start() #给操作系统发送创建进程的指令,子进程创建好之后,要被执行,执行的时候就会执行run方法
        print('p1.name',p1.name)
        print('p1.pid',p1.pid)
        print('主进程结束')
    name和pid和ppid

    join方法

    global_num = 100
    def func1():
        time.sleep(2)
        global global_num
        global_num = 0
        print('子进程全局变量>>>',global_num)
    if __name__ == '__main__':
        p1 = Process(target=func1,)
        p1.start()
        print('子进程执行')
        #time.sleep(3)
        p1.join()  #阻塞住,等待你的p1子进程执行结束,主进程的程序才能从这里继续往下执行
        print('主进程的全局变量>>>',global_num)
    验证join方法
    global_num = 100
    def func1():
        start_time = time.time()
        time.sleep(2)
        global global_num
        global_num = 0
        print('子进程全局变量>>>',global_num)
        end_time = time.time()
        print(end_time - start_time)
    if __name__ == '__main__':
        p1 = Process(target=func1,)
        p1.start()
        print('子进程执行')
        #time.sleep(3)
        p1.join()  #阻塞住,等待你的p1子进程执行结束,主进程的程序才能从这里继续往下执行
        print('主进程的全局变量>>>',global_num)
    join方法验证代码执行时间的
    #for循环在创建进程中的应用
    def fun1(n):
        time.sleep(1)
        print(n)
    
    if __name__ == '__main__':
        pro_list = []
        for i in range(10):
            p1 = Process(target=fun1,args=(i,))
            p1.start()
            pro_list.append(p1)
    
        for p in pro_list:
            p.join()
        print('主进程结束')

    守护进程   一定要在start之前设置守护进程

    import time
    import os
    from multiprocessing import Process
    def func1():
        time.sleep(5)
        print(os.getpid())
        print('子进程')
    
    if __name__ == '__main__':
        p1 = Process(target=func1,)
        p1.daemon = True#将p1子进程设置为守护进程        daemon(地们)
        p1.start()
        # print('主进程的ID',os.getpid())
        print('主进程结束')
    

    三、进程同步(锁)

    问题1:为什么要加进程锁?

           线程锁是为了在线程不安全的时候,为一段代码加上锁来控制实现线程安全,即线程间数据隔离;

           进程间的数据本来就是隔离的,所以一般不用加锁,当进程间共用某个数据的时候需要加锁;

    Look(唠嗑)模块    acquire(额快也)#加锁    release(蕊李四)还锁

    加锁的另种形式:     with

    import json
    import time
    import random
    from multiprocessing import Process,Lock
    
    def get_ticket(i,ticket_lock):
        print('我们都到齐了,大家预备!!123')
        #所有进程异步执行,到这里等待,同时再去抢下面的代码执行
        time.sleep(1)
        ticket_lock.acquire()  #这里有个门,只有一个人能够抢到这个钥匙,加锁
        with open('ticket','r') as f:
            #将文件数据load为字典类型的数据
            last_ticket_info = json.load(f)
        #查看一下余票信息
        last_ticket = last_ticket_info['count']
        #如果看到余票大于0,说明你可以抢到票
        if last_ticket > 0:
            #模拟网络延迟时间
            time.sleep(random.random())
            #抢到一张票就减去1
            last_ticket = last_ticket - 1
            last_ticket_info['count'] = last_ticket
            #将修改后的票数写回文件
            with open('ticket','w') as f:
                #通过json.dump方法来写回文件,字符串的形式
                json.dump(last_ticket_info,f)
            print('%s号抢到了,丫nb!' % i)
        else:
            print('%s号傻逼,没票啦,明年再来!' % i)
        #释放锁,也就是还钥匙的操作
        ticket_lock.release()
    if __name__ == '__main__':
        #创建一个锁
        ticket_lock = Lock()
        for i in range(10):
            #将锁作为参数传给每个进程,因为每个进程都需要通过锁来进行限制,同步
            p = Process(target=get_ticket,args=(i,ticket_lock,))
            p.start()
    模拟抢票

    四、信号量(了解)    

    Semaphore(赛呢fao)

    import time
    import random
    from multiprocessing import Process,Semaphore
    
    def dbj(i,s):
        s.acquire()
        print('%s号男主人公来洗脚'%i)
        print('-------------')
        time.sleep(random.randrange(3,6))
        # print(time.time())
        s.release()
    
    if __name__ == '__main__':
        s = Semaphore(4) #创建一个计数器,每次acquire就减1,直到减到0,那么上面的任务只有4个在同时异步的执行,后面的进程需要等待.
        for i in range(10):
            p1 = Process(target=dbj,args=(i,s,))
            p1.start()
    洗脚

    五、事件(了解)

    Event(额外特)

    e.clear(可累儿)将e改成False

    e.wait(威特)等待

    e.set(赛特)将e是True

    e.is_set()查看当前e的状态是True或者False

    import time
    import random
    from multiprocessing import Process,Event
    
    #模拟红绿灯执行状态的函数
    def traffic_lights(e):
        while 1:
            print('红灯啦')
            time.sleep(5)
            e.set()
            print('绿灯亮')
            time.sleep(3)
            e.clear()  #将e改为了False
    def car(i,e):
    
        if not e.is_set(): #新来的车看到是红灯
            print('我们在等待.....')
            e.wait()
            print('走你')
        else:
            print('可以走了!!!')
    
    if __name__ == '__main__':
        e = Event()
        hld = Process(target=traffic_lights,args=(e,))
        hld.start()
        while 1:
            time.sleep(0.5)
            #创建10个车
            for i in range(3):
                # time.sleep(random.randrange(1,3))
                p1 = Process(target=car,args=(i,e,))
                p1.start()
    红绿灯
    复制代码
    from multiprocessing import Process,Semaphore,Event
    import time,random
    
    e = Event() #创建一个事件对象
    print(e.is_set())  #is_set()查看一个事件的状态,默认为False,可通过set方法改为True
    print('look here!')
    # e.set()          #将is_set()的状态改为True。
    # print(e.is_set())#is_set()查看一个事件的状态,默认为False,可通过set方法改为Tr
    # e.clear()        #将is_set()的状态改为False
    # print(e.is_set())#is_set()查看一个事件的状态,默认为False,可通过set方法改为Tr
    e.wait()           #根据is_set()的状态结果来决定是否在这阻塞住,is_set()=False那么就阻塞,is_set()=True就不阻塞
    print('give me!!')
    
    #set和clear  修改事件的状态 set-->True   clear-->False
    #is_set     用来查看一个事件的状态
    #wait       依据事件的状态来决定是否阻塞 False-->阻塞  True-->不阻塞
    事件

    六、进程间通信 IPC(重要)

      队列:(重点)

    full(fao)

    from multiprocessing import Process,Queue
    #先进后出
    '''
    要注意的事项
    q.full()队列满了返回True,不满返回False
    队列为空的时候,get会阻塞          盖特
    put超出了队列长度,你put插入数据的时候会阻塞     破特
    print('>>>',q.empty())  #不可信,队列空了返回True,不为空返回False
    '''
    q = Queue(3)
    q.put(1)#往队列里写入3个数据
    q.put(2)
    q.put(3)
    print(q.get())#获取值
    #可以用异常踹一下
    while 1:
        try:
            q.get(False)  #queue.Empty
        except:
            print('队列目前是空的')

    队列实现进程间的通信

    import time
    from multiprocessing import Process,Queue
    
    def girl(q):
        print('来自boy的消息',q.get())
        print('来自校领导的凝视',q.get())
    def boy(q):
        q.put('约吗')
    
    if __name__ == '__main__':
        q = Queue(5)
        boy_p = Process(target=boy,args=(q,))
        girl_p = Process(target=girl,args=(q,))
        boy_p.start()
        girl_p.start()
        time.sleep(1)
        q.put('好好工作,别乱搞')
    来自校领导的凝视

    生产者消费者模型

    import time
    from multiprocessing import Process,Queue
    
    def producer(q):
        for i in range(1,11):
            time.sleep(1)
            print('生产了包子%s号' % i)
            q.put(i)
        q.put(None)  #针对第三个版本的消费者,往队列里面加了一个结束信号
    def consumer(q):
        while 1:
            time.sleep(2)
            s = q.get()
            if s == None:
                break
            else:
                print('消费者吃了%s包子' % s)
    
    if __name__ == '__main__':
        #通过队列来模拟缓冲区,大小设置为20
        q = Queue(20)
        #生产者进程
        pro_p = Process(target=producer,args=(q,))
        pro_p.start()
        #消费者进程
        con_p = Process(target=consumer,args=(q,))
        con_p.start()
    

      

    生产者消费者模型主进程发送结束信号

    import time
    from multiprocessing import Process,Queue
    
    def producer(q):
        for i in range(1,11):
            time.sleep(1)
            print('生产了包子%s号' % i)
            q.put(i)
    
    def consumer(q):
        while 1:
            time.sleep(2)
            s = q.get()
            if s == None:
                break
            else:
                print('消费者吃了%s包子' % s)
    
    if __name__ == '__main__':
        #通过队列来模拟缓冲区,大小设置为20
        q = Queue(20)
        #生产者进程
        pro_p = Process(target=producer,args=(q,))
        pro_p.start()
        #消费者进程
        con_p = Process(target=consumer,args=(q,))
        con_p.start()
        pro_p.join()
    
        q.put(None)  

    JoinableQueue的生产着消费者模型     Joinable(准呢包)Queue   张波Q

     q.task_done(她可死但)     给q对象发送一个任务结束的信号

    import time
    from multiprocessing import Process,Queue,JoinableQueue
    
    def producer(q):
        for i in range(1,11):
            time.sleep(0.5)
            print('生产了包子%s号' % i)
            q.put(i)
        q.join()
        print('在这里等你')
    def consumer(q):
        while 1:
            time.sleep(1)
            s = q.get()
            print('消费者吃了%s包子' % s)
            q.task_done()  #给q对象发送一个任务结束的信号
    
    if __name__ == '__main__':
        #通过队列来模拟缓冲区,大小设置为20
        q = JoinableQueue(20)
        #生产者进程
        pro_p = Process(target=producer,args=(q,))
        pro_p.start()
        #消费者进程
        con_p = Process(target=consumer,args=(q,))
        con_p.daemon = True #
        con_p.start()
        pro_p.join()
        print('主进程结束')  

    管道:

      进程间通信(IPC)方式二:管道(了解即可)

     Pipe(牌破)

    from multiprocessing import Process,Pipe
    def func1(conn1,conn2):
        try:
            msg = conn2.recv()
            print('>>>',msg)
            #如果管道一端关闭了,那么另外一端在接收消息的时候会报错
            msg2 = conn2.recv() #EOFError
        except EOFError:
            print('对方管道一端已经关闭')
            conn2.close()
    
    if __name__ == '__main__':
        conn1, conn2 = Pipe()
        p = Process(target=func1,args=(conn1,conn2,))
        p.start()
        conn1.send('小鬼!')
        conn1.close()
        # conn1.recv()  #OSError: handle is closed
    管道报错模拟

    数据共享:(了解)

    Lock(啦个)

    from multiprocessing import Process,Manager,Lock
    
    def func(m_dic,ml):
        #不加锁的情况会出现数据错乱
        # m_dic['count'] -= 1
        #加锁,这是另外一种加锁形式
        with ml:
            m_dic['count'] -= 1
    
        #等同
        # ml.acquire()
        # m_dic['count'] -= 1
        # ml.release()
    
    if __name__ == '__main__':
        m = Manager()
        ml = Lock()
        m_dic = m.dict({'count':100})
        # print('主进程', m_dic)
        p_list = []
        #开启20个进程来对共享数据进行修改
        for i in range(20):
            p1 = Process(target=func,args=(m_dic,ml,))
            p1.start()
            p_list.append(p1)
        [ppp.join() for ppp in p_list]
    
        print('主进程',m_dic)
    manager不安全

    进程池和mutiprocess.Poll(噗奥)

      

    为什么要有进程池?进程池的概念。

      在程序实际处理问题过程中,忙时会有成千上万的任务需要被执行,闲时可能只有零星任务。那么在成千上万个任务需要被执行的时候,我们就需要去创建成千上万个进程么?首先,创建进程需要消耗时间,销毁进程(空间,变量,文件信息等等的内容)也需要消耗时间。第二即便开启了成千上万的进程,操作系统也不能让他们同时执行,维护一个很大的进程列表的同时,调度的时候,还需要进行切换并且记录每个进程的执行节点,也就是记录上下文(各种变量等等乱七八糟的东西,虽然你看不到,但是操作系统都要做),这样反而会影响程序的效率。因此我们不能无限制的根据任务开启或者结束进程。就看我们上面的一些代码例子,你会发现有些程序是不是执行的时候比较慢才出结果,就是这个原因,那么我们要怎么做呢?

      在这里,要给大家介绍一个进程池的概念,定义一个池子,在里面放上固定数量的进程,有需求来了,就拿一个池中的进程来处理任务,等到处理完毕,进程并不关闭,而是将进程再放回进程池中继续等待任务。如果有很多任务需要执行,池中的进程数量不够,任务就要等待之前的进程执行任务完毕归来,拿到空闲进程才能继续执行。也就是说,池中进程的数量是固定的,那么同一时间最多有固定数量的进程在运行。这样不会增加操作系统的调度难度,还节省了开闭进程的时间,也一定程度上能够实现并发效果

    p.apply(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。
    '''需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()'''
    
    p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。
    '''此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。'''
        
    p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成
    
    P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用
    主要方法介绍

     apply(额扑来)      apply_async(额扑来森科)   Map(麦坡)  

    import time
    from multiprocessing import Pool,Process
    
    #针对range(100)这种参数的
    # def func(n):
    #     for i in range(3):
    #         print(n + 1)
    
    def func(n):
        print(n)
        # 结果:
        #     (1, 2)
        #     alex
    def func2(n):
        for i in range(3):
            print(n - 1)
    if __name__ == '__main__':
        #1.进程池的模式
        s1 = time.time()  #我们计算一下开多进程和进程池的执行效率
        poll = Pool(5) #创建含有5个进程的进程池
        # poll.map(func,range(100)) #异步调用进程,开启100个任务,map自带join的功能
        poll.map(func,[(1,2),'alex']) #异步调用进程,开启100个任务,map自带join的功能
        # poll.map(func2,range(100))  #如果想让进程池完成不同的任务,可以直接这样搞
        #map只限于接收一个可迭代的数据类型参数,列表啊,元祖啊等等,如果想做其他的参数之类的操作,需要用后面我们要学的方法。
        # t1 = time.time() - s1
        #
        # #2.多进程的模式
        # s2 = time.time()
        # p_list = []
        # for i in range(100):
        #     p = Process(target=func,args=(i,))
        #     p_list.append(p)
        #     p.start()
        # [pp.join() for pp in p_list]
        # t2 = time.time() - s2
        #
        # print('t1>>',t1) #结果:0.5146853923797607s 进程池的效率高
        # print('t2>>',t2) #结果:12.092015027999878s
    进程池的简单应用以及进程池的效率对比

    同步与异步两种执行方式:

    同步:

    import time
    from multiprocessing import Process,Pool
    
    
    def fun(i):
        time.sleep(0.5)
        # print(i)
        return i**2
    if __name__ == '__main__':
        p = Pool(4)
        for i in range(10):
            res = p.apply(fun,args=(i,))  #同步执行的方法,他会等待你的任务的返回结果,
            print(res)
    

      

    异步:

    import time
    from multiprocessing import Process,Pool
    
    def fun(i):
        time.sleep(1)
        print(i)
        return i**2
    if __name__ == '__main__':
        p = Pool(4)
        res_list = []
        for i in range(10):
            res = p.apply_async(fun,args=(i,))  
            res_list.append(res)
    
        p.close()  # 不是关闭进程池,而是不允许再有其他任务来使用进程池
        p.join()   # 这是感知进程池中任务的方法,进程池中所有的进程随着主进程的结束而结束了,等待进程池的任务全部执行完
        #循环打印结果
        for e_res in res_list:
            print('结果:',e_res.get())
    
        print('主进程结束')
    

      

    需要回调函数的场景:进程池中任何一个任务一旦处理完了,就立即告知主进程:我好了额,你可以处理我的结果了。主进程则调用一个函数去处理该结果,该函数即回调函数,
    这是进程池特有的,普通进程没有这个机制,但是我们也可以通过进程通信来拿到返回值,进程池的这个回调也是进程通信的机制完成的。 我们可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果
    import os
    from multiprocessing import Pool
    
    def func1(n):
        print('func1>>',os.getpid())
        # print('func1')
        return n*n
    
    
    def func2(nn):
        print('func2>>',os.getpid())
        # print('func2')
        print(nn)
        # import time
        # time.sleep(0.5)
    if __name__ == '__main__':
        print('主进程:',os.getpid())
        p = Pool(4)
        p.apply_async(func1,args=(10,),callback=func2)
        p.close()
        p.join()
    

      

    回调函数在写的时候注意一点,回调函数的形参执行有一个,如果你的执行函数有多个返回值,那么也可以被回调函数的这一个形参接收,接收的是一个元祖,包含着你执行函数的所有返回值。

  • 相关阅读:
    《高校后勤管理信息系统设计与实现》论文笔记五
    《高校后勤管理系统的设计与实现》论文笔记三
    《高校后勤管理系统的设计与实现》论文笔记二
    如何利用React.js开发出强大Web应用
    关于啤酒和尿布故事的真相
    以生活例子说明单线程与多线程
    未来哪些领域WiFi将成为刚需?
    CSS开发中的10个不要
    10年后编程还有意义吗?
    JavaEE中遗漏的10个最重要的安全控制
  • 原文地址:https://www.cnblogs.com/xihuanniya/p/9838138.html
Copyright © 2011-2022 走看看