多线程和多进程
并发编程(线程 进程 协程 io多路复用 )
为什么要有操作系统
操作系统:位于底层硬件与应用软件之间的一层
工作方式:向下管理硬件,向上管理接口
1.当只有一个cpu的时候多个程序运行其实是在很快的时间内快速切换实现的,这种叫并发。
2.当有多个cpu的时候每个cpu单独执行一个程序这种运行方式叫并行
(程序切换状态是非常快的,只有0.01秒,人是无法感知的,所以感觉在同时跑多个任务,真正的多进程需要有多个cpu,这就是并行)
进程定义:
进程是由程序、数据集、进程控制块组成
在程序切换时,需要将当前程序状态保存这个功能由进程控制块完成
进程和线程是由操作系统切换:
切换条件:
1 出现io操作
2 固定时间
进程:资源管理单位(容器)
管理多个线程
线程:最小执行单位
被进程所管理
进程和线程的关系:
(1)一个线程只能属于一个进程,而一个进程可以有多个线程,但至少有一个线程。
(2)资源分配给进程,同一进程的所有线程共享该进程的所有资源。
(3)CPU分给线程,即真正在CPU上运行的是线程。
并行和并发
并行处理(Parallel Processing)是计算机系统中能同时执行两个或更多个处理的一种计算方法。并行处理可同时工作于同一程序的不同方面。并行处理的主要目的是节省大型和复杂问题的解决时间。并发处理(concurrency Processing):指一个时间段中有几个程序都处于已启动运行到运行完毕之间,且这几个程序都是在同一个处理机(CPU)上运行,但任一个时刻点上只有一个程序在处理机(CPU)上运行
并发的关键是你有处理多个任务的能力,不一定要同时。并行的关键是你有同时处理多个任务的能力。所以说,并行是并发的子集
同步与异步
在计算机领域,同步就是指一个进程在执行某个请求的时候,若该请求需要一段时间才能返回信息,那么这个进程将会一直等待下去,直到收到返回信息才继续执行下去;异步是指进程不需要一直等下去,而是继续执行下面的操作,不管其他进程的状态。当有消息返回时系统会通知进程进行处理,这样可以提高执行的效率。举个例子,打电话时就是同步通信,发短息时就是异步通信。
(python中没办法实现真正的线程并行,进程并行没问题)
python的多线程:由于GIL锁,导致同一时刻同一进程只能有一个进程运行
线程对象的创建:
Thread类直接创建
import threading#(线程)
import time
def tingge():
print('听歌')
time.sleep(3)
print('听歌结束')
def xiebike():
print('写博客')
time.sleep(5)
print('写博客结束')
t1=threading.Thread(target=tingge)
t2=threading.Thread(target=xieboke)
t1.start()#开启子线程1
t2.start()#开启子线程2
print('ending!')#主线程结束
(执行效果是并发执行两个子线程和主线程)
Thread类继承创建
#继承Thread式创建
import threading
import time
class MyThread(threading.Thread):
def __init__(self,num):
threading.Thread.__init__(self)
self.num=num
def run(self):#重写父类的run方法
print("running on number:%s" %self.num)
time.sleep(3)
t1=MyThread(56)
t2=MyThread(78)
t1.start()
t2.start()
print("ending")
Thread类的实例方法
join()和setDaemon()
# join():在子线程完成运行之前,这个子线程的父线程将一直被阻塞。
# setDaemon(True):
'''
将线程声明为守护线程,必须在start() 方法调用之前设置,如果不设置为守护线程程序会被无限挂起。
当我们在程序运行中,执行一个主线程,如果主线程又创建一个子线程,主线程和子线程 就分兵两路,分别运行,那么当主线程完成
想退出时,会检验子线程是否完成。如果子线程未完成,则主线程会等待子线程完成后再退出。但是有时候我们需要的是只要主线程
完成了,不管子线程是否完成,都要和主线程一起退出,这时就可以 用setDaemon方法啦'''
import threading
from time import ctime,sleep
import time
def Music(name):
print ("Begin listening to {name}. {time}".format(name=name,time=ctime()))
sleep(3)
print("end listening {time}".format(time=ctime()))
def Blog(title):
print ("Begin recording the {title}. {time}".format(title=title,time=ctime()))
sleep(5)
print('end recording {time}'.format(time=ctime()))
threads = []
t1 = threading.Thread(target=Music,args=('FILL ME',))
t2 = threading.Thread(target=Blog,args=('',))
threads.append(t1)
threads.append(t2)
if __name__ == '__main__':
#t2.setDaemon(True)
for t in threads:
#t.setDaemon(True) #注意:一定在start之前设置
t.start()
#t.join()
#t1.join() #添加join是等待主线程
#t2.join() # 考虑这三种join位置下的结果?
print ("all over %s" %ctime())
其他方法:
Thread实例对象的方法
# isAlive(): 返回线程是否活动的。
# getName(): 返回线程名。
# setName(): 设置线程名。
threading模块提供的一些方法:
# threading.currentThread(): 返回当前的线程变量。
# threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
# threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
GIL全局解释器锁:(全局解释器是加在cpython解释器上的)
Python中的线程是操作系统的原生线程,Python虚拟机使用一个全局解释器锁(Global Interpreter Lock)来互斥线程对Python虚拟机的使用。为了支持多线程机制,一个基本的要求就是需要实现不同线程对共享资源访问的互斥,所以引入了GIL。
GIL:在一个线程拥有了解释器的访问权之后,其他的所有线程都必须等待它释放解释器的访问权,即使这些线程的下一条指令并不会互相影响。
在调用任何Python C API之前,要先获得GIL
GIL缺点:多处理器退化为单处理器;优点:避免大量的加锁解锁操作
总结:
无论你启多少个线程,你有多少个cpu, Python在执行一个进程的时候会淡定的在同一时刻只允许一个线程运行。
所以,python是无法利用多核CPU实现多线程的。
这样,python对于计算密集型的任务开多线程的效率甚至不如串行(没有大量切换),但是,对于IO密集型的任务效率还是有显著提升的。
python非要使用多核:只能开多个进程(因为GIL锁),弊端:开销大切切换复杂
着重点:只有协程+多进程才是解决的办法
方向2:io多路复用
终极解决:换C模块实现多线程
同步锁:
import time
import threading
def addNum():
global num #在每个线程中都获取这个全局变量
#num-=1
lock.acquire()
temp=num
time.sleep(0.1)
num =temp-1 # 对此公共变量进行-1操作
lock.release()
num = 100 #设定一个共享变量
thread_list = []
lock=threading.Lock()
for i in range(100):
t = threading.Thread(target=addNum)
t.start()
thread_list.append(t)
for t in thread_list: #等待所有线程执行完毕
t.join()
print('Result: ', num)
死锁和递归锁:
import threading
import time
mutexA = threading.Lock()
mutexB = threading.Lock()
class MyThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
self.fun1()
self.fun2()
def fun1(self):
mutexA.acquire() # 如果锁被占用,则阻塞在这里,等待锁的释放
print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))
mutexB.acquire()
print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
mutexB.release()
mutexA.release()
def fun2(self):
mutexB.acquire()
print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
time.sleep(0.2)
mutexA.acquire()
print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))
mutexA.release()
mutexB.release()
if __name__ == "__main__":
print("start---------------------------%s"%time.time())
for i in range(0, 10):
my_thread = MyThread()
my_thread.start()
#如果在锁的争夺中出现有的锁还没有被释放的过程,就会出现死锁
这个时候可以把两个锁替换为一个递归锁Rlock=threading.RLock()
Event线程通信标志位:
线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就 会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行
event.isSet():返回event的状态值;
event.wait():如果 event.isSet()==False将阻塞线程;
#wait括号内可以加入一个最长等待时间的参数
event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
event.clear():恢复event的状态值为False。
import threading
import time
import logging
logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s',)
def worker(event):
logging.debug('Waiting for redis ready...')
event.wait()
logging.debug('redis ready, and connect to redis server and do some work [%s]', time.ctime())
time.sleep(1)
def main():
readis_ready = threading.Event()
t1 = threading.Thread(target=worker, args=(readis_ready,), name='t1')
t1.start()
t2 = threading.Thread(target=worker, args=(readis_ready,), name='t2')
t2.start()
logging.debug('first of all, check redis server, make sure it is OK, and then trigger the redis ready event')
time.sleep(3) # simulate the check progress
readis_ready.set()
if __name__=="__main__":
main()
Semaphore(信号量)
(相当于数量锁,一次可以锁几个线程,限制线程同时运行的数量)
Semaphore管理一个内置的计数器,
每当调用acquire()时内置计数器-1;
调用release() 时内置计数器+1;
计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。
实例:(同时只有5个线程可以获得semaphore,即可以限制最大连接数为5):
import threading
import time
semaphore = threading.Semaphore(5)
def func():
if semaphore.acquire():
print (threading.currentThread().getName() + ' get semaphore')
time.sleep(2)
semaphore.release()
for i in range(20):
t1 = threading.Thread(target=func)
t1.start()
multiprocessing模块
由于GIL的存在,python中的多线程其实并不是真正的多线程,如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程。
multiprocessing包是Python中的多进程管理包。与threading.Thread类似,它可以利用multiprocessing.Process对象来创建一个进程。该进程可以运行在Python程序内部编写的函数。该Process对象与Thread对象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition类 (这些对象可以像多线程那样,通过参数传递给各个进程),用以同步进程,其用法与threading包中的同名类一致。所以,multiprocessing的很大一部份与threading使用同一套API,只不过换到了多进程的情境
# Process类调用
from multiprocessing import Process
import time
def f(name):
print('hello', name,time.ctime())
time.sleep(1)
if __name__ == '__main__':
p_list=[]
for i in range(3):
p = Process(target=f, args=('alvin:%s'%i,))
p_list.append(p)
p.start()
for i in p_list:
i.join()
print('end')
# 继承Process类调用
from multiprocessing import Process
import time
class MyProcess(Process):
def __init__(self):
super(MyProcess, self).__init__()
# self.name = name
def run(self):#重写父类中的run方法
print ('hello', self.name,time.ctime())
time.sleep(1)
if __name__ == '__main__':
p_list=[]
for i in range(3):
p = MyProcess()
p.start()
p_list.append(p)
for p in p_list:
p.join()
print('end')
process类
构造方法:
Process([group [, target [, name [, args [, kwargs]]]]])
group: 线程组,目前还没有实现,库引用中提示必须是None;
target: 要执行的方法;
name: 进程名;
args/kwargs: 要传入方法的参数。
实例方法:
is_alive():返回进程是否在运行。
join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。
start():进程准备就绪,等待CPU调度
run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。
terminate():不管任务是否完成,立即停止工作进程
属性:
daemon:和线程的setDeamon功能一样
name:进程名字。
pid:进程号。
from multiprocessing import Process
import os
import time
def info(name):
print("name:",name)
print('parent process:', os.getppid())
print('process id:', os.getpid())
print("------------------")
time.sleep(1)
def foo(name):
info(name)
if __name__ == '__main__':
info('main process line')
p1 = Process(target=info, args=('alvin',))
p2 = Process(target=foo, args=('egon',))
p1.start()
p2.start()
p1.join()
p2.join()
print("ending")
协程
(携程因为进程内只有单个线程,所有不存在切换)
(因为只有一个线程所以不存在锁的概念)
yield与协程
import time
"""
传统的生产者-消费者模型是一个线程写消息,一个线程取消息,通过锁机制控制队列和等待,但一不小心就可能死锁。
如果改用协程,生产者生产消息后,直接通过yield跳转到消费者开始执行,待消费者执行完毕后,切换回生产者继续生产,效率极高。
"""
# 注意到consumer函数是一个generator(生成器):
# 任何包含yield关键字的函数都会自动成为生成器(generator)对象
def consumer():
r = ''
while True:
# 3、consumer通过yield拿到消息,处理,又通过yield把结果传回;
# yield指令具有return关键字的作用。然后函数的堆栈会自动冻结(freeze)在这一行。
# 当函数调用者的下一次利用next()或generator.send()或for-in来再次调用该函数时,
# 就会从yield代码的下一行开始,继续执行,再返回下一次迭代结果。通过这种方式,迭代器可以实现无限序列和惰性求值。
n = yield r
if not n:
return
print('[CONSUMER] ←← Consuming %s...' % n)
time.sleep(1)
r = '200 OK'
def produce(c):
# 1、首先调用c.next()启动生成器
next(c)
n = 0
while n < 5:
n = n + 1
print('[PRODUCER] →→ Producing %s...' % n)
# 2、然后,一旦生产了东西,通过c.send(n)切换到consumer执行;
cr = c.send(n)
# 4、produce拿到consumer处理的结果,继续生产下一条消息;
print('[PRODUCER] Consumer return: %s' % cr)
# 5、produce决定不生产了,通过c.close()关闭consumer,整个过程结束。
c.close()
if __name__=='__main__':
# 6、整个流程无锁,由一个线程执行,produce和consumer协作完成任务,所以称为“协程”,而非线程的抢占式多任务。
c = consumer()
produce(c)
'''
result:
[PRODUCER] →→ Producing 1...
[CONSUMER] ←← Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 2...
[CONSUMER] ←← Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 3...
[CONSUMER] ←← Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 4...
[CONSUMER] ←← Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 5...
[CONSUMER] ←← Consuming 5...
[PRODUCER] Consumer return: 200 OK
'''
基于greenlet的框架
gevent模块实现协程
Python通过yield提供了对协程的基本支持,但是不完全。而第三方的gevent为Python提供了比较完善的协程支持。
gevent是第三方库,通过greenlet实现协程,其基本思想是:
当一个greenlet遇到IO操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO。
由于切换是在IO操作时自动完成,所以gevent需要修改Python自带的一些标准库,这一过程在启动时通过monkey patch完成:
import gevent
import time
def foo():
print("running in foo")
gevent.sleep(2)
print("switch to foo again")
def bar():
print("switch to bar")
gevent.sleep(5)
print("switch to bar again")
start=time.time()
gevent.joinall(
[gevent.spawn(foo),
gevent.spawn(bar)]
)
print(time.time()-start)
当然,实际代码里,我们不会用gevent.sleep()去切换协程,而是在执行到IO操作时,gevent自动切换,代码如下:
from gevent import monkey
monkey.patch_all()
import gevent
from urllib import request
import time
def f(url):
print('GET: %s' % url)
resp = request.urlopen(url)
data = resp.read()
print('%d bytes received from %s.' % (len(data), url))
start=time.time()
gevent.joinall([
gevent.spawn(f, 'https://itk.org/'),
gevent.spawn(f, 'https://www.github.com/'),
gevent.spawn(f, 'https://zhihu.com/'),
])
# f('https://itk.org/')
# f('https://www.github.com/')
# f('https://zhihu.com/')
print(time.time()-start)