多线程
多任务可以由多进程完成,也能够用一个进程的多线程完成。线程(Thread)也叫轻量级进程,是操作系统能够进行运算调度的最小单位,它被包涵在进程之中,是进程中的实际运作单位。线程自己不拥有系统资源,只拥有一点儿在运行中必不可少的资源(程序计数器、寄存器和栈),但它可与同属一个进程的其它线程共享进程所拥有的全部资源。一个线程可以创建和撤消另一个线程,同一进程中的多个线程之间可以并发执行。
Python中使用thread、threading来创建线程,thread是比较底层的模块,threading模块对thread做了一些封装,更加方便使用。
import threading import time class MyThread(threading.Thread): def run(self): for i in range(3): info = "线程 %s running"%self.name +"***"+ str(i) print(info) time.sleep(1) if __name__ == '__main__': print("%s 开始"%threading.current_thread().name) t = MyThread() t.start() t.join() print("%s 结束"%threading.current_thread().name) 输出: MainThread 开始 线程 Thread-1 running***0 线程 Thread-1 running***1 线程 Thread-1 running***2 MainThread 结束
例子中使用一个类去继承threading.Thread,然后重写run方法,这点很类似于Java中的Thread类。任何进程都会默认启动一个线程,这个线程便是主进程,主进程又可以启动子线线程,主线程会等待子进程结束后结束。
互斥锁
多进程会复制父进程所有信息独立一块内存运行,进程之间彼此没有影响。一个进程运行时,各个线程也会拥有一份独立的内存空间,但是线程之间共享全局变量,当多个线程同时修改共享数据时,会引起数据混乱,即非线程安全。为了保证多个线程安全访问共享数据资源,引入同步机制的互斥锁。
- threading模块中定义Lock类来实现互斥锁:
#创建锁
lock = threading.Lock()
#锁定
lock.acquire([blocking])
#释放
lock.release()
如果设定blocking为True,则当前线程会堵塞,直到获取到这个锁为止(如果没有指定,那么默认为True)
如果设定blocking为False,则当前线程不会堵塞
非线程安全引起数据混乱:
from threading import Thread,Lock import time num = 0 def test1(): global num for i in range(1000000): num += 1 print('----test1----num=%d'%num) def test2(): global num for i in range(1000000): num += 1 print('----test2----num=%d'%num) if __name__ == '__main__': t1 = Thread(target=test1) t1.start() t2 = Thread(target=test2) t2.start() print('---num=%d---'%num) 输出: ---num=586426--- ----test1----num=1308133 ----test2----num=1460511
引入互斥锁,线程安全:
from threading import Thread,Lock import time num = 0 def test1(): global num for i in range(1000000): lockFlag = lock.acquire() if lockFlag: num += 1 lock.release() print('----test1----num=%d'%num) def test2(): global num for i in range(1000000): lockFlag = lock.acquire() if lockFlag: num += 1 lock.release() print('----test2----num=%d'%num) if __name__ == '__main__':
lock = Lock() t1 = Thread(target=test1) t1.start() t2 = Thread(target=test2) t2.start() print('---num=%d---'%num) 输出: ---num=88200--- ----test1----num=1921024 ----test2----num=2000000
对比上述两种情况,加入互斥锁后能够获得预期结果。当一个线程调用acquire()时,锁进入locked状态,每次只有一个线程能够获取到锁。另外一个线程试图去获取已经被其他线程持有的锁时,该线程会立即变为blocked状态,即被阻塞,直到拥有锁的线程调用锁的release()方法释放锁之后,锁进入unlocked状态,这时线程调度程序从处于同步阻塞状态的线程中选择一个来获得锁,使得该线程进入运行(running)状态。
锁的优点:
- 确保某段代码同一时刻只能由一个线程从头到尾执行
锁的缺点:
- 阻止了多线程并发执行,包含锁的代码实际上只能单线程模式执行,效率大大降低,所以锁包裹的代码尽可能少。
- 由于可以存在多个锁,不同的线程持有不同的锁,并试图获取对方持有的锁时,可能会造成死锁
死锁
线程之间共享多个资源时,如果两个线程分别占有一部分资源并且同时等待对方持有的锁,就会造成死锁。死锁一旦发生就会造成应用停止响应。
import threading import time class MyThread1(threading.Thread): def run(self): if aLock.acquire(): print(self.name+' A锁被持有了---') time.sleep(1) if bLock.acquire(): print(self.name+'获取B锁---') bLock.release() aLock.release() class MyThread2(threading.Thread): def run(self): if bLock.acquire(): print(self.name+' B锁被持有了---') time.sleep(1) if aLock.acquire(): print(self.name+'获取A锁---') aLock.release() bLock.release() if __name__ == '__main__': aLock = threading.Lock() bLock = threading.Lock() t1 = MyThread1() t2 = MyThread2() t1.start() t2.start() 输出: Thread-1 A锁被持有了--- Thread-2 B锁被持有了---
解决死锁简单方法可以使用超时算法:aLock.acquire(timeout=2)即等待2s,超过timeout直接往下执行。
线程之间通信
在谈线程通信时,大部分语言都会列举生产者消费模式,本篇博客也不例外。生产者就是产生数据的线程,消费者就是消费数据的线程。在多线程开发中,如果生产数据的线程很快,消费者处理数据速度很慢,那么生产者就要等待消费者处理完成之后产能继续生产。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这种强耦合,在生产者和消费者之间加入容器使他们彼此不直接通信,而是通过阻塞队列来进行通信,这时消费者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列中取,阻塞队列起到缓冲作用,即平衡两者处理数据能力。
import threading import time from queue import Queue class Producer(threading.Thread): def run(self): global queue count = 0 while True: if queue.qsize() < 1000: for i in range(100): count = count + 1 msg = self.name + '制造产品' + str(count) queue.put(msg) print(msg) time.sleep(0.5) class Consumer(threading.Thread): def run(self): global queue while True: if queue.qsize() > 100: for i in range(3): msg = self.name + ', 消费' + queue.get() print(msg) time.sleep(1) if __name__ == '__main__': queue = Queue() for i in range(300): queue.put('初始产品'+str(i)) for i in range(2): p = Producer() p.start() for i in range(5): c = Consumer() c.start()
ThreadLocal
在多线程开发中,每个线程都有自己的数据,如果使用加锁的全局变量意味着所有线程共享数据;如果想要实现每一个线程都有自己的共享变量,即存储每个线程的私有数据,这时ThreadLocal就是主要解决每个线程绑定的值,也就是ThreadLocal在不同线程中只对自己的线程可见,即具有隔离性。
使用全局变量:
import threading import time global g_name def process_student(): std = g_name print('Hello,%s (in %s)'%(std,threading.current_thread().name)) def process_thread(name): global g_name g_name = name time.sleep(1) process_student() t1 = threading.Thread(target=process_thread,args=('jack',),name='Therad-1') t2 = threading.Thread(target=process_thread,args=('Ming',),name='Therad-2') t1.start() t2.start() 输出: Hello,Ming (in Therad-1) Hello,Ming (in Therad-2)
使用ThreadLocal:
import threading import time local_school = threading.local() def process_student(): std = local_school.student print('Hello,%s (in %s)'%(std,threading.current_thread().name)) def process_thread(name): local_school.student = name time.sleep(1) process_student() t1 = threading.Thread(target=process_thread,args=('jack',),name='Therad-1') t2 = threading.Thread(target=process_thread,args=('Ming',),name='Therad-2') t1.start() t2.start() 输出: Hello,jack (in Therad-1) Hello,Ming (in Therad-2)
根据以上对比很清楚:全局变量被所有线程共享,ThreadLocal定义的变量只对自己线程可见,即具有隔离性。ThreadLocal解决了参数在一个线程中各个函数之间互相传递的问题。全局变量local_school就是一个ThreadLocal对象,每个Thread对它都可以读写student属性,但互不影响。你可以把local_school看成全局变量,但每个属性如local_school.student都是线程的局部变量,可以任意读写而互不干扰,也不用管理锁的问题,ThreadLocal内部会处理。
ThreadLocal最常用的地方就是为每个线程绑定一个数据库连接,HTTP请求,用户身份信息等,这样一个线程的所有调用到的处理函数都可以非常方便地访问这些资源。