Python线程
1. 操作系统/应用程序
操作系统分为: 硬件 , 装系统和安装软件
硬件:
-- 硬盘
-- CPU
-- 主板
-- 显卡
-- 内存
-- 电源
.......
装系统: 系统就是一个由程序员写出来的软件, 该软件用于控制计算机的硬件,让他们之间进行相互配合.
安软件:
-- 百度云
.....
2. 操作中的"并发"
并发(伪) : 由于执行熟读特别快 , 感觉不到停顿 , 是由同一个CPU执行
并行(真) : 创建多个线程同时操作
3. Python中线程和进程
print("111")
Python中没有线程和进程 , 它调用的是操作系统的线程和进程 .
import threading def func(arg): print(arg) t = threading.Thread(target = func) t.start() print("end")
一个应用程序(软件) , 可以有多个进程(默认只有一个) , 一个进程中可以创建多个线程(默认一个) .
总结 :
1. 操作系统帮助开发者操作硬件
2. 程序员写好代码在操作系统上运行(解释器)
Python多线程情况下 :
-- 计算密集型操作 : 效率低(GIL锁)
-- IO操作 : 效率高
Python多进程的情况下 :
-- 计算密集型操作 : 效率高(浪费资源)
-- IO操作 : 效率高(浪费资源)
写Python时 :
IO密集型用多线程 : 文件/输入输出/socket网络通信
计算密集型用多进程
进程与线程的区别:
线程 , CPU工作的最小单元
进程 , 为线程提供一个资源共享的空间
一个进程中默认是由一个主线程
进程和线程的使用准则:
计算密集型 : 多进程
IO密集型 : 多线程
线程创建的不是越多越好 , 线程之间切换时 , 要做上下文管理 .
4. Python线程编写
GIL锁 : 全局解释器锁 . 用于限制一个进程中同一时刻只有一个县城北CPU调度 . 默认GIL锁在执行100个CPU指令(过期时间)后 , 执行下一个线程 .
由于线程是CPU工作的最小单元 , 创建线程可以利用多核优势实现并行操作 .
注意 : 线程是为了工作 .
线程的应用 :
import threading def func(arg): print(arg) t = threading.Thread(target=func,args=(11,)) t.start() print(123) """ 结果: 11 123 """
import threading import time def func(arg): time.sleep(arg) print(arg) t1 = threading.Thread(target=func,args=(3,)) t1.start() t2 = threading.Thread(target=func,args=(9,)) t2.start() print(123) """ 结果: 123 3 9 """
import threading import time def func(arg): time.sleep(arg) print(arg) t1 = threading.Thread(target=func,args=(3,)) t1.setDaemon(True) t1.start() t2 = threading.Thread(target=func,args=(9,)) t2.setDaemon(True) t2.start() print(123) """ 结果: 123 """
import threading import time def func(arg): time.sleep(0.01) print(arg) print('创建子线程t1') t1 = threading.Thread(target=func,args=(3,)) t1.start() # 无参数,让主线程在这里等着,等到子线程t1执行完毕,才可以继续往下走。 # 有参数,让主线程在这里最多等待n秒,无论是否执行完毕,会继续往下走。 t1.join(2) print('创建子线程t2') t2 = threading.Thread(target=func,args=(9,)) t2.start() t2.join(2) # 让主线程在这里等着,等到子线程t2执行完毕,才可以继续往下走。 print(123)
import threading def func(arg): # 获取当前执行该函数的线程的对象 t = threading.current_thread() # 根据当前线程对象获取当前线程名称 name = t.getName() print(name,arg) t1 = threading.Thread(target=func,args=(11,)) t1.setName('姚明') t1.start() t2 = threading.Thread(target=func,args=(22,)) t2.setName('刘翔') t2.start() print(123) """ 结果: 姚明 11 刘翔 22 123 """
import threading def func(arg): print(arg) t1 = threading.Thread(target=func,args=(11,)) t1.start() # start 是开始运行线程吗?不是 # start 告诉cpu,我已经准备就绪,你可以调度我了。 print(123)
# 1. 常见 def func(arg): print(arg) t1 = threading.Thread(target=func,args=(11,)) t1.start() # 2. 不常见 class MyThread(threading.Thread): def run(self): print(11111,self._args,self._kwargs) t1 = MyThread(args=(11,)) t1.start() t2 = MyThread(args=(22,)) t2.start() print('end')
- start 线程准备就绪,等待CPU调度
- setName 为线程设置名称
- getName 获取线程名称
- setDaemon 设置为后台线程或前台线程(默认)
如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止
如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止 - join 逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义
5. 锁
Python内置的一个全局解释器锁,锁的作用就是保证同一时刻一个进程中只有一个线程可以被CPU调度.
Python语言的创始人在开发这门语言时 , 目的是快速把语言开发出来 , 如果加上GIL锁(C语言加锁) , 切换时按照100条字节指令来进行线程间的切换.
1. Lock
一次放一个
线程安全 , 多线程操作时 , 内部会让所有线程排队处理 . 如 : list/dict/Queue
线程不安全 + 人 => 排队处理
import threading import time v = [] lock = threading.Lock() def func(arg): lock.acquire() v.append(arg) time.sleep(0.01) m = v[-1] print(arg,m) lock.release() for i in range(10): t =threading.Thread(target=func,args=(i,)) t.start() """ 结果: 0 0 1 1 2 2 3 3 4 4 5 5 6 6 7 7 8 8 9 9 """
2. RLock
一次放一个
import threading import time v = [] lock = threading.RLock() def func(arg): lock.acquire() v.append(arg) time.sleep(0.01) m = v[-1] print(arg,m) lock.release() for i in range(10): t =threading.Thread(target=func,args=(i,)) t.start() """ 结果: 0 0 1 1 2 2 3 3 4 4 5 5 6 6 7 7 8 8 9 9 """
3. BoundedSemaphore
一次放任意量
import time import threading num = int(input(">>>")) lock = threading.BoundedSemaphore(num) def func(arg): lock.acquire() print(arg) time.sleep(1) lock.release() for i in range(20): t =threading.Thread(target=func,args=(i,)) t.start()
4. Condition
每次放任意量
import time import threading lock = threading.Condition() def func(arg): print('线程进来了') lock.acquire() lock.wait() # 加锁 print(arg) time.sleep(1) lock.release() for i in range(10): t =threading.Thread(target=func,args=(i,)) t.start() while True: inp = int(input('>>>')) lock.acquire() lock.notify(inp) lock.release()
import time import threading lock = threading.Condition() def xxxx(): print('来执行函数了') input(">>>") # ct = threading.current_thread() # 获取当前线程 # ct.getName() return True def func(arg): print('线程进来了') lock.wait_for(xxxx) print(arg) time.sleep(1) for i in range(10): t =threading.Thread(target=func,args=(i,)) t.start()
5. Event
一次放所有
import time import threading lock = threading.Event() def func(arg): print('线程来了') lock.wait() # 加锁:红灯 print(arg) for i in range(10): t =threading.Thread(target=func,args=(i,)) t.start() input(">>>>") lock.set() # 绿灯 lock.clear() # 再次变红灯 for i in range(10): t =threading.Thread(target=func,args=(i,)) t.start() input(">>>>") lock.set()
总结 :
线程安全 , 列表和字典线程安全
加锁的原因 :
-- 非线程安全
-- 控制一段代码
6. threading.local
作用 : 内部自动为每个线程维护一个空间(字典)
import time import threading v = threading.local() def func(arg): # 内部会为当前线程创建一个空间用于存储:phone=自己的值 v.phone = arg time.sleep(2) print(v.phone,arg) # 去当前线程自己空间取值 for i in range(10): t =threading.Thread(target=func,args=(i,)) t.start()
import time import threading DATA_DICT = {} def func(arg): ident = threading.get_ident() DATA_DICT[ident] = arg time.sleep(1) print(DATA_DICT[ident],arg) for i in range(10): t =threading.Thread(target=func,args=(i,)) t.start()
""" 以后:Flask框架内部看到源码 上下文管理 """ import time import threading INFO = {} class Local(object): def __getattr__(self, item): ident = threading.get_ident() return INFO[ident][item] def __setattr__(self, key, value): ident = threading.get_ident() if ident in INFO: INFO[ident][key] = value else: INFO[ident] = {key:value} obj = Local() def func(arg): obj.phone = arg # 调用对象的 __setattr__方法(“phone”,1) time.sleep(2) print(obj.phone,arg) for i in range(10): t =threading.Thread(target=func,args=(i,)) t.start()
7. 线程池
为了限制线程个数 , 线程不是越多越好 , 也不是越少越好 , 控制线程数可以提高效率.
from concurrent.futures import ThreadPoolExecutor import time def task(a1,a2): time.sleep(2) print(a1,a2) # 创建了一个线程池(最多5个线程) pool = ThreadPoolExecutor(5) for i in range(40): # 去线程池中申请一个线程,让线程执行task函数。 pool.submit(task,i,8)
8. 生产者消费者模型
三部件 :
生产者 :
队列 : 先进先出
栈 : 后进先出
消费者
生产者消费者模型解决了不用一直等待的问题 .
import time import queue import threading q = queue.Queue() # 线程安全 def producer(id): """ 生产者 :return: """ while True: time.sleep(2) q.put('包子') print('厨师%s 生产了一个包子' %id ) for i in range(1,4): t = threading.Thread(target=producer,args=(i,)) t.start() def consumer(id): """ 消费者 :return: """ while True: time.sleep(1) v1 = q.get() print('顾客 %s 吃了一个包子' % id) for i in range(1,3): t = threading.Thread(target=consumer,args=(i,)) t.start()
Python进程
进程是CPU资源分配的最小单元.
进程之间数据不共享.
data_list = [] def task(arg): data_list.append(arg) print(data_list) def run(): for i in range(10): p = multiprocessing.Process(target=task,args=(i,)) # p = threading.Thread(target=task,args=(i,)) p.start() if __name__ == '__main__': run()
class MyProcess(multiprocessing.Process): def run(self): print('当前进程', multiprocessing.current_process()) def run(): p1 = MyProcess() p1.start() p2 = MyProcess() p2.start() if __name__ == '__main__': run()
1. 进程之间资源共享
linux: q = multiprocessing.Queue() def task(arg, q): q.put(arg) def run(): for i in range(10): p = multiprocessing.Process(target=task, args=(i, q,)) p.start() while True: v = q.get() print(v) run() windows: def task(arg, q): q.put(arg) if __name__ == '__main__': q = multiprocessing.Queue() for i in range(10): p = multiprocessing.Process(target=task, args=(i, q,)) p.start() while True: v = q.get() print(v)
Linux: m = multiprocessing.Manager() dic = m.dict() def task(arg): dic[arg] = 100 def run(): for i in range(10): p = multiprocessing.Process(target=task, args=(i,)) p.start() input('>>>') print(dic.values()) if __name__ == '__main__': run() windows: def task(arg, dic): time.sleep(2) dic[arg] = 100 if __name__ == '__main__': m = multiprocessing.Manager() dic = m.dict() process_list = [] for i in range(10): p = multiprocessing.Process(target=task, args=(i, dic,)) p.start() process_list.append(p) while True: count = 0 for p in process_list: if not p.is_alive(): count += 1 if count == len(process_list): break print(dic)
2. 进程锁
import time import threading import multiprocessing lock = multiprocessing.RLock() def task(arg): print('鬼子来了') lock.acquire() time.sleep(2) print(arg) lock.release() if __name__ == '__main__': p1 = multiprocessing.Process(target=task,args=(1,)) p1.start() p2 = multiprocessing.Process(target=task, args=(2,)) p2.start()
进程锁跟线程锁的编写方式类似 , 线程锁的所有方法 , 进程锁都有.
3. 进程池
import time from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor def task(arg): time.sleep(2) print(arg) if __name__ == '__main__': pool = ProcessPoolExecutor(5) for i in range(10): pool.submit(task,i)
4. 初识爬虫
安装 : pip3 install requests 和 pip3 install beautifulsoup4 模块
requests 模块模拟浏览器发送请求
--- 本质 requests.get(...):
-- 创建socket客户端
-- 连接[阻塞]
-- 发送请求
-- 接受请求[阻塞]
-- 断开连接
import requests from bs4 import BeautifulSoup from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor # 模拟浏览器发送请求 # 内部创建 sk = socket.socket() # 和抽屉进行socket连接 sk.connect(...) # sk.sendall('...') # sk.recv(...) def task(url): print(url) r1 = requests.get( url=url, headers={ 'User-Agent':'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.92 Safari/537.36' } ) # 查看下载下来的文本信息 soup = BeautifulSoup(r1.text,'html.parser') print(soup.text) # content_list = soup.find('div',attrs={'id':'content-list'}) # for item in content_list.find_all('div',attrs={'class':'item'}): # title = item.find('a').text.strip() # target_url = item.find('a').get('href') # print(title,target_url) def run(): pool = ThreadPoolExecutor(5) for i in range(1,50): pool.submit(task,'https://dig.chouti.com/all/hot/recent/%s' %i) if __name__ == '__main__': run()