线程
python 的 threading 模块提供了线程的相关操作,线程是应用程序中工作的最小单元。
import time import threading def process(arg): time.sleep(1) print(arg) if __name__ == '__main__': for i in range(10): t = threading.Thread(target=process, args=(i,)) t.start()
通过 threading 模块实现多线程,本身需要至少 10s 的程序会很快执行完成
start() # 线程准备就绪,等待 CPU 的调度 setDaemon() # 为线程命名 getName() # 获取线程名称 setDaemon(True) # 设置是否为后台进程,默认为 False # 为 True 时,主线程执行时,后台线程也在执行,当主线程执行完成后,无论后台的子线程是否执行完成,都会被停止 # 为 False 时,主线程为等待子线程执行完成后再中止 join() # 逐个执行线程,执行完毕后继续向下执行,该方法使多线程失去了意义 run() # 线程被 CPU 调度后自动执行线程对应的 run 方法
创建多线程的两种方法
import threading def f1(x): print(x) # 第一种方法:常用 t = threading.Thread(target=f1, args=(1,)) t.start() # t.start() 表示线程已经准备就绪,等待 CPU 调用, CPU 在调用的时候,其实就是调用 run() 方法 # 第二种方法: class MyThread(threading.Thread): def __init__(self, func, args): self.func = func self.args = args super(MyThread, self).__init__() def run(self): self.func(self.args) obj = MyThread(f1, 123) obj.start()
线程锁(Lock,RLock)
threading 的 Lock 和 RLock 方法提供了线程锁的功能,同时只允许一个线程更改数据
Lock 方法不支持多重锁
RLock 方法支持多重锁,一般使用 RLock 方法
由于线程之间的数据是共享的,当多个线程同时修改一个数据时,就会出现脏数据,此时就需要通过线程锁来让线程一个一个修改数据
""" 创建一个全局变量 NUM = 10, 通过 func 函数每次自减 1,1s 后打印数据 """ import time import threading NUM = 10 def func(): # 修改全局变量 global NUM NUM -= 1 # 等待 1s 后输出 NUM time.sleep(1) print(NUM) # 创建 10 个线程执行 func 函数 for i in range(10): t = threading.Thread(target=func) t.start() ################ 输出结果 ################ 0 0 0 0 0 0 0 0 0 0
无线程锁时,输出的值全部都为 0,输出的结果是所有线程修改后的数据。
通过线程锁,使数据同时只能让一个线程修改
import time import threading NUM = 10 # 创建锁 lock = threading.RLock() def func(): # 上锁 lock.acquire() # 修改全局变量 global NUM NUM -= 1 # 等待 1s 后输出 NUM time.sleep(1) print(NUM) # 解锁 lock.release() # 创建 10 个线程执行 func 函数 for i in range(10): t = threading.Thread(target=func) t.start() ################ 输出结果 ################ 9 8 7 6 5 4 3 2 1 0
信号量(Semaphore)
同时允许多个线程
import time import threading semaphore = threading.Semaphore(2) def func(x): semaphore.acquire() time.sleep(1) print(x) semaphore.release() # 创建 10 个线程执行 func 函数 for i in range(10): t = threading.Thread(target=func, args=(i,)) t.start()
事件(Event)
通过主线程控制子线程的执行,主要有三个方法 set,wait,clear
事件处理会在全局定义一个 Flag,如果 Flag 为 False,那么当执行到 event.wait 时就会阻塞,当 Flag 为 True 是,event.wait 就不会阻塞。和红绿灯机制相似
- claer 将 Flag 设置为 False
- set 将 Flag 设置为 True
- wait 阻塞
import time import threading event = threading.Event() def func(x): event.wait() time.sleep(1) print(x) event.wait() # 创建 10 个线程执行 func 函数 for i in range(10): t = threading.Thread(target=func, args=(i,)) t.start() # Flag 默认为 False inp = input('>') if inp: event.set()
条件(Condition)
线程默认等待,只有满足某个条件时,才会释放 n 个线程
import threading condition = threading.Condition() def func(x): condition.acquire() condition.wait() print(x) condition.release() # 创建 10 个线程执行 func 函数 for i in range(10): t = threading.Thread(target=func, args=(i,)) t.start() while True: inp = input('>') if inp: condition.acquire() # 释放 int(inp) 个线程 condition.notify(int(inp)) condition.release()
定时器(Timer)
指定 n 秒后执行某操作
import threading def func(): print("Hello World") t = threading.Timer(1, func) t.start()
线程池
使用 concurrent.futures 的 ThreadPoolExecutor 类实现线程池,但是如果获取返回值时会阻塞
import time from concurrent.futures import ThreadPoolExecutor def f(x): time.sleep(1) print(x) # return x+100 # 定义线程池的大小为 2 pool = ThreadPoolExecutor(5) for i in range(20): r = pool.submit(f, i) # print(r.result()) # 获取返回值,如果获取返回值则会阻塞
自定义线程池
""" 在线程池初始化的时候就需要先创建线程池 """ import queue import threading class ThreadPool(object): def __init__(self, maxsize = 5): self.maxsize = maxsize self._queue = queue.Queue(self.maxsize) for i in range(self.maxsize): self._queue.put(threading.Thread) def get_request(self): return self._queue.get() def add_request(self): self._queue.put(threading.Thread) def func(x, pool): time.sleep(1) print(x) pool.add_request() pool = ThreadPool(2) for i in range(20): thread = pool.get_request() t = thread(target=func, args=(i, pool)) t.start()
""" 支持传函数、传参、传回调函数、立即终止所有线程、最大优点:线程的循环利用,节省时间和资源 【来源网站 http://www.bkjia.com/Pythonjc/1135798.html】 """ import queue import threading import contextlib import time StopEvent = object() class ThreadPool(object): def __init__(self, max_num): self.q = queue.Queue() self.max_num = max_num self.terminal = False self.generate_list = [] self.free_list = [] def run(self, func, args, callback=None): """ 线程池执行一个任务 :param func: 任务函数 :param args: 任务函数所需参数 :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数) :return: 如果线程池已经终止,则返回True否则None """ if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: self.generate_thread() w = (func, args, callback,) self.q.put(w) def generate_thread(self): """ 创建一个线程 """ t = threading.Thread(target=self.call) t.start() def call(self): """ 循环去获取任务函数并执行任务函数 """ current_thread = threading.currentThread self.generate_list.append(current_thread) event = self.q.get() while event != StopEvent: func, arguments, callback = event try: result = func(*arguments) status = True except Exception as e: status = False result = e if callback is not None: try: callback(status, result) except Exception as e: pass if self.terminal: # False event = StopEvent else: with self.worker_state(self.free_list,current_thread): event = self.q.get() @contextlib.contextmanager def worker_state(self,x,v): x.append(v) try: yield finally: x.remove(v) def close(self): num = len(self.generate_list) while num: self.q.put(StopEvent) num -= 1 # 终止线程(清空队列) def terminate(self): self.terminal = True while self.generate_list: self.q.put(StopEvent) self.q.empty() def work(i): time.sleep(1) print(i) pool = ThreadPool(10) for item in range(50): pool.run(func=work, args=(item,)) pool.close()
进程
创建子进程需要使用 multiprocess 模块
from multiprocessing import Process def f(): print("Hello") for i in range(2): p = Process(target=f,) p.start()
进程间数据共享
进程间默认无数据共享,
import time from multiprocessing import Process li = [] def f(x): li.append(x) time.sleep(1) # 输出各自的结果 print(li) for i in range(10): p = Process(target=f, args=(i,)) p.start()
创建 Array 时必须指定类型
'c': ctypes.c_char, 'u': ctypes.c_wchar, 'b': ctypes.c_byte, 'B': ctypes.c_ubyte, 'h': ctypes.c_short, 'H': ctypes.c_ushort, 'i': ctypes.c_int, 'I': ctypes.c_uint, 'l': ctypes.c_long, 'L': ctypes.c_ulong, 'f': ctypes.c_float, 'd': ctypes.c_double
from multiprocessing import Process, Queue def f(): print(q.get()) if __name__ == '__main__': q = Queue() q.put(1) q.put(2) q.put(3) q.put(4) for i in range(4): p = Process(target=f) p.start()
进程池
from multiprocessing import Process, Pool def f(x): time.sleep(2) print(x) pool = Pool(5) for i in range(10): pool.apply_async(f,args=(i,)) print('end') pool.close() pool.join()
协程
线程和进程的操作是由程序触发系统接口,最后的执行者是系统;协程的操作则是程序员。
协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续)。协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序。
协程的适用场景:当程序中存在大量不需要CPU的操作时(IO),适用于协程;
from greenlet import greenlet def func1(): print("func1 11") gr2.switch() print("func1 22") gr2.switch() def func2(): print("func2 11") gr1.switch() print("func2 22") if __name__ == '__main__': gr1 = greenlet(func1) gr2 = greenlet(func2) gr1.switch()
import gevent def func1(): print("func1 11") gevent.sleep() print("func1 22") def func2(): print("func2 11") gevent.sleep() print("func2 22") if __name__ == '__main__': gevent.joinall([ gevent.spawn(func1), gevent.spawn(func2), ])
请求 URL 实例:
from gevent import monkey import gevent import requests monkey.patch_all() def geturl(url): print("GET: %s" % url) resp = requests.get(url) data = resp.text print("%s types received from %s" % (len(data), url)) gevent.joinall([ gevent.spawn(geturl, 'https://www.baidu.com'), gevent.spawn(geturl, 'https://www.python.org'), gevent.spawn(geturl, 'https://github.com'), ])
用途
多线程:用于 IO 密集型操作
多进程:用于计算密集型操作
协程:用于线程内部,解决 IO 等待