threading模块 |
GIL 全局解释器锁(锁定同一个进程内的线程同时只有一个被执行)
t = threading.Thread(target=函数名, args=(i,)) # 创建一个线程运行函数
t.start() # 激活线程,
t.getName() # 获取线程的名称
t.setName() # 设置线程的名称
t.name # 获取或设置线程的名称
t.is_alive() # 判断线程是否为激活状态
t.isAlive() # 判断线程是否为激活状态
t.setDaemon() # 默认:False 守护线程(前台线程),主线程等待子线程结束后方退出.必须在执行start()前设置
# 设置:True 后台线程,主线程执行完后无论子线程完成与否均退出.
t.isDaemon() :# 判断是否为守护线程
t.ident : # 获取线程的标识符。线程标识符是一个非零整数,只有在调用了start()方法之后该属性才有效,否则它只返回None。
t.join() : # 逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义
run() : # 线程被cpu调度后自动执行线程对象的run方法
线程锁threading.RLock和threading.Lock |
我们使用线程对数据进行操作的时候,如果多个线程同时修改某个数据,可能会出现不可预料的结果,所以就引入的锁的概念.
lock = threading.RLock() # 创建一个线程锁对象
lock.acquire() # 获得锁(可以获得多把锁,但获得几次就需释放几次)
lock.release() # 释放锁
lock = threading.Lock # 用法同上,但其只能获得一把锁,多次获得锁会形成死锁,不推荐使用.
线程锁threading.Condition |
使用Condition对象可以在某些事件触发或者达到特定的条件后才处理数据,Condition除了具有Lock对象的acquire方法
和release方法外,还有wait方法、notify方法、notifyAll方法等用于条件处理。
threading.Condition([lock]):# 创建一个condition,支持从外界引用一个Lock对象(适用于多个condtion共用一个Lock的情况)
# 默认是创建一个新的Lock对象。
acquire()/release():# 获得/释放 Lock
wait([timeout]) # 线程挂起,直到收到一个notify通知或者超时(可选的,浮点数,单位是秒s)才会被唤醒继续运行。
# wait()必须在已获得Lock前提下才能调用,否则会触发RuntimeError。
# 调用wait()会释放Lock,直至该线程被Notify()、NotifyAll()或者超时线程又重新获得Lock.
notify(n=1) # 通知其他线程,那些挂起的线程接到这个通知之后会开始运行,默认是通知一个正等待该condition的线程
# notify()必须在已获得Lock前提下才能调用,否则会触发RuntimeError。notify()不会主动释放Lock。
notifyAll() # 如果wait状态线程比较多,notifyAll的作用就是通知所有线程
import threading def consumer(cond): with cond: print("aaa") cond.wait() # 线程阻塞等待通知 print("bbb") def producer(cond): with cond: # 自动管理释放锁 print("ccc") cond.notifyAll() # 通知所有等待通知的线程 print("ddd") condition = threading.Condition() c1 = threading.Thread(name="c1", target=consumer, args=(condition,)) c2 = threading.Thread(name="c2", target=consumer, args=(condition,)) p = threading.Thread(name="p", target=producer, args=(condition,)) c1.start() c2.start() p.start()
线程通信 threading.Event |
Event是线程间之间通信的机制之一:一个线程发送一个event信号,其他的线程则等待这个信号。用于主线程控制其他线程的执行。
event_obj = threading.Event() # 创建一个Event对象
Event.wait([timeout]) : # 堵塞线程,直到Event对象内部标识位被设为True或超时(如果提供了参数timeout)。
Event.set() : # 将标识位设为Ture
Event.clear() : # 将标识伴设为False。
Event.isSet() : # 判断标识位是否为Ture。
from threading import Thread, Event # 导入模块下的类 def func(event): # 每个线程都要传递一个 event 对象进去 print('开始') event.wait() # 默认为Flase(即阻塞) print('结束') event_obj = Event() for i in range(10): t = Thread(target=func, args=(event_obj, )) t.start() # 线程开始 event_obj.clear() # 标志位设置为Flase inp = input('是否开始') if inp == 'true': event_obj.set() # 标志位设置为True(即非阻塞)
多进程 multiprocessing |
from multiprocessing import Process # 导入进程模块
p = Process(target=f1, args=(11,), callback=fend) # 新建一个进程,运行f1函数,给其传参11,并指定回调函数
p.start() # 开始执行进程
进程共享数据的方法一
from multiprocessing import Process, Array def foo(i, temp): temp[i] = 100+i for item in temp: print(i, '----->', item) if __name__ == "__main__": temp = Array('i', [11, 22, 33, 44]) # 定义 'i' 为c语言中的整型 [11,22,33,44] 为数组 for i in range(2): p = Process(target=foo, args=(i, temp)) p.start() #****************************************************** # 'c': ctypes.c_char, 'u': ctypes.c_wchar, # 'b': ctypes.c_byte, 'B': ctypes.c_ubyte, # 'h': ctypes.c_short, 'H': ctypes.c_ushort, # 'i': ctypes.c_int, 'I': ctypes.c_uint, # 'l': ctypes.c_long, 'L': ctypes.c_ulong, # 'f': ctypes.c_float, 'd': ctypes.c_double #******************************************************
进程共享数据的方法二
from multiprocessing import Process,Manager def foo(num, dic): dic[num] = 100+num print(dic.values(), num) if __name__ == "__main__": manage = Manager() dic = manage.dict() for i in range(2): p = Process(target=foo, args=(i, dic)) p.start() p.join() print(dic) 结果:[100] 0 [100, 101] 1 {0: 100, 1: 101}
进程池 Pool |
p = Pool(5) # 新建一个进程池,最大进程数为5
p.apply(func[, args[, kwds]]) # 使用arg和kwds参数调用func函数,结果返回前会一直阻塞(daemon = False)
p.apply_async(func[, args[, kwds[, callback[, error_callback]]]])
# 1.字进程返回结果前不会被阻塞(daemon = True)
# 2.执行func函数(为其传参args()元组,keds{}字典)
# 3.func函数正常结束会将其返回值传递给 callback 回调函数。
# 4.func函数异常时,其结果会传递给 err_callback 回调函数。
pool.close() # 关闭进程池,阻止更多的任务提交到pool,待任务完成后,工作进程会退出。
pool.terminate()# 不管任务是否完成,立即停止工作进程。在对pool对象进程垃圾回收的时候,会立即调用terminate()。
pool.join() # 进程池等待所有的进程执行完
join() : 等待工作线程的退出,在调用join()前,必须调用close()或terminate()。
这样是因为被终止的进程需要被父进程调用wait(join等价与wait),否则进程会成为僵尸进程。
协程 gevent (依赖greenlet) |
线程和进程的操作是由程序触发系统接口,最后的执行者是系统;协程的操作则是程序员。
协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续)。
协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序。
协程的适用场景:当程序中存在大量不需要CPU的操作时(IO),适用于协程;
gevent协程模块 import gevent def foo(): print('111') # 执行顺序1 gevent.sleep(0) # 自动切换下一个协程 print('222') # 执行顺序3 def bar(): print('333') # 执行顺序2 gevent.sleep(0) print('444') # 执行顺序4 gevent.joinall([ gevent.spawn(foo), gevent.spawn(bar), ]) 实现并发访问: from gevent import monkey;monkey.patch_all() import gevent import requests def fun(url): print('GET: %s' % url) resp = requests.get(url) data = resp.text print('%d bytes received from %s.' % (len(data), url)) gevent.joinall([ gevent.spawn(fun, 'https://www.python.org/'), # 调用fun函数并为其传参url gevent.spawn(fun, 'https://www.yahoo.com/'), gevent.spawn(fun, 'https://github.com/'), ])