zoukankan      html  css  js  c++  java
  • 进程与线程

    生产者消费者模型

      主要用于解耦

    from multiprocessing import Queue
    #队列是安全的,不用加锁.
    q = Queue(num)
    num : 队列的最大长度
    
    q.get()#阻塞等待获取数据,如果有数据直接获取,如果没有数据,阻塞等待
    q.put()#阻塞,如果可以继续往队列中放数据就直接放,不能放就阻塞等待
    
    q.get_nowait()#不阻塞,如果有数据就直接获取,没有数据就报错
    q.put_nowait()#不阻塞, 如果能继续往队列中放数据,就直接放,不能放就报错
    
    
    
    q = Queue(3)
    q.put(1)
    q.put('abc')
    q.put([4,5,6])
    print('此时队列已不能再放入了')
    q.put('呵呵')#此处阻塞等待空位置放入
    #q.putnowait('呵呵')#队列已满,不再等待,直接报错
    print('此处不会被打印')

    print(q.get())#先进先出,先取出 1
    print(q.get())
    print(q.get())
    #print(q.get())#队列为空,取不出会阻塞等待新数据取出
    print(q.getnowait())#不再等待直接报错

      借助队列实现生产者消费者模型 (队列(First In First Out 简称 : FIFO) : 先进先出 )

    from multiprocessing import Queue ,Process
    
    def consumer(q,name):
        while 1:
            pro_info = q.get()#如果消费者不知道生产者停止生产,将会一直等待获取
            if pro_info :
                print('%s拿走了%s' % (name,pro_info))
            else:#当收到None时,结束获取,退出程序
                break
    
    def producer(q,product):
        for i in range(10):
          pro_info = product + '的成品%s号' % str(i)
          q.put(pro_info)
        q.put(None)#生产者停止生产的标识
    
    if __name__ == '__main__' :
        q = Queue(5)#规定队列最大为5
        pro = Process(target=producer, args=(q,'版本一'))
        con = Process(target=consumer, args=(q,'小潘'))
        pro.start()
        con.start()
    
    
    
    #把成产表示符放入父进程
    from multiprocessing import Queue ,Process
    
    def consumer(q,name):
        while 1:
            pro_info = q.get()#如果消费者不知道生产者停止生产,将会一直等待获取
            if pro_info :
                print('%s拿走了%s' % (name,pro_info))
            else:#当收到None时,结束获取,退出程序
                break
    
    def producer(q,product):
        for i in range(10):
          pro_info = product + '的成品%s号' % str(i)
          q.put(pro_info)
    
    
    if __name__ == '__main__' :
        q = Queue(5)#规定队列最大为5
        pro = Process(target=producer, args=(q,'版本一'))
        con = Process(target=consumer, args=(q,'小潘'))
        pro.start()
        con.start()
        pro.join()
        q.put(None)#生产者停止生产的标识



    #多个生产者消费者
    from multiprocessing import Queue ,Process

    def consumer(q,name):
    while 1:
    pro_info = q.get()#如果消费者不知道生产者停止生产,将会一直等待获取
    if pro_info :
    print('%s拿走了%s' % (name,pro_info))
    else:#当收到None时,结束获取,退出程序
    break

    def producer(q,product):
    for i in range(20):
    pro_info = product + '的成品%s号' % str(i)
    q.put(pro_info)

    if __name__ == '__main__' :
    q = Queue(5)#规定队列最大为5
    pro1 = Process(target=producer, args=(q,'版本一'))
    pro2 = Process(target=producer, args=(q, '版本二'))
    pro3 = Process(target=producer, args=(q, '版本三'))
    con1 = Process(target=consumer, args=(q,'小潘'))
    con2 = Process(target=consumer, args=(q, '李四'))
    li = [pro1,pro2,pro3,con1,con2]
    [i.start() for i in li]
    pro1.join()
    pro2.join()
    pro3.join()
    q.put(None)#生产者停止生产的标识
    q.put(None)
     

      joinablequeue模块

    from multiprocessing import JoinableQueue
    #继承了multiprocessing.Queue 类,新添加了join(),q.task_done()
    
    q = JoinableQueue()
    
    q.join()#等待q.task_done的返回结果
    q.task_done()#用于消费者,表示每消费队列中一个数据,就给join返回一个标识
    
    
    
    
    from multiprocessing import JoinableQueue ,Process
    
    def consumer(q,name):
        while 1:
            pro_info = q.get()#如果消费者不知道生产者停止生产,将会一直等待获取
            if pro_info :
                print('%s拿走了%s' % (name,pro_info))
                q.task_done()#从队列中,每拿走一个数据,就传给join发送一个标识,共十个数据,则十个标识
    
    def producer(q,product):
        for i in range(10):
          pro_info = product + '的成品%s号' % str(i)
          q.put(pro_info)
        q.join()#记录生产了20个数据在队列中,此时阻塞等待着对列中的所有数据均被拿取
    
    if __name__ == '__main__' :
        q = JoinableQueue(5)#规定队列最大为5
        pro = Process(target=producer, args=(q,'版本一'))
        con = Process(target=consumer, args=(q,'小潘'))
        con.daemon = True#把消费者进程设为守护进程,由于主进程等待成产者进程,生产者进程等待消费者进程,
        # 所以把消费者进程设为守护进程,主进程代码执行完毕,消费者进程结束,则程序结束.
        pro.start()
        con.start()
        pro.join()#等待生产者进程结束

    管道  

      管道是不安全的 , 一般单进程不要用管道

      用于多进程之间通信的一种方式

      如果在单进程中使用管道,那么就是con1收数据,con2发数据 ; 如果是con1发数据 , con2收数据

      如果是多进程中使用管道,那么必须是父进程使用con1收,子进程就必须使用con2发 ;

        父进程用con1发 , 子进程必须用con2收 ; 

        父进程用con2收 , 子进程必须用con1发 ;

        父进程用con2收 , 子进程必须用con1收

      管道中EOFError错误,是指父进程中如果关闭了发送端,子进程还继续接收数据,就会引

      发EOFError错误.

    from multiprocessing import Pipe,Process
    
    #单进程下的管道
    # con1 , con2 = Pipe()
    #
    # con1.send('adc')
    # print(con2.recv())
    # con2.send(123)
    # print(con1.recv())
    
    
    
    #多进程
    def func(con):
        con1,con2 = con
        con1.close()
        print(con2.recv())
        con2.send('主进程con2收')
        #print(con1.recv())#在同一进程中,con1和con2不能同时开启,否则程序不能关闭
    
    if __name__ == '__main__':
        con1 , con2 = Pipe()
        p = Process(target=func,args=((con1,con2),))
        p.start()
        con2.close()
        con1.send('子进程con2收')#con1发送,必须是con2接收
        print(con1.recv())
    
    
    
    
    
    def func(con):
        con1,con2 = con
        con1.close()
        con2.send('主进程con2收')
        while 1 :
            try :
                print(con2.recv())#如果父进程不关闭con1管道,则子进程一直阻塞在此处等待接收,报错
            except EOFError :#try 一下当报该类型错误时自动执行下面程序
                con2.close()
                break
    
    if __name__ == '__main__':
        con1 , con2 = Pipe()
        p = Process(target=func,args=((con1,con2),))
        p.start()
        con2.close()
        print(con1.recv())
        for i in range(10):
            con1.send('子进程con2收%s' % i)#con1发送,必须是con2接收
        con1.close()#发送完毕后,关闭管道

    进程之间的共享内存

      

    from multiprocessing import Manager , Process
    # m = Manager()
    # num = m.dict({'键':'值'})#数据可以是字典或者其他形式
    # num = m.list([1,2,3])
    
    def func(num):
        num[0] -= 1
        print('子进程中的num的值是', num)
    
    if __name__ == '__main__':
        m = Manager()
        num = m.list([1,2,3])
        p = Process(target=func , args=(num,))
        p.start()
        p.join()
        print('父进程中的num',num)

    进程池

      在实际业务中,任务量是有多有少的,如果任务量特别多,不可能要开对应那摩多的进程数,开启那摩多进程首先

      需要大量的时间让操作系统来为你管理他,其次还需要消耗大量时间让CPU帮你调度他.

      进程池还会帮程序员管理进程池中的进程

     进程池 : 一个形象化的池子,里面有给定的进程,这些进程一直处于待命状态,一旦有任务,就有进程去处理.

      进程池中的进程都是守护进程,主进程代码执行完毕,守护进程就结束了 

    from multiprocessing import Pool
    import os
    import time
    
    def func(num):
        num += 1
        print(num)
    
    # if __name__ == '__main__':
    #     p = Pool(os.cpu_count()+1)#oscpu_count+1 最佳进程数量
    #     start = time.time()
    #     p.map(func , [i for i in range(20)])
    #     p.close()#不允许再向进程池中添加任务
    #     p.join()#等待进程池中所有进程执行完所有任务
    
    #p.apply()#让进程池中的进程同步的做任务 # if __name__ == '__main__': # p = Pool(5) # for i in range(20):#同步处理20个任务,同步是指不管进程池中有多少个进程依然一个进程一个进程的执行,不需要join等待和close. # p.apply(func , args=(i ,)) # time.sleep(0.5)
    #p.apply_async()#让进程池中的进程异步做任务 if __name__ == '__main__': p = Pool(5) l = [] for i in range(20):#异步处理20个任务,异步是指进程池中有几个进程,一下就处理几个任务,那个进程任务处理完了,就接收下一个任务. re = p.apply_async(func , args=(i ,)) l.append(re)
       res= [i.get() for i in l]
       p.close()#不再接受新的任务,准备关闭
       p.join()#等待进程池中所有进程执行任务完毕.

       print(res) time.sleep(
    0.5)

      回调函数(只有异步有)

       在进程池中的回调函数是父进程调用的,和子进程无关.

    from multiprocessing import Pool
    import requests
    
    def func(url):
        re = requests.get()
        print(re.text)
        if re.status_code == 200:
            return url , re.text
    
    def call_back(sta):#func函数的返回值,会被回调函数的形参接收,
        url ,text = sta
        #print('回调函数',sta)
        print('回调',url)
    
    if __name__ == '__main__':
        p = Pool(4)
        l = ['https//www.baidu.com',
             'https // www.jd.com'
             'https // www.taobao'
             'https // www.mi.com'
             'https // www.bilibili'
             ]
        for i in l :
            p.apply_async(func,args=(i,),call_back=call_back)
            #异步执行func任务,每一个进程执行完任务,在func中return一个结果,结果会自动被callback指定的函数
            #当成形参来接收到.
        p.close()
        p.join()

    线程

      计算机的最小执行单位是线程;

      进程是资源分配的基本单位.线程是可执行的基本单位,是可被调度的基本单位.

      线程不可以自己独立拥有资源 ,线程的执行必须依赖于所属进程中的资源.

      线程被称为轻量级的进程, 线程的切换速度比进程快

      进程中必须至少有一个线程.

      线程分为用户级和内核级线程

        用户级线程 : 对于程序员来说,这样的线程完全被程序员控制执行和调度;

        内核级线程 : 对于计算机内核来说 , 这样的线程完全被内核调度.

        线程组成 : 代码段 ; 数据段 ; TCB(Thread  control  block)

      开启现成的方法

    #方法一 
    from threading import Thread import time def func () : print('子线程') time.sleep(1) #if __name__ == '__main__' :#线程中可以不用写这句代码 t = Thread(target = func , args=()) t.start()


    #方法二
    from threading import Thread
    import time 

    class Mythread(Thread)
      def __init__(self):
        super(
    Mythread,self).__init__()
      def run(self):
        print('我是子线程')

    t = Mythread()
    t.start()

      线程和进程的比较

       (1) CPU切换进程要比CPU切换线程慢得多

            在Python中,如果IO操作过多,最好使用线程 ; 

       (2) 在同一个进程中,所有线程共享这个进程的pid,也就是所有线程共享所属进程的资源和内存地址

       (3) 在同一个进程内,所有线程共享该进程中的全局变量(各个线程之间的局部变量不能共享)

       (4) 关于守护进程与守护线程

        守护进程 : 要摸自己正常结束,要摸根据父进程代码的执行结束而结束

        守护线程 : 要摸自己正常结束,要摸根据父进程的执行结束而结束

       (5) 全局解释器锁 , 只有cpython解释器才有,对于线程来说有了GIL,所以没有真正并行,但是有真正的

        多进程并行

        在cPython中,IO密集用多线程,计算密集用多进程

    from multiprocessing import Process
    from threading import Thread
    import time
    
    def func():
        pass
    
    if __name__ == '__main__':
        start = time.time()
        for i in range(50):
            p = Process(target=func)
            p.start()
        print('开50个进程的时间:',time.time() - start)
        start = time.time()
        for i in range(50):
            p = Thread(target=func)
            p.start()
        print('开50个线程的时间:', time.time() - start)

      GIL锁

        全局解释器锁 , 只有cpython解释器才有,对于线程来说有了GIL,所以没有真正并行,但是有真正的

        多进程并行

        强制线程放弃CPU

        在同一时间内它只允许一个线程执行.

        当你的任务是计算密集的情况下,使用多进程好

        

    from multiprocessing import Process
    from threading import Thread
    import time,os
    
    def func():
        global num
        number = num
        time.sleep(0.1)
        #执行此处会等待,GIL会令该线程退出执行,允许下一线程进入,这一线程也要等待,同样退出执行,依次循环.
        #当等待时间结束,第一个线程再次进入,会从上一断点开始执行,直接执行下一步,num = number -1 结果为99,
        #第二线程同样从上一断点执行,直接执行下一步,num = number -1,结果也为99,以此类推.
        num = number -1
    
    if __name__ == '__main__':
        num = 100
        t = []
        for i in range(50):
            p = Thread(target=func,)
            p.start()
            t.append(p)
        [p.join() for p in t]
        print(num)

      递归锁 

        RLock 可以有无数把锁,但是只有一把万能钥匙(一把钥匙配若干把锁)

        在同一个线程内,递归锁可以无止境的acquire , 但是互斥锁不行 

        在不同进程内,递归锁是保证只能被一个线程拿到钥匙,然后无止境的acquire,其它线程等待

       互斥锁 

        lock() 一把钥匙配一把锁

          一把钥匙配一把锁,主要用于保护数据安全;

          共享资源,又叫玲姐资源.

          共享带码,又叫临界代码.

          对临界资源进行操作时,一定要加锁.

       GIL : 全局锁

        锁的是线程,是cpy解释器上的一把锁,锁的是线程,意思是同一时间只允许一个线程访问CPU

    #>>>>>>>死锁
    from multiprocessing import Process
    from threading import Thread ,Lock
    import time,os
    
    def man(m_tot,m_pap):
        m_tot.acquire()#男的获得厕所资源,把厕所锁上了
        print('男在上厕所')
        time.sleep(1)
        m_pap.acquire()#男的拿纸资源
        print('男的拿到纸资源了')
        time.sleep(1)
        print('男的上完厕所了')
        m_tot.release()#男的还纸资源
        m_pap.release()#男的还厕所资源
    
    def woman(m_tot,m_pap):
        m_pap.acquire()  # 女的获得纸资源
        print('女的拿到纸资源了')
        time.sleep(1)
        m_tot.acquire()  # 女的拿厕所资源,把厕所锁上了
        print('女在上厕所')
        time.sleep(1)
        print('女的上完厕所了')
        m_tot.release()  # 女的还厕所资源
        m_pap.release()  # 女的还纸资源
    
    if __name__ == '__main__':
        m_tot = Lock()
        m_pop = Lock()
        m = Thread(target=man,args=(m_tot,m_pop))
        w = Thread(target=woman, args=(m_tot, m_pop))
        m.start()
        w.start()
    #结果
    #男在上厕所
    #女的拿到纸资源了




    #>>>>>>解决死锁
    #>>>>递归锁 : 只有一把钥匙,但是可以开所有锁,层层开锁

    from multiprocessing import Process
    from threading import Thread ,RLock
    import time,os
    
    def man(m_tot,m_pap):
        m_tot.acquire()#男的手中有一把钥匙获得厕所资源,把厕所锁上了
        print('男在上厕所')
        time.sleep(1)
        m_pap.acquire()#男的拿纸资源
        print('男的拿到纸资源了')
        time.sleep(1)
        print('男的上完厕所了')
        m_tot.release()#男的还纸资源
        m_pap.release()#男的还厕所资源
    
    def woman(m_tot,m_pap):
        m_pap.acquire()  # 女的拿到一把钥匙,获得纸资源
        print('女的拿到纸资源了')
        time.sleep(1)
        m_tot.acquire()  # 女的拿厕所资源,把厕所锁上了
        print('女在上厕所')
        time.sleep(1)
        print('女的上完厕所了')
        m_tot.release()  # 女的还厕所资源
        m_pap.release()  # 女的还纸资源
    
    if __name__ == '__main__':
        m_tot = RLock()
        m_pop = RLock()
        m = Thread(target=man,args=(m_tot,m_pop))
        w = Thread(target=woman, args=(m_tot, m_pop))
        m.start()
        w.start()
     

    线程间的通信与进程的用法一样(线程可以不写__main__)

      信号量

       from threading import Semaphore

      事件

      from threading import Event

      条件

      from threading import Condition

      条件是让程序员自行去调度线程的一个机制

                # Condition涉及4个方法

                # acquire()

                # release()

                # wait()    是指让线程阻塞住

                # notify(int)  是指给wait发一个信号,让wait变成不阻塞

                # int是指,你要给多少给wait发信号

      定时器

      from threading import Timer

       Timer(time , func )

       time :睡眠时间,(秒为单位)

       func : 睡眠过后,要执行的函数

    from threading import Timer
    
    def func():
        print('定时器')
        
     Timer(3,func).start()
  • 相关阅读:
    CF1539 VP 记录
    CF1529 VP 记录
    CF875C National Property 题解
    CF1545 比赛记录
    CF 1550 比赛记录
    CF1539E Game with Cards 题解
    CF1202F You Are Given Some Letters... 题解
    vmware Linux虚拟机挂载共享文件夹
    利用SOLR搭建企业搜索平台 之九(solr的查询语法)
    利用SOLR搭建企业搜索平台 之四(MultiCore)
  • 原文地址:https://www.cnblogs.com/panda-pandeyong/p/9519283.html
Copyright © 2011-2022 走看看