-
并
并发并行
并发:单核不同程序间快速切换
任务数>核心数,通过操作系统的调度算法,实现多个任务'一起执行'(切换任务速度太快,所以单核心也有一种同一时间多任务的错觉)
一个时间段内发生若干事件的情况 一个段
并行:任务数<核心数,一个核心做一个任务。
同一时刻发生若干事件的情况 一个点同步异步
同步
并发或并行的各个任务不是独自运行的,任务之间有一定的交替顺序,可能在运行完一个任务得到结果后,另一个任务才会开始运行,就像接力赛跑一样,拿到交接棒后才可以跑
异步
并发或并行的各个任务可以独立运行,一个任务不受另一个任务的影响。
相对于多任务而言阻塞非阻塞
如果卡住了调用者,调用者不能继续往下执行,就是说调用者阻塞了
如果不会卡住,可以继续执行,就是说非阻塞的
相对于代码而言
-
多任务-线程
使用线程,一个进程下多个线程, 进程是君王,所有的线程为进程服务,一个进程自身就产生一个线程,如果在一个线程中更改了某个变量,其他线程马上就能看到这个改变,这是因为变量------实际上,所有的变量,在程序的所有线程中是共享的。
自定义线程
需求:同时执行多任务
解决1:核心是再自定义一个函数,把要执行的函数包含在里面,然后开线程执行这个自定义函数
# 解决方案1:
# 工作1
def work1():
print(threading.current_thread())
print("工作1...")
# 工作2
def work2():
print(threading.current_thread())
print("工作2...")
def func():
print(threading.current_thread())
work1()
work2()
if __name__ == '__main__':
# 创建一个子线程
sub_thread = threading.Thread(target=func)
sub_thread.start()
本质就是就是函数的递归
解决2:自定义一个类,继承于threading.Thread
一个线程执行多个任务
# 解决方案2: 自定义线程
class MyThread(threading.Thread):
# 重写init方法
def __init__(self, new_name):
# 在子类中调用父类的同名方法
# 不用super的话显示调用父类的__init__失败
super(MyThread, self).__init__()
# 添加属性并赋值
self.new_name = new_name
# 工作1
def work1(self):
print(self.new_name)
print("工作1...")
# 工作2
def work2(self):
print("工作2...")
# 需要重写线程类中的run方法
# 子线程的 .start 方法是调用的run()
def run(self):
print(threading.current_thread())
self.work1()
self.work2()
if __name__ == '__main__':
# 字符串
name = "自定义线程描述文字"
# 创建子线程
sub_thread = MyThread(name)
# 启动线程
sub_thread.start()
思想就是在自定义类中创建任务函数,再定义一个run函数,然后由 自定义类实例,start()启动线程
多个任务,同时访问一个变量,可能会造成资源竞争,造成数据出问题
- 线程同步,共享变量问题
同步就是协同步调,按预定的先后次序进行运行。如:你说完,我再说。
"同"字从字面上容易理解为一起动作
其实不是,"同"字应是指协同、协助、互相配合。
如进程、线程同步,可理解为进程或线程A和B一块配合,A执行到一定程度时要依靠B的某个结果,于是停下来,示意B运行;B依言执行,再将结果给A;A再继续操作。
sub_thread.join() 作用于多个线程启动时
#定义一个变量列表
import threading,time
my_list = list()
#一个传数据 一个读取
def write_():
for i in range(1,6):
time.sleep(0.1)
my_list.append(i)
print('write:',my_list)
def read_():
print('read',my_list)
# 开启两个线程,一个读一个写
if __name__ == '__main__':
write_thread = threading.Thread(target=write_)
read_thread = threading.Thread(target=read_) #进入新建状态
write_thread.start()
#线程同步 等第一个子线程执行完成后,后面的子线程才会工作
write_thread.join()
#执行到此时,相当于一个input,他在监听第一个子线程是否结束。结束了才进行下一个线程
#假如不用线程同步,读写线程同时作用于list,导致写入了,但是读却读不出来数据
read_thread.start() #进入就绪状态
#多个线程执行多个任务 而多个任务同时访问同一个变量,可能会导致出现资源竞争问题
#解决方案: 线程同步,(同一时间内,只有一个线程在访问这个变量)
#这是为了保证数据安全而牺牲性能。
import threading,time
#定义一个变量
num = 0
def sum1():
for _ in range(1000000):
global num
num+=1
print('num1',num)
def sum2():
for _ in range(1000000):
global num
num+=1
print('num2',num)
if __name__ == '__main__':
sum1_thread = threading.Thread(target=sum1)
sum2_thread = threading.Thread(target=sum2)
sum1_thread.start()
# sum1_thread.join()
sum2_thread.start()
两个线程同时对变量num进行操作,两个线程竞争,相互的赋值做自增,出现了重复性操作那么使结果达不到预期的1000...2000....,num1可能小于1000...大于1000....,num2大于1000....
应用场景:多个线程执行多个任务,而多个任务同时访问同一个变量,可能会出现资源竞争,导致数据出错
这样做是保证数据安全的前提下,牺牲了性能
2. 守护线程
守护
当主线程挂了所有的子线程也就没有意义了,不管子线程行进到哪一步,都得挂
sub_thread.setDaemon(True)
打开主线程层守护
sub_thread.setDaemon(True)
import threading
def a(num):
for i in num:
print('1111%s'%i)
t = threading.Thread(target=a,args=([1,2,3,4,6,],))
t.setDaemon(1)
t.start()
print('程序完成')
服务于非守护线程(主线程)
应用于主线程执行完任务后,无论子线程任务是否执行完成,程序都会退出
先设置线程守护,再启动线程,进程结束,不管线程有没有结束,程序都会关闭
都关闭
import threading
import time
def a(num):
for i in num:
time.sleep(1)
print('1111%s'%i)
def b():
print('bbbb')
t = threading.Thread(target=a,args=([1,2,3,4,6,],))
t2 = threading.Thread(target=b)
t.setDaemon(1)
t.start()
t2.start()
print('程序完成')
#bbbb 程序完成
t1都不带执行一个的。
启动线程守护,当前的守护不影响其他线程的执行,
两种设置方法
t = threading.Thread(target=a,daemon=True)
t.setDaemon(1)
设置了线程守护的子线程,在主线程完成之后,不管当前子线程是否结束,统统结束
3. mutex 互斥
import threading
# 在使用互斥锁的时候 上锁后 一定要解锁 如果不解锁 导致会出现死锁状态
# 定义一个锁对象
lock = threading.Lock()
# 获取数据
# 同一时间内 只允许一个线程执行任务通过下标索引获取列表的数据
def get_value(index):
# 定义一个列表
my_list = list("abcde")
# 上锁
lock.acquire()
# 防止发生列表越界
if index >= len(my_list):
# 解锁
lock.release() 如果这里不释放,将会造成死锁
return
# 获取数据
value = my_list[index]
print(value)
# 解锁
lock.release()
if __name__ == '__main__':
# 创建10个子线程
for i in range(10):
# 创建子线程
sub_thread = threading.Thread(target=get_value, args=(i,))
# 启动线程
sub_thread.start()
互斥锁
import threading
num = 0
# 创建一个互斥锁对象()
lock = threading.Lock()
# 任务1:
def work1():
# 上锁
lock.acquire()
global num
for _ in range(1000000):
num += 1
print("work1:", num)
# 解锁
lock.release()
# 任务1:
def work2():
# 上锁
lock.acquire()
global num
for _ in range(1000000):
num += 1
print("work2:", num)
# 解锁
lock.release()
if __name__ == '__main__':
# 定义两个子线程
work1_thread = threading.Thread(target=work1)
work2_thread = threading.Thread(target=work2)
# # 启动线程
work1_thread.start()
work2_thread.start()
import threading
num1 = 0
num2 = 0
lock=threading.RLock()
def fun1():
lock.acquire()
global num1
num1+=1
lock.release()
return num1
def fun2():
lock.acquire()
global num2
num2+=1
lock.release()
return num2
def fun3():
lock.acquire()
print('---------------')
a=fun1()
b=fun2()
lock.release()
print(a,b)
for i in range(3):
t=threading.Thread(target=fun3)
t.start()
print(num1,num2)
import threading,time
def fun(n):
semaphore.acquire()
time.sleep(1)
print('run the thread:%s\n'%n)
semaphore.release()
if __name__ == '__main__':
semaphore = threading.BoundedSemaphore(5)
for i in range(20):
t = threading.Thread(target=fun,args=(i,))
t.start()
全局解释器锁
线程之间同步交互
event = threading.Event()
线程间的通信消息队列队列 queue
生产者消费者模型
生产者消费者模式是通过一个容器来解决生产者和消费者的 强耦合 问题,
生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,
所以生产者产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,直接从阻塞队列中取。
阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
import threading,time,queue
q = queue.Queue()
def produce():
for i in range(10):
q.put('骨头%s'%i)
def consumer(name):
while q.qsize()>0:
time.sleep(1)
print('[%s]拿到了[%s]并且吃掉'%(name,q.get()))
if __name__ == '__main__':
p = threading.Thread(target=produce)
c = threading.Thread(target=consumer,args=('vvv',))
p.start()
c.start()
注意点:
1 线程之间执行是无序的。所以先执行哪个是由CPU决定的
2 主线程会等待所有的子线程结束后才结束
3 线程为编写高效服务器提供了一种便利的方法。多数的多线程服务器有着同样的体系结构,主线程(Main Thread)负责侦听请求的线程,当收到一个请求的时候,一个新的工作者线程被建立,处理该客户端的请求。当客户端断开连接,工作者线程终止
1. 线程之间执行是无序的
# 定义一个任务
def work():
# 运行状态
time.sleep(1) # 线程休眠(阻塞状态)
print("工作....", threading.current_thread().name)
# 死亡状态(子线程会被销毁掉)
if __name__ == '__main__':
for _ in range(5):
# 定义一个子线程
# 新建状态
sub_thread = threading.Thread(target=work)
# 就绪状态
sub_thread.start()
-
多任务-进程
概念:
1 通俗理解一个运行的程序或者软件,进程是操作系统资源分配的基本单位,一个程序至少有一个进程,一个进程至少有一个线程,多进程可以完成多任务
2 一个进程默认有一个线程,进程里面可以创建线程,线程是依附在进程里的,没有进程就没有线程
3 进程是多任务处理的基本单元,同时可运行多个进程,他不能执行任务(具体的逻辑函数),只是分配资源(切换线程)
创建
import multiprocessing
import time
def run_pro():
"""子进程要执行的代码"""
while True:
print("----2----")
time.sleep(1)
if __name__=='__main__':
# 创建子进程
sub_process = multiprocessing.Process(target=run_pro)
# 启动子进程
sub_process.start()
while True:
print("----1----")
time.sleep(1)
在打开一个程序时执行一串代码,这个时候就已经存在两个进程,父进程开启程序的的,子进程是当前程序的。
当启动多进程的时候,以上个子进程为父进程进行创建子进程的操作。
所以说,父子进程是相对的。
主进程等待子进程
import multiprocessing,time
#任务
def work():
for i in range(10):
print('工作...')
time.sleep(0.1)
if __name__ == '__main__':
# 01 如果创建了子进程,主进程即使任务完成,也会等待子进程任务结束程序再退出
# sub_process = multiprocessing.Process(target=work)
# sub_process.start()
# 02 主进程守护
# 如果创建了子进程,给他设置了主进程守护,等主进程任务结束后,那么就会销毁子进程,程序退出
# sub_process = multiprocessing.Process(target=work)
#
# # 主进程守护
# sub_process.daemon = 1
# sub_process.start()
# time.sleep(0.2)
# print('主进程结束')
# 03 terminate
# 如果创建了子进程,设置了子进程terminate。即使主进程还没结束,子进程就被销毁
sub_process = multiprocessing.Process(target=work)
sub_process.start()
time.sleep(0.5)
print('terminate要执行')
sub_process.terminate()
time.sleep(0.2)
print('主进程结束')
设置子进程销毁
# 如果创建了子进程,设置了子进程terminate。即使主进程还没结束,子进程就被销毁
sub_process = multiprocessing.Process(target=work)
sub_process.start()
time.sleep(0.5)
print('terminate要执行')
sub_process.terminate()
time.sleep(1)
print('主进程结束')
子进程消息队列
import multiprocessing
def write(queue):
for i in range(10):
#判断消息队列是否已满
if queue.full():
print('已经满了,写不进去了')
break
queue.put(i)
print('write',i)
def read(queue):
for i in range(10):
#判断消息队列是否为空
if queue.empty():
print('空了,读不到了')
break
print('read',queue.get())
if __name__ == '__main__':
#消息队列
queue = multiprocessing.Queue(5)
#子进程
w_p = multiprocessing.Process(target=write,args=(queue,))
r_p = multiprocessing.Process(target=read,args=(queue,))
w_p.start()
#子进程运行无序,添加进程同步
w_p.join()
r_p.start()
queue = multiprocessing.Queue(maxsize = 0)
# 创建一个消息队列对象
# 消息队列 也是一个容器
# Queue(n) 可以保存的消息数量(容量)
# Queue() 默认值为0 ,表示保存消息数量没有上限
#因为进程启动的顺序不定,所以,可能出现,1,也可能出现队列为空,报错
queue = multiprocessing.Queue(3)
queue.put_nowait(1)
print(queue.get_nowait())
使用进程池
import multiprocessing
import time
# 定义一个任务
def work(num):
time.sleep(1)
print("任务:", num, multiprocessing.current_process().pid)
if __name__ == '__main__':
# 定义一个进程池
pool = multiprocessing.Pool(3)
for i in range(1, 11):
# 利用进程池执行任务(异步)
# pool.apply_async(work, args=(i,))
# 利用进程池执行任务(同步)
pool.apply(work, args=(i,))
# 关闭进程池 不再接收新的任务
pool.close()
# 默认情况下主进程不等待进程池执行完任务就停止了
# 解决: 进程池同步
pool.join()
##pool.apply_async(func=work, args=(i,))
if __name__ == '__main__':
# 定义一个进程池
pool = multiprocessing.Pool(3)
for i in range(1, 11):
# 利用进程池执行任务(异步)
pool.apply_async(work, args=(i,))
# 关闭进程池 不再接收新的任务
pool.close()
# 默认情况下主进程不等待进程池执行完任务就停止了
# 解决: 进程池同步
pool.join() #使用join以后就得关闭进程池,不再向里面添加新任务,就执行刚才既定的。
-
多任务-协程
又称微线程,纤程,也称为用户级线程,
在不开辟线程的基础上完成多任务,也就是在单线程的情况下完成多任务,多个任务按照一定顺序交替执行
通俗理解只要在def里面只看到一个yield关键字表示就是协程
基础
迭代生成
判断类型
a = 10
flag = isinstance(a, int)
print(flag)
#a 是否为整数类型
判断是否为可迭代对象
l1 = 1,2,3
l2=4,5,6
a = zip(l1,l2)
print(a,type(a))
print(a.__next__)
print(next(a))
from collections import Iterable
print(isinstance(a, Iterable))
可迭代对象所拥有的
# 自定义可迭代对象: 在类里面定义__iter__方法创建的对象就是可迭代对象
class MyList(object):
def __init__(self):
self.my_list = list()
# 添加指定元素
def append_item(self, item):
self.my_list.append(item)
def __iter__(self):
# 可迭代对象的本质:遍历可迭代对象的时候其实获取的是可迭代对象的迭代器, 然后通过迭代器获取对象中的数据
my_iterator = MyIterator(self.my_list)
return my_iterator
可迭代器对象
# 自定义类
class StudentList(object):
def __init__(self):
# 创建属性并赋值
self.items = []
# 添加名字
def append_item(self, name):
self.items.append(name)
# 重写
# 返回一个迭代器
def __iter__(self):
return StudentIterator(self.items)
# 自定义一个服务于StudentList 迭代器类
class StudentIterator(object):
def __init__(self, items):
# 保存可迭代对象的数据
self.items = items
# 定义一个下标索引 记录迭代的位置
self.current_index = 0
def __iter__(self):
return self
def __next__(self):
# 判断条件
if self.current_index < len(self.items):
self.current_index += 1
return self.items[self.current_index - 1]
else:
# 抛出异常
raise StopIteration
# 自定义可迭代对象函数
def func1():
s_list = StudentList()
s_list.append_item("小明")
s_list.append_item("小红")
for name in s_list.items:
pass
# 为什么for循环没有报错 是因为内部做了 异常捕获
# for循环中 之所以可以取值 因为执行了一次iter 和多次next函数
# for name in s_list:
# print(name)
# 如果是迭代对象 -> 转迭代器 然后 next
# 如果是迭代器 直接 next
# s_iter = iter(s_list)
# for name in s_iter:
# print(name)
# s_iter = iter(s_list)
# print(s_iter)
# print(next(s_iter))
# print(next(s_iter))
# print(next(s_iter))
# 测试函数
def func2():
# 定义列表
my_list = list()
my_list.append("小明")
my_list.append("小红")
# for name in my_list:
# print(name)
# 可迭代对象 之所以可以通过for循环遍历取值的原因
# 因为每个可迭代对象中 都有一个迭代器 由迭代器帮可迭代对象取值操作
# iter函数: 通过可迭代对象 获取到迭代器
list_iter = iter(my_list)
print(list_iter)
# next函数:取值
print(next(list_iter))
print(next(list_iter))
print(next(list_iter))
if __name__ == '__main__':
func1()
两个函数
# 自定义可迭代对象: 在类里面定义__iter__方法创建的对象就是可迭代对象
class MyList(object):
def __init__(self):
self.my_list = list()
# 添加指定元素
def append_item(self, item):
self.my_list.append(item)
def __iter__(self):
# 可迭代对象的本质:遍历可迭代对象的时候其实获取的是可迭代对象的迭代器, 然后通过迭代器获取对象中的数据
my_iterator = MyIterator(self.my_list)
return my_iterator
# 自定义迭代器对象: 在类里面定义__iter__和__next__方法创建的对象就是迭代器对象
# 迭代器是记录当前数据的位置以便获取下一个位置的值
class MyIterator(object):
def __init__(self, my_list):
self.my_list = my_list
# 记录当前获取数据的下标
self.current_index = 0
def __iter__(self):
return self
# 获取迭代器中下一个值
def __next__(self):
if self.current_index < len(self.my_list):
self.current_index += 1
return self.my_list[self.current_index - 1]
else:
# 数据取完了,需要抛出一个停止迭代的异常
raise StopIteration
# 创建了一个自定义的可迭代对象
my_list = MyList()
my_list.append_item(1)
my_list.append_item(2)
# 获取可迭代对象的迭代器
my_iterator = iter(my_list)
print(my_iterator)
# 获取迭代器中下一个值
# value = next(my_iterator)
# print(value)
# 循环通过迭代器获取数据
whileTrue:
try:
value = next(my_iterator)
print(value)
except StopIteration as e:
break
# # 列表推导式
import sys
# list = [i for i in range(100)]
# print(type(list))
# # 计算占用内存
# print(sys.getsizeof(list))
# # 生成器
# g = (i for i in range(100))
# print(type(g))
# print(sys.getsizeof(g))
yield
def fibonacci(num):
a = 0
b = 1
# 记录生成fibonacci数字的下标
current_index = 0
print("--11---")
while current_index < num:
result = a
a, b = b, a + b
current_index += 1
print("--22---")
# 代码执行到yield会暂停,然后把结果返回出去,下次启动生成器会在暂停的位置继续往下执行
yield result
print("--33---")
创建 yield
import time
def fun1():
while 1:
print('任务1')
yield
time.sleep(0.5)
def fun2():
while 1:
print('任务2')
yield
time.sleep(0.5)
if __name__ == '__main__':
g1 = fun1()
g2 = fun2()
启动
g1 = fun1()
g2 = fun2()
# 启动当前协程
# while 1:
# 方法1启动
# next(g1)
# next(g2)
#方法2启动
# g1.__next__()
# g2.__next__()
#方法3启动
# g1.send(None)
# g2.send(None)
# while 1:
#
# g1.send(111)
#
# g2.send(222)
greenlet
import time
import greenlet
# 任务1
def work1():
for i in range(5):
print("work1...")
time.sleep(0.2)
# 切换到协程2里面执行对应的任务
g2.switch()
# 任务2
def work2():
for i in range(5):
print("work2...")
time.sleep(0.2)
# 切换到第一个协程执行对应的任务
g1.switch()
if __name__ == '__main__':
# 创建协程指定对应的任务
g1 = greenlet.greenlet(work1)
g2 = greenlet.greenlet(work2)
# 切换到第一个协程执行对应的任务
g1.switch()
gevent
gevent内部封装的greenlet,其原理是当一个greenlet遇到IO(指的是input output 输入输出,比如网络、文件操作等)操作时,
比如访问网络,就自动切换到其他,由于IO操作非常耗时,经常使程序处于等待状态,
有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。
g1.join()
线程不会等协程完成再退出。所以需要join
gevent切换执行
import gevent
def work(n):
for i in range(n):
# 获取当前协程
print(gevent.getcurrent(), i)
#用来模拟一个耗时操作,注意不是time模块中的sleep
gevent.sleep(1)
g1 = gevent.spawn(work, 5)
g2 = gevent.spawn(work, 5)
g3 = gevent.spawn(work, 5)
g1.join()
g2.join()
g3.join()
给程序打补丁
import gevent
import time
from gevent import monkey
# 打补丁,让gevent框架识别耗时操作,比如:time.sleep,网络请求延时
monkey.patch_all()
# 任务1
def work1(num):
for i in range(num):
print("work1....")
time.sleep(0.2)
# gevent.sleep(0.2)
# 任务1
def work2(num):
for i in range(num):
print("work2....")
time.sleep(0.2)
# gevent.sleep(0.2)
if __name__ == '__main__':
# 创建协程指定对应的任务
g1 = gevent.spawn(work1, 3)
g2 = gevent.spawn(work2, 3)
# 主线程等待协程执行完成以后程序再退出
g1.join()
g2.join()
初次使用gevent的时候,模拟耗时操作,不适用time操作,使用gevent下的time方法模拟
打补丁之后 ,就可以用time模块下的time来模拟耗时操作
while True:
gevent.spawn(deal_client_request,tcp_client_socket)
当把协程放入一个死循环的时候就不需要在设置同步join了,因为这个线程本身就没有得到退出,
-
对比: