第一部分 线程
"""
一、线程的基本概念
程序:一些指令,静态的,没有运行的。
进程:程序运行起来,创建进程,有的程序可以创建很多进程,有的程序只创建一个。
线程:线程是进程基本执行单元,一个进行至少要有一个线程,一个进程包含多个线程。
进程和线程的关系,特性
一个进程:至少有一个线程,对于一个线程来说,只能在一个进程中。
进程独立,线程共享:特性
从资源的角度看:
每个进程之间,相互独立。对于资源不共享。没关系。
在同一个进程中,每个线程之间,是相互联系,对于资源共享,是有关系
进程有独立的空间和资源,但是线程不具有独立的系统资源,而同一个进程下多线程共享该进程下资源。
"""
"""
二、多线程:就是同时处理并发执行的多个任务,并发?并行?
多线程适合的场合:io密集型
1. 计算密集型:一根筋计算
2. io密集型:上传文件,下载文件,input、print、爬虫、html代码下载
多线程的缺点:
1. 线程也是程序,需要占用内存
2. 多线程之间需要cpu时间片的切换,切片也需要大量的时间和监控
3. 最重要的问题,线程不安全,数据不一致
"""
"""
三、创建线程
1. 使用threading模块、Thread,创建线程,指定target、args(元组)
2. 使用threading模块,创建一个线程类,继承Thread类,重写run方法
"""
import threading
import time
# 1. 方式一:
# 相当于扫地方法
def mission(): print("执行mission方法") import time def mission(_start,_end): for i in range(_start,_end): print(i) time.sleep(1) t1=threading.Thread(target=mission,args=(1,10)) # t1 t2 t3都是线程对象 t2=threading.Thread(target=mission,args=(11,20)) t3=threading.Thread(target=mission,args=(21,30)) # start方法,把当前线程加入到cpu待执行的任务列表,不是立即执行,执行的时间cpu决定。 t1.start() t2.start() t3.start()
# 注意,在单核cpu下,从宏观的角度。虽然mission方法也可以看成一起执行的。
# 但是从微观的角度,仍然是cpu单个去处理
# t1的mission t2的mission t3的mission
# 方式2:
class Mythread(threading.Thread): def run(self): print("执行mission方法") time.sleep(1) class Mythread(threading.Thread): def __init__(self,_start,_end): self._start=_start self._end=_end super().__init__() def run(self): for i in range(self._start,self._end): print(i) time.sleep(1) t1=Mythread(1,10) t2=Mythread(1,10) t3=Mythread(1,10) t1.start() t2.start() t3.start() t1.run() t2.run() t3.run()
# 四、线程的生命周期
# 1. 新建:新建一个线程对象。没有执行能力。
# 2. 就绪:当调用了start方法之后,线程处于就绪状态,把线程加入到cpu待执行任务的列表中
# 具体什么时间执行。由cpu说了算
# 3. 运行:获得了cpu的时间片,开始运行线程对象run方法。
# 4. 阻塞:曾经运行状态,cpu把时间片分给别人,当前线程就处于阻塞状态,等待cpu下一次分给时间片。
# 5. 死亡:run已经执行完毕,或者run方法中抛出了异常。
# 注意start和run方法的区别
"""
start方法不是真正执行线程的程序,而是将任务加入到cpu的待执行任务列表中,具体什么时间执行由cpu说了算
run方法:真正的执行线程程序(一般不是主动调用,是由cpu调用的。)
"""
"""
五、 线程的相关操作
"""
1. 活跃线程的数量:活跃线程:就绪之后----死亡之前 threading.active_count()
print(threading.active_count())
# 在py里没有创建任何子线程,也会有一个主线程 main
2. 返回活跃线程列表 threading.enumerate()
print(threading.enumerate())
3. 当前执行的线程 threading.current_thead()
print(threading.current_thread())
4. 获得线程标志threading.get_ident(),每一个线程都有一个id
5. 返回解释器中的主线程 threading.main_thread()
t=threading.Thread(target=mission)
print(threading.main_thread())
6. run和start
7.join(参数):抢占时间片
# 在A线程中,调用b.join方法,b抢占A线程,必须b执行完毕,才能执行a线程
# 被抢占时,a线程处于阻塞状态
# 如果不写参数:无限等待
# 写了参数:等待n秒
def mission(): print("正在进行修路") time.sleep(1) print("修路完毕") print("想要过马路") t=threading.Thread(target=mission) t.start() t.join(2)
# 如果join不写参数,相当于mission必须执行完毕,也就是t线程对象死亡。
# 才可以继续执行下面的主线程。
# 如果join写了参数,就相当于当前的主线程,只等待mission线程2s,2s之后
# 无论mission有没有执行完毕,都会继续执行主线程的内容
print("路修好了,过马路")
8. name:在线程中的一个私有属性:代表线程的名字
def mission(): print("正在进行修路") time.sleep(1) print("修路完毕") # 获得当前线程的名字 t=threading.current_thread() print(t.name) print("想要过马路") t=threading.Thread(target=mission) # 相当于调用setname方法给线程name赋值 t.name="修马路的线程" t.start() t.join(2)
9. ident 线程的标志:属性,直接访问就相当于调用get_ident方法
# 也可以进行赋值
10. is_alive:判断是否是存活的线程。存活True,死亡False
print(t.is_alive())
11. daemon设置线程是否是守护线程
# 非守护线程:前台线程
# 守护线程:后台线程
# 线程默认情况下就是非守护线程
# 乐队,伴奏,唱歌(非守护线程)
# 如果把伴奏看成【非守护线程】:歌唱完了,伴奏还没完,继续;歌没唱完,伴奏完了,伴奏走了,各走各的
# 如果把伴奏看成是【守护线程】:歌唱完了,伴奏还没完,伴奏也结束了,
# 如果歌没唱完,伴奏已经完毕,也会等待唱歌完毕,整个程序才结束。
# 如果将一个线程设置为【守护线程】:
# 相当于告诉cpu,这个线程不重要,当主线程结束时,所有程序运行完毕,
# 当主线程没有结束时,守护线程结束了,主线程依然执行
def music(): print("乐队线程正在开始") time.sleep(1) print("乐队还在伴奏") print("乐队线程结束") if __name__=="__main__": print("开始唱歌") t=threading.Thread(target=music) # 设置一个线程是守护线程的时候,必须是在start之前 t.setDaemon(True) # 将t设置为守护线程 t.start() print("歌唱完了") #有时候主线程结束了,守护线程依然会执行,原因是cpu时间片在主线程刚结束时就切换到了守护线程,cpu没有确定 # 主线程是否结束,当再次切换到主线程时,才能判断主线程已经结束,这时守护线程就不再执行了
六、线程应用及线程锁
"""
线程同步
为什么?线程对于同一个进程中的资源,是共享。
"""
# 并发修改现场
# 例子:各个售票点可以多线程,同一趟车对于各个售票点,剩余票应该一致,作为共享资源
import threading
import time
# 某一趟车总票数
ticket=100
def buy_ticket():
global ticket
while ticket>0:
t=threading.current_thread()
time.sleep(0.5)
print("{}抢到了第{}张车票".format(t.name,ticket))
ticket-=1
# 多个售票点相当于开启了多个线程
t1=threading.Thread(target=buy_ticket)
t1.name="张三"
t2=threading.Thread(target=buy_ticket)
t2.name="李四"
t3=threading.Thread(target=buy_ticket)
t3.name="王五"
t1.start()
t2.start()
t3.start()
# 多线程会出现资源共享ticket,多个人买到同一张票,或者出现-1,0的情况
# 解决:线程锁
# 线程锁:在一个时间点内,一片共享资源只希望被一个线程访问,这是可以给资源加锁,
# 锁包含两个部分:锁,钥匙
# 同步概念:排队。
# 当一个线程拿到锁之后,只有当前这个线程可以访问这一片资源(买票的方法)
# 即使cpu将时间片分给其他线程,其他线程也无法访问这一片资源,
# 直到当前的线程释放了锁,其他才可以继续访问
# 创建一个锁的对象
lock=threading.Lock()
# lock.acquire()------加锁
# lock.release()------释放锁
# 某一趟车总票数
方式一:不能成功解决:结果是会产生0张,第-1张票。原因多个线程还是可以进入到while中执行
ticket=100 def buy_ticket(): global ticket while ticket>0: # 获得线程锁 lock.acquire() t=threading.current_thread() time.sleep(0.5) print("{}抢到了第{}张车票".format(t.name,ticket)) ticket-=1 lock.release() # 多个售票点相当于开启了多个线程 t1=threading.Thread(target=buy_ticket) t1.name="张三" t2=threading.Thread(target=buy_ticket) t2.name="李四" t3=threading.Thread(target=buy_ticket) t3.name="王五" t1.start() t2.start() t3.start()
方式二:锁加载while外侧的时候,票都会被同一个线程抢走了。
ticket=100
def buy_ticket():
global ticket
# 获得线程锁
lock.acquire()
while ticket>0:
t=threading.current_thread()
time.sleep(0.5)
print("{}抢到了第{}张车票".format(t.name,ticket))
ticket-=1
lock.release()
# 多个售票点相当于开启了多个线程
t1=threading.Thread(target=buy_ticket)
t1.name="张三"
t2=threading.Thread(target=buy_ticket)
t2.name="李四"
t3=threading.Thread(target=buy_ticket)
t3.name="王五"
t1.start()
t2.start()
t3.start()
方式三、将while写成True,把锁加在ticket使用之前,循环内部判断ticket>0...
# 不成功:当最后一个人抢票出去之后,还有其他人还在while里获得了锁之后,发现ticket=0,
# 不能继续向下买票,执行else,跳出了循环,再也没有人释放锁。
ticket=100 def buy_ticket(): global ticket while True: t=threading.current_thread() lock.acquire() if ticket>0: time.sleep(0.5) print("{}抢到了第{}张车票".format(t.name,ticket)) ticket-=1 lock.release() else: break # 多个售票点相当于开启了多个线程 t1=threading.Thread(target=buy_ticket) t1.name="张三" t2=threading.Thread(target=buy_ticket) t2.name="李四" t3=threading.Thread(target=buy_ticket) t3.name="王五" t1.start() t2.start() t3.start()
方式四:使用finally,使得程序无论走break ,还是正常执行抢票,都能够释放锁。除非发生没有捕获的异常
ticket=100 def buy_ticket(): global ticket while ticket>0: try: lock.acquire() if ticket > 0: t=threading.current_thread() print("{}抢到了第{}张车票".format(t.name,ticket)) ticket-=1 else: break finally: lock.release() time.sleep(1) # 多个售票点相当于开启了多个线程 t1=threading.Thread(target=buy_ticket) t1.name="张三" t2=threading.Thread(target=buy_ticket) t2.name="李四" t3=threading.Thread(target=buy_ticket) t3.name="王五" t1.start() t2.start() t3.start()
死锁
"""
当两个或者多个线程同时拥有自己的资源,二互相等待对方释放资源,试图拿到对方的锁,
导致程序进入僵持状态,这就是死锁。
"""
import threading lock1=threading.Lock() lock2=threading.Lock() # lock1是棉花糖 # lock2是棒棒糖 def mission(l1,l2): l1.acquire() t=threading.current_thread() print("{}已经获得了一把锁,希望获得另外一把锁".format(t.name)) time.sleep(1) l2.acquire() l2.release() l1.release() t1=threading.Thread(target=mission,args=(lock1,lock2)) t2=threading.Thread(target=mission,args=(lock2,lock1)) t1.start() t2.start()
通知和等待
"""
等待:wait,等待状态,既释放锁,又进行等待。
通知:
notifyall:通知其他所有等待的线程
notify:任选一个等待线程
"""
"""
sleep和wait有什么区别?
sleep:只是等待,但是不会释放锁
wait:等待而且释放锁。
"""
# 生产者和消费者。
# 商品存在 list
# 生产者:生产商品,会让商品+1
# 消费者:消费产品,会让产品—1
# 如果生产者生产太快,供过于求,如果仓库中只能放3件产品,需要生产阻塞。
# 如果把生产者阻塞选用wait,释放锁,又等待。
# 希望有消费者通知notify生产者继续生产。
# 如果一旦消费有某种原因,不能进行消费,或者消费之后,无法通知生产者继续生产,或者
# 生产者不再生产了,程序会被停止
import threading from threading import Condition lock=Condition() def produce(li): for i in range(4): try: lock.acquire() if len(li)==3: print("仓库已满,生产阻塞") lock.wait() # 既能阻塞当前进程,还能释放锁 else: print("生产了商品{}".format(i)) li.append("商品{}".format(i)) lock.notify_all() finally: lock.release() def consume(li): for i in range(4): try: lock.acquire() if len(li)==0: print("仓库已空,消费阻塞") lock.wait() else: print("消费了{}".format(i)) li.pop(0) lock.notify_all() finally: lock.release() if __name__=="__main__": li=[] t1=threading.Thread(target=produce,args=(li,)) t2=threading.Thread(target=consume,args=(li,)) t1.start() t2.start()
将生产者消费者程序修改成while
import threading from threading import Condition lock=Condition() def produce(li): count=1 while True: try: lock.acquire() if len(li)==3: print("仓库已满,生产阻塞") lock.wait() # 既能阻塞当前进程,还能释放锁 else: time.sleep(0.5) print("生产了商品{}".format(count)) li.append("商品{}".format(count)) lock.notify_all() count+=1 finally: lock.release() def consume(li): while True: try: lock.acquire() if len(li)==0: print("仓库已空,消费阻塞") lock.wait() else: time.sleep(0.5) print("消费了{}".format(li.pop(0))) lock.notify_all() finally: lock.release() if __name__=="__main__": li=[] t1=threading.Thread(target=produce,args=(li,)) t2=threading.Thread(target=consume,args=(li)) t1.start() t2.start()
第二部分 队列
"""
队列:线性队列,queue模块下的一个数据类。
应用队列去解决多线程问题,不需要考虑锁。
"""
"""
按照三种算法,有三种队列:
size:指定队列的最大长度,参数不写,默认0(传入负数)
代表创建无限长度队列
1. 先进先出。 queue.Queue(size)
2. 先进后出(堆栈) queue.LifoQueue(size)
3. 优先队列(可以按照优先级别排队) queue.ProrityQueue(size)
"""
import queue
一、以先进先出队列为例:
1. 创建队列:queue.Queue(参数)
# 参数不写,创建无限长度的空队列
q=queue.Queue()
2. 获得队列的大小
print(q.qsize())
3. 判断队列是否为空,为空True,否则False
print(q.empty())
# 指定参数,能够指定队列的长度
q3=queue.Queue(3)
4. 向队列中加入元素,put方法
q.put(block=True,timeout=0)
# block:指定添加时是否阻塞,默认值为阻塞状态
# timeout:指定添加时,阻塞的最大时间,超过了最大时间,就会报错
q3.put("abc")
q3.put("def")
q3.put("ghi")
print(q3.qsize())
print(q3.empty())
full判断队列是否已满
print(q3.full())
# 当队列已满,再加元素,会处于阻塞。
# block=True 默认阻塞
q3.put("1122")
# q3.put("1222",block=False)
# put 还可以指定超时时间。当put元素时,会阻塞n秒,当队列仍然无法put时,会报错
q3.put("1222",block=True,timeout=3)
5. put_nowait和q3.put("eee",block=False)等价
# 相当于只要无法执行put,就会报错。
# q3.put("eee",block=False)
q3.put_nowait("ddd")
6. get方法,获得队列中的元素
# 以先进先出为顺序
print(q3.get())
print(q3.get())
print(q3.get())
# 当get出队列中的全部元素之后,仍然get,会默认阻塞
# block:如果不能执行get方法,会采用block形式执行
# True:阻塞,默认
# False:直接报错
# timeout参数:指定阻塞的最大时间,如果超过了时间仍然无法执行get,则报错
print(q3.get(block=False))
二、先进后出队列:堆栈
q2=queue.LifoQueue()
print(q2.empty())
print(q2.qsize())
q2.put(1)
q2.put(2)
q2.put(3)
while not q2.empty():
print(q2.get())
q2.put(4)
三、优先队列
qp=queue.PriorityQueue()
# qp.put("bee")
# qp.put("apple")
# qp.put("cat")
# qp.put("egg")
# qp.put("face")
# while not qp.empty():
# print(qp.get())
qp=queue.PriorityQueue()
qp.put((2,"bee"))
qp.put((1,"apple"))
qp.put((3,"cat"))
qp.put((4,"egg"))
qp.put((5,"face"))
while not qp.empty():
print(qp.get())
# 使用队列完成生产者消费者
# 使用队列存储商品
import queue import threading def produce(q): for i in range(4): q.put(i) print("生产商品{}".format(i)) def consume(q): for i in range(4): print("消费了{}".format(q.get())) # 指定队列的最大值就是3 q=queue.Queue(3) t1=threading.Thread(target=produce,args=(q,)) t2=threading.Thread(target=produce,args=(q,)) t1.start() t2.start() import queue import threading import time def produce(q): count=1 while True: q.put(count) print("生产商品{}".format(count)) count+=1 time.sleep(0.5) def consume(q,name): while True: time.sleep(0.5) print("{}消费了{}".format(name,q.get())) # 指定队列的最大值就是3 q=queue.Queue(3) t1=threading.Thread(target=produce,args=(q,)) t2=threading.Thread(target=consume,args=(q,"tom")) t3=threading.Thread(target=consume,args=(q,"jerry")) t1.start() t2.start() t3.start()
第三部分 GIL全局锁
"""
GIL全局锁:全局解释锁
调用操作系统中原生下完成,将底层加了一个全局锁,目的是可以将解释器中所有代码中的共享资源全部保护
保护。任何一个时刻,只有一个线程能够通过python解释器拿到数据。
即使有GIL全局锁,也不影响多线程在单核cpu下的应用。时间片切换
多进程。
"""
第四部分 进程
"""
进程:multiprocessing下来实现进程的创建和操作
"""
import multiprocessing
import time
# 进程的创建
"""
1. 使用Process 类构造方法创建进程,指定target,args
2. 继承Process类,重写run方法
"""
# 在windows下创建进程,程序会将当前的模块导入。Linux下不需要。
# 注意:在创建进程的时候,在windows一定要加入if__name__=="__main__",在Linux下不需要
input()
# 方式一:
def mission(): time.sleep(10) print("子进程正在执行,mission函数正在执行") if __name__=="__main__": p=multiprocessing.Process(target=mission) p.start() print("主进程正在执行") time.sleep(10)
# 方式二:
import os class MyProcess(multiprocessing.Process): def run(self): time.sleep(1) print("子进程正在执行") print("当前运行进程的id:{}".format(os.getpid())) print("当前进程的父进程id:{}".format(os.getppid())) print("在py顶端输出一句话") if __name__=="__main__": p=MyProcess() p.start() print("p对象的pid:{}".format(p.pid)) p.join() print("主进程正在执行") print("当前运行的进程id:{}".format(os.getpid()))
# 1. 练习创建进程
# 2.在py文件的顶级模块上print("输出一句话")
# 进程下的相关操作
# pid能进程对象的id
# 进程中不涉及锁。
# 进程队列
# multiprocessing.Queue
# 进程中的资源不共享,涉及到的不是锁
# 进程队列解决了资源共享的问题。如何解决进程之间的资源沟通问题。
# 原理使用pickle,将共享资源序列化之后,同步给另外一个资源。------字节