线程概念
之前提到的进程有很多优点,它提供了多道编程,让我们感觉我们每个人都拥有自己的CPU和其他资源,可以提高计算机的利用率。但进程还是有一些缺陷存在:
进程只能在一个时间干一件事情,如果想同时干两个或多个任务,进程就达不到了.而且进程在执行的过程中如果发生阻塞,整个进程就会被挂起.
在实际的操作系统中,引入了类似的机制,就是线程, 线程是进程的一部分,一个线程必须依赖进程存在,一个进程内至少有一条线程,进程中默认的线程是主线程,也可以在一个进程中开启多个子线程来完成更多的事情.
进程和线程的关系
1. 进程间相互独立, 数据是隔离的, 同一进程的各线程之间数据共享; 进程是资源分配的最小单位,线程是CPU调度的最小单位,是计算机中最小的执行单位,是真正负责执行代码的
2. 进程之间可以实现通信IPC, 线程之间可以直接读写进程数据段来进行通信---> 需要进程同步和互斥来辅助
3. 进程之间开销大, 线程之间开销小
4. 多个进程之间的多个线程可以利用多核, 一个进程之间的多个线程是可以利用多核的
python中的线程
GIL全局解释器锁
在CPython解释器中,同一个进程中的多个线程不能利用多核,GIL锁能够让同一个进程中的多个线程在同一时刻只有一个线程被CPU调用,有了GIL锁,只要有socket通信,在高I/O操作时的多线程仍然比多进程要好用很多
python线程模块的选择:
python中提供了几个用于多线程编程的模块,包括thread,threading和Queue等, thread模块提供了基本的线程和锁的支持, threading提供了更为高级,功能更强的线程管理的功能, Queue模块允许用户创建一个可以用于多个线程之间的数据共享的队列数据结构
threading模块
线程的创建:
from threading import Thread import time def func(name): time.sleep(1) print('%s say hello'% name) if __name__ == '__main__': t = Thread(target=func,args=('小胖',)) t.start() print('主线程')
from threading import Thread import time class Say(Thread): def __init__(self,name): super().__init__() self.name = name def run(self): # 必须是固定写法,不能改变 time.sleep(2) print('%s say hello'%self.name) if __name__ == '__main__': t = Say('小胖') t.start() print('主线程')
多线程和多进程
from threading import Thread from multiprocessing import Process import os def work(): print('hello',os.getpid()) if __name__ == '__main__': #part1:在主进程下开启多个线程,每个线程都跟主进程的pid一样 t1=Thread(target=work) t2=Thread(target=work) t1.start() t2.start() print('主线程/主进程pid',os.getpid()) #part2:开多个进程,每个进程都有不同的pid p1=Process(target=work) p2=Process(target=work) p1.start() p2.start() print('主线程/主进程pid',os.getpid())
from multiprocessing import Process from threading import Thread import time def func(i): i += 1 if __name__ == '__main__': start = time.time() for i in range(20): Thread(target=func, args=(i,)).start() # 线程时间短,效率高 print(time.time() - start) end = time.time() for i in range(20): Process(target=func, args=(i,)).start() # 进程时间长,效率低 print(time.time() - end)
from threading import Thread n = 100 def func(): global n n -= 1 t_lst = [] for i in range(100): t = Thread(target=func) t.start() t_lst.append(t) for t in t_lst: t.join() print(n)
多线程实现socket通信:
from threading import Thread import socket s=socket.socket() s.bind(('127.0.0.1',8000)) s.listen(5) def action(conn): while True: data=conn.recv(1024) conn.send(data.upper()) if __name__ == '__main__': while True: conn,addr=s.accept() p=threading.Thread(target=action,args=(conn,)) p.start()
import socket client = socket.socket() client.connect(('127.0.0.1',8000)) while 1: msg = input('>>>').strip() if not msg: continue client.send(msg.encode('utf-8')) data = client.recv(1024) print(data)
查看线程名和线程id
from threading import Thread import time def func(): # 在函数外部 print('in func') t = Thread(target=func) t.start() print(t.ident) # 线程id print(t.name) # 线程名 from threading import currentThread def func(): # 函数内部 print('in func',currentThread().name,currentThread().ident) t = Thread(target = func) t.start()
Thread类的其他方法
Thread实例对象的方法 isAlive(): 返回线程是否活动的。 getName(): 返回线程名。 setName(): 设置线程名。 threading模块提供的一些方法: # threading.currentThread(): 返回当前的线程变量。 # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括
启动前和终止后的线程。 # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果
实例:
from threading import currentThread,active_count,enumerate import time def func(): time.sleep(1) # 方便系统检测线程个数 print('in func',currentThread().name,currentThread().ident) t = Thread(target=func) t.start() print(active_count()) # 当前所有活着的线程对象的个数 print(enumerate()) # 当前所有活着的线程对象组成的列表
守护线程
守护线程和守护进程的区别:
守护进程会等待主进程的代码执行完毕而结束
守护线程会等待主线程的结束而结束
主进程必须最后结束,回收子线程的资源
线程是属于进程的,主线程如果结束了,那整个进程就结束了
主线程的结束依赖两件事: 自身的代码执行完毕,非守护的子线程执行完毕
from threading import Thread import time def func(): while 1: time.sleep(1) print('in func') def son(): print('son start') time.sleep(1) print('son end') t = Thread(target=func) t.setDaemon(True) t.start() Thread(target=son).start() time.sleep(5) # 守护线程会等待主线程的结束而结束,如果主线程还开启了其他的子线程,守护线程会守护到最后
from threading import Thread import time def foo(): print(123) time.sleep(1) print("end123") def bar(): print(456) time.sleep(3) print("end456") t1=Thread(target=foo) t2=Thread(target=bar) t1.daemon=True # 设置t1为守护线程 t1.start() t2.start() print("main-------")
线程数据安全:
1. GIL锁: 能保证在同一时刻不可能有两个线程同时执行CPU指令
from threading import Lock import dis lst = [] def func1(): lst.append(1) a = 0 def func2(): lock = Lock() global a with lock: # 上锁,执行完一段代码之后再执行另外一段代码 a += 1 dis.dis(func2)
2. 同步锁
from threading import Thread import time def work(): global n temp=n time.sleep(1) n=temp-1 if __name__ == '__main__': n=100 l=[] for i in range(100): p=Thread(target=work) l.append(p) p.start() for p in l: p.join() print(n) #结果为99
from threading import Thread,Lock r = threading.Lock() r.acquire() ''' 对公共数据的操作 ''' r.release()
from threading import Thread,Lock import time def work(): global n lock.acquire() temp=n time.sleep(0.1) n=temp-1 lock.release() if __name__ == '__main__': lock=Lock() n=100 l=[] for i in range(100): p=Thread(target=work) l.append(p) p.start() for p in l: p.join() print(n) # 结果为0, 由并发编程串行,保证了数据的安全
互斥锁和join的区别:
from threading import current_thread,Thread,Lock import time def task(): global n print('%s is running' %current_thread().getName()) temp=n time.sleep(0.5) n=temp-1 if __name__ == '__main__': n=100 lock=Lock() threads=[] start_time=time.time() for i in range(100): t=Thread(target=task) threads.append(t) t.start() for t in threads: t.join() stop_time=time.time() print('主:%s n:%s' %(stop_time-start_time,n)) ''' 结果是: Thread-1 is running Thread-2 is running ...... Thread-100 is running 主:0.5220310688018799 n:99 '''
from threading import current_thread,Thread,Lock import time def task(): #未加锁的代码并发运行 time.sleep(3) print('%s start to run' %current_thread().getName()) global n #加锁的代码串行运行 lock.acquire() temp=n time.sleep(0.5) n=temp-1 lock.release() if __name__ == '__main__': n=100 lock=Lock() t_lst=[] start=time.time() for i in range(100): t=Thread(target=task) t_lst.append(t) t.start() for t in t_lst: t.join() stop=time.time() print('主:%s n:%s' %(stop-start,n)) ''' Thread-1 is running Thread-2 is running ...... Thread-100 is running 主:53.037208557128906 n:0 '''
from threading import current_thread,Thread,Lock import time def task(): time.sleep(3) print('%s start to run' %current_thread().getName()) global n temp=n time.sleep(0.5) n=temp-1 if __name__ == '__main__': n=100 lock=Lock() start_time=time.time() for i in range(100): t=Thread(target=task) t.start() t.join() stop_time=time.time() print('主:%s n:%s' %(stop_time-start_time,n)) ''' Thread-1 start to run Thread-2 start to run ...... Thread-100 start to run 主:350.1151704788208 n:0 '''
在start之后立即join,也能让任务中的所有代码变为串行执行,而加锁只是在修改公共数据的部分是串行的,在数据安全上,两种方法都能实现,但很明显的是加锁的效率更高,join方法耗时很长.
3. 死锁和递归锁
死锁是由于多个锁对多个变量管理不当而产生的一种现象,是指两个或两个以上的进程或线程在执行过程中,因争夺系统资源而造成的一种相互等待的现象.
from threading import Thread, Lock noodle_lock = Lock() fork_lock = Lock() def eat1(name): noodle_lock.acquire() print('%s 抢到了面条'%name) fork_lock.acquire() print('%s 抢到了叉子'%name) print('%s 吃到了面'%name) frok_lock.release() noodle_lock.release() def eat2(name): fork_lock.acquire() print('%s 抢到了叉子'%name) noodle_lock.acquire() print('%s 抢到了面条'%name) print('%s 吃到了面'%name) noodle_lock.release() fork_lock.release() t1 = Thread(target=eat1, args=(name,)).start() t2 = Thread(target=eat2, args=(name,)).start() ''' 会发生死锁现象 '''
互斥锁Lock,用互斥锁可以解决死锁现象,将两个资源都锁起来,
from threading import Thread, Lock lock = Lock() def eat1(name): lock.acquire() print('%s 抢到了面条'%name) print('%s 抢到了叉子'%name) print('%s 吃到了面'%name) lock.release() def eat2(name): lock.acquire() print('%s 抢到了叉子'%name) print('%s 抢到了面条'%name) print('%s 吃到了面'%name) lock.release() Thread(target=eat1, args=('小一',)).start() Thread(target=eat2, args=('小二',)).start()
递归锁Rlock,同一个线程可以被acquire多次,必须acquire几次就release几次,才能保证其他的线程可以访问被锁住的代码,而且可以快速的解决死锁现象
from threading import Thread, RLock noodle_lock = fork_lock = RLock() def eat1(name): noodle_lock.acquire() print('%s 抢到了面条'%name) fork_lock.acquire() print('%s 抢到了叉子'%name) frok_lock.release() noodle_lock.release() def eat2(name): fork_lock.acquire() print('%s 抢到了叉子'%name) noodle_lock.acquire() print('%s 抢到了面条'%name) noodle_lock.release() fork_lock.release() Thread(target=eat1, args=('小一',)).start() Thread(target=eat2, args=('小二',)).start()
线程队列
Queue队列, 使用import queue,用法和进程中的Queue一样
FIFO(先进先出队列)
import queue q=queue.Queue() q.put('first') q.put('second') q.put('third') print(q.get()) print(q.get()) print(q.get()) ''' (先进先出): first second third '''
LifoQueue(后进先出队列),也称为栈
import queue q=queue.LifoQueue() q.put('first') q.put('second') q.put('third') print(q.get()) print(q.get()) print(q.get()) ''' (后进先出): third second first '''
优先级队列:
queue.PriorityQueue(),数字越小优先级越高,且优先出队列,放进队列的是元组,根据第一个参数数字的大小,如果是同等优先的,就会根据第二个参数在ASCII码位表中的顺序来决定
from queue import PriorityQueue pq = PriorityQueue() pq.put((3, '小三')) pq.put((2, '阿二')) pq.put((1, '大大')) print(pq.get()) print(pq.get()) print(pq.get())
concurrent模块
concurrent.futures 提供了一个高度封装的异步调用接口
ThreadPoolExecutor: 线程池, 提供异步调用
ProcessPoolExecutor: 进程池, 提供异步调用
基本方法:
submit ---> 相当于进程中的apply_async方法
shutdown ---> 相当于进程池中的 close + join
result ---> 相当于 get
add_done_callback ---> 相当于 callback
from concurrent.futures import ThreadPoolExecutor import time def func(i): time.sleep(1) print('in son thread') if __name__ == '__main__': tp = ThreadPoolExecutor() for i in range(20): tp.submit(func,i) # 在这儿可以直接传参 tp.shutdown() print('所有任务执行完毕') # 最后执行shutdown
使用Executor达到无缝衔接
from concurrent.futures import ThreadPoolExecutor as Executor, ProcessPoolExecutor as Executor import time def func(i): time.sleep(1) print('in son thread') if __name__ == '__main__': tp = Executor() # 可以通过改变上面的模块导入来切换进程池和线程池 for i in range(20): tp.submit(func,i) # 可以直接进行传参
map方法
from concurrent.futures import ThreadPoolExecutor import time def func(i): time.sleep(1) print('in son thread%s' % i) if __name__ == '__main__' tp = ThreadPoolExecutor() tp.map(func, range(20)) ''' in son thread 0 in son thread 1 ...... in son thread 19 '''
from concurrent.futures import ThreadPoolExecutor def func(i): return '*' * i if __name__ == '__main__': tp = ThreadPoolExecutor() ret_lst = tp.map(func, range(20)) for ret in ret_lst: print(ret) # 用map方法就省去下面的这些步骤 ret_l = [] for i in range(20): ret = tp.submit(func,i) ret_l.append(ret) for ret in ret_l: print(ret.result())
add_done_callback,回调函数
from concurrent.futures import ThreadPoolExecutor def back(ret): print(ret) print(ret.result()) # 拿到具体的结果 def son_func(i): return i**i tp = ThreadPoolExecutor() for i in range(20): ret = tp.submit(son_func, i) ret.add_done_callback(back) ''' <Future at 0x1ba6881da58 state=finished returned int> 361 '''