zoukankan      html  css  js  c++  java
  • 多线程

    多线程初识

    threading 包 :threading.Thread创建多线程的两种方式

    多线程的组件:threading包中的模块:注意只有队列不是用 threading包

    线程池: concurrent.futures

    GIL—全局解释器锁

    一、多线程初识:

    #线程特点
    #1.轻型实体,占用非常小的资源
    #2.独立调度和分派的基本单位:即cup实际上调度的是线程
    #3.共享进程资源
    #4.可并发执行
    线程与进程的区别可以归纳为以下4点:
      1)地址空间和其它资源(如打开文件):进程间相互独立,同一进程的各线程间共享。某进程内的线程在其它进程不可见。
      2)通信:进程间通信IPC,线程间可以直接读写进程数据段(如全局变量)来进行通信——需要进程同步和互斥手段的辅助,以保证数据的一致性。
      3)调度和切换:线程上下文切换比进程上下文切换要快得多。
      4)在多线程操作系统中,进程不是一个可执行的实体。
    线程与进程的区别
    #进程是最小的内存分配单位
    #线程是操作系统调度最小单位
    #线程被CPU执行了
    #进程内至少含有一个线程,称为主线程
    #进程中可以开启多个线程
        #开启一个线程所需要的时间要远远小于开启一个进程
        #多个线程内部有自己的数据栈,数据不共享。
        #全局变量在多个线程之间是共享的
    #注意:子进程中不能用input,子线程中是可以使用的
    import time
    import os
    from threading import Thread
    
    def func(a,b):
        n = a + b
        print('in the n',n)
        # print('func pid: ', os.getpid())
        global g
        g = 0
        print(g)
    
    
    g = 100
    t_lst = []
    for i in range(10):
        # print('pid',os.getpid())
        t = Thread(target=func,args=(i,5,))
        t_lst.append(t)
        t.start()
    [t.join() for t in t_lst]
    print('g ',g)
    
    
    #进程
    #当前写的代码,导入的模块,文件所在的位置,内置函数
    #这些和操作系统,python解释器,占用内存大的都放进程里,线程不放
    
    #线程
    #主线程:i呀,t呀这些参数
    #子线程:
        # 栈:图在有道笔记0928。这里用文字描述。
        #     栈用矩形表示,先进后出。比如在子线程中 a=0,b=5,操作方式为add,输出为n,这些一次入栈
        #     由于n后入栈,n出来后,得到add,再得到 a,b从而就获得了n的值
    线程,进程存储了哪些信息?

     进程,线程,协程概览:

    #进程的缺点:
    # 进程只能在一个时间干一件事,如果想同时干两件事或多件事,进程就无能为力了。
    # 进程在执行的过程中如果阻塞,例如等待输入,整个进程就会挂起,即使进程中有些工作不依赖于输入的数据,也将无法执行。
    
    #   60年代,在OS中能拥有资源和独立运行的基本单位是进程,然而随着计算机技术的发展,进程出现了很多弊端,
    # 一是由于进程是资源拥有者,创建、撤消与切换存在较大的时空开销,因此需要引入轻型进程;二是由于对称多处理机(SMP)出现,
    # 可以满足多个运行单位,而多个进程并行开销过大。
    #   因此在80年代,出现了···能独立运行的基本单位——线程(Threads)。
    # ······注意:进程是资源分配的最小单位,线程是CPU调度的最小单位.
    #所以说进程间都有各自的内存空间,只能通信来获取数值
    #      每一个进程中至少有一个线程
    
    #可以把进程看做一个车间,每个进程肯定有一个主线程,类比车间必定有一个工人(主线程)
    #把线程看作车间的工人
    #CPU就是生产零件的机器,工人搬材料给cpu制作。此时如果要多生产零件,我们平时都是在一直增加车间(即增加进程),从而多了工人来运作零件制造
    #但是增加线程,就好比只在一个车间里增加工人,就每必要每次都要新开车间,时空开销这么大了
    
    #在操作系统中的 分时调度系统里,我们所说的切换进程,其实就是切换进程的主线程
    
    #内存中的进程:
    #从头到尾   栈,堆,数据,文本
    
    #进程中有:
    #代码,数据,文件 线程(线程有 寄存器,栈,线程本身)
    # 线程有 寄存器,栈,线程本身
    
    
    # 线程与进程的区别可以归纳为以下4点:
    #   1)地址空间和其它资源(如打开文件):进程间相互独立,同一进程的各线程间共享。某进程内的线程在其它进程不可见。
    #   2)通信:进程间通信IPC,线程间可以直接读写进程数据段(如全局变量)来进行通信——需要进程同步和互斥手段的辅助,以保证数据的一致性。
    #   3)调度和切换:线程上下文切换比进程上下文切换要快得多。
                #有时也称做进程切换或任务切换,是指CPU 从一个进程或线程切换到另一个进程或线程。
    #   4)在多线程操作系统中,进程不是一个可执行的实体。
                #真正执行的是线程
    #http://www.ruanyifeng.com/blog/2013/04/processes_and_threads.html
    
    #线程特点
    #1.轻型实体,占用非常小的资源
    #2.独立调度和分派的基本单位:即cup实际上调度的是线程
    #3.共享进程资源
    #4.可并发执行
    
    
    #协程:
        #本质上是一个线程
        #能够在多个任务之间切换来节省一些IO时间
        #协程中任务之间的切换也消耗时间,但是开销远远小于进程线程之间的切换
    
    #协程的意义:
        #在遇到IO操作的时候,切换到另外一个任务
        #规避之前任务的IO时间,来提高cpu的利用率
        #在实际工作中会采用:进程+线程+协程,来提高代码的并发效果
        #进程是cpu核数+1,线程是cpu核数*5,每个线程中协程最多可以起500个
    #比如:
        #发送了一个网页请求后,在网络延时,等待网页响应的时间(等待IO),就可以用协程去切换任务利用等待的时间,继续发送多个网页请求,从而提高效率
        #进程5,线程20,协程500 = 总共可以有50000个协程:一台4c的机器最多可以接收的并发数
    #数据库,负载均衡,让很多个请求,平均分摊给各个服务器
        #nginx组件 大型互联网公司会用到,就是用来帮你分发任务的,并发最大承载量就是50000,用的就是协程机制
        #一般情况下就是根据这个规则,上下浮动
    进程,线程,协程

    二.使用threading.Thread 创建多线程:


    (1)和进程一样,多线程也有两种创建方法:

    #主要用threading
    # Python提供了几个用于多线程编程的模块,包括thread、threading和Queue等。
    # thread和threading模块允许程序员创建和管理线程。thread模块提供了基本的线程和锁的支持,threading提供了更高级别、功能更强的线程管理的功能。
    # Queue模块允许用户创建一个可以用于多个线程之间共享数据的队列数据结构。
    #
    import time
    import os
    from threading import Thread
    
    def func(n):
        time.sleep(1)
        print('in the n',n)
        print('func pid: ',os.getpid())
    
    for i in range(10):
        print('pid',os.getpid())
        t = Thread(target=func,args=(1,))
        t.start()
    
    #因为不开启进程,所以不用 if __name__ == '__main__'
    #其余的启动线程,start线程,就和期用进程一模一样了。各个线程之间也有三态,
    # 执行的时候因为是并发的,所以不一定按顺序
    
    
    #第二种方式,也是启动进程一样。1.继承Thread,2.有run方法,里面是线程代码,3.参数用__init__传入,父类参数记得super().__init__()
    # class Mythread(Thread):
    #     def __init__(self,n):
    #         super().__init__()
    #         self.n = n
    #     def run(self):
    #         print('in the run',self.n+1)
    #
    # m = Mythread(1)
    # m.start()
    
    
    # https://docs.python.org/3/library/threading.html?highlight=threading# 线程官方说明
    # import requests
    # url = r'https://docs.python.org/3/library/threading.html?highlight=threading#'
    # res = requests.get(url)
    # print(res.status_code)
    # print(res.content)
    多线程的两种创建方法
    import time
    from multiprocessing import Process
    from threading import Thread
    
    def func(n):
        n + 1
    
    if __name__ == '__main__':
        start = time.time()
        p_lst = []
        for i in range(100):
            p = Process(target=func,args=(i,))
            p.start()
            p_lst.append(p)
        for p in p_lst:p.join()
        t1 = time.time() - start
    
        start2 = time.time()
        t_lst = []
        for i in range(100):
            t = Thread(target=func, args=(i,))
            t.start()
            t_lst.append(t)
        for t in t_lst: t.join()
        t2 = time.time() - start2
        print(t1,t2)
    线程和进程的效率比较

    threading.current_thread(), .active_count(), .enumerate()

    import threading,time
    def haha(n):
        # print(n)
        time.sleep(0.5)
        print('名字和ID,',threading.current_thread())
        print('id',threading.get_ident())
    
    for i in range(10):
        threading.Thread(target=haha,args=(1,)).start()
    print(threading.current_thread())
    print(threading.active_count())#当前活着的线程数,记得算上主线程
    print(threading.enumerate()) #存储所有线程名字和id的一个列表
    Thread的其他方法

    (2)守护线程:

    #1 主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),
      # 然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束,
    #2 主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。
      # 因为主线程的结束意味着进程的结束,进程整体的资源都将被回收,而进程必须保证非守护线程都运行完毕后才能结束。

    #和进程一样,主线程会等到子线程执行完才结束。 设置守护线程也是 t.daemon=Ture
    #守护进程会随着主进程的结束而结束
    #守护线程会在主线程结束之后等待子线程的结束才结束,主进程只是代码运行结束,但是没关闭,因为主进程要留着内存资源给子线程用
    #但是如果进程既有子线程,也有子进程,会等到所有代码结束完,才结束。因为主进程没结束,所以守护线程还会运行
    import time
    from threading import Thread
    from multiprocessing import Process
    def func1():
        while True:
            print('*'*10)
            time.sleep(1)
    
    def func2():
        print('fun222')
        time.sleep(3)
        # print('funcccc')
    
    t= Thread(target=func1)
    t.daemon=True #设置守护线程,随着主线程结束而结束
                  #守护线程会在主线程结束之后等待其他子线程的结束才结束
    t.start()
    t2= Thread(target=func2)
    # t2.daemon=True
    t2.start()
    t2.join()#感知func2结束,才会继续执行主线程代码
    print('主线程')
    
    # if __name__ == '__main__':
    #     t = Process(target=func1)
    #     # t = Thread(target=func1)
    #     t.daemon = True  # 设置守护线程,随着主线程结束而结束
    #     # 守护进程会随着主进程的结束而结束
    #     t.start()
    #     t2 = Thread(target=func2)
    #     # t2 = Process(target=func2)
    #     # t2.daemon = True
    #     t2.start()
    #     print('主线程')
    守护线程

     (3)使用多线程实现的socket客户端的并发:

    import socket
    from threading import Thread
    
    def func(conn):
        conn.send(b'hello threading')
        msg = conn.recv(1024).decode('utf8')
        print(msg)
        conn.close()
    
    sk = socket.socket()
    sk.bind(('127.0.0.1',8080))
    sk.listen()
    
    while True:
        conn,addr = sk.accept()
        t = Thread(target=func,args=(conn,))
        t.start()
    sk.close()
    server端
    import socket
    
    sk = socket.socket()
    sk.connect(('127.0.0.1',8080))
    
    msg = sk.recv(1024)
    print(msg)
    
    ret = input('>>>> ')
    sk.send(ret.encode('utf8'))
    
    sk.close()
    client端

    三.threading包中的其他组件:


    (1)锁: 同步锁/互斥锁threading.Lock   以及  递归锁threading.RLock .

    所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁

    import time
    from threading import Thread,Lock
    noodle_lock = Lock()
    fork_lock = Lock()
    def eat1(name):
        noodle_lock.acquire()
        print('%s 抢到了面条'%name)
        fork_lock.acquire()
        print('%s 抢到了叉子'%name)
        print('%s 吃面'%name)
        fork_lock.release()
        noodle_lock.release()
    
    def eat2(name):
        fork_lock.acquire()
        print('%s 抢到了叉子' % name)
        time.sleep(1)
        noodle_lock.acquire()
        print('%s 抢到了面条' % name)
        print('%s 吃面' % name)
        noodle_lock.release()
        fork_lock.release()
    
    for name in ['哪吒','egon','yuan']:
        t1 = Thread(target=eat1,args=(name,))
        t2 = Thread(target=eat2,args=(name,))
        t1.start()
        t2.start()
    
    死锁问题
    死锁

    著名的科学家吃面问题:

    #Lock互斥锁,但凡互斥锁都有死锁问题。(当用到两把以上锁,可能会出现死锁现象)
    
    #进程三大消息中间件:一般不用我们之前学习的那些IPC。所以进程一般用不到我们自己加锁
    # kafka:大数据的 消息中间件 可保留数据
    # rebbitmq
    # memcache:不可保留
    
    #由于存在操作系统时间片的轮换,刚好线程拿到数据的时候,时间片轮换了,(其他线程也拿到了这个数据)
    # 不是GIL让线程休眠,而是系统时间片轮换强制休眠,所以线程把数据还回去了(操作完了)。其他线程就可以使用数据了(前提是这个代码足够到让时间片轮换)
    #所以即使有GIL锁还是存在数据不安全性的
    
    import time
    from threading import Thread,Lock
    #
    # def func(lock):
    #     global n
    #     with lock:  #lock.acquire()  lock.release()  #没有锁结果为9
    #         temp = n
    #         time.sleep(0.2)
    #         n = temp - 1
    #
    # n = 10
    # t_lst = []
    # lock = Lock()
    # for i in range(10):
    #     t = Thread(target=func,args=(lock,))
    #     t.start()
    #     t_lst.append(t)
    # [t.join() for t in t_lst]
    # print(n)
    
    
    #科学家吃面,经典的死锁问题(进程也有死锁问题)
    # noodle_lock  = Lock()
    # fork_lock = Lock()
    # def eat1(name):
    #     noodle_lock.acquire()
    #     print('%s拿到面条啦'%name)
    #     fork_lock.acquire()
    #     print('%s拿到叉子了'%name)
    #     print('%s吃面'%name)
    #     fork_lock.release()
    #     noodle_lock.release()
    #
    # def eat2(name):
    #     fork_lock.acquire()
    #     print('%s拿到叉子了'%name)
    #     time.sleep(1)
    #     noodle_lock.acquire()
    #     print('%s拿到面条啦'%name)
    #     print('%s吃面'%name)
    #     noodle_lock.release()
    #     fork_lock.release()
    
    #在只有同步锁,没有递归锁的情况下,会产生如下注释
    # Thread(target=eat1,args=('alex',)).start()   #1.拿到面条钥匙,拿到叉子钥匙,吃面成功
    # Thread(target=eat2,args=('Egon',)).start()   #1.拿到叉子钥匙,阻塞
    # Thread(target=eat1,args=('bossjin',)).start() # 1.alex吃完面,因为叉子钥匙在egon手里,所以jin 拿到了面条钥匙
    # Thread(target=eat2,args=('nezha',)).start()  # 1.通过前3步,此时nezha啥也没拿到
    
    
    #递归锁,解决死锁问题
    from threading import RLock #递归锁
    # lock2 = RLock() # 理解为一串钥匙
    # lock2.acquire()
    # lock2.acquire()
    # lock2.acquire()
    # lock2.acquire()
    # print(123)
    #在只有一个线程的时候,Rlock不起效果
    #但是如果多个线程,其中一个线程拿了acqure,其他子线程就拿不到了
    #Rlock 可以理解为一串钥匙,假设进3道门。一个线程拿着这一串钥匙(3把),要逐一进到门里,拿到结果,(流程图在有道词典0928)
    #然后逐一返回回来,下一个进程才可以继续用这钥匙串,去进入这3道门
    
    
    noodle_lock2 = fork_lock2 = RLock() #一串钥匙,一旦拿在手里,别人就拿不到钥匙了,不存在死锁
    def eat1(name):
        noodle_lock2.acquire()
        print('%s拿到面条啦'%name)
        fork_lock2.acquire()
        print('%s拿到叉子了'%name)
        print('%s吃面'%name)
        fork_lock2.release()
        noodle_lock2.release()
    
    def eat2(name):
        fork_lock2.acquire()
        print('%s拿到叉子了'%name)
        time.sleep(1)
        noodle_lock2.acquire()
        print('%s拿到面条啦'%name)
        print('%s吃面'%name)
        noodle_lock2.release()
        fork_lock2.release()
    
    #在只有同步锁,没有递归锁的情况下,会产生如下注释
    Thread(target=eat1,args=('alex',)).start()   #1.拿到面条钥匙,拿到叉子钥匙,吃面成功
    Thread(target=eat2,args=('Egon',)).start()   #1.拿到叉子钥匙,阻塞
    Thread(target=eat1,args=('bossjin',)).start() # 1.alex吃完面,因为叉子钥匙在egon手里,所以jin 拿到了面条钥匙
    Thread(target=eat2,args=('nezha',)).start()  # 1.通过前3步,此时nezha啥也没拿到
    同步锁和递归锁

    (2)信号量: threading.Semaphore,同进程一样.

    Semaphore管理一个内置的计数器,
    每当调用acquire()时内置计数器-1;
    调用release() 时内置计数器+1;
    计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。

    import time
    from threading import Thread,Semaphore
    
    def fun(sem,a,b):
        sem.acquire()
        time.sleep(0.5)
        n = a+b
        print(n)
        sem.release()
    
    sem = Semaphore(4)
    for i in range(10):
        t = Thread(target=fun,args=(sem,i,i+5,))
        t.start()
    threading.Semaphore

    (3)事件: threading.Event

    import time,random
    from threading import Thread,Event
    
    def check_db(e):
        count = 0
        while count<3:
            e.wait(1) #在状态为false的情况下,只等待1秒钟
            if e.is_set() == True:
                print('第%s次连接,连接成功'%(count+1))
                break
            else:
                print('第%s次连接,连接失败'%(count+1))
            count += 1
        else:
            raise TimeoutError('33[31m超时,请重新尝试33[0m')
        # if count == 3 :
    
    
    def check_web(e):
        time.sleep(random.randint(0,4))
        e.set()
    
    e = Event()
    t = Thread(target=check_web,args=(e,))
    t2 = Thread(target=check_db,args=(e,))
    t.start()
    t2.start()
    
    #写错的地方
    #1.target里的函数竟然加了括号,2.t2没有start 3.e.wait()没放在while里,4.count忘了+1
    Event-连接数据库的例子

    (4)条件: threading.Condition

    使得线程等待,只有满足某条件时,才释放n个线程

    Python提供的Condition对象提供了对复杂线程同步问题的支持。Condition被称为条件变量,除了提供与Lock类似的acquire和release方法外,还提供了wait和notify方法。线程首先acquire一个条件变量,然后判断一些条件。如果条件不满足则wait;如果条件满足,进行一些处理改变条件后,通过notify方法通知其他线程,其他处于wait状态的线程接到通知后会重新判断条件。不断的重复这一过程,从而解决复杂的同步问题。
    Condition初识
    #有点像 semaphore,不过condition钥匙是一次性的。生产中不太会使用这个组件
    from threading import Condition,Thread
    #条件 类似锁
    #有acquire release
    #一个条件被创立之初默认有一个falese状态
    #falese状态会影响wait一直出于等待状态
    #notify(int) 接收一个int类型 ,造一次性使用的钥匙
    
    def func(con,i):
        con.acquire()  #首先 wait和notify都要在 acquire和release中间实现
        con.wait(2)     #等钥匙,没钥匙就一直wait,但是如果wait里给参数,比如wait(2)那么就只等待2秒
        print('在第%s个循环里'%i)
        con.release()
    
    con = Condition()
    for i in range(10):
        t = Thread(target=func,args=(con,i,))
        t.start()
    while True:
        num = int(input('>>> '))
        con.acquire()
        # num = int(input('>>> '))
        con.notify(num)  #首先 wait和notify都要在 acquire和release中间实现
        con.release()    #notify表示造了多少把一次性钥匙
                         #当这些线程都被锁阻挡在外面,那一把一次性钥匙进去后,钥匙就没了也不返回,其他子线程要等着notify继续造钥匙才能进去
    Condition例子和解释

    (5)定时器:  from threading import Timer

    定时器,指定n秒后执行某个操作

    #定时器不需要启动thread,#interval间隔的意思
    #和thread一样,是异步的
    #timer #初始化2个参数,定时秒数和func
    import time
    from threading import Timer
    
    def func():
        print('时间同步')
    
    while True:
        Timer(2,func).start() #注意,此处Timer是异步不阻塞的,这里的2秒是针对函数来说的,它起了线程后,会马上执行后面的代码
        time.sleep(2)
    #为什么此处还需要time.sleep,因为timer是异步的,用while True会瞬间开启很多个进程,睡2秒后,同时一起运行 func。
    #所以为了实现每2秒进行一次同步,就要让 while True 每2秒才开启一个线程。
    #同时巧妙的 让定时器等待的2秒时间和time.sleep等待的2秒时间,并发等待。达到2秒执行一次func的目的
    
    #为什么不在func中sellp2秒?
    #因为时间同步也有时间,因此在函数外头实现,比较好
    定时器例子和解释

    (6)队列: import queue ---->  queue.Queue  注意这里使用的是普通的队列模块

    #队列中内置了很多锁,是数据安全的
    #由于GIL 全局解释性锁的不安全性
    # 存在操作系统时间片的轮换,刚好线程拿到数据的时候,时间片轮换了,(其他线程也拿到了这个数据)

    #所以线程间通信,有时候为了让线程数据保证安全,还是要依赖队列

    #队列中内置了很多锁,是数据安全的
    #由于GIL 全局解释性锁的不安全性
    # 存在操作系统时间片的轮换,刚好线程拿到数据的时候,时间片轮换了,(其他线程也拿到了这个数据)
    
    #所以线程间通信,有时候为了让线程数据保证安全,还是要依赖队列
    
    #注意与进程导入不同,这里直接import queue即可
    import queue
    # q = queue.Queue()  #队列 先进先出
    # q.put()
    # q.put_nowait() #如果队列是有长度得,那么放满了此时不会阻塞,而是直接报错
    # q.get()
    # q.get_nowait() #如果队列已经空了,那么继续get不会阻塞,而是直接报错
    
    #可以用异常处理,来让队列不阻塞也不报错
    #     try:
    #         q.get_nowait() #为空会报错
    #     except:
    #         print('队列已空')
    #         time.sleep(0.5)
    
    #LifoQueue 类似栈
    # q = queue.LifoQueue #先进后出
    # q.put(1)
    # q.put(2)
    # q.put(3)
    # print(q.get()) #最先打印3,因为3最后进
    
    #PrioritityQueue
    q2 = queue.PriorityQueue()  #优先队列,put参数传入为一个元组 (优先级,要传入得值)
    q2.put((20,'a'))
    q2.put((10,'b'))
    q2.put((30,'c'))
    q2.put((3,'d'))
    print(q2.get())
    #数字越小,代表优先级越高。当优先级一样的时候,根据传入的值的 ASCII码值的顺序,进行排列
    队列例子和解释

    #有普通的先进先出队列, LifoQueue 类似栈,先进后出  , PrioritityQueue队列,优先队列,数值越小优先级越高

    四.线程池: concurrent.futures


    concurrent.futures模块提供了高度封装的异步调用接口

    既可以用来开启多线程,也可以用来开启多进程

    #1 介绍
    concurrent.futures模块提供了高度封装的异步调用接口
    ThreadPoolExecutor:线程池,提供异步调用
    ProcessPoolExecutor: 进程池,提供异步调用
    Both implement the same interface, which is defined by the abstract Executor class.
    
    #2 基本方法
    #submit(fn, *args, **kwargs)
    异步提交任务
    
    #map(func, *iterables, timeout=None, chunksize=1) 
    取代for循环submit的操作
    
    #shutdown(wait=True) 
    相当于进程池的pool.close()+pool.join()操作
    wait=True,等待池内所有任务执行完毕回收完资源后才继续
    wait=False,立即返回,并不会等待池内的任务执行完毕
    但不管wait参数为何值,整个程序都会等到所有任务执行完毕
    submit和map必须在shutdown之前
    
    #result(timeout=None)
    取得结果
    
    #add_done_callback(fn)
    回调函数
    concurrent.futures模块介绍
    #早期的时候压根没有线程池
    #concurrent.futures 新出的模块
    # ThreadPoolExecutor:线程池,提供异步调用
    # ProcessPoolExecutor: 进程池,提供异步调用
    
    import time
    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    def func(n):
        time.sleep(1)
        print(n)
        return n*n
    
    
    t_lst = []
    t = ThreadPoolExecutor(max_workers=5)  #注意和进程池建议个数为CPU核数+1不同,这里建议最大为 cpu核数*5
    #ProcessPoolExecutor 如果想改为进程池,只要在这里改为 t = #ProcessPoolExecutor(max_workers=5),以下完全不用变
    for i in range(20):
        t_result = t.submit(func,i) ##异步开启线程,类似 apply_async
        t_lst.append(t_result)
    # t.shutdown()      #功能类似 pool.close()+pool.join() 组合效果
    print('主线程')
    for t_result in t_lst:print('***',t_result.result()) #类似 res = pool.apply_async.......  res.get()
    
    # map的用法
    # t.map(func,range(20)) #但是用map是拿不到返回值的,# multiprocessing.Pool中的进程池是可以拿到的
    
    #····把 t.shutdown()注释掉,建议要注释掉
    #那么20个线程,线程池里是5个,每个睡2秒,也就是总共需要8秒时间。如果有shutdown,主线程要8秒后才能给我 result的值
    #但是如果注释掉shutdown,那么 每次5个线程一运行完,t_slt马上有值,则 result马上能给我传值
    #在这20个线程运行期间就可以 边运行边传值,不用再等待所有线程运行完了。提高了效率
    
    #注意 for循环里肯定是按顺序开启线程的,但是for循环太快了,所以导致启动的线程,有些是进入就绪态,有些是进入执行态的,所有线程执行结果顺序是乱的
    #但是在for循环里 list.append list里肯定是按顺序的
    
    
    # 死锁的产生
    # import time
    # def wait_on_b():
    #     time.sleep(5)
    #     print(b.result())  # b will never complete because it is waiting on a.
    #     return 5
    #
    # def wait_on_a():
    #     time.sleep(5)
    #     print(a.result())  # a will never complete because it is waiting on b.
    #     return 6
    #
    #
    # executor = ThreadPoolExecutor(max_workers=2)
    # a = executor.submit(wait_on_b)
    # b = executor.submit(wait_on_a)
    例子和解释

    该模块中拿到返回值要用 result方法,不是进程池中的get方法了,具体看上面的例子

    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    from multiprocessing import Pool
    import requests
    import json
    import os
    
    def get_page(url):
        print('<进程%s> get %s' %(os.getpid(),url))
        respone=requests.get(url)
        if respone.status_code == 200:
            return {'url':url,'text':respone.text}
    
    def parse_page(res):
        res=res.result()
        print('<进程%s> parse %s' %(os.getpid(),res['url']))
        parse_res='url:<%s> size:[%s]
    ' %(res['url'],len(res['text']))
        with open('db.txt','a') as f:
            f.write(parse_res)
    
    
    if __name__ == '__main__':
        urls=[
            'https://www.baidu.com',
            'https://www.python.org',
            'https://www.openstack.org',
            'https://help.github.com/',
            'http://www.sina.com.cn/'
        ]
    
        # p=Pool(3)
        # for url in urls:
        #     p.apply_async(get_page,args=(url,),callback=pasrse_page)
        # p.close()
        # p.join()
    
        p=ProcessPoolExecutor(3)
        for url in urls:
            p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一个future对象obj,需要用obj.result()拿到结果
            
    线程池小爬虫
    from concurrent.futures import ThreadPoolExecutor
    
    def func(n):
        print('***',n)
        return n*n
    
    def func2(m):
        print('++',m.result())  #返回值是一个对象,要用 result去获取
    
    tpool = ThreadPoolExecutor(max_workers=5)
    tpool.submit(func,5).add_done_callback(func2)
    #进程池中的  pool.apply_async(func,args=(5,),callback=func2)
    线程池-回调函数

     五.GIL—全局解释器锁:

    #GIL 全局解释器锁
    #我们现在的CPU基本上是4核以上,完全可以同一时刻同时执行几个线程
    #但是同一时刻执行,就有可能发生,两个不同线程,同时向进程获取全局变量,操作完后写入这个全局变量后,会产生混乱
    #所以在 Cpython解释器里加了一把锁,这个锁就是  全局解释器锁,简称GIL
    #当A线程找进程拿了数据后,此时其他线程就拿不到对应的数据,只有当A线程处理完数据,返回给进程后,其他线程才能使用
    #因此GIL 使得 同一时刻只能有一个线程访问CPU
    #GIL锁住的是线程,
    #那是因为python语言的问题吗,不是的 是 cpython解释器的特性。jpython就 java语言写的解释器,就没有GIL。不过我们现在用的都是cpython
    
    #编译型解释器,会在编译过程中大大的回避这个问题
    #但是目前还没有完善的方案,去解决GIL的问题
    
    #高CPU:所以在高CPU使用率的的程序中,比如计算类,确实python不占优势
    #但是我们一般写的程序很少用到这么连续使用CPU的情况。如果非要用到高计算类的,那就可以使用多进程
    #高IO:我们主要写的是高IO的程序,#而且在高计算类的时候,一般也不使用多线程
            #高并发:爬取200个网页,大部分时间都在等待网络延迟,不是高计算
                     # QQ聊天
                     #处理日志文件,大部分时间在等待读文件,写文件(IO)
                     #处理web请求, 还有比如登陆验证,读写数据库
    
    
    #GIL运行步骤,再次强调锁是锁住线程
    # Python代码的执行由Python虚拟机(也叫解释器主循环)来控制。Python在设计之初就考虑到要在主循环中,
    # 同时只有一个线程在执行。虽然 Python 解释器中可以“运行”多个线程,但在任意时刻只有一个线程在解释器中运行。
    #   对Python虚拟机的访问由全局解释器锁(GIL)来控制,正是这个锁能保证同一时刻只有一个线程在运行。
    #   在多线程环境中,Python 虚拟机按以下方式执行:
    #   a、设置 GIL;
    #   b、切换到一个线程去运行;
    #   c、运行指定数量的字节码指令或者线程主动让出控制(可以调用 time.sleep(0));
    #   d、把线程设置为睡眠状态;
    #   e、解锁 GIL;
    #   d、再次重复以上所有步骤。
    
    #协程
    #由于cpython解释器中的GIL原因,导致python中多线程被弱化了,而且切换多个线程之间也要时间开销
    #所以就出现了协程,协程的切换效率更快,把1个线程的作用发挥到了极致,提高1个cpu的利用率。减少线程的时间开销
    #java里也有协程,但是没有这么被重视
  • 相关阅读:
    Audacious 1.3.0
    medit-多平台的 GTK 文本编辑器
    PIDA:基于 Gtk 的 Python IDE
    Xfce 4.4.0 for Ubuntu Edgy Eft
    Lemonrip-DVD 提取对象
    Tomboy 0.5.5
    网管经验谈:成功网管必备素质硬件篇
    再学 GDI+[35]: TGPPen 虚线画笔位移 SetDashOffset
    再学 GDI+[33]: TGPPen 画笔的几何变换 SetTransform
    再学 GDI+[37]: TGPPen TGPCustomLineCap、SetCustomStartCap、SetCustomEndCap
  • 原文地址:https://www.cnblogs.com/gkx0731/p/9744671.html
Copyright © 2011-2022 走看看