进程和线程
概念
进程就是操作系统中执行的一个程序,操作系统以进程为单位分配存储空间,每个进程都有自己的地址空间、数据栈以及其他用于跟踪进程执行的辅助数据,操作系统管理所有进程的执行,为它们合理的分配资源。进程可以通过fork或spawn的方式来创建新的进程来执行其他的任务,不过新的进程也有自己独立的内存空间,因此必须通过进程间通信机制(IPC,Inter-Process Communication)来实现数据共享,具体的方式包括管道、信号、套接字、共享内存区等。
Python既支持多进程又支持多线程,因此使用Python实现并发编程主要有3种方式:多进程、多线程、多进程+多线程。
Python中的多进程
没有用多进程:
1 from random import randint 2 from time import time, sleep 3 4 5 def download_task(filename): 6 print('开始下载%s...' % filename) 7 time_to_download = randint(5, 10) 8 sleep(time_to_download) 9 print('%s下载完成! 耗费了%d秒' % (filename, time_to_download)) 10 11 12 def main(): 13 start = time() 14 download_task('Python从入门到住院.pdf') 15 download_task('Peking Hot.avi') 16 end = time() 17 print('总共耗费了%.2f秒.' % (end - start)) 18 19 20 if __name__ == '__main__': 21 main()
下面是运行程序得到的一次运行结果。
开始下载Python从入门到住院.pdf...
Python从入门到住院.pdf下载完成! 耗费了6秒
开始下载Peking Hot.avi...
Peking Hot.avi下载完成! 耗费了7秒
总共耗费了13.01秒.
多进程:
1 from multiprocessing import Process # 导入Process 2 from os import getpid 3 from random import randint 4 from time import time, sleep 5 6 7 def download_task(filename): 8 print('启动下载进程,进程号[%d].' % getpid()) 9 print('开始下载%s...' % filename) 10 time_to_download = randint(5, 10) 11 sleep(time_to_download) 12 print('%s下载完成! 耗费了%d秒' % (filename, time_to_download)) 13 14 15 def main(): 16 start = time() 17 p1 = Process(target=download_task, args=('Python从入门到住院.pdf', )) 18 p1.start() 19 p2 = Process(target=download_task, args=('Peking Hot.avi', )) # 创建进程 创建进程和创建线程的方法差不多,只不过用到的方法不一样 20 p2.start() # 线程导入 from threading import Thread 创建线程用Thread() 和创建进程类似 21 p1.join() 22 p2.join() 23 end = time() 24 print('总共耗费了%.2f秒.' % (end - start)) 25 26 27 if __name__ == '__main__': 28 main()
结果:
启动下载进程,进程号[1530].
开始下载Python从入门到住院.pdf...
启动下载进程,进程号[1531].
开始下载Peking Hot.avi...
Peking Hot.avi下载完成! 耗费了7秒
Python从入门到住院.pdf下载完成! 耗费了10秒
总共耗费了10.01秒.
在上面的代码中,我们通过Process
类创建了进程对象,通过target
参数我们传入一个函数来表示进程启动后要执行的代码,
后面的args
是一个元组,它代表了传递给函数的参数。Process
对象的start
方法用来启动进程,而join
方法表示等待进程执行结束。
运行上面的代码可以明显发现两个下载任务“同时”启动了,而且程序的执行时间将大大缩短,不再是两个任务的时间总和。
Python中的多线程
在Python早期的版本中就引入了thread模块(现在名为_thread)来实现多线程编程,然而该模块过于底层,而且很多功能都没有提供,因此目前的多线程开发我们推荐使用threading模块,该模块对多线程编程提供了更好的面向对象的封装。我们把刚才下载文件的例子用多线程的方式来实现一遍。
1 from random import randint 2 from threading import Thread # 注意这里 3 from time import time, sleep 4 5 6 def download(filename): 7 print('开始下载%s...' % filename) 8 time_to_download = randint(5, 10) 9 sleep(time_to_download) 10 print('%s下载完成! 耗费了%d秒' % (filename, time_to_download)) 11 12 13 def main(): 14 start = time() 15 t1 = Thread(target=download, args=('Python从入门到住院.pdf',)) 16 t1.start() 17 t2 = Thread(target=download, args=('Peking Hot.avi',)) # 注意这里与进程之间 18 t2.start() # 的巧妙之处 19 t1.join() 20 t2.join() 21 end = time() 22 print('总共耗费了%.3f秒' % (end - start)) 23 24 25 if __name__ == '__main__': 26 main()
我们可以直接使用threading模块的Thread
类来创建线程,但是我们之前讲过一个非常重要的概念叫“继承”,我们可以从已有的类创建新类,因此也可以通过继承Thread
类的方式来创建自定义的线程类,然后再创建线程对象并启动线程。代码如下所示。
1 from random import randint 2 from threading import Thread 3 from time import time, sleep 4 5 6 class DownloadTask(Thread): # 这个类继承Thread 上一个用的是方法 7 8 def __init__(self, filename): 9 super().__init__() # 初始化先写父类的构造方法 10 self._filename = filename 11 12 def run(self): 13 print('开始下载%s...' % self._filename) 14 time_to_download = randint(5, 10) # 模拟下载,睡眠 15 sleep(time_to_download) 16 print('%s下载完成! 耗费了%d秒' % (self._filename, time_to_download)) 17 18 19 def main(): 20 start = time() 21 t1 = DownloadTask('Python从入门到住院.pdf') # 看这里与上面的区别 22 t1.start() 23 t2 = DownloadTask('Peking Hot.avi') 24 t2.start() 25 t1.join() # 等待线程结束 26 t2.join() 27 end = time() 28 print('总共耗费了%.2f秒.' % (end - start)) 29 30 31 if __name__ == '__main__': 32 main()
因为多个线程可以共享进程的内存空间,因此要实现多个线程间的通信相对简单,大家能想到的最直接的办法就是设置一个全局变量,多个线程共享这个全局变量即可。但是当多个线程共享同一个变量(我们通常称之为“资源”)的时候,很有可能产生不可控的结果从而导致程序失效甚至崩溃。如果一个资源被多个线程竞争使用,那么我们通常称之为“临界资源”,对“临界资源”的访问需要加上保护,否则资源会处于“混乱”的状态。下面的例子演示了100个线程向同一个银行账户转账(转入1元钱)的场景,在这个例子中,银行账户就是一个临界资源,在没有保护的情况下我们很有可能会得到错误的结果。这里加了临界锁
1 from time import sleep 2 from threading import Thread, Lock 3 4 5 class Account(object): 6 7 def __init__(self): 8 self._balance = 0 9 self._lock = Lock() # 这里定义 10 11 def deposit(self, money): 12 # 先获取锁才能执行后续的代码 13 self._lock.acquire() 14 try: 15 new_balance = self._balance + money 16 sleep(0.01) 17 self._balance = new_balance 18 finally: 19 # 在finally中执行释放锁的操作保证正常异常锁都能释放 20 self._lock.release() 21 22 @property 23 def balance(self): 24 return self._balance 25 26 27 class AddMoneyThread(Thread): 28 29 def __init__(self, account, money): 30 super().__init__() 31 self._account = account 32 self._money = money 33 34 def run(self): 35 self._account.deposit(self._money) 36 37 38 def main(): 39 account = Account() 40 threads = [] 41 for _ in range(100): 42 t = AddMoneyThread(account, 1) 43 threads.append(t) 44 t.start() 45 for t in threads: 46 t.join() 47 print('账户余额为: ¥%d元' % account.balance) 48 49 50 if __name__ == '__main__': 51 main()
示例:多线程求和(这里用8个线程求(1,100000001)的和
1 from multiprocessing import Process, Queue 2 from random import randint 3 from time import time 4 5 6 def task_handler(curr_list, result_queue): 7 total = 0 8 for number in curr_list: 9 total += number 10 result_queue.put(total) 11 12 13 def main(): 14 processes = [] 15 number_list = [x for x in range(1, 100000001)] # 如果溢出适当减小数字 16 result_queue = Queue() 17 index = 0 18 # 启动8个进程将数据切片后进行运算 19 for _ in range(8): 20 p = Process(target=task_handler, 21 args=(number_list[index:index + 12500000], result_queue)) 22 index += 12500000 23 processes.append(p) 24 p.start() 25 # 开始记录所有进程执行完成花费的时间 26 start = time() 27 for p in processes: 28 p.join() 29 # 合并执行结果 30 total = 0 31 while not result_queue.empty(): 32 total += result_queue.get() 33 print(total) 34 end = time() 35 print('Execution time: ', (end - start), 's', sep='') 36 37 38 if __name__ == '__main__': 39 main()