线程进程概要:
1、最小工作单元为线程
2、一个应用程序至少有一个进程,一个进程里至少有一个线程
3、应用场景:
IO密集型适合用线程
计算密集型适合用进程
4、python中GIL全局解释器锁,保证同一个进程中只能有一个线程同时被调用
进程、线程:
1、进程内存独立,线程共享同一进程的资源
2、进程是资源的集合,线程是执行单位
3、进程之间不能直接互相访问,同一进程内的线程可以互相通信
4、创建新进程会消耗资源,线程非常轻量,只保存线程运行时的必要数据,如上下文,程序堆栈
5、同一进程里的线程可以互相控制,不同进程之间不能相互控制,但是父进程可以控制子进程
线程:
线程模块:threading
创建一个线程:t = threading.Thread(target=,args=[])
target:函数或类名
args:必须是可迭代的,如列表、元组
运行该线程: t.start()表示已经准备好等待cpu调度
1、线程的基本使用:
1 import threading,time 2 3 def task(args): 4 time.sleep(1) 5 print(args) 6 7 for i in range(20): 8 t = threading.Thread(target=task,args=[i,]) 9 t.start() 10 11 print("end") #主线程
2、线程的调用:
1 import threading,time 2 def task(args): 3 time.sleep(args) 4 print(args) 5 6 for i in range(20): 7 t = threading.Thread(target=task,args=[i,]) 8 # t.setDaemon(True) #主线程不等待子线程结束 9 # t.setDaemon(False)#默认值,主线程等子线程全部执行完再结束 10 #setDaemon()只能设置在start上方 11 t.start() 12 #t.join() #一个线程一个线程的执行,并且主线程等待所有子线程执行完再执行 13 t.join(2) #表示主线程只等2秒,如果2秒内子线程没有执行,则不再等待,又称等待的最大时间 14 15 print("end") #主线程
3、线程锁:
import threading,time
1 v = 10 2 lock = threading.Lock()# 3 # lock = threading.RLock()#递归锁,又称多重锁,同时可以释放多把锁 4 5 def task(args): 6 time.sleep(2) 7 lock.acquire()#调用锁(上锁) 8 #lock.acquire() 9 global v 10 v -= 1 11 print(v) 12 lock.release()#释放锁(解锁) 13 #lock.release()#只有使用RLock的时候才能开多把 14 15 for i in range(10): 16 t = threading.Thread(target=task,args=(i,)) 17 t.start()
1 v = 10 2 lock = threading.BoundedSemaphore(3) #可以通过的线程数量 3 4 def task(args): 5 lock.acquire()#调用锁(上锁) 6 time.sleep(2) 7 global v 8 v -= 1 9 print(v) 10 lock.release()#释放锁(解锁 11 12 for i in range(10): 13 t = threading.Thread(target=task,args=(i,)) 14 t.start()
1 lock = threading.Event() 2 3 def task(args): 4 time.sleep(1) 5 lock.wait()#锁住所有的线程,等待统一释放 6 print(args) 7 8 for i in range(10): 9 t = threading.Thread(target=task,args=(i,)) 10 t.start() 11 12 while True: 13 value = input(">>>>:") 14 if value == "1": 15 lock.set()#释放所有的线程 16 #lock.clear() #默认值,等待
1 lock = threading.Condition() 2 3 def task(args): 4 time.sleep(1) 5 lock.acquire() 6 lock.wait() #锁住所有的线程 7 print(args) 8 lock.release() 9 10 for i in range(10): 11 t = threading.Thread(target=task,args=(i,)) 12 t.start() 13 14 while True: 15 value = input(">>>>:") 16 lock.acquire() 17 lock.notify(int(value)) #通知lock.wait可以同时释放多少线程 18 lock.release()
4、线程池:
线程池模块:from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor
import threading,time
1 def task(args): 2 time.sleep(1) 3 print(args) 4 5 pool = ThreadPoolExecutor(5) #创建连接池,并指定连接池的连接数量 6 7 for i in range(30): 8 pool.submit(task,i) #到连接池中获取连接,并将数据交给task函数进行处理
1 import requests #如果没有此模块则使用:pip3 install requests进行安装 2 3 def task(url): 4 response = requests.get(url) 5 print("结果:",url,len(response.content)) #拿到结果后,可以根据自己的需求进行处理 6 7 pool = ThreadPoolExecutor(2) 8 9 url_list = [ 10 "http://www.baidu.com", 11 "http://www.open.com.cn", 12 "http://www.sina.com", 13 ] 14 15 for url in url_list: 16 pool.submit(task,url)
1 def download(url): 2 response = requests.get(url) 3 return response #此处的response包含了下载的所有内容 4 5 pool = ThreadPoolExecutor(2) 6 7 url_list = [ 8 "http://www.baidu.com", 9 "http://www.open.com.cn", 10 "http://www.sina.com", 11 ] 12 13 for url in url_list: 14 future = pool.submit(download,url) #此处只表示下载完成,并未执行其他操作(也可以理解为这里的结果是由download函数处理得到的) 15 future.add_done_callback(test)#当download函数返回response时,future就等于response,同时future也包含了response中所有的内容 16 # future.add_done_callback(test)固定写法,括号中是用户想将结果交给那个函数处理的函数名 17 18 def test(response): 19 download_response = response.result()#通过执行response.result()就相当于得到了download函数中返回值response(result()是固定写法) 20 print("结果:",download_response.url,download_response.status_code)#此处的download_response.url就等于download函数中的response.url
将以上过程写成两个文件(即将download和循环操作写成一个模块),在另一个文件中来引用并调用
1 from concurrent.futures import ThreadPoolExecutor 2 import requests 3 4 def down(url): 5 response = requests.get(url) 6 return response 7 8 def run(url_list): 9 pool = ThreadPoolExecutor(2) 10 for item in url_list: 11 url = item["url"] 12 call = item["call"] 13 futurt = pool.submit(down,url) 14 futurt.add_done_callback(call)
模块名为syan
1 import syan 2 3 def f1(f1_data): 4 response = f1_data.result()#拿到syan文件中处理的结果 5 print(response) 6 7 def f2(f2_data): 8 response = f1_data.result()#拿到syan文件中处理的结果 9 print(response) 10 11 def f3(f3_data): 12 response = f1_data.result()#拿到syan文件中处理的结果 13 print(response) 14 15 url_list = [ 16 {"url":"http://www.baidu.com","call":f1}, 17 {"url":"http://www.open.com.cn","call":f2}, 18 {"url":"http://www.sina.com","call":f3}, 19 ] 20 21 syan.run(url_list)
4.4、线程池整理:
共分为两种模式:(线程池是创建线程之后来处理批量任务)
一、将所有处理的任务写在一个函数中
二、将处理的任务分为两个函数来处理
武sir整理:
1 def task(url): 2 """ 3 任务执行两个操作:下载;保存本地 4 """ 5 # response中封装了Http请求响应的所有数据 6 # - response.url 请求的URL 7 # - response.status_code 响应状态码 8 # - response.text 响应内容(字符串格式) 9 # - response.content 响应内容(字节格式) 10 # 下载 11 response = requests.get(url) 12 13 # 下载内容保存至本地 14 f = open('a.log','wb') 15 f.write(response.content) 16 f.close() 17 18 19 pool = ThreadPoolExecutor(2) 20 url_list = [ 21 'http://www.oldboyedu.com', 22 'http://www.autohome.com.cn', 23 'http://www.baidu.com', 24 ] 25 26 27 for url in url_list: 28 print('开始请求',url) 29 # 去连接池中获取链接 30 pool.submit(task,url)
1 def save(future): 2 """ 3 只做保存 # future中包含response 4 """ 5 response = future.result() 6 7 # 下载内容保存至本地 8 f = open('a.log','wb') 9 f.write(response.content) 10 f.close() 11 12 def task(url): 13 """ 14 只做下载 requests 15 """ 16 # response中封装了Http请求响应的所有数据 17 # - response.url 请求的URL 18 # - response.status_code 响应状态码 19 # - response.text 响应内容(字符串格式) 20 # - response.content 响应内容(字节格式) 21 # 下载 22 response = requests.get(url) 23 return response 24 25 pool = ThreadPoolExecutor(2) 26 27 url_list = [ 28 'http://www.oldboyedu.com', 29 'http://www.autohome.com.cn', 30 'http://www.baidu.com', 31 ] 32 33 for url in url_list: 34 print('开始请求',url) 35 # 去连接池中获取链接 36 # future中包含response 37 future = pool.submit(task,url) 38 # 下载成功后,自动调用save方法 39 future.add_done_callback(save)
进程:
进程模块:from multiprocessing import Process
进程的创建与使用与线程完全一致
区别之处:
将线程中的threading.Thread改为Process
线程中的setDaemon(True)在进程中为daemon = True or False
进程之间的数据共享:
验证进程之间数据不共享:
1 from multiprocessing import Process 2 def task(num,li): 3 li.append(num) 4 print(li) 5 6 v = [] 7 8 for i in range(10): 9 p = Process(target=task,args=(i,v,)) 10 p.start() 11 12 13 结果: 14 [0] 15 [1] 16 [2] 17 [3] 18 [4] 19 [5] 20 [6] 21 [7] 22 [8] 23 [9]
1 线程: 2 from threading import Thread 3 def task(num,li): 4 li.append(num) 5 print(li) 6 7 v = [] 8 9 for i in range(10): 10 t = Thread(target=task,args=(i,v,)) 11 t.start() 12 13 14 结果: 15 [0] 16 [0, 1] 17 [0, 1, 2] 18 [0, 1, 2, 3] 19 [0, 1, 2, 3, 4] 20 [0, 1, 2, 3, 4, 5] 21 [0, 1, 2, 3, 4, 5, 6] 22 [0, 1, 2, 3, 4, 5, 6, 7] 23 [0, 1, 2, 3, 4, 5, 6, 7, 8] 24 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
实现进程之间数据的共享:
第一种方式:
需要的模块(Array):from multiprocessing import Process,Array
Array是c语言级别的共享,有一定的限制
Array("类型",长度)
1 from multiprocessing import Process,Array 2 3 def task(num,li): 4 li[num] = 1 #当num是1的时候,v的列表里就会有一个值变为1,当num是2到时候,v的列表里就会有两个值变为1 5 print(list(li)) 6 7 8 v = Array("i",10) #此处v的值为[0,0,0,0,0,0,0,0,0,0] 9 #Array("i",10)表示此列表的长度为10,并且数据类型为整数型 10 11 12 for i in range(10): 13 p = Process(target=task,args=(i,v,)) 14 p.start() 15 16 17 结果: 18 [1, 0, 0, 0, 0, 0, 0, 0, 0, 0] 19 [1, 1, 0, 0, 0, 0, 0, 0, 0, 0] 20 [1, 1, 1, 0, 0, 0, 0, 0, 0, 0] 21 [1, 1, 1, 1, 0, 0, 0, 0, 0, 0] 22 [1, 1, 1, 1, 1, 0, 0, 0, 0, 0] 23 [1, 1, 1, 1, 1, 1, 0, 0, 0, 0] 24 [1, 1, 1, 1, 1, 1, 1, 0, 0, 0] 25 [1, 1, 1, 1, 1, 1, 1, 1, 0, 0] 26 [1, 1, 1, 1, 1, 1, 1, 1, 1, 0] 27 [1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
第二种方式:(通过socket实现通信)
需要的模块(Manager):from multiprocessing import Process,Manager
Manager().dict() / Manager().list()
1 from multiprocessing import Process,Manager 2 3 def task(num,li): 4 li.append(num) 5 print(li) 6 7 8 # obj = Manager()#表示创建两socket对象 9 # dic = obj.dict() #或者可以写成:Manager().dict();特殊的字典 10 v = Manager().list() #特殊的列表 11 12 13 for i in range(10): 14 p = Process(target=task,args=(i,v,)) 15 p.start() 16 17 18 结果:会有报错,是因为连接没有关闭 19 [0] 20 [0, 1, 2] 21 [0, 1, 2] 22 [0, 1, 2, 3] 23 [0, 1, 2, 3, 4, 5] 24 [0, 1, 2, 3, 4, 5] 25 [0, 1, 2, 3, 4, 5, 6] 26 [0, 1, 2, 3, 4, 5, 6, 7] 27 [0, 1, 2, 3, 4, 5, 6, 7, 8]
进程池:
模块:from concurrent.futures import ProcessPoolExecutor
跟线程池的区别就是将线程池中的ThreadPoolExecutor改为ProcessPoolExecutor即可