模块:threading
Thread
开启方式,join()方法、互斥锁、守护线程与多进程相同
进程只是用来把资源集中到一起(进程只是一个资源单位,或者说资源集合),而线程才是cpu上的执行单位。
与多进程的区别:
1、开进程的开销远大于开线程
2、同一进程内的多个线程共享该进程的地址空间
# mutex # 互斥锁:牺牲效率,保证数据安全 from threading import Thread, Lock import time n = 100 def task(): # 多进程互斥锁必须传参? global n mutex.acquire() temp = n time.sleep(0.01) n = temp - 1 mutex.release() if __name__ == '__main__': mutex = Lock() t_l = [] for i in range(100): t = Thread(target=task) t_l.append(t) t.start() for t in t_l: t.join() # time.sleep(0.1) print('主', n)
GIL全局解释器锁
GIL是cpython解释器的特性,不是python的特性
GIL保证同一时间只能有一个线程被执行
当多线程的python代码执行时,所有线程都去抢GIL锁,谁抢到执行谁
当该线程执行完或者该线程内遇到IO操作时,释放GIL,其他线程去抢GIL
如果这多个线程操作同一个内存数据,就会导致错误
所以要保证数据安全,就要在多线程任务中自定义一把互斥锁,
即使第一次抢到GIL的线程遇到IO时,由于互斥锁未释放,其他线程也不会被执行
# 计算密集型:用多进程 # from multiprocessing import Process # from threading import Thread # import os,time # def work(): # res=0 # for i in range(100000000): # res*=i # # # if __name__ == '__main__': # l=[] # # print(os.cpu_count()) #本机为8核 # start=time.time() # for i in range(8): # # p=Process(target=work) #耗时8s多 # p=Thread(target=work) #耗时37s多 # l.append(p) # p.start() # for p in l: # p.join() # stop=time.time() # print('run time is %s' %(stop-start)) # IO密集型:用多线程 from multiprocessing import Process from threading import Thread import threading import os,time def work(): time.sleep(2) if __name__ == '__main__': l=[] # print(os.cpu_count()) #本机为4核 start=time.time() for i in range(400): # p=Process(target=work) #耗时2.697多,大部分时间耗费在创建进程上 p=Thread(target=work) #耗时2.02多 l.append(p) p.start() for p in l: p.join() stop=time.time() print('run time is %s' %(stop-start))
死锁与递归锁:
互斥锁只能acquire一次,释放之后才能被再次获取
递归锁:可以连续acquire多次,每acquire一次计数器+1,只有计数为0时,才能被抢到acquire
from threading import Thread,Lock import time mutexA=Lock() mutexB=Lock() class MyThread(Thread): def run(self): self.f1() self.f2() def f1(self): mutexA.acquire() print('%s 拿到了A锁' %self.name) mutexB.acquire() print('%s 拿到了B锁' %self.name) mutexB.release() mutexA.release() def f2(self): mutexB.acquire() print('%s 拿到了B锁' % self.name) time.sleep(0.1) mutexA.acquire() print('%s 拿到了A锁' % self.name) mutexA.release() mutexB.release() if __name__ == '__main__': for i in range(10): t=MyThread() t.start()
from threading import Thread, RLock import time mutexB = mutexA = RLock() class MyThread(Thread): def run(self): self.f1() self.f2() def f1(self): mutexA.acquire() print('%s 拿到了A锁' % self.name) mutexB.acquire() print('%s 拿到了B锁' % self.name) mutexB.release() mutexA.release() def f2(self): mutexB.acquire() print('%s 拿到了B锁' % self.name) time.sleep(7) mutexA.acquire() print('%s 拿到了A锁' % self.name) mutexA.release() mutexB.release() if __name__ == '__main__': for i in range(10): t = MyThread() t.start()
信号量
也是锁,互斥锁可比喻为一件房子只有一把锁,一次只有一个人能进入
信号量可比喻为一间房安装了多把锁,抢到任何一把锁都能进入
from threading import Thread, Semaphore, currentThread import time, random sm = Semaphore(3) def task(): # sm.acquire() # print('%s in' %currentThread().getName()) # sm.release() with sm: print('%s in' % currentThread().getName()) time.sleep(random.randint(1, 3)) if __name__ == '__main__': for i in range(10): t = Thread(target=task) t.start()
Event事件
Event对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行
from threading import Event event.isSet():返回event的状态值; event.wait():如果 event.isSet()==False将阻塞线程; event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度; event.clear():恢复event的状态值为False。
from threading import Thread, Event import time event = Event() # event.wait() # event.set() def student(name): print('学生%s 正在听课' % name) event.wait(3) # 超时时间,到时间后自动往下执行 print('学生%s 课间活动' % name) def teacher(name): print('老师%s 正在授课' % name) time.sleep(5) print('下课') event.set() if __name__ == '__main__': stu1 = Thread(target=student, args=('alex',)) stu2 = Thread(target=student, args=('wxx',)) stu3 = Thread(target=student, args=('yxx',)) t1 = Thread(target=teacher, args=('egon',)) stu1.start() stu2.start() stu3.start() t1.start()
定时器:指定n秒后执行某操作
from threading import Timer def task(name): print('hello %s' % name) t = Timer(5, task, args=('egon',)) t.start()
from threading import Timer import random class Code: def __init__(self): self.make_cache() def make_cache(self, interval=5): self.cache = self.make_code() print(self.cache) self.t = Timer(interval, self.make_cache) self.t.start() def make_code(self, n=4): res = '' for i in range(n): s1 = str(random.randint(0, 9)) s2 = chr(random.randint(65, 90)) res += random.choice([s1, s2]) return res def check(self): while True: code = input('请输入你的验证码>>: ').strip() if code.upper() == self.cache: print('验证码输入正确') self.t.cancel() break obj = Code() obj.check()
队列
import queue q = queue.Queue(3) # 先进先出->队列 q.put('first') q.put(2) q.put('third') print(3) q.put(4) print(4) # q.put(4,block=False) #q.put_nowait(4) # q.put(4,block=True,timeout=3) # # # # # print(q.get()) # print(q.get()) # print(q.get()) # # print(q.get(block=False)) #q.get_nowait() # # print(q.get_nowait()) # # # print(q.get(block=True,timeout=3)) # q=queue.LifoQueue(3) #后进先出->堆栈 # q.put('first') # q.put(2) # q.put('third') # # print(q.get()) # print(q.get()) # print(q.get()) # # q=queue.PriorityQueue(3) #优先级队列 # # q.put((10,'one')) # q.put((40,'two')) # q.put((30,'three')) # # print(q.get()) # print(q.get()) # print(q.get())
进程池、线程池
基本方法 1、submit(fn, *args, **kwargs) 异步提交任务 2、map(func, *iterables, timeout=None, chunksize=1) 取代for循环submit的操作 3、shutdown(wait=True) 相当于进程池的pool.close()+pool.join()操作 wait=True,等待池内所有任务执行完毕回收完资源后才继续 wait=False,立即返回,并不会等待池内的任务执行完毕 但不管wait参数为何值,整个程序都会等到所有任务执行完毕 submit和map必须在shutdown之前 4、result(timeout=None) 取得结果 5、add_done_callback(fn) 回调函数
# from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor # import os,time,random # # def task(name): # print('name:%s pid:%s run' %(name,os.getpid())) # time.sleep(random.randint(1,3)) # # # if __name__ == '__main__': # # pool=ProcessPoolExecutor(4) # 不指定则默认使用cpu核数 # pool=ThreadPoolExecutor(5) # # for i in range(10): # pool.submit(task,'egon%s' %i) # # pool.shutdown(wait=True) # 相当于join() # # # print('主') from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor from threading import currentThread import os, time, random def task(): print('name:%s pid:%s run' % (currentThread().getName(), os.getpid())) time.sleep(random.randint(1, 3)) if __name__ == '__main__': pool = ThreadPoolExecutor(5) for i in range(10): pool.submit(task, ) pool.shutdown(wait=True) print('主')
提交任务的两种方式
1、同步调用:提交完任务后,就在原地等待任务执行完毕,拿到结果,再执行下一行代码,导致程序是串行执行
同步 != 阻塞
from concurrent.futures import ThreadPoolExecutor import time import random def la(name): print('%s is laing' %name) time.sleep(random.randint(3,5)) res=random.randint(7,13)*'#' return {'name':name,'res':res} def weigh(shit): name=shit['name'] size=len(shit['res']) print('%s 拉了 《%s》kg' %(name,size)) if __name__ == '__main__': pool=ThreadPoolExecutor(13) shit1=pool.submit(la,'alex').result() weigh(shit1) shit2=pool.submit(la,'wupeiqi').result() weigh(shit2) shit3=pool.submit(la,'yuanhao').result() weigh(shit3)
2、异步调用:提交完任务后,不地等待任务执行完毕,异步通常伴随着回调
举个例子,某部门经理X手下原本只有一个员工A,X每天早上给A安排了任务(先干这再干那再再干那)后就回去喝茶了,后来老板又给X安排了两个员工B和C,以后X就要给三个人安排任务了,A去干这个,B去干那个,C再去干什么什么,X怎么知道他们三个有没有完成任务呢?他们主动给经理报告呗。
from concurrent.futures import ThreadPoolExecutor import time import random def la(name): print('%s is laing' % name) time.sleep(random.randint(3, 5)) res = random.randint(7, 13) * '#' return {'name': name, 'res': res} def weigh(shit): shit = shit.result() name = shit['name'] size = len(shit['res']) print('%s 拉了 《%s》kg' % (name, size)) if __name__ == '__main__': pool = ThreadPoolExecutor(13) pool.submit(la, 'alex').add_done_callback(weigh) pool.submit(la, 'wupeiqi').add_done_callback(weigh) pool.submit(la, 'yuanhao').add_done_callback(weigh)