计算机的执行单位以线程为单位.计算机的最小可执行单位是线程.
进程是资源分配的基本单位,线程是可执行的基本单位.是可被调度的基本单位.
线程不可以自己独立拥有资源,线程的执行必须依赖于所属进程的资源.
线程被称作轻量级的进程.线程的切换速度不进程块.
进程中必须至少要有一个线程
GIL:全局解释锁,用来锁解释器(只有CPython解释器才有),限制只能有一个线程来访问CPU.对于线程来说,因为有了GIL所以只有并发没有并行,CPU允许一个线程执行5毫秒
线程又分为:
用户级线程:对于程序员来说的.这样的线程完全被程序员控制执行调度
内核级线程:对于计算机内核来说的,这样的线程完全被内核控制调度
线程的模块:threading
线程是由:代码块,数据段,TCB(线程控制块)组成.
实现线程的两种方法:
1.直接实现
例:
from threading import Thread
def func(i):
print(i+1)
for i in range(100):
t = Thread(target = func,args = (i,)) #开启100个线程
t.start()
2.继承Thread类
例:
from threading import Thread
class Mythread(Thread): #继承Thread类来开启线程
def __init__(self):
super(Mythread, self).__init__()
def run(self):
print(1)
t = Mythread()
t.start()
进程和线程的区别:
1.CPU的切换进程要比CPU切换线程慢很多 所有在python中,如果I/O操作密集时,最好使用线程;计算密集情况下,最好使用进程
2.在同一进程内,所有线程共享这个进程的pid,也就是说这个线程共享这个进程的所有资源和内存地址.
3.在同一进程内,所有进程共享这个进程中的全局变量
例:
from threading import Thread,Lock
import time
def func():
global num
l.acquire()
tem = num
time.sleep(0.01)
num = tem - 1
l.release()
num = 100
lis = []
l = Lock()
for i in range(100):
t = Thread(target = func, )
t.start()
lis.append(t)
[i.join() for i in lis]
print(num)
4.因为GIL锁的存在,在CPython中,没有真正的线程并行,但是有真正的多进程并行.
5.守护线程是根据主线程执行结束才结束;守护进程是根据主进程代码执行结束而结束
主线程会等待普通线程执行而结束
守护线程会等待主线程结束而结束.一般把不重要的事情设置为守护线程
线程受被强制放弃CPU的原因:1.线程会受到时间片影响
2.GIL会限制每个线程的执行时间一般都是5毫秒左右
3.限制执行固定的bytecode(字节码)
线程的使用方法:
1.锁
Lock是互斥锁,一把钥匙配一把锁
Rlock是递归锁,是一个无止尽的锁,但是所有锁都有一个公用的钥匙
在同一个线程内,递归锁可以无止尽的acquire,但互斥锁不行
在不同的线程内,递归锁是保证只能被一个线程拿到钥匙.其他线程等待
GIL是全局解释器锁,锁的是线程,是解释器上的锁,锁的是线程.意思是在同一时间只允许一个线程访问CPU
例:
from threading import Thread,RLock,Lock
def func1():
r.acquire()
print(456)
r.acquire()
print(123)
r.release()
r.release()
def func2():
r.acquire()
print("ABC")
r.acquire()
print("abc")
r.release()
r.release()
r = RLock() #实例化一个万能锁
t1 = Thread(target = func1)
t2 = Thread(target = func2)
t1.start()
t2.start()
2.信号量
例:
from threading import Semaphore,Thread
import time,random
def func(sem,i):
sem.acquire()
print(" 33[32m 第%s个人进入 33[0m" % i)
time.sleep(random.randint(1,3))
print(" 33[35m 第%s个人出去了 33[31m" % i)
sem.release()
sem = Semaphore(5)
for i in range(20):
t = Thread(target = func,args = (sem,i))
t.start()
3.事件
例:
from threading import Thread,Event
import time
def Traffic_lights(e,):
while 1:
if e.is_set():
time.sleep(5)
print(" 33[35m 红灯亮了 33[0m")
e.clear()
else:
time.sleep(5)
print(" 33[36m 绿灯亮了 33[0m")
e.set()
def car(e,i):
e.wait()
print("第%s辆车过去了" % i)
e = Event()
t_l = Thread(target = Traffic_lights,args = (e,))
t_l.start()
for i in range(20):
c = Thread(target = car,args = (e,i+1))
c.start()
4.条件
条件是让程序员自行去调度线程的一个机制
Condition的四个方法:acquire,release,wait,notify
wait是指让线程阻塞住
notify(n)是给wait发一个信号,让wait变不阻塞,n是指要给多少wait发信号
例:
from threading import Condition,Thread
def func(con,i):
con.acquire() #主线程和100个子线程都在抢递归锁的一把钥匙
con.wait() #阻塞
con.release()
print("第%s个线程开始执行了" % i)
con = Condition()
for i in range(10):
t = Thread(target = func,args = (con,i))
t.start()
while 1:
num = int(input(">>>"))
con.acquire()
con.notify(num) #发送一个标识让线程不阻塞num次
con.release()
结果:
>>>5
>>>第0个线程开始执行了
第1个线程开始执行了
第3个线程开始执行了
第4个线程开始执行了
第2个线程开始执行了
4
>>>第7个线程开始执行了
第6个线程开始执行了
第5个线程开始执行了
第8个线程开始执行了
2
>>>第9个线程开始执行了
注意:如果主线程执行顺序是拿到钥匙->input->notify发送信号->换钥匙.如果主线程执行特别快,接下来极有可能主线程又拿到钥匙.
那么此时即使有十个子线程的wait接收到信号,也拿不到钥匙执行不了程序.
5.定时器
语法:Timer(time,func).star()
time:睡眠时间
func:要执行的函数名
例:
from threading import Timer
def func():
print("hello world")
Timer(0.1, func).start()
线程的队列:
同一进程(多线程)的队列:不能做多进程的通信
例:先进先出队列
import queue
q = queue.Queue()
q.put("1")
q.put("2")
q.put("3")
print(q.get())
print(q.get())
结果:
1
2
例:后进先出队列
import queue
q = queue.LifoQueue()
q.put(1)
q.put(2)
q.put(3)
print(q.get())
print(q.get())
结果:
3
2
例:优先级队列
import queue
q = queue.PriorityQueue()
q.put((1,"a"))
q.put((0,"b"))
q.put((9,"c"))
print(q.get())
print(q.get())
结果:
(0, 'b')
(1, 'a')
例:
import queue
q = queue.Queue(3)
q.put(1)
q.put(2)
q.put(3)
# q.put_nowait("a")
while 1:
try:
print(q.get_nowait())
except:
print("队列空了")
break
for i in range(0):
print(i)
优先级队列的put()接收的是元组(,):第一个元素是优先级,第二个位置是存入队列的数据
首先保证整个队列中,所有优先级的类型一致
优先级是数字(int)类型的直接比较大小,数字越小优先级越高
优先级是字符串类型的按照ASCII码比较的,比较字符串的第一个位置的ASCII,如果字符串的ASCII码相同会按照先进先出原则
线程池:并发
定义:在一个池子里放固定数量的线程,这些线程处于等待任务状态,一旦有任务来,就有线程自动执行
concurrent.futures这个模块是异步的调用机制
导入进程池和线程池:from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
==> 提交任务都用submit,多个任务用for循环+submit shutdown()==pool进程池中的close+join
例:线程,进程和Pool进程异步的效率对比
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import time
from multiprocessing import Pool
def func(num):
sum = 0
for i in range(num):
sum += i**2
# print(sum)
if __name__ == '__main__':
t_start = time.time()
t = ThreadPoolExecutor(20)
for i in range(10000):
t.submit(func,i)
t.shutdown()
t_time = time.time() - t_start
p_start = time.time()
p = ProcessPoolExecutor(5)
for i in range(10000):
p.submit(func,i)
p.shutdown()
p_time = time.time() - p_start
po_start = time.time()
po = Pool(5)
for i in range(10000):
po.apply_async(func,i)
po.close()
po.join()
po_time = time.time() - po_start
print("进程执行的时间:%s,Pool进程中异步执行的时间:%s,线程执行的时间:%s" % (p_time,po_time,t_time))
针对计算密集时:
不管是Pool进程池的异步还是ProcessPoolExecutor的进程池执行效率相当
ThreadPoolExecutor的效率差很多,所以计算密集时使用进程池
shutdown等效于Pool进程池中异步处理进程中的close+join 是指不允许向线程池中添加任务,然后让父进程(线程)等待池中所以进程(线程)执行任务
提交多个任务:1.使用for循环+submit
2.使用map
例:for循环+submit方法拿到返回结果用result
from concurrent.futures import ThreadPoolExecutor
def func(i):
return i+1
t = ThreadPoolExecutor(20)
lis = []
for i in range(1000):
res = t.submit(func,i)
lis.append(res)
t.shutdown()
[print(i.result()) for i in lis]
例:map方法拿到返回结果用__next__() 返回值是一个生成器可以直接用for循环
from concurrent.futures import ThreadPoolExecutor
def func(i):
return i+1
t = ThreadPoolExecutor(20)
obj = t.map(func,[i for i in range(1000)])
t.shutdown()
while 1 :
try :
print(obj.__next__())
except StopIteration:
break
回调函数:
例:
from concurrent.futures import ThreadPoolExecutor
def func(num):
sum = 0
for i in range(num):
sum += i**2
return sum
def call_back(ret):
print(ret.result())
t = ThreadPoolExecutor(20)
for i in range(1000):
t.submit(func,i).add_done_callback(call_back)
t.shutdown()