zoukankan      html  css  js  c++  java
  • 38 线程

    回顾:

    生产者消费者模型
    主要为了解决强耦耦合的问题


    队列

     from multiprocessing import Queue


    先进先出
      队列本身是安全的

    from multiprocessing import JoinableQueue

    task_done()    每消费一个数据,就返回一个标识
    join()          接收task_done返回的标识,以便可以知道队列中的数据什么时候被消费完了

    管道(了解)
    本身是不安全的 

    from multiprocessing import Pipe
    
    
    con1,con2 = Pipe()

    con1可以收发数据,con2也可以收发数据(全双工)


    队列 = 管道 + 锁

    多进程之间共享内存数据:

    from multiprocessing import Manager,Value

    进程池

    from multiprocessing import Pool
    
    
    p.map(func,iterable)
    
    p.apply(func,args=())   同步的做任务
                池中的进程是普通进程,不会跟随着主进程的结束而结束。
    
    p.apply_async(func,args=(),callback=None)    异步,记得加上close和join
    close:                   不再接受新的任务,准备关闭
    join:                    等待进程池中所有进程执行完毕任务。

    池中的进程是守护进程,主进程的代码执行完毕,守护进程就结束了

    在进程池中的回调函数是主进程调用的,和子进程无关。


    1 学习线程

    线程被称作轻量级的进程。 GIL:全局解释锁(只有Cpython解释器才有)
    对于线程来说,因为有了GIL,所以没有真正的并行

    计算机的执行单位以线程为单位。计算机的最小可执行是线程。
    进程是资源分配的基本单位。线程是可执行的基本单位,是可被调度的基本单位。
    线程不可以自己独立拥有资源。线程的执行,必须依赖于所属进程中的资源。
    进程中必须至少应该有一个线程。

    线程又分为用户级线程和内核级线程(了解)
    用户级线程:对于程序员来说的,这样的线程完全被程序员控制执行,调度
    内核级线程:对于计算机内核来说的,这样的线程完全被内核控制调度。

    进程由 代码段 数据段 PCB组成(process control block)
    线程由 代码段 数据段 TCB组成(thread control block)

    线程和进程的比较
    thread - 线程
    import thread 操作线程的模块
    import threading 用这个去操作线程

          (1) cpu切换进程要比cpu切换线程 慢很多
    在python中,如果IO操作过多的话,使用多线程最好了
    from multiprocessing import Process
    from threading import Thread
    import time
    
    
    
    def func():
        pass
    
    if __name__ == '__main__':
        start = time.time()
        for i in range(100):
            p = Process(target=func)
            p.start()
        print('开100个进程的时间:', time.time() - start)
    
        start = time.time()
        for i in range(100):
            p = Thread(target=func)
            p.start()
        print('开100个线程的时间:', time.time() - start)
    
    ============
    开100个进程的时间: 0.2928299903869629
    开100个线程的时间: 3.7468421459198
    开启的时间比较
       (2) 在同一个进程内,所有线程共享这个进程的pid,也就是说所有线程共享所属进程的所有资源和内存地址
    from multiprocessing import Process
    from threading import Thread
    import time,os
    
    
    
    def func(name):
        print('我是一个%s,我的pid是%s'%(name,os.getpid()))
    
    
    if __name__ == '__main__':
    
        print('我是main,我的pid是%s'%(os.getpid()))
        for i in range(10):
            p = Process(target=func,args=('进程',))
            p.start()
    
        for i in range(10):
            p = Thread(target=func,args=('线程',))
            p.start()
    pid 差异

          主进程和子进程的pid 是不同的  

          主线程和子线程的pid 是相同的

     (3) 在同一个进程内,所有线程共享该进程中的全局变量
    from multiprocessing import Process
    from threading import Thread,Lock
    import time,os
    
    
    
    def func():
        global num
        tmp = num
        time.sleep(0.00001)
        num = tmp - 1
    
    if __name__ == '__main__':
        num = 100
        t_l = []
        for i in range(100):
            t = Thread(target=func)
            t.start()
            t_l.append(t)
        # time.sleep(1)
        [t.join() for t in t_l]
        print(num)
    线程可以共享全局变量?




    (4) 因为有GIL锁的存在,在Cpython中,没有真正的线程并行。但是有真正的多进程并行
    当你的任务是计算密集的情况下,使用多进程好
    总结:在CPython中,IO密集用多线程,计算密集用多进程

    (5)关于守护线程和守护进程的事情(注意:代码执行结束并不代表程序结束)
    守护进程:要么自己正常结束,要么根据父进程的代码执行结束而结束
    守护线程:要么自己正常结束,要么根据父线程的执行结束而结束


    2线程的使用方法


    import threading
    from threading import Thread
    import time
    
    
    
    
    def func():
        print('这是一个子线程')
        time.sleep(2)
    
    
    if __name__ == '__main__':
        t = Thread(target=func,args=())
        t.start()
    开启线程的一个比较low的方法
    import threading
    from threading import Thread
    import time
    
    class MyThread(Thread):
        def __init__(self):
            super(MyThread, self).__init__()
        def run(self):
            print('我是一个子线程')
    
    
    t = MyThread()
    t.start()   # 运行类中的run()方法  ,也开启了线程
    装逼的方法 类的方式
    from multiprocessing import Process
    from threading import Thread,Lock
    import time,os
    
    l = Lock()# 一把钥匙配一把锁
    l.acquire()
    print('abc')
    l.acquire()# 程序会阻塞住   陷入死锁了
    print(123)
    Lock() 锁
    from multiprocessing import Process
    from threading import Thread,Lock
    import time,os
    
    
    
    def man(l_tot,l_pap):
        l_tot.acquire()# 是男的获得厕所资源,把厕所锁上了
        print('alex在厕所上厕所')
        time.sleep(1)
        l_pap.acquire()# 男的拿纸资源
        print('alex拿到卫生纸了!')
        time.sleep(0.5)
        print('alex完事了!')
        l_pap.release()# 男的先还纸
        l_tot.release()# 男的还厕所
    
    def woman(l_tot,l_pap):
        l_pap.acquire()  # 女的拿纸资源
        print('小雪拿到卫生纸了!')
        time.sleep(1)
        l_tot.acquire()  # 是女的获得厕所资源,把厕所锁上了
        print('小雪在厕所上厕所')
        time.sleep(0.5)
        print('小雪完事了!')
        l_tot.release()  # 女的还厕所
        l_pap.release()  # 女的先还纸
    
    
    if __name__ == '__main__':
        l_tot = Lock()
        l_pap = Lock()
        t_man = Thread(target=man,args=(l_tot,l_pap))
        t_woman = Thread(target=woman,args=(l_tot,l_pap))
        t_man.start()
        t_woman.start()
    死锁演示-厕所纸
    from multiprocessing import Process
    from threading import Thread,RLock
    import time,os
    # RLock是递归锁 --- 是无止尽的锁,但是所有锁都有一个共同的钥匙
    # 想解决死锁,配一把公共的钥匙就可以了。
    
    def man(l_tot,l_pap):
        l_tot.acquire()# 是男的获得厕所资源,把厕所锁上了
        print('alex在厕所上厕所')
        time.sleep(1)
        l_pap.acquire()# 男的拿纸资源
        print('alex拿到卫生纸了!')
        time.sleep(0.5)
        print('alex完事了!')
        l_pap.release()# 男的先还纸
        l_tot.release()# 男的还厕所
    
    def woman(l_tot,l_pap):
        l_pap.acquire()  # 女的拿纸资源
        print('小雪拿到卫生纸了!')
        time.sleep(1)
        l_tot.acquire()  # 是女的获得厕所资源,把厕所锁上了
        print('小雪在厕所上厕所')
        time.sleep(0.5)
        print('小雪完事了!')
        l_tot.release()  # 女的还厕所
        l_pap.release()  # 女的先还纸
    
    
    if __name__ == '__main__':
        l_tot = l_pap = RLock()
        t_man = Thread(target=man,args=(l_tot,l_pap))
        t_woman = Thread(target=woman,args=(l_tot,l_pap))
        t_man.start()
        t_woman.start()
    解决死锁RLock()
    # 第一种情况 在同一个线程内,递归锁可以无止尽的acquire,但是互斥锁不行
    # 第二种情况,在不同的线程内,递归锁是保证只能被一个线程拿到钥匙,然后无止尽的acquire,其他线程等待

    (1)锁机制
    递归锁
    RLock() 可以有无止尽的锁,但是会有一把万能钥匙(获得的不是同一个锁,而是锁中锁,不停迭代)
    互斥锁:
    Lock() 一把钥匙配一把锁
    GIL:全局解释器锁
    锁的是线程,是CPython解释器上的一个锁,锁的是线程,意思是在同一时间只允许一个线程访问cpu


    (2) 信号量:
    from threading import Semaphore
    去看多进程的信号量

    from threading import Semaphore,Thread
    import time
    
    
    def func(sem,i):
        sem.acquire()
        print('第%s个人进入屋子'%i)
        time.sleep(2)
        print('第%s个人离开屋子'%i)
        sem.release()
    
    
    sem = Semaphore(20)
    for i in range(20):
        t = Thread(target=func,args=(sem,i))
        t.start()
    信号量 semaphore

    (3) 事件
    from threading import Event
    去看多进程的事件机制



    (4) 条件
    from threading import Condition
    条件是让程序员自行去调度线程的一个机制
    # Condition涉及4个方法
    # acquire()
    # release()
    # wait() 是指让线程阻塞住
    # notify(int) 是指给wait发一个信号,让wait变成不阻塞
    # int是指,你要给多少给wait发信号
    from threading import Thread,Condition
    import time
    # Condition涉及4个方法
    # acquire()
    # release()
    # wait()    是指让线程阻塞住
    # notify(int)  是指给wait发一个信号,让wait变成不阻塞
    #     int是指,你要给多少给wait发信号
    
    
    def func(con,i):
        con.acquire()
        con.wait()# 线程执行到这里,会阻塞住,等待notify发送信号,来唤醒此线程
        con.release()
        print('第%s个线程开始执行了!'%i)
    
    if __name__ == '__main__':
        con = Condition()
        for i in range(10):
            t = Thread(target=func,args=(con,i))
            t.start()
        while 1:
            num = int(input(">>>"))
            con.acquire()
            con.notify(num)# 发送一个信号给num个正在阻塞在wait的线程,让这些线程正常执行
            con.release()
    View Code
    from threading import Condition,Thread
    import time
    
    
    
    
    def func(con,i):
        con.acquire()# 主线程和10个子线程都在抢夺递归锁的一把钥匙。
        # 如果主线程抢到钥匙,主线程执行while 1,input,然后notify发信号,还钥匙。但是,此时如果主线程执行特别快
        # 极有可能接下来主线程又会拿到钥匙,那么此时哪怕其他10个子线程的wait接收到信号,但是因为没有拿到钥匙,所以其他子线程还是不会执行
        con.wait()
        print('第%s个线程执行了'%i)
        con.release()
    
    con = Condition()
    for i in range(10):
        t = Thread(target=func,args = (con,i))
        t.start()
    while 1:
        # print(123)
        con.acquire()
        num = input('>>>')
        con.notify(int(num))
        con.release()
        time.sleep(0.5)    # 主线程执行太快了 ,子线程还没来得及摸到到钥匙(还没到门口),咱们就在这搞循环,子线程根本穿不进去
    
    # 条件 涉及 4个方法:
    #    con.acquire()
    #    con.release()
    #    con.wait()  # 假设有一个初始状态为False,阻塞。一旦接受到notify的信号后,变为True,不再阻塞
    #    con.notify(int)  给wait发信号,发int个信号,会传递给int个wait,让int个线程正常执行
    解释版 条件

    (5) 定时器
    from threading import Timer
    # Timer(time,func)
    # time:睡眠的时间,以秒为单位
    # func:睡眠时间之后,需要执行的任务
    from threading import Timer# 定时器
    
    
    def func():
        print('就是这么nb!')
    
    Timer(2.5,func).start()
    # Timer(time,func)
    # time:睡眠的时间,以秒为单位
    # func:睡眠时间之后,需要执行的任务
    定时器


    今天的面试题:
    进程和线程的区别?
    你认为什么时候用多线程好?什么时候用多进程好?
    给你一个任务场景,让你去分析,如果让你去研发,你是选择用多线程还是多进程?
    解释以下GIL锁?
  • 相关阅读:
    Java8 Stream Function
    PLINQ (C#/.Net 4.5.1) vs Stream (JDK/Java 8) Performance
    罗素 尊重 《事实》
    小品 《研发的一天》
    Java8 λ表达式 stream group by max then Option then PlainObject
    这人好像一条狗啊。什么是共识?
    TOGAF TheOpenGroup引领开发厂商中立的开放技术标准和认证
    OpenMP vs. MPI
    BPMN2 online draw tools 在线作图工具
    DecisionCamp 2019, Decision Manager, AI, and the Future
  • 原文地址:https://www.cnblogs.com/zhuangdd/p/12819099.html
Copyright © 2011-2022 走看看