概念梳理
进程:
一个程序的执行实例就是一个进程。每一个进程提供执行程序所需的所有资源。(进程本质上是资源的集合)
每个进程有自己的内存空间、数据栈等,只能使用进程间通讯,而不能直接共享信息。
在linux中,每个进程都是由父进程提供的。每启动一个子进程就从父进程克隆一份数据,但是进程之间的数据本身是不能共享的。
线程:
所有线程运行在同一个进程中,共享相同的运行环境。
每个独立的线程有一个程序运行的入口、顺序执行序列和程序的出口。每条线程可以执行多个任务。
线程的运行可以被抢占(中断),或暂时被挂起(睡眠),让其他线程运行(让步)。
一个进程中的各个线程间共享同一片数据空间。
进程和线程的区别:
1.同一个进程中的线程共享同一内存空间,但是进程之间是独立的。
2.同一个进程中的所有线程的数据是共享的(进程通讯),进程之间的数据是独立的。
3.对主线程的修改可能会影响其他线程的行为,但是父进程的修改(除了删除以外)不会影响其他子进程。
4.线程是一个上下文的执行指令,而进程则是与运算相关的一簇资源。
5.同一个进程的线程之间可以直接通信,但是进程之间的交流需要借助中间代理来实现。
6.创建新的线程很容易,但是创建新的进程需要对父进程做一次复制。
7.一个线程可以操作同一进程的其他线程,但是进程只能操作其子进程。
8.线程启动速度快,进程启动速度慢(但是两者运行速度没有可比性)。
单核情况下,多线程都是并发的,多核可以产生多线程并发
单核情况下,多进程都是并发的,多核可以产生多进程并发
多线程
常用方法:
start() 线程准备就绪,等待CPU调度
setName() 为线程设置名称
getName() 获取线程名称
setDaemon(True) 设置为守护线程, 使用setDaemon(True)把所有的子线程都变成了主线程的守护线程,因此当主线程结束后,子线程也会随之结束。所以当主线程结束后,整个程序就退出了。
join() 逐个执行每个线程,执行完毕后继续往下执行
run() 线程被cpu调度后自动执行线程对象的run方法,如果想自定义线程类,直接重写run方法就行了
GIL:
在非python环境中,单核情况下,同时只能有一个任务执行,即单个CPU一个时刻只能运行一个线程,多CPU时可以支持多个线程同时执行。但是在python中,无论有多少核,同时只能执行一个线程。究其原因,这就是由于GIL的存在导致的。
GIL是一把全局排他锁,同一时刻只有一个线程在运行。毫无疑问全局锁的存在会对多线程的效率有不小影响。甚至就几乎等于Python是个单线程的程序。
multiprocessing库的出现很大程度上是为了弥补thread库因为GIL而低效的缺陷。它完整的复制了一套thread所提供的接口方
便迁移。唯一的不同就是它使用了多进程而不是多线程。每个进程有自己的独立的GIL,因此也不会出现进程之间的GIL争
抢。
线程锁:互斥锁、递归锁、信号量:
1)由于线程之间是进行随机调度,如果有多个线程同时操作一个对象,如果没有很好地保护该对象,会造成程序结果的不可预期,我们也称此为“线程不安全”。
多线程并发不安全举例:
程序 1 执行 i++,程序 2 也执行 i++
当程序 1 将 i 值读取出来并运算后改为写入的时候,系统抢占式把控制权给个程序 2,程序 2 完整的执行完了 i++,随后系统将控制权交回给程序 1,此时的程序 1 并不知道自己被打断了,也不知道 i 已经被修改,还把之前计算好的值写入,最后结果就是 i 只加了 1,而不是加了 2
2)为了防止上面情况的发生,就出现了互斥锁(Lock)。
3)RLcok类的用法和Lock类一模一样,但它支持嵌套,在多个锁没有释放的时候一般会使用使用RLcok类。
4)互斥锁同时只允许一个线程更改数据,而信号量(Semaphore)是同时允许一定数量的线程更改数据
def run(n):
lock.acquire() #获取锁
global num
num += 1
lock.release() #释放锁
lock = threading.Lock() #实例化一个锁对象
num = 0
t_obj = []
for i in range(20000):
t = threading.Thread(target=run, args=("t-%s" % i,))
t.start()
t_obj.append(t)
for t in t_obj:
t.join()
print "num:", num
信号量
def run(n):
semaphore.acquire() #加锁
time.sleep(1)
print("run the thread:%s " % n)
semaphore.release() #释放
semaphore = threading.BoundedSemaphore(5) # 最多允许5个线程同时运行
事件(Event类)
python线程的事件用于主线程控制其他线程的执行,事件是一个简单的线程同步对象。
事件处理的机制:全局定义了一个“Flag”,当flag值为“False”,那么event.wait()就会阻塞,当flag值为“True”,那么event.wait()便不再阻塞。
#利用Event类模拟红绿灯
import threading
import time
event = threading.Event()
def lighter():
count = 0
event.set() #初始值为绿灯
while True:
if 5 < count <=10 :
event.clear() # 红灯,清除标志位
print("33[41;1mred light is on... 33[0m")
elif count > 10:
event.set() # 绿灯,设置标志位
count = 0
else:
print("33[42;1mgreen light is on... 33[0m")
time.sleep(1)
count += 1
def car(name):
while True:
if event.is_set(): #判断是否设置了标志位
print("[%s] running..."%name)
time.sleep(1)
else:
print("[%s] sees red light,waiting..."%name)
event.wait()
print("[%s] green light is on,start going..."%name)
light = threading.Thread(target=lighter,)
light.start()
car = threading.Thread(target=car,args=("MINI",))
car.start()
定时器
定时器,指定n秒后执行某操作
from threading import Timer
def hello():
print("hello, world")
t = Timer(1, hello)
t.start() # after 1 seconds, "hello, world" will be printed
多进程
进程间通信
l queue():
from multiprocessing import Process, Queue
import time
def write(q):
for i in ['A','B','C','D','E']:
print('Put %s to queue' % i)
q.put(i)
time.sleep(0.5)
def read(q):
while True:
v = q.get(True)
print('get %s from queue' %v)
if(v == 'E'): break;
if __name__ == '__main__':
q = Queue()
pw = Process(target=write,args=(q,))
pr = Process(target=read,args=(q,))
pw.start()
pr.start()
pr.join()
pr.terminate()
l pipe():
Pipe的本质是进程之间的数据传递,而不是数据共享。pipe()返回两个连接对象分别表示管道的两端,每端都有send()和recv()方法。
from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello'])
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv()) # prints "[42, None, 'hello']"
p.join()
进程锁
数据输出的时候保证不同进程的输出内容在同一块屏幕正常显示,防止数据乱序的情况。
from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
try:
print('hello world', i)
finally:
l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
进程池
由于进程启动的开销比较大,使用多进程的时候会导致大量内存空间被消耗。为了防止这种情况发生可以使用进程池,(由于启动线程的开销比较小,所以不需要线程池这种概念,多线程只会频繁得切换cpu导致系统变慢,并不会占用过多的内存空间)
进程池中常用方法:
apply() 同步执行(串行)
apply_async() 异步执行(并行)
terminate() 立刻关闭进程池
join() 主进程等待所有子进程执行完毕。必须在close或terminate()之后。
close() 等待所有进程结束后,才关闭进程池。
进程池内部维护一个进程序列,当使用时,去进程池中获取一个进程,如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。在上面的程序中产生了10个进程,但是只能有5同时被放入进程池,剩下的都被暂时挂起,并不占用内存空间,等前面的五个进程执行完后,再执行剩下5个进程。
from multiprocessing import Pool
import time
def f(x):
print x*x
time.sleep(2)
return x*x
if __name__ == '__main__':
'''定义启动的进程数量'''
pool = Pool(processes=5)
res_list = []
for i in range(10):
'''以异步并行的方式启动进程,如果要同步等待的方式,可以在每次启动进程之后调用res.get()方法,也可以使用Pool.apply'''
res = pool.apply_async(f,[i,])
print('-------:',i)
res_list.append(res)
pool.close()
pool.join()
for r in res_list:
print "result",(r.get(timeout=5))
>>>
0
4
1
9
16
('-------:', 0)
('-------:', 1)
('-------:', 2)
('-------:', 3)
('-------:', 4)
('-------:', 5)
('-------:', 6)
('-------:', 7)
('-------:', 8)
('-------:', 9)
25
36
49
64
81
result 0
result 1
result 4
result 9
result 16
result 25
result 36
result 49
result 64
result 81