Python自动化 【第九篇】:Python基础-线程、进程及python GIL全局解释器锁
1. 进程与线程区别
进程:以一个整体的形式暴露给操作系统管理,里面包含对各种资源的调用,内存的管理,网络接口的调用等,对各种资源管理的集合就可称为 进程。进程要操作cpu, 必须先创建一个线程。
- 线程共享内存空间,进程的内存是独立的
- 线程共用数据,进程数据独立
- 同一个进程的线程之间可以直接交流,两个进程必须通过中间代理实现通信
- 新线程容易创建,新进程需要克隆父进程
- 一个线程可以控制和操作同一进程里的其他线程, 进程只能操作子进程
2. 线程(threading模块)
a) 语法
import threading import time def run(n): print("task" ,n) time.sleep(2) t1 = threading.Thread(target=run, args=("t1",)) t2 = threading.Thread(target=run, args=("t2",)) t1.start() t2.start() print(t1.getName) #获取线程名 print(t2.getName)
import threading import time class MyThread(threading.Thread): def __init__(self, num): threading.Thread.__init__(self) self.num = num def run(self): # 定义每个线程要运行的函数 print("running on number:%s" % self.num) time.sleep(3) if __name__ == '__main__': t1 = MyThread(1) t2 = MyThread(2) t1.start() t2.start()
import threading import time def run(n): print("task" ,n) time.sleep(2) for i in range(50): t = threading.Thread(target=run, args=("t_%s" % i,)) t.start()
b) join
join & Daemon用法:
import threading import time def run(n): print("task" ,n) time.sleep(2) print("task done ", n) start_time = time.time() t_objs = [] for i in range(50): t = threading.Thread(target=run, args=("t_%s" % i,)) t.start() t_objs.append(t) for t in t_objs: t.join() print("cost_time:",time.time()-start_time)
import threading import time def run(n): print("task" ,n) time.sleep(2) print("task done ", n,threading.current_thread()) start_time = time.time() t_objs = [] for i in range(50): t = threading.Thread(target=run, args=("t_%s" % i,)) t.start() t_objs.append(t) # for t in t_objs: # t.join() print("===all threads has finished", threading.current_thread(), threading.active_count()) print("cost_time:",time.time()-start_time)
c) 信号量:
互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 。
d) 把子线程变成守护线程setDaemod()方法:
#!/usr/bin/env python # -*- coding:utf-8 -*- # Author: zhoujunlong import threading import time def run(n): print("task" ,n) time.sleep(2) print("task done ", n,threading.current_thread()) start_time = time.time() for i in range(50): t = threading.Thread(target=run, args=("t_%s" % i,)) t.setDaemon(True) # 把当前线程设置为守护线程 , 在start之前 t.start() print("===all threads has finished", threading.current_thread(), threading.active_count()) print("cost_time:",time.time()-start_time)
e) 事件:
event = threading.event()
event.wait() 等待标志位被设定
event.set() 设置标志位
event.clear() 清除标志位
event.is_set() 判断标志位是否设定
#!/usr/bin/env python # -*- coding:utf-8 -*- # Author: zhoujunlong import threading import time event = threading.Event() def lighter(): count = 0 event.set() while True: if 10>=count > 5:#change_to_red_light event.clear() # 清空标志位 print(" 33[41;1m红灯了 33[0m") elif count > 10: event.set() #change_to_green_light count = 0 else:print(" 33[42;1m绿灯了 33[0m") time.sleep(1) count += 1 def car(name): while True: if event.is_set():#代表绿灯 print("[%s] running..." % name) time.sleep(1) else: print("[%s] sees red light, waiting" % name) event.wait() print(" 33[34;1m[%s] 绿灯了,gogogo 33[0m"%name) light = threading.Thread(target=lighter) light.start() car1 = threading.Thread(target=car,args=("QQ",)) car2 = threading.Thread(target=car,args=("TT",)) car1.start() car2.start()
这里还有一个event使用的例子,员工进公司门要刷卡, 我们这里设置一个线程是“门”, 再设置几个线程为“员工”,员工看到门没打开,就刷卡,刷完卡,门开了,员工就可以通过。
import threading import time import random def door(): door_open_time_counter = 0 while True: if door_swiping_event.is_set(): print(" 33[32;1mdoor opening.... 33[0m") door_open_time_counter +=1 else: print(" 33[31;1mdoor closed...., swipe to open. 33[0m") door_open_time_counter = 0 #清空计时器 door_swiping_event.wait() if door_open_time_counter > 3:#门开了已经3s了,该关了 door_swiping_event.clear() time.sleep(0.5) def staff(n): print("staff [%s] is comming..." % n ) while True: if door_swiping_event.is_set(): print(" 33[34;1mdoor is opened, passing..... 33[0m") break else: print("staff [%s] sees door got closed, swipping the card....." % n) print(door_swiping_event.set()) door_swiping_event.set() print("after set ",door_swiping_event.set()) time.sleep(0.5) door_swiping_event = threading.Event() #设置事件 door_thread = threading.Thread(target=door) door_thread.start() for i in range(5): p = threading.Thread(target=staff,args=(i,)) time.sleep(random.randrange(3)) p.start()
f) queue(队列)
class queue.Queue(maxsize=0) #先入先出
class queue.LifoQueue(maxsize=0) #先入先出 #last in first out
class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列
- 实现程序的解耦
- 提高运行效率
>>> import queue >>> q = queue.Queue >>> q.put("disk1") >>> q.put("disk2") >>> q.put("disk3") >>q.qsize() 3 >>>q.get() 'disk1' >>>q.get() 'disk2' >>>q.get() 'disk3' >>>q.get_nowait()
>>>q = queue.Queue(maxsize=3) >>>q.put(1) >>>q.put(2) >>>q.put(3) >>>q.put(4)
import queue q = queue.LifoQueue() q.put(1) q.put(2) q.put(3) print(q.get()) print(q.get()) print(q.get())
3 2 1
import queue q = queue.PriorityQueue() q.put(("-1a1")) q.put(("5, a2")) q.put(("2, a3")) print(q.get()) print(q.get()) print(q.get())
-1, a1 2, a3 5, a2
Queue.task_done() 以下为解释:
Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.
If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).
Raises a ValueError if called more times than there were items placed in the queue.
g) 生产者消费者模型
import threading import queue def producer(): for i in range(10): q.put("骨头 %s" % i ) print("开始等待所有的骨头被取走...") q.join() print("所有的骨头被取完了...") def consumer(n): while q.qsize() >0: print("%s 取到" %n, q.get()) q.task_done() #告知这个任务执行完了 q = queue.Queue() p = threading.Thread(target=producer,) p.start() c1 = consumer("Jack")
import time,random import queue,threading q = queue.Queue() def Producer(name): count = 0 while count <20: time.sleep(random.randrange(3)) q.put(count) print('Producer %s has produced %s baozi..' %(name, count)) count +=1 def Consumer(name): count = 0 while count <20: time.sleep(random.randrange(4)) if not q.empty(): data = q.get() print(data) print(' 33[32;1mConsumer %s has eat %s baozi... 33[0m' %(name, data)) else: print("-----no baozi anymore----") count +=1 p1 = threading.Thread(target=Producer, args=('A',)) c1 = threading.Thread(target=Consumer, args=('B',)) p1.start() c1.start()
3. GIL全局解释器锁(面试必会)
import threading import time def run(n): global num time.sleep(2) num += 1 num = 0 t_objs = [] for i in range(1000): t = threading.Thread(target=run, args=("t_%s" % i,)) t.start() t_objs.append(t) for t in t_objs: t.join() print("===all threads has finished") print("num:", num)
正常来讲,这个num结果应该是1000, 但在python 2.7上多运行几次,会发现,最后打印出来的num结果不总是1000,为什么每次运行的结果不一样呢? 哈,很简单,假设你有A,B两个线程,此时都 要对num 进行减1操作, 由于2个线程是并发同时运行的,所以2个线程很有可能同时拿走了num=0这个初始变量交给cpu去运算,当A线程去处完的结果是1,但此时B线程运算完的结果也是1,两个线程同时CPU运算的结果再赋值给num变量后,结果就都是1。那怎么办呢? 很简单,每个线程在要修改公共数据时,为了避免自己在还没改完的时候别人也来修改此数据,可以给这个数据加一把锁, 这样其它线程想修改此数据时就必须等待你修改完毕并把锁释放掉后才能再访问此数据。
import threading, time def run1(): print("grab the first part data") lock.acquire() global num num += 1 lock.release() return num def run2(): print("grab the second part data") lock.acquire() global num2 num2 += 1 lock.release() return num2 def run3(): lock.acquire() res = run1() print('--------between run1 and run2-----') res2 = run2() lock.release() print(res, res2) if __name__ == '__main__': num, num2 = 0, 0 lock = threading.RLock() for i in range(10): t = threading.Thread(target=run3) t.start() while threading.active_count() != 1: print(threading.active_count()) else: print('----all threads done---') print(num, num2)
GIL VS Lock :
既然Python已经有一个GIL来保证同一时间只能有一个线程来执行了,为什么这里还需要lock?注意,这里的lock是用户级的lock,跟那个GIL没关系 ,具体我们通过下图来看一下: