io操作不占用CPU,从内存、磁盘都读写数据是不占用CPU的;涉及计算的会占用CPU。
python多线程不适合cpu密集操作型的任务,适合io操作密集型的任务。
所以,如果任务涉及IO较多,那就适合多线程;如果涉及的计算较多,那就不适合多线程,不然cpu会一直切换上下文,反而降低效率。
所以cpu密集操作型的任务就用到了多进程。
进程之间的数据不共享,所以就不涉及锁的问题了;8核cpu,起8个以上进程,每个进程起N个线程,就利用了多核的优势。
1.多进程multiprocessing
进程的语法与线程类似;进程内可以启动线程。
1.1 创建进程
import multiprocessing import threading import time def threading_run(name): print('threading:{}'.format(name)) def run(name): time.sleep(2) t = threading.Thread(target=threading_run,args=(name,)) t.start() print('name:{}'.format(name)) for i in range(10): p = multiprocessing.Process(target=run,args=('zhang',)) #创建进程 p.start() #启动进程
1.2 下例演示进程间关系
#!/usr/bin/env python from multiprocessing import Process import os def info(title): print(title) print('module name:', __name__) print('parent process:', os.getppid()) #打印父进程PID print('process id:', os.getpid()) #打印当前进程PID print(" ") def f(name): info('function f') print('hello', name) if __name__ == '__main__': info('main process line') p = Process(target=f, args=('bob',)) p.start() p.join() #结果: main process line module name: __main__ parent process: 3910 process id: 4810 function f module name: __main__ parent process: 4810 process id: 4816 hello bob
上面结果所示,主进程同样有个父PID 3910,这是因为在linux里(在windows里貌似也一样)任何进程都是由一个父进程启动的,就好像init进程是所有进程的父进程那样。
主进程的PID是4810,所以子进程的父进程也是4810,子进程的PID是4816。
1.3 进程间通讯
不同进程间内存是不共享的,要想实现两个进程间的数据交换,有几种方式实现,但本质都是找一个“媒婆”在中间传话。
1.3.1 进程队列
普通队列(import queue;q = queue.Queue)被称为线程队列,线程可以共享数据,但是不同进程是不能共享数据的;线程queue无法直接传给进程。
进程队列(from multiprocessing import Process,Queue;q = Queue())使不同进程可以共享进程队列里的数据,如下:
from multiprocessing import Process,Queue import time def run1(q): print('run1') q.put([1,2,3]) def run2(q): print('run2') print(q.get()) #p2进程获取p1进程put的队列数据 if __name__ == '__main__': q = Queue() p1 = Process(target=run1,args=(q,)) #创建一个进程 p1.start() p2 = Process(target=run2,args=(q,)) #创建另一个进程 p2.start() #结果: run1 run2 [1, 2, 3]
进程队列看似是多个进程共享一个队列,其实,相当于是克隆,如上面所示,把q传给p1,等于是克隆了一份,把q传给p2,又克隆了一份,进程修改了数据后会利用pickle进行通信(底层的东西),实现进程的数据同步,保证数据一致性。
进程A将数据pickle传给进程B,同样进程B将数据pickle传给进程A,双方其实是通过pickle来进行的通信(而不是共享一个队列),因为他俩的内存区域并不同,只能通过数据pickle到一块共享区域才能实现共享。
1.3.2 Pipes 管道
管道就像一个socket,两端连着不同的进程。
from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.send('zsc') print('from parent:{}'.format(conn.recv())) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']" print(parent_conn.recv()) #对端发几次,己端酒需要收几次。 parent_conn.send('i am a human') p.join() #结果: [42, None, 'hello'] zsc from parent:i am a human
1.3.3 Manager实现进程共享数据
from multiprocessing import Process,Manager import os def f(d,l): d[os.getpid()] = os.getpid() #字典添加pid l.append(os.getpid()) #列表添加pid print(l) if __name__ == '__main__': with Manager() as manager: d = manager.dict() #创建一个独特的字典,这个字典可以被多个进程共享 l = manager.list() #创建一个可以被多个进程共享的列表 join_list = [] for i in range(10): p = Process(target=f,args=(d,l)) p.start() join_list.append(p) for i in join_list: i.join() print(d) print(l) #结果: [7430] [7430, 7431] [7430, 7431, 7433] [7430, 7431, 7433, 7438] [7430, 7431, 7433, 7438, 7440] [7430, 7431, 7433, 7438, 7440, 7442] [7430, 7431, 7433, 7438, 7440, 7442, 7439, 7436] [7430, 7431, 7433, 7438, 7440, 7442, 7439, 7436, 7445] [7430, 7431, 7433, 7438, 7440, 7442, 7439, 7436, 7445] [7430, 7431, 7433, 7438, 7440, 7442, 7439, 7436, 7445, 7447] {7440: 7440, 7442: 7442, 7445: 7445, 7430: 7430, 7431: 7431, 7433: 7433, 7447: 7447, 7436: 7436, 7438: 7438, 7439: 7439} [7430, 7431, 7433, 7438, 7440, 7442, 7439, 7436, 7445, 7447]
1.3.4 进程锁
from multiprocessing import Process, Lock def f(l, i): l.acquire() #锁 try: print('hello world', i) finally: l.release() #解锁 if __name__ == '__main__': lock = Lock() #定义锁 for num in range(10): Process(target=f, args=(lock, num)).start() #必须把锁传给进程
进程锁的作用是防止屏幕输出错乱,虽然进程不共享数据,但是共享一个屏幕,如果进程很多,可能出现打印的数据错乱,这时候可以尝试加上进程锁。
1.4 进程池
- apply ,同步执行,串行。
- apply_async ,异步执行,并行。
from multiprocessing import Process,Pool import time def Foo(i): time.sleep(2) print(i+100) return i+100 def Bar(arg): print('-->exec done:',arg) pool = Pool(5) for i in range(10): pool.apply_async(func=Foo, args=(i,),callback=Bar) #并行执行,同时执行5个进程;callback是回调,当Foo方法执行完毕后,才会再调用Bar方法; # pool.apply(func=Foo, args=(i,)) print('end') pool.close() pool.join()#进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。
2. 协程
2.1 简述
协程,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程。
协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:
协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。
协程的好处:
- 无需线程上下文切换的开销
- 无需原子操作锁定及同步的开销
- 方便切换控制流,简化编程模型
- 高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。
缺点:
- 无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上.当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。
- 进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序
使用协程可以做到只要遇到IO操作就切换,这样可以使cpu一直处于工作状态,提高速度,下面的代码,每个time.sleep都比如是在进行IO操作,
def home(): time.sleep(5) #遇到了IO操作,那就执行别的方法,也就是login() print('home') def login(): time.sleep(2) #又遇到了IO操作,那就继续往下执行。 print('login') def bbs(): print(bb) #没有IO操作,可以打印。
这里有个问题,光是遇到IO就切换走了,那什么时候切换回来呢,怎么知道IO操作是不是执行完了呢?greenlet可以实现手动切换,gevent可以实现自动切换,继续往下看。
2.2 Greenlet
greenlet需要手动切换
from greenlet import greenlet def test1(): print(12) gr2.switch() #切换到gr2 print(34) gr2.switch() def test2(): print(56) gr1.switch() #切换到gr1 print(78) gr1 = greenlet(test1) gr2 = greenlet(test2) gr1.switch() #结果: 12 56 34 78
2.3 Gevent
Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。
import gevent def foo(): print('1 Running in foo') gevent.sleep(2) #触发切换 print('2 Explicit context switch to foo again') def bar(): print('3 Explicit context to bar') gevent.sleep(1) #触发切换 print('4 Implicit context switch back to bar') gevent.joinall([ gevent.spawn(foo), gevent.spawn(bar), ]) #结果: 1 Running in foo 3 Explicit context to bar 4 Implicit context switch back to bar 2 Explicit context switch to foo again
gevent会自动判断是不是IO操作,遇到IO阻塞就会需要切换。
下面实例演示下载网页,需要注意的是,gevent无法认知urllib是否在进行IO操作,所以需要monkey补丁,代码见下:
from gevent import monkey monkey.patch_all() #打上这个补丁,gevent就能识别出urllib的IO操作了,当urllib进行IO操作时就切换。 import gevent from urllib import request def f(url): print('GET: %s' % url) resp = request.urlopen(url) data = resp.read() print('%d bytes received from %s.' % (len(data), url)) gevent.joinall([ gevent.spawn(f, 'https://www.python.org/'), gevent.spawn(f, 'https://www.yahoo.com/'), gevent.spawn(f, 'https://github.com/'), ])
2.4 通过gevent实现单线程下的多socket并发
server side
import sys import socket import time import gevent from gevent import socket,monkey monkey.patch_all() def server(port): s = socket.socket() s.bind(('0.0.0.0', port)) s.listen(500) while True: cli, addr = s.accept() gevent.spawn(handle_request, cli) #创建协程 def handle_request(conn): try: while True: data = conn.recv(1024) print("recv:", data) conn.send(data) if not data: break except Exception as ex: print(ex) finally: conn.close() if __name__ == '__main__': server(8001)
client side
import socket HOST = 'localhost' # The remote host PORT = 8001 # The same port as used by the server s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((HOST, PORT)) while True: msg = bytes(input(">>:"),encoding="utf8") s.sendall(msg) data = s.recv(1024) #print(data) print('Received', repr(data)) s.close()
3. 论事件驱动与异步IO
windows只支持select;linux2.6及以后支持select和epoll。
selectors模块
selectors模块是select和epoll进行了封装,它会优先使用epoll模型,不支持epoll模型(windows)的话会使用select网络模型。
server端代码:
#!/usr/bin/env python import selectors import socket sel = selectors.DefaultSelector() def accept_client(sock,mask): conn,addr = sock.accept() print('conn:{},addr:{},mask:{}'.format(conn,addr,mask)) conn.setblocking(False) sel.register(conn, selectors.EVENT_READ, read_client) def read_client(conn,mask): data = conn.recv(1024) if data: print('recv from {},the content is {}'.format(conn,data)) conn.send(data) else: print('{} closed'.format(conn)) sel.unregister(conn) conn.close() server = socket.socket() server.bind(('localhost',1236)) server.listen(1234) server.setblocking(False) sel.register(server, selectors.EVENT_READ,accept_client) #将server注册到selectors。 while True: events = sel.select() #这里虽然写的是select,但是它会优先选择epoll网络模型,没有的话再用select网络模型,这里的select应该是“选择网络模型”的意思,而不是“使用select网络模型”。默认这里是阻塞的,有活动连接就返回连接列表。 for key,mask in events: callable = key.data #这里的callback是accept_client callable(key.fileobj,mask)
client端代码:
#!/usr/bin/env python import socket client = socket.socket() client.connect(('localhost',1236)) while True: user_input = input('>') if user_input == 'q': break client.send(user_input.encode()) data = client.recv(1024) print(data) client.close()