1. 多任务
- 并行:真的多任务
- 并发:假的多任务
2. 多任务-线程
Python的 Thread模块是比较底层的模块,Python的 Threading模块 是对Thread做了一些包装,可以更加方便的被使用
2.1 使用threading模块
线程执行
# -*- coding: utf-8 -*- """ Created on Mon Mar 4 11:27:41 2019 @author: Douzi """ import threading import time def sing(): for i in range(5): print("sing...." + str(i)) time.sleep(1) def dance(): for i in range(5): print("dance...." + str(i)) time.sleep(1) def main(): t1 = threading.Thread(target=sing) t2 = threading.Thread(target=dance) t1.start() t2.start() if __name__=="__main__": main()
2.2 查看当前线程数量
-
当调用Thread的时候,不会创建 线程
-
当 调用 Thread创建出来的实例对象的 start方法地时候才会创建线程以及让这个线程开始运行
# -*- coding: utf-8 -*- """ Created on Mon Mar 4 18:48:16 2019 @author: Douzi """ import threading import time def sing(): for i in range(5): print("......sing...%d..." % i) time.sleep(1) # 当调用Thread的时候,不会创建 线程 # 当调用Thread创建出来的实例对象的 start方法地时候才会创建线程 # 以及让这个线程开始运行 def main(): # 打印当前线程信息 print(threading.enumerate()) t1 = threading.Thread(target=sing) # 在调用Thread之后打印 print(threading.enumerate()) t1.start() # 子线程开始 # 在调用start之后打印 print(threading.enumerate()) if __name__=="__main__": main()
3. 线程-注意点
3.1 线程执行代码的封装
# -*- coding: utf-8 -*- """ Created on Mon Mar 4 20:16:16 2019 @author: Douzi """ import threading import time class MyThread(threading.Thread): def run(self): for i in range(3): time.sleep(1) msg = "I'm "+self.name+' @ '+str(i) print(msg) self.login() def login(self): print("login.......") def register(self): print("register........") if __name__=="__main__": t = MyThread() t.start()
3.2 多线程共享全局变量
-
在一个函数中, 对 全局变量进行修改 的时候,到底是否需要使用global进行说明,要看 是否对全局变量的执行指向进行了修改
-
如果修改了执行,即让全局变量指向一个新的地方,那么必须使用global。
-
如果,仅仅是修改了 指向的空间的数据,此时不必用 global
# -*- coding: utf-8 -*- """ Created on Mon Mar 4 21:01:53 2019 @author: Douzi """ import threading import time # 定义一个全局变量 g_num = 100 def test1(): global g_num g_num += 1 print("......in test1 g_num=%d=....." % g_num) def test2(): print(".......in test2 g_num=%d=...." % g_num) def main(): t1 = threading.Thread(target=test1) t2 = threading.Thread(target=test2) t1.start() time.sleep(1) t2.start() time.sleep(1) print("......in main Thread g_num = %d...." % g_num) if __name__=="__main__": main()
3.3 多线程共享全局变量-args参数
# -*- coding: utf-8 -*- """ Created on Mon Mar 4 21:10:36 2019 @author: Douzi """ # -*- coding: utf-8 -*- """ Created on Mon Mar 4 21:01:53 2019 @author: Douzi """ import threading import time # 定义一个全局变量 g_num = 100 def test1(temp): temp.append(33) print("......in test1 g_num=%s=....." % str(temp)) def test2(temp): print(".......in test2 g_num=%s=...." % str(temp)) g_nums = [11, 22] def main(): # target指定将来 这个线程去哪个函数执行代码 # args指定将来调用 函数的时候 传递什么数据过去 t1 = threading.Thread(target=test1, args=(g_nums,)) t2 = threading.Thread(target=test2, args=(g_nums,)) t1.start() time.sleep(1) t2.start() time.sleep(1) print("......in main Thread g_nums = %s...." % str(g_nums)) if __name__=="__main__": main()
3.4 多线程-共享全局变量问题
# -*- coding: utf-8 -*- """ Created on Mon Mar 4 21:01:53 2019 @author: Douzi """ import threading import time # 定义一个全局变量 g_num = 0 def test1(num): global g_num for i in range(num): g_num += 1 print("......in test1 g_num=%d=....." % g_num) def test2(num): global g_num for i in range(num): g_num += 1 print(".......in test2 g_num=%d=...." % g_num) def main(): t1 = threading.Thread(target=test1, args=(1000000,)) t2 = threading.Thread(target=test2, args=(1000000,)) t1.start() t2.start() # 等待上面2个线程执行完毕..... time.sleep(5) print("......in main Thread g_num = %d...." % g_num) if __name__=="__main__": main()
3.5 互斥锁
当多个线程几乎同时修改某一个共享数据的时候,需要进行同步控制。
某个线程要更改共享数据时,先将其锁定,此时资源的状态为“锁定”,其他线程不能更改;直到该线程释放资源,将资源的状态变成“非锁定”
import threading # 创建锁 mutex = threading.Lock() # 锁定 mutex.acquice() # 释放 mutex.release()
注意:
- 如果这个锁之前是没有上锁的,那么acquire不会阻塞
- 如果在调用acquire对这个锁上锁之前,他已经被其他线程上了锁,那么此时acquire会阻塞,直到这个锁被解锁为止。
3.6 使用互斥锁完成2个线程对同一个全局变量各加100w次的操作
# -*- coding: utf-8 -*- """ Created on Mon Mar 4 22:01:29 2019 @author: Douzi """ """使用互斥锁完成2个线程对同一个全局变量各加100w次的操作""" import threading import time g_num = 0 def test1(num): global g_num for i in range(num): # 上锁 mutex.acquire() g_num += 1 # 解锁 mutex.release() print("----test1---g_num=%d" % g_num) def test2(num): global g_num for i in range(num): mutex.acquire() g_num += 1 mutex.release() print("----test2---g_num=%d" % g_num) # 创建一个互斥锁,默认是没有上锁的 mutex = threading.Lock() def main(): t1 = threading.Thread(target=test1, args=(100,)) t2 = threading.Thread(target=test2, args=(100,)) t1.start() t2.start() # 等待上面的2个线程执行完毕.... time.sleep(2) print("-----in main Thread g_num = %d" % g_num) if __name__=="__main__": main()
3.7 死锁、银行家算法
略
4. 案例:多任务版udp聊天器
说明
- 编写一个2个线程的程序
- 线程1用来接收数据然后显示
- 线程2用来检测数据然后通过udp发送数据
# -*- coding: utf-8 -*- """ Created on Thu Mar 7 10:42:01 2019 @author: Douzi """ import socket import threading def recv_Message(udp_socket): """接收数据显示""" # 接收数据 while True: recv_data = udp_socket.recvfrom(1024) print(recv_data) def send_Message(udp_socket, dest_ip, dest_port): # 发送数据 while True: send_data = input("输入要发送的数据:") udp_socket.sendto(send_data.encode("utf-8"), (dest_ip, dest_port)) def main(): """完成udp聊天器的整体控制""" # 1. 创建套接字 udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # 2. 绑定本地信息 udp_socket.bind(("172.20.10.5", 7891)) # 172.20.10.5, # 3. 输入对方ip dest_ip = input("输入对方ip:") dest_port = input("输入对方port:") # 4. 创建两个线程,去执行相应的功能 t_recv = threading.Thread(target=recv_Message, args=(udp_socket,)) t_send = threading.Thread(target=send_Message, args=(udp_socket,dest_ip, dest_port)) t_recv.start() t_send.start() if __name__=="__main__": main()
5. 多任务-进程
import multiprocessing import time def test1(): while True: print("1.........") time.sleep(1) def test2(): while True: print("2.........") time.sleep(1) def main(): p1 = multiprocessing.Process(target=test1) p2 = multiprocessing.Process(target=test2) p1.start() p2.start() if __name__=="__main__": main()
6. 进程和线程对比
- 线程不能独立执行,必须依存在进程中
优缺点:
- 线程和进程在使用上各有优缺点:线程执行开销小,但不利于资源的管理和保护;而进程相反
7. 通过队列完成进程间通信
from multiprocessing import Queue q = Queue(3) # 初始化一个Queue对象 q.put("消息1") q.put("消息2") print(q.full()) # False q.put("消息3") print(q.full()) # True # 因为消息队列已满,下面try会抛出异常,第一个try会等待2秒后抛出异常,第二个Try会立刻抛出异常 try: q.put("消息4", True, 2) except: print("消息列队已满,现有消息数量:%s" % q.qsize()) try: q.put_nowait("消息4") except: print("消息队列已满,现有:%s" % q.qsize())
# -*- coding: utf-8 -*-
"""
Created on Fri Mar 8 16:42:07 2019
@author: Douzi
"""
import multiprocessing
def download_from_web(q):
# 模拟从网上下载的数据
data = [11, 22, 33, 44]
for tmp in data:
q.put(tmp)
print("...下载器已经下载完了数据并且存入到队列中....")
def analysis_data(q):
# 数据处理
# 从队列中获取数据
waitting_analysis_data = list()
while True:
data = q.get()
waitting_analysis_data.append(data)
if q.empty():
break
# 模拟数据处理
print(waitting_analysis_data)
def main():
# 1.创建一个队列
q = multiprocessing.Queue()
# 2.创建多个进程,将队列的引用当作实参进行传递到里面
p1 = multiprocessing.Process(target=download_from_web, args=(q,))
p2 = multiprocessing.Process(target=analysis_data, args=(q,))
p1.start()
p2.start()
if __name__=="__main__":
main()
...下载器已经下载完了数据并且存入到队列中....
[11, 22, 33, 44]
8. 进程池Pool
当要创建的子进程数量不多时,可以直接利用multiprocessing中的Process动态生成多个进程,但如果是上百个目标,手动创建进程的工作量巨大,此时可以用到Pool方法。
初始化Pool时,可以指定一个最大的进程数,当有新的请求提交到Pool中时,如果池没有满,那么就会创建一个新的进程用来执行该请求;如果池达到指定的最大值,那么请求就会等待,直到池中有进程结束,池会用之前的进程执行新的任务,看下面实例:
#!/user/bin/python3 from multiprocessing import Pool import os, time, random def worker(msg): t_start = time.time() print("%s开始执行, 进程号为%d" % (msg, os.getpid())) # random.random()随机生成0~1之间的浮点数 time.sleep(random.random()*2) t_stop = time.time() print(msg, "执行完毕,耗时%0.2f" % (t_stop-t_start)) po = Pool(3) # 定义一个进程池,最大进程数3 for i in range(0, 10): # Pool().apply_async(要调用的目标, (传递给目标的参数元祖,)) # 每次循环将会用空闲出来的子进程去调用目标 po.apply_async(worker, (i,)) print("----start-----") po.close() # 关闭进程池,关闭后po不再接收新的请求 po.join() # 等待po中所有子进程执行完成,必须放在close语句之后 print("-----end------")
注意:如果 要保证进程池里的进程先结束,主进程再结束,需要加上 po.join()等待进程池中所有子进程执行完成(必须放在po.close()之后)
----start-----
0开始执行, 进程号为26270
1开始执行, 进程号为26271
2开始执行, 进程号为26272
0 执行完毕,耗时0.24
3开始执行, 进程号为26270
2 执行完毕,耗时0.65
4开始执行, 进程号为26272
3 执行完毕,耗时0.42
5开始执行, 进程号为26270
5 执行完毕,耗时0.43
6开始执行, 进程号为26270
1 执行完毕,耗时1.14
7开始执行, 进程号为26271
6 执行完毕,耗时0.37
8开始执行, 进程号为26270
8 执行完毕,耗时0.28
9开始执行, 进程号为26270
7 执行完毕,耗时0.82
4 执行完毕,耗时1.35
9 执行完毕,耗时1.59
-----end------
9. 文件夹copy器(多任务)
import multiprocessing import os def copy_file(file_name, old_folder_name, new_folder_name): """完成文件的复制""" print("=======模拟copy文件: 从%s--->到%s 文件名是:%s" % (old_folder_name, new_folder_name, file_name)) with open(old_folder_name + "/" + file_name, "rb") as old_f: content = old_f.read() with open(new_folder_name + "/" + file_name, "wb") as new_f: new_f.write(content) def main(): # 1.获取用户要copy的文件 old_folder_name = input("请输入要copy的文件夹名字:") # 2. 创建一个新的文件夹 try: new_folder_name = old_folder_name+"[复件]" os.mkdir(new_folder_name) except: pass # 3. 获取文件夹的所有待copy的文件名字 listdir() file_names = os.listdir(old_folder_name) print(file_names) # 4. 创建进程池 pools = multiprocessing.Pool(5) # 5. 向进程池中 添加copy文件的任务 for file_name in file_names: pools.apply_async(copy_file, args=(file_name, old_folder_name, new_folder_name)) # 复制源文件夹中的文件,到新文件夹重点文件去(进程池) pools.close() pools.join() if __name__ == '__main__': main()
=======模拟copy文件: 从test--->到test[复件] 文件名是:uuid.py
=======模拟copy文件: 从test--->到test[复件] 文件名是:smtpd.py
=======模拟copy文件: 从test--->到test[复件] 文件名是:copyreg.py
=======模拟copy文件: 从test--->到test[复件] 文件名是:random.py
=======模拟copy文件: 从test--->到test[复件] 文件名是:decimal.py
=======模拟copy文件: 从test--->到test[复件] 文件名是:tty.py
=======模拟copy文件: 从test--->到test[复件] 文件名是:csv.py(略)
10. 文件夹copy器v2_显示进度(进程间通信)
from multiprocessing import Manager, Pool import os, time, random def copy_file(queue, file_name, old_folder_name, new_folder_name): """完成文件的复制""" # print("=======模拟copy文件: 从%s--->到%s 文件名是:%s" % (old_folder_name, new_folder_name, file_name)) f_read = open(old_folder_name + "/" + file_name, "rb") f_write = open(new_folder_name + "/" + file_name, "wb") while True: time.sleep(0.001) content = f_read.read(1024) if content: f_write.write(content) else: break f_read.close() f_write.close() # 发送已经拷贝完毕的文件名字 queue.put(file_name) def main(): # 1.获取用户要copy的文件 old_folder_name = input("请输入要copy的文件夹名字:") # 2. 创建一个新的文件夹 try: new_folder_name = old_folder_name+"[复件]" os.mkdir(new_folder_name) except: pass # 3. 获取文件夹的所有待copy的文件名字 listdir() file_names = os.listdir(old_folder_name) # print(file_names) # 创建Queue queue = Manager().Queue() # 4. 创建进程池 pools = Pool(5) # 5. 向进程池中 添加copy文件的任务 for file_name in file_names: pools.apply_async(copy_file, args=(queue, file_name, old_folder_name, new_folder_name)) # 主进程显示进度 pools.close() # pools.join() all_file_num = len(file_names) while True: file_name = queue.get() # 子进程放一个名字,这里取一个名字,取不到则会等待。实现进程间通信 if file_name in file_names: file_names.remove(file_name) copy_rate = (all_file_num - len(file_names))*100 / all_file_num print(" %.2f%%...(%s)" % (copy_rate, file_name) + " "*50, end="") if copy_rate >= 100: break print() if __name__ == '__main__': main()
请输入要copy的文件夹名字:test
100.00%...(shelve.py)