1.1 进程和线程
1.1.1 进程简介
计算机程序不过是磁盘中的可执行的二进制的数据,它们只有在被读到内存中,被操作系统调用的时候才开始它们的生命期。进程是程序的一次执行,每个进程都有自己的地址空间、内存、数据栈及其他记录其运行轨迹和辅助数据。操作系统管理其上运行的所有进程,并为这些进程公平的分配时间,进程也可以通过fork哈spawn操作来完成其他的任务,不过各个进程有自己的内存空间、数据栈等,所以只能使用进程间通信(IPC),而不能直接共享信息。
1.1.2 线程简介
线程(有时被称为轻量级线程)与进程类似,不过所有的线程运行在同一个进程中,共享相同的运行环境。
线程有开始、顺序执行和结束三个部分。它有一个自己的指令指针,记录自己运行到了什么地方。线程的运行可能被抢占,或暂时的被挂起(也叫睡眠),让其他的线程运行,这也叫让步。一个进程中的各个线程间共享同一片数据空间。线程一般都是并发执行的。
由于多个线程共享一片数据,并且数据访问的顺序不一样,可能导致数据结果的不一致的问题,这叫做竞争条件,所以大多数线程库都带有一系列的同步原语,来控制线程的执行和数据的访问。
1.2 python线程和全局解释器锁
1.2.1 全局解释器锁
python代码的执行由python虚拟机(也叫解释器主循环)来控制。在主循环中,同时只有一个线程在执行,虽然python解释器中可以“运行”多个线程,但在任意时刻,只有一个线程在解释器中运行。
对python虚拟机的访问由全局解释器锁(GIL)来控制,正是这个锁能保证同一时刻只有一个线程在运行。在多线程环境中,python解释器按以下方式执行。
1. 设置GIL。
2. 切换到一个线程去执行。
3. 运行:
a.指定数量的字节码的指令,或者
b.线程主动让出控制(可以调用time.sleep(0))。
4. 把线程设置为睡眠状态。
5. 解锁GIL。
6. 再次重复以上所有步骤。
1.2.2 没有线程执行的情况
示例1:使用time.sleep()函数演示线程是怎么工作的,该函数需要一个浮点型的参数,来指定“睡眠”的时间(以秒为单位)。
1 # -*- coding:utf-8 -*- 2 3 ''' 4 创建两个计时循环,一个睡眠4秒,一个睡眠2秒,分别是loop0()和loop1() 5 在一个线程或进程中,顺序执行loop0()和loop1()。 6 ''' 7 8 from time import sleep, ctime 9 10 def loop0(): 11 print 'start loop 0 at:', ctime() 12 sleep(4) 13 print 'loop 0 done at:', ctime() 14 15 def loop1(): 16 print 'start loop 1 at:', ctime() 17 sleep(2) 18 print 'loop 1 done at:', ctime() 19 20 def main(): 21 print 'starting at:', ctime() 22 loop0() 23 loop1() 24 print 'all DONE at:', ctime() 25 26 if __name__ == '__main__': 27 main()
输出结果:
1 starting at: Sun Jan 26 23:46:05 2020 2 29 start loop 0 at: Sun Jan 26 23:46:05 2020 3 30 loop 0 done at: Sun Jan 26 23:46:09 2020 4 31 start loop 1 at: Sun Jan 26 23:46:09 2020 5 32 loop 1 done at: Sun Jan 26 23:46:11 2020 6 33 all DONE at: Sun Jan 26 23:46:11 2020
可以看出loop0()和loop1()并不是并行执行的。
1.2.3 python的threading模块
python提供了几个用于多线程编程的模块:thread、threading和Queue。thread和threading模块允许程序员创建和管理线程。thread模块提供了基本的线程和锁的支持,而threading提供了更高级别,功能更强的线程管理的功能。Queue模块允许用户创建一个可以用于多个线程之间共享数据的队列数据结构。
1.2.4 hread模块
thread模块提供了基本的同步数据数据结构锁对象(也叫原语锁、简单锁、互斥锁、互斥量、二值信号量)。下表是常用的线程函数以及LockType类型的锁对象的方法。
函数 |
描述 |
模块函数 |
|
start_new_thread(function, args[, kwargs]) |
产生一个新的线程,在新线程中用指定的参数和可选的kwargs来调用这个函数。 |
allocate_lock() |
分配一个LockType类型的锁对象 |
exit() |
让线程退出 |
LockType类型锁对象方法 |
|
acquire(wait=None) |
尝试获取锁对象 |
locked() |
如果获取了锁对象返回True,否则返回False |
release() |
释放锁 |
start_new_thread()函数的参数为:函数,函数的参数以及可选的关键字参数。函数不是在主线程里运行,而是产生一个新的线程来运行这个函数。
示例如下:
1 # -*- coding:utf-8 -*- 2 3 ''' 4 loop0和loop1并发执行 5 ''' 6 7 import thread 8 from time import sleep, ctime 9 10 def loop0(): 11 print 'start loop 0 at:', ctime() 12 sleep(4) 13 print 'loop 0 done at:', ctime() 14 15 def loop1(): 16 print 'start loop 1 at:', ctime() 17 sleep(2) 18 print 'loop 1 done at:', ctime() 19 20 def main(): 21 print 'starting at:', ctime() 22 thread.start_new_thread(loop0, ()) 23 thread.start_new_thread(loop1, ()) 24 sleep(6) 25 print 'all DONE at:', ctime() 26 27 if __name__ == '__main__': 28 main()
输出结果:
1 输出结果:loop1先结束,因为运行时间短。 2 starting at: Mon Jan 27 10:51:32 2020 3 start loop 0 at:start loop 1 at: Mon Jan 27 10:51:32 2020 4 Mon Jan 27 10:51:32 2020 5 loop 1 done at: Mon Jan 27 10:51:34 2020 6 loop 0 done at: Mon Jan 27 10:51:36 2020 7 all DONE at: Mon Jan 27 10:51:38 2020
上面的代码中,我们在main()函数中使用sleep(6)来作为同步机制,等待loop0()和loop1()执行结束之后再结束main()函数。这样做了一个额外的延时。下面我们用锁来做。
1 # -*- coding:utf-8 -*- 2 3 import thread 4 from time import sleep, ctime 5 6 loops = [4, 2] 7 8 def loop(nloop, nsec, lock): 9 print 'start loop', nloop, 'at:', ctime() 10 sleep(nsec) 11 print 'loop', nloop, 'done at:', ctime() 12 lock.release() #释放锁 13 14 def main(): 15 print 'starting at:', ctime() 16 locks = [] 17 nloops = range(len(loops)) 18 19 for i in nloops: 20 lock = thread.allocate_lock() #创建锁 21 lock.acquire() #获得锁 22 locks.append(lock) 23 24 for i in nloops: 25 thread.start_new_thread(loop, (i, loops[i], locks[i])) #创建线程 26 27 for i in nloops: 28 while locks[i].locked():pass #死等锁释放 29 30 print 'all DONE at:', ctime() 31 32 if __name__ == '__main__': 33 main()
输出结果:
1 输出结果:loop1先结束,因为运行时间短。 2 starting at: Mon Jan 27 10:51:32 2020 3 start loop 0 at:start loop 1 at: Mon Jan 27 10:51:32 2020 4 Mon Jan 27 10:51:32 2020 5 loop 1 done at: Mon Jan 27 10:51:34 2020 6 loop 0 done at: Mon Jan 27 10:51:36 2020 7 all DONE at: Mon Jan 27 10:51:38 2020
1.3 threading模块
threading模块不仅提供了Thread类,还提供了各种非常好用的同步机制。下表是threading模块里的所有的对象。
threading模块对象 |
描述 |
Thread |
表示一个线程的执行的对象 |
Lock |
锁原语对象(跟thread模块里的锁对象相同) |
RLock |
可重入锁对象,使单线程可以再次获得已经获得了的锁(递归锁定)。 |
Condition |
条件变量对象能让一个线程停下来,等待其他线程满足了某个“条件”,如,状态的改变或值的改变。 |
Event |
通用的条件变量,多个线程可以等待某个事件的发生,在事件发生后,所有的线程都会被激活 |
Semaphore |
为等待锁的线程提供一个类似“等候室”的结构 |
BounderSemaphore |
与Semaphore类似,只是它不允许超过初始值 |
Timer |
与Thread类似,只是它要等待一段时间后才执行 |
核心笔记:守护线程
Thread模块不支持守护线程,Threading模块支持守护线程,它们是这样工作的:守护线程一般是一个等待客户请求服务器,如果没有客户提出请求,它就在那等着。如果你设定某个线程为守护线程,就表示你在说这个线程是不重要的,在进程退出的时候,不用等待这个线程退出。
如果你的主线程要退出的时候,不用等待那些子线程完成,那就设定这些线程的daemon属性,即,在线程开始(调用thread.start())之前,调用setDaemon()函数设定线程的daemon标志(thread.setDaemon(True))就表示这个线程“不重要”。
1.3.1 Thread类
threading的Thread类是你主要运行对象。用Thread类,可以有多种方法来创建线程。
Ø 创建一个Thread的实例,传给它一个函数;
Ø 创建一个Thread的实例,传给它一个可调用的类对象;
Ø 从Thread派生出一个子类,创建一个这个子类的实例。
下表是Thread类的一些函数。
函数 |
描述 |
start() |
开始进程的运行 |
run() |
定义线程的功能的函数(一般会被子类重写) |
join(timeout=None) |
线程挂起,直到线程结束:如果给了timeout,则最多阻塞timeout |
getName() |
返回线程的名字 |
setName(name) |
设置线程的名字 |
isAlive() |
布尔标志,表示这个线程是否在运行 |
isDaemon() |
返回线程的daemon标志 |
setDaemon(daemonic) |
把线程的daemon标志设为daemonic(一定要在调用start()函数前调用) |
下面举例进行说明:
(1)创建一个Thread的实例,传给它一个函数。
1 # -*- coding:utf-8 -*- 2 3 import threading 4 from time import sleep, ctime 5 6 loops = [4, 2] 7 8 def loop(nloop, nsec): 9 print 'start loop', nloop, 'at:', ctime() 10 sleep(nsec) 11 print 'loop', nloop, 'done at:', ctime() 12 13 def main(): 14 print 'starting at:', ctime() 15 threads = [] 16 nloops = range(len(loops)) 17 18 for i in nloops: 19 t = threading.Thread(target=loop, args=(i, loops[i])) 20 threads.append(t) 21 22 for i in nloops: 23 threads[i].start() 24 25 for i in nloops: 26 threads[i].join() 27 28 print 'all DONE at:', ctime() 29 30 if __name__ == '__main__': 31 main()
输出结果:
1 starting at: Mon Jan 27 12:02:06 2020 2 start loopstart loop 1 at: 0 at: Mon Jan 27 12:02:06 2020 3 Mon Jan 27 12:02:06 2020 4 loop 1 done at: Mon Jan 27 12:02:08 2020 5 loop 0 done at: Mon Jan 27 12:02:10 2020 6 all DONE at: Mon Jan 27 12:02:10 2020
在实例化每个Thread对象的时候,我们把函数(target)和参数(args)传进去,得到返回的Thread实例。实例化一个Thread与调用thread.start_new_thread()之间的最大的区别是,新的线程不会立即开始。
join()会等到线程结束,或者在给了timeout参数的时候,等到超时为止。join()的另一个比较重要的方法是它可以完全不用调用。一旦线程启动后,就会一直运行,直到线程的函数结束,退出为止。如果你的主线程除了等线程结束外,还有其他的事情要做,那就不用调用join(),只有在你要等待线程结束的时候菜肴调用join()。
(2)创建一个Thread的实例,传给它一个可调用的类对象。
1 # -*- coding:utf-8 -*- 2 3 import threading 4 from time import sleep, ctime 5 6 loops = [4, 2] 7 8 class ThreadFunc(object): 9 def __init__(self, func, args, name=''): 10 self.name = name 11 self.func = func 12 self.args = args 13 14 def __call__(self): 15 self.res = self.func(*self.args) 16 17 def loop(nloop, nsec): 18 print 'start loop', nloop, 'at:', ctime() 19 sleep(nsec) 20 print 'loop', nloop, 'done at:', ctime() 21 22 def main(): 23 print 'starting at:', ctime() 24 threads = [] 25 nloops = range(len(loops)) 26 27 for i in nloops: # create all threads 28 t = threading.Thread(target=ThreadFunc(loop, (i, loops[i]), loop.__name__)) 29 threads.append(t) 30 31 for i in nloops: 32 threads[i].start() 33 34 for i in nloops: 35 threads[i].join() 36 37 print 'all DONE at:', ctime() 38 39 if __name__ == '__main__': 40 main()
输出结果:
1 starting at: Mon Jan 27 12:31:28 2020 2 start loopstart loop 1 at: 0 at: Mon Jan 27 12:31:28 2020 3 Mon Jan 27 12:31:28 2020 4 loop 1 done at: Mon Jan 27 12:31:30 2020 5 loop 0 done at: Mon Jan 27 12:31:32 2020 6 all DONE at: Mon Jan 27 12:31:32 2020
(3)子类化Thread类
1 import threading 2 from time import sleep, ctime 3 4 loops = [4, 2] 5 6 class MyTread(threading.Thread): 7 def __init__(self, func, args, name=''): 8 threading.Thread.__init__(self) 9 self.name = name 10 self.func = func 11 self.args = args 12 13 def run(self): 14 self.res = self.func(*self.args) 15 16 def loop(nloop, nsec): 17 print 'start loop', nloop, 'at:', ctime() 18 sleep(nsec) 19 print 'loop', nloop, 'done at:', ctime() 20 21 def main(): 22 print 'starting at:', ctime() 23 threads = [] 24 nloops = range(len(loops)) 25 26 for i in nloops: # create all threads 27 t = MyTread(loop, (i, loops[i]), loop.__name__) 28 threads.append(t) 29 30 for i in nloops: 31 threads[i].start() 32 33 for i in nloops: 34 threads[i].join() 35 36 print 'all DONE at:', ctime() 37 38 if __name__ == '__main__': 39 main() 40
输出结果:
1 starting at: Mon Jan 27 12:57:32 2020 2 start loopstart loop 1 at: 0 Mon Jan 27 12:57:32 2020 3 at: Mon Jan 27 12:57:32 2020 4 loop 1 done at: Mon Jan 27 12:57:34 2020 5 loop 0 done at: Mon Jan 27 12:57:36 2020 6 all DONE at: Mon Jan 27 12:57:36 2020
与(2)不同的两点重要的改变:1.MyThread子类的构造器一定要先调用基类的构造器。2.之前的特殊函数__call__()在子类中,名字要改为run()。
1.3.2 threading模块中的其他函数
函数 |
描述 |
activeCount() |
当前活动的线程对象的数量 |
currentThread() |
返回当前线程对象 |
enumerate() |
返回当前活动线程的列表 |
settrace(func) |
为所有线程设置一个跟踪函数 |
setprofile(func) |
为所有线程设置一个profile函数 |
1.3.3 生产者-消费者问题和Queue模块
Queue模块可以用来进行线程间通讯,让各个线程之间共享数据。
Queue模块具有以下属性:
函数 |
描述 |
Queue模块函数 |
|
queue(size) |
创建一个大小为size的Queue对象 |
Queue对象函数 |
|
qsize() |
返回队列的大小(由于在返回的时候,队列可能会被其他线程修改,所以这个值是近似值) |
empty() |
如果队列为空返回True,否则返回False |
full() |
如果队列已满返回True,否则返回False |
put(item, block=0) |
把item放到队列中,如果给了block(不为0),函数会一直阻塞到队列中有空间为止。 |
get(block=0) |
从队列中取一个对象,如果给了block(不为0),函数会一直阻塞到队列中有对象为止。 |
生产者-消费者问题:生产者生产货物,然后把货物放到一个队列之类的数据结构中,生产货物所需要花费的时间无法预先确定,消费者消耗生产者生产的货物的时间也不确定。
下面使用Queue模块来演示生产者和消费者的场景。
1 myThread.py中的代码: 2 import threading 3 from time import sleep, ctime 4 5 class MyTread(threading.Thread): 6 def __init__(self, func, args, name=''): 7 threading.Thread.__init__(self) 8 self.name = name 9 self.func = func 10 self.args = args 11 12 def getResult(self): 13 return self.res 14 15 def run(self): 16 print 'starting', self.name, 'at:', ctime() 17 self.res = self.func(*self.args) 18 print self.name, 'finished at:', ctime()
1 # -*- coding:utf-8 -*- 2 3 from random import randint 4 from Queue import Queue 5 from time import sleep 6 from myThread import MyTread 7 8 def writeQ(queue): # 把对象放入队列 9 print 'producing object for Q...' 10 queue.put('xxx', 1) 11 print 'size now ', queue.qsize() 12 13 def readQ(queue): #消耗队列的一个对象 14 val = queue.get(1) 15 print 'consumed object from Q... size now', queue.qsize() 16 17 def writer(queue, loops): #写入对象 18 for i in range(loops): 19 writeQ(queue) 20 sleep(randint(1, 3)) 21 22 def reader(queue, loops): #消耗对象 23 for i in range(loops): 24 readQ(queue) 25 sleep(randint(2, 5)) 26 27 funcs = [writer, reader] 28 nfuncs = range(len(funcs)) 29 30 def main(): 31 nloops = randint(2, 5) 32 q = Queue(32) 33 34 threads = [] 35 for i in nfuncs: 36 t = MyTread(funcs[i], (q, nloops), funcs[i].__name__) 37 threads.append(t) 38 39 for i in nfuncs: 40 threads[i].start() 41 42 for i in nfuncs: 43 threads[i].join() 44 45 print 'all DONE' 46 47 if __name__ == '__main__': 48 main()
输出结果:
1 输出结果: 2 startingstarting reader at: writer at: Mon Jan 27 16:38:31 2020 3 Mon Jan 27 16:38:31 2020 4 producing object for Q... 5 size now consumed object from Q... size now 0 6 0 7 producing object for Q... 8 size now 1 9 producing object for Q... 10 size now 2 11 producing object for Q... 12 size now 3 13 consumed object from Q... size now 2 14 writer finished at: Mon Jan 27 16:38:36 2020 15 consumed object from Q... size now 1 16 consumed object from Q... size now 0 17 reader finished at: Mon Jan 27 16:38:46 2020 18 all DONE
1.4 相关模块
下表是一些多线程编程中可能用得到的模块:
模块 |
描述 |
thread |
基本的、低级别的线程模块 |
threading |
高级别的线程和同步对象 |
Queue |
供多线程使用的同步先进先出(FIFO)队列 |
SocketServer |
具有线程控制的TCP和UDP管理器 |