1、并发
1.1、并发与并行
并行,parallel,同一时刻,执行不同任务,并且相互没有干扰;
并发,concurrency,一段时间内,交替执行不同的任务;
串行,一个任务执行完成后执行下一个任务;
1.2、并发的解决方法
“高并发模型”:例如早高峰的北京地铁,在同一时刻,需要处理大量任务,可以理解为高并发模型;
解决方法:
(1)队列,缓冲区:将任务排队,形成队列,先进先出,就解决了资源的使用问题;形成的队列其实就是一个缓冲区域,假设排队有一种优先机制,例如女士优先,则次队列为优先对列;
(2)争抢:谁抢到资源就上锁,排他性的锁,其他任务只能等待;争抢是一种高并发的解决方案,但是这样不好,因为可能有任务长期霸占资源,有人一直抢不到资源;
(3)预处理:将各个任务会用到的热点数据提前缓存,这样就减少了任务的执行时间;
(4)并行:开辟多的资源,例如银行办理业务,排队的人多了,可以多增加业务窗口;此方案为水平扩展的思想;
(5)提速:简单的就是提高资源性能,提高cpu性能或者单个服务器安装更多的cpu,此方案为垂直扩展
(6)消息中间件:模型可以理解为北京地铁站外面的排队走廊,具有缓冲人流量的功能;常见的消息队列有:RabbitMQ、ActiveMQ、RocketMQ(阿里)、kafka等
2、进程与线程
# 进程与线程的关系:
程序是源码编译后的文件,而这些文件被存放在磁盘上;当程序被操作系统加载到内存中,就是进程,进程中存放着指令和数据(资源),进程是线程的容器;线程是操作系统进行运算调度的最小单位;
2.1、线程的状态
2.2、python中的进程与线程
# 进程会启动一个解释器进程,线程共享一个解释器进程;会有一个GIL,全局解释器锁,来分配具体线程任务执行;python的GIL保证同一时刻只有一个线程被执行;
3、Python的多线程开发
3.1、Thread类
# Python的线程开发使用标准款threading;
# Thread类的初始化方法
def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None):
参数:
target 线程调用的对象,就是目标函数;
name 为线程起名
args 为目标函数传递位置参数,元祖;
kwargs 为目标函数传递关键字参数,字典
(1)线程的启动:
通过threading.Thread创建一个线程对象,target是目标函数,name指定线程名称,到此线程被创建完成,需要调用start() 方法来启动线程 ;
import threading
def work(): print('I m working') print('Finished') t=threading.Thread(target=work,name='workerthread') # 创建线程t t.start() # 启动线程t
(2)线程的退出:
# python没有提供线程的退出方法,但是线程在以下情况会退出:
①:线程函数内语句执行完成;
②:线程函数中抛出未处理异常;
# python的线程没有优先级、没有线程组的概念,也不能被销毁,停止,挂起,也没有恢复和中断;
(3)线程的传参
# 线程的传参本质上和函数没有区别
import threading def add(x,y): print(x+y) t=threading.Thread(target=add,args=(4,5),name='workerthread') # args=(4,5)传递位置参数; t.start()
(4)threading的属性和方法
# current_thread() 返回当前线程对象
# main_thread() 返回主线程对象
# active_count() 返回处于alive状态的线程数
# enumerate() 返回所有活着的线程列表,不包括已经终止的线程和未开始的线程
# get_ident() 返回当前线程的id,非0整数
(5)Thread实例的属性和方法
# name:线程名称
# ident:线程id,线程启动后才有id,否则为None
# is_alive():是否存活
# start()方法与run()方法
①:start() 启动线程,每一个线程必须并且只能执行一次该方法;
def start(self): if not self._initialized: raise RuntimeError("thread.__init__() not called") if self._started.is_set(): raise RuntimeError("threads can only be started once") with _active_limbo_lock: _limbo[self] = self try: _start_new_thread(self._bootstrap, ()) # 启动一个线程 except Exception: with _active_limbo_lock: del _limbo[self] raise self._started.wait()
②:run() 并不启动新线程,只是在主线程中调用了一个函数,调用定义线程对象中的target函数;
def run(self): try: if self._target: self._target(*self._args, **self._kwargs) # 执行target函数 finally: # Avoid a refcycle if the thread is running a function with # an argument that has a member that points to the thread. del self._target, self._args, self._kwargs
3.2、多线程
# 多线程,一个进程中如果有多个线程,就是多线程,实现一种并发;
# 一个进程中至少有一个线程,并作为程序的入口,这个线程就是主线程;一个进程至少有一个主线程,其他线程为工作线程;
# 线程安全与线程不安全
def worker(): for x in range(100): print('{} is runing'.format(threading.current_thread().name)) for x in range(1,5): name='work-{}'.format(x) t= threading.Thread(target=worker,name=name) t.start() 运行结果: work-1 is runing work-2 is runingwork-1 is runingwork-3 is runing work-4 is runing work-3 is runingwork-2 is runing work-1 is runing 分析: 看代码,应该是一行行打印,但是很多字符串打印在了一起? 说明print函数被打断了,被线程切换打断了,print函数分2步,第一步打印字符串,第二步打印换行,就在2步之间被打断,发生了线程切换 每个线程是根据系统调用cpu分时计算的,可能出现线程1的函数还没有执行完成,则cpu资源被线程2给抢占,则去执行线程2的函数; 解决方法:字符串是不可变数据类型,它作为一个整体不可分割输出,end=''不再让print换行,但没有解决根本问题,应为print函数是线程不安全
3.3、daemon线程与non-daemon线程
# 注意:daemon不是linux中的守护进程,daemon属性必须在start方法之前设置好;
在主线程运行结束后会检查是否含有,没有执行完成的non-daemon线程,
如果有则等待non-daemon线程运行完成;
没有则直接结束线程;
源码中Thread的 __init__方法中 if daemon is not None: self._daemonic=daemon # 设定daemon的布尔值 ,true为daemon,false为non-daemon else: self._daemonic=current_thread().daemon # 如果为None则从父线程获取;
# 总结:
①:线程具有一个daemon属性,可以显示设置为True或False,也可以不设置,则默认为None;
②:如果不设置daemon,就取当前线程的daemon来设置它;
③:主线程是non-daemon,即daemon=False;
④:从主线程创建的所有线程不设置daemon,则默认daemon=False,也就是non-daemon线程;
⑤:Python程序在没有活的non-daemon线程运行时退出,也就是剩下的只能是daemon线程,主线程才能退出,否则主线程只能等待;
3.4、join方法
import time import threading def foo(n): for i in range(n): print(i) time.sleep(1) t1=threading.Thread(target=foo,args=(10,),daemon=True) t1.start() t1.join() # 表示主线程,必须等到t1线程结束以后才结束; print('main thread end') ''' 运行结果: 0 1 2 . . 9 main thread end 分析:由于t1.join()方法后,将主线程阻塞,等待t1线程结束; '''
# join(timeout=None),是线程的标准方法之一;
# 一个线程中调用另外一个线程的join方法,调用着将被阻塞,直到被调用线程终止;
# 一个线程可以被join多次,timout参数指定调用则等待多久,没有设置超时时间,就一直等待到被调用者线程终止;设置超时时间如果被调线程没有结束,则调用者不等待,直接结束线程;
# 调用谁的join方法,就是join谁,就要等待谁;
4、threading.local类
# python提供的threading.local类,将这个类实例化得到一个全局对象,但是不同的线程使用这个对象存储数据其他线程看不见;
# 本质:threading.local类构建了一个大字典,存放所有线程相关的字典,定义如下: { id(Thread) -> (ref(Thread), thread-local dict) }
# 每一个线程的id为key,元祖为value;
# value中2部分为:线程对象引用,每个线程自己的字典;
from threading import local import threading import time a=local() def worker(): a.x=0 for i in range(10): # print(a.__dict__,current_thread().ident) time.sleep(0.0001) a.x+=1 print(threading.current_thread(),a.x,a.__dict__) for i in range(5): threading.Thread(target=worker).start() ''' 运行结构: <Thread(Thread-3, started 14488)> 10 {'x': 10} <Thread(Thread-4, started 14420)> 10 {'x': 10} <Thread(Thread-5, started 13620)> 10 {'x': 10} <Thread(Thread-1, started 11808)> 10 {'x': 10} <Thread(Thread-2, started 15348)> 10 {'x': 10} '''
5、定时器Timer
# Timer是线程Thread的子类,就是线程类,具有线程的属性和方法;
# 它的实例就是能延时执行目标函数的线程,在真正执行目标函数之前,都可以cancel它;
# cancel方法本质使用Event类实现;
# 自定义实现Timer类:
class MyTimer: def __init__(self,interval,func,args=(),kwargs={}): self.interval=interval self.func=func self.args=args self.kwargs=kwargs self.event=Event() def cancel(self): print("call cancel------>") self.event.set() def start(self): Thread(target=self._run).start() # 在这里起一个线程是为了避免在主线程被阻塞,执行cancel方法不生效的问题 def _run(self): self.event.wait(self.interval) if not self.event.is_set(): self.func(*self.args,**self.kwargs) # Thread(target=self.func,args=self.args,kwargs=self.kwargs).start() self.event.set() def add(x,y): print(3,'========>',x + y) t1=MyTimer(3,add,(4,5)) t1.start() t1.cancel() print('Main Thread end')