一、什么是线程?什么是进程?
第一,进程是一个实体。每一个进程都有它自己的地址空间,一般情况下,包括文本区域(text region)、数据区域(data region)和堆栈(stack region)。文本区域存储处理器执行的代码;数据区域存储变量和进程执行期间使用的动态分配的内存;堆栈区域存储着活动过程调用的指令和本地变量。 第二,进程是一个“执行中的程序”。程序是一个没有生命的实体,只有处理器赋予程序生命时(操作系统执行之),它才能成为一个活动的实体,我们称其为进程。[3] 进程是操作系统中最基本、重要的概念。是多道程序系统出现后,为了刻画系统内部出现的动态情况,描述系统内部各道程序的活动规律引进的一个概念,所有多道程序设计操作系统都建立在进程的基础上。 进程的概念
线程是进程中的一个实体,是被系统独立调度和分派的基本单位,线程自己不拥有系统资源,只拥有一点儿在运行中必不可少的资源,但它可与同属一个进程的其它线程共享进程所拥有的全部资源。
在单个程序中同时运行多个线程完成不同的工作,称为多线程。
协程,是由程序员创造出来的一个不是真实存在的东西;可以看做是一个微线程,对一个线程进程分片,使得线程在代码块之间进行来回切换执行,而不是在逐行执行。
注意:
对于python而言,其自己没有进程和线程,在执行的时候需要调用操作系统的进程和线程。
二、应用软件、进程和线程的关系
程序和进程的区别就在于:程序是指令的集合,它是进程运行的静态描述文本;进程是程序的一次执行活动,属于动态概念。在多道编程中,我们允许多个程序同时加载到内存中,在操作系统的调度下,可以实现并发地执行。这是这样的设计,大大提高了CPU的利用率。进程的出现让每个用户感觉到自己独享CPU,因此,进程就是为了在CPU上实现多道编程而提出的。
简单来说:一个应用程序(软件),可以有多个进程(默认只有一个),一个进程中可以创建多个线程(默认一个)。
三、进程、线程、协程的区别
进程是资源分配的最小单元,其作用是进行数据隔离,线程是cpu调度的最小单元,其作用主要是执行某个任务,一个应用程序可以有多个进程,一个进程可以有多个线程,在其他语言中几乎很少用进程,都是用多线程,而对于python来说,IO操作主要是用多线程实现,计算密集型操作主要是用多进程实现,主要原因是python中存在GIL锁,GIL锁的作用就是保证一个进程中同一时刻只有一个线程被cpu调度,所以在python中想要利用cpu的多核优势就是开多个进程。在后来程序员级别的人为了让代码更牛逼,创建了一个东西叫协程,这个东西本身不存在,是程序员自己创造出来的,协程可以对代码的执行顺序进行控制,本身这个东西存在没有意义,但是协程要是和IO切换放在一起就了不得了,遇到IO操作就切换,相当于把一个线程分片,所达到的效果是线程一直没有停,一直在工作,这就是进程、线程、协程的本质区别,在python中用协程的时候会有一个模块叫greenlet,协程加IO自动切换的模块叫gevent。
注意:IO密集型操作可以使用多线程;计算密集型可以使用多进程;
*****通过漫画了解进程与线程
四、线程
(一)线程的基本使用
#主程序默认等待子程序执行完毕 import threading import time def func(arg): time.sleep(arg) print(arg) t1 = threading.Thread(target=func,args=(3,)) t1.start() t2 = threading.Thread(target=func,args=(2,)) t2.start() print(123)
(二)线程的常用方法
1、start 线程准备就绪,等待CPU调度
2、setDaemon(True) 主程序不再等,主程序终止则所有子程序终止
import threading import time def func(arg): time.sleep(2) print(arg) t1 = threading.Thread(target=func,args=(3,)) t1.setDaemon(True) #设置主程序不在等待子程序执行完毕,主程序终止则子程序终止 t1.start() t2 = threading.Thread(target=func,args=(9,)) t2.setDaemon(True) t2.start() print(123)
3、join 开发者可以控制主线程等待子线程(最多等待时间)
import threading import time def func(arg): time.sleep(arg) print(arg) print('创建子线程t1') t1 = threading.Thread(target=func,args=(3,)) t1.start() # join()无参数,让主线程在这里等着,等到子线程t1执行完毕,才可以继续往下走。 # join(n)有参数,让主线程在这里最多等待n秒,无论是否执行完毕,会继续往下走,但最终会等所有子程序执行完毕主程序在终止。 t1.join(6) print('创建子线程t2') t2 = threading.Thread(target=func,args=(6,)) t2.start() t2.join(6) print(123)
4、其它方法
import threading import time def func(arg): # 获取当前执行该函数的线程的对象 t = threading.current_thread() # 根据当前线程对象获取当前线程名称 name = t.getName() print(name,arg) t1 = threading.Thread(target=func,args=(11,)) t1.setName('线程1') #设置线程名称 t1.start() t2 = threading.Thread(target=func,args=(22,)) t2.setName('线程2') #设置线程名称 t2.start() print(123)
(三)两种方法创建多线程
#多线程方式一(常见):
import threading def func(arg): print(arg) for i in range(4): t = threading.Thread(target=func,args=(i,)) t.start()
#多线程方式二(通过面向对象创建):
import threading class MyThread(threading.Thread): def run(self): print(11111,self._args,self._kwargs) t1 = MyThread(args=(11,)) t1.start() t2 = MyThread(args=(22,)) t2.start()
(四)几种常见的线程锁
问题:为什么要加线程锁?
#如果不加锁就可能出现多个线程抢占资源的情况,从而出现脏数据,加上锁之后可以保证在同一时刻只有一个线程在执行。
*****同步锁******
import threading import time lock = threading.Lock() v = [] def task(i): lock.acquire() v.append(i) time.sleep(0.01) m = v[-1] print(m,i) lock.release() for i in range(10): t = threading.Thread(target=task,args=(i,)) t.start()
*****递归锁:可以多次获得锁,多次释放锁******
import threading import time lock = threading.RLock() v = [] def task(i): lock.acquire() lock.acquire() v.append(i) time.sleep(0.01) m = v[-1] print(m,i) lock.release() lock.release() for i in range(10): t = threading.Thread(target=task,args=(i,)) t.start()
*****信号锁:可以控制一次最多有n个线程执行代码******
import threading import time lock = threading.BoundedSemaphore(5) def task(i): lock.acquire() print(i) time.sleep(1) lock.release() for i in range(10): t = threading.Thread(target=task,args=(i,)) t.start()
*****条件锁:简单理解,满足条件时,释放线程,根据输入数量释放相应的线程数量******
import threading import time lock = threading.Condition() def task(i): print('线程来了',i) lock.acquire() lock.wait() print(i) time.sleep(1) lock.release() for i in range(10): t = threading.Thread(target=task,args=(i,)) t.start() while 1: num = int(input('请输入一次要执行的线程数量>>>')) lock.acquire() lock.notify(num) lock.release()
*****事件锁:一次放所有线程******
import threading import time lock = threading.Event() def task(i): print('线程来了') lock.wait() #“Flag”值为False print(i) time.sleep(1) for i in range(10): t = threading.Thread(target=task,args=(i,)) t.start() input('>>>') lock.set() #“Flag”值为True lock.clear #“Flag”值为False
(五)线程池
from concurrent.futures import ThreadPoolExecutor import time def task(a1,a2): time.sleep(2) print(a1,a2) # 创建了一个线程池(最多5个线程) pool = ThreadPoolExecutor(5) for i in range(40): # 去线程池中申请一个线程,让线程执行task函数。 pool.submit(task,i,8)
(六)生产者消费者模型
import time import queue import threading q = queue.Queue() # 线程安全 def producer(id): """ 生产者 :return: """ while True: time.sleep(2) q.put('包子') print('厨师%s 生产了一个包子' %id ) for i in range(1,4): t = threading.Thread(target=producer,args=(i,)) t.start() def consumer(id): """ 消费者 :return: """ while True: time.sleep(1) v1 = q.get() print('顾客 %s 吃了一个包子' % id) for i in range(1,3): t = threading.Thread(target=consumer,args=(i,)) t.start()
(七)threadinglocal 原理
作用:为每个线程创建一个独立的空间,使得线程对自己的空间中的数据进行操作(数据隔离)。
import time import threading INFO = {} class Local(object): def __getattr__(self, item): ident = threading.get_ident() return INFO[ident][item] def __setattr__(self, key, value): ident = threading.get_ident() if ident in INFO: INFO[ident][key] = value else: INFO[ident] = {key:value} obj = Local() def func(arg): obj.phone = arg # 调用对象的 __setattr__方法(“phone”,1) time.sleep(2) print(obj.phone,arg) for i in range(10): t =threading.Thread(target=func,args=(i,)) t.start()
import time import threading class Local(object): def __init__(self): object.__setattr__(self,'INFO',{}) def __getattr__(self, item): ident = threading.get_ident() return self.INFO[ident][item] def __setattr__(self, key, value): ident = threading.get_ident() if ident in self.INFO: self.INFO[ident][key] = value else: self.INFO[ident] = {key:value} obj = Local() def func(arg): obj.phone = arg # 调用对象的 __setattr__方法(“phone”,1) time.sleep(2) print(obj.phone,arg) for i in range(10): t =threading.Thread(target=func,args=(i,)) t.start()
五、进程
(一)进程的常用功能----------------其方法和线程类似
- join
- daemon
- name
- multiprocessing.current_process()
- multiprocessing.current_process().ident/pid
import time import multiprocessing def task(arg): # time.sleep(3) print(arg) p = multiprocessing.current_process() #获取当前工作进程 print(p.name) #获取进程名称 if __name__ == '__main__': p1 = multiprocessing.Process(target=task,args=(1,)) p1.name = 'pp1' #设置进程名称 p1.daemon=True #主进程执行完毕所有子进程不论执行到哪里都关闭 p1.start() # p1.join(0.5) #加参数主进程最长等待子进程0.5秒就向下执行,但最终都会等子进程执行完毕在关闭 print(1234) p2 = multiprocessing.Process(target=task, args=(2,)) p2.name = 'pp2' #设置进程名称 p2.daemon=True p2.start() # p2.join(0.5) print(2345)
(二)进程的数据共享问题
进程之间的数据默认不共享,但通过Queue和Manager可以使进程之间的数据共享
import multiprocessing data_list = [] def task(arg): data_list.append(arg) print(data_list) def run(): for i in range(4): p = multiprocessing.Process(target=task,args=(i,)) p.start() if __name__ == '__main__': run() #结果 # [2] # [0] # [1] # [3]
*******进程间的数据共享:multiprocessing.Queue
q = multiprocessing.Queue() def task(arg,q): q.put(arg) def run(): for i in range(10): p = multiprocessing.Process(target=task, args=(i, q,)) p.start() while True: v = q.get() print(v) if __name__ == '__main__': run()
**********进程间的数据共享:Manager
def task(arg,dic): time.sleep(2) dic[arg] = 100 if __name__ == '__main__': m = multiprocessing.Manager() dic = m.dict() process_list = [] for i in range(10): p = multiprocessing.Process(target=task, args=(i,dic,)) p.start() process_list.append(p) #处理主进程和子进程数据共享问题,避免主进程结束之后共享的数据消失 while True: count = 0 for p in process_list: if not p.is_alive(): count += 1 if count == len(process_list): break print(dic)
(三)进程锁
参考线程锁,其使用方法及其类似
*******为什么会有进程锁?
当多个进程共享某个数据时,为了保证数据安全,不出现脏数据,会给进程加锁.
(四)进程池
import time from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor def task(arg): time.sleep(2) print(arg) if __name__ == '__main__': pool = ProcessPoolExecutor(5) #进程池里面设置了5个进程 for i in range(10): pool.submit(task,i)
六、简单爬虫实例--------使用线程池
import requests from bs4 import BeautifulSoup from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor # 模拟浏览器发送请求 # 内部创建 sk = socket.socket() # 和抽屉进行socket连接 sk.connect(...) # sk.sendall('...') # sk.recv(...) def task(url): r1 = requests.get( url=url, headers={ 'User-Agent':'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.92 Safari/537.36' } ) # 查看下载下来的文本信息 soup = BeautifulSoup(r1.text,'html.parser') content_list = soup.find('div',attrs={'id':'content-list'}) for item in content_list.find_all('div',attrs={'class':'item'}): title = item.find('a').text.strip() target_url = item.find('a').get('href') print(title,target_url) def run(): pool = ThreadPoolExecutor(5) for i in range(1,50): pool.submit(task,'https://dig.chouti.com/all/hot/recent/%s' %i) if __name__ == '__main__': run()
七、协程
协程的实现主要应用greenlet模块,可使线程在代码块之间来回切换执行,见代码:
import greenlet def f1(): print(11) gr2.switch() print(22) gr2.switch() def f2(): print(33) gr1.switch() print(44) # 协程 gr1 gr1 = greenlet.greenlet(f1) # 协程 gr2 gr2 = greenlet.greenlet(f2) gr1.switch()
除了以上方法之外,也可以手动实现协程,上代码:
def f1(): print(11) yield print(22) yield print(33) def f2(): print(55) yield print(66) yield print(77) v1 = f1() v2 = f2() next(v1) next(v2) next(v1) next(v2) next(v1) next(v2)
注意:单纯的协程无用,并不能实现并发操作,但是协程遇到IO就切换就牛逼起来了,可大大缩短执行任务的等待时间,提高代码执行的速度,上代码:
from gevent import monkey monkey.patch_all() # 以后代码中遇到IO都会自动执行greenlet的switch进行切换 import requests import gevent def get_page1(url): ret = requests.get(url) print(url,ret.content) def get_page2(url): ret = requests.get(url) print(url,ret.content) def get_page3(url): ret = requests.get(url) print(url,ret.content) gevent.joinall([ gevent.spawn(get_page1, 'https://www.python.org/'), # 协程1 gevent.spawn(get_page2, 'https://www.yahoo.com/'), # 协程2 gevent.spawn(get_page3, 'https://github.com/'), # 协程3 ])
八、基于IO多路复用+socket实现并发请求,一个线程100个请求
(一)IO多路复用
作用:可以监听所有的IO请求的状态。
- socket
I,input
o,output
操作系统检测socket是否发生变化,有三种模式:
select:最多1024个socket;循环去检测。
poll:不限制监听socket个数;循环去检测(水平触发)。
epoll:不限制监听socket个数;回调方式(边缘触发)。
Python模块:
select.select
select.epoll
注意:windows系统只支持select。
(二)基于IO多路复用+socket实现单线程的并发,上代码:
import socket import select client1 = socket.socket() client1.setblocking(False) # 百度创建连接: 非阻塞 try: client1.connect(('www.baidu.com',80)) except BlockingIOError as e: pass client2 = socket.socket() client2.setblocking(False) # 百度搜狗连接: 非阻塞 try: client2.connect(('www.sogou.com',80)) except BlockingIOError as e: pass client3 = socket.socket() client3.setblocking(False) # 创建老男孩连接: 非阻塞 try: client3.connect(('www.oldboyedu.com',80)) except BlockingIOError as e: pass socket_list = [client1,client2,client3] conn_list = [client1,client2,client3] while True: rlist,wlist,elist = select.select(socket_list,conn_list,[],0.005) # wlist中表示已经连接成功的socket对象 for sk in wlist: if sk == client1: sk.sendall(b'GET /s?wd=alex HTTP/1.0 host:www.baidu.com ') elif sk==client2: sk.sendall(b'GET /web?query=fdf HTTP/1.0 host:www.sogou.com ') else: sk.sendall(b'GET /s?wd=alex HTTP/1.0 host:www.oldboyedu.com ') conn_list.remove(sk) for sk in rlist: chunk_list = [] while True: try: chunk = sk.recv(8096) if not chunk: break chunk_list.append(chunk) except BlockingIOError as e: break body = b''.join(chunk_list) # print(body.decode('utf-8')) print('------------>',body) sk.close() socket_list.remove(sk) if not socket_list: break
import socket import select class Req(object): def __init__(self,sk,func): self.sock = sk self.func = func def fileno(self): return self.sock.fileno() class Nb(object): def __init__(self): self.conn_list = [] self.socket_list = [] def add(self,url,func): client = socket.socket() client.setblocking(False) # 非阻塞 try: client.connect((url, 80)) except BlockingIOError as e: pass obj = Req(client,func) self.conn_list.append(obj) self.socket_list.append(obj) def run(self): while True: rlist,wlist,elist = select.select(self.socket_list,self.conn_list,[],0.005) # wlist中表示已经连接成功的req对象 for sk in wlist: # 发生变换的req对象 sk.sock.sendall(b'GET /s?wd=alex HTTP/1.0 host:www.baidu.com ') self.conn_list.remove(sk) for sk in rlist: chunk_list = [] while True: try: chunk = sk.sock.recv(8096) if not chunk: break chunk_list.append(chunk) except BlockingIOError as e: break body = b''.join(chunk_list) # print(body.decode('utf-8')) sk.func(body) sk.sock.close() self.socket_list.remove(sk) if not self.socket_list: break def baidu_repsonse(body): print('百度下载结果:',body) def sogou_repsonse(body): print('搜狗下载结果:', body) def oldboyedu_repsonse(body): print('老男孩下载结果:', body) t1 = Nb() t1.add('www.baidu.com',baidu_repsonse) t1.add('www.sogou.com',sogou_repsonse) t1.add('www.oldboyedu.com',oldboyedu_repsonse) t1.run()
(三)总结
1、socket默认是阻塞的,阻塞体现在connect和recv两个阶段
2、如何让socket编程非阻塞?
设置setblocking(False)
3、提高并发方案:
- 多进程
- 多线程
- 异步非阻塞模块(Twisted) scrapy框架(单线程完成并发)
4、 什么是异步非阻塞?
- 非阻塞,不等待。
比如创建socket对某个地址进行connect、获取接收数据recv时默认都会等待(连接成功或接收到数据),才执行后续操作。
如果设置setblocking(False),以上两个过程就不再等待,但是会报BlockingIOError的错误,只要捕获即可。
- 异步,通知,执行完成之后自动执行回调函数或自动执行某些操作(通知)。
比如做爬虫中向某个地址baidu.com发送请求,当请求执行完成之后自执行回调函数。
5、什么是同步阻塞?
- 阻塞:等
- 同步:按照顺序逐步执行
6.什么是并行和并发?
并发 不能利用多核,同一时间段内,有多个任务在一个CPU上
并行 能利用多核,同一时刻有多个任务在CPU上同时执行轮流被执行