目录:
基本概念
线程
进程
协程
Python操作缓存
一. 基本概念
现在的操作系统,如Unix、Linux、Windows、Mac OS X等,都是支持“多任务”的操作系统。
什么叫”多任务“呢?简单理解,就是我们可以一般上网浏览某车之家的网页,看看喜欢的车型信息;一边打开某易云音乐听听好歌;一边打开某软件股市行情图,不安的盯着曲线图...卧槽,又尼玛跌了! 这就是多任务喽。
多核心的CPU已经很普及了,但是,就是在过去的单核心CPU上,也可以执行多任务。
PS: CPU是分时间片的,假设任务1执行0.01秒,切换到任务2,任务2执行0.01秒,再切换到任务3, 执行0.01秒....在这个切换的过程中,需要不断的保存程序的执行状态,和恢复执行状态,简单的说,这个过程就叫做”上下文切换", 注意,过多的上下文切换是会带来很多额外的开销的!
对操作系统来说, 一个任务(程序)就是一个进程(Process), 比如打开浏览器访问某车之家,就是一个浏览器的进程。打开一个某易云音乐,就是一个某易云音乐进程(Process)。
这时候我忽然又想打开邮箱查收下邮件,然后在浏览器的标签栏里又打开了Gmail邮箱;然后我又想发条微博说下,卧槽,我在写博客!而后又打开浏览器的一个标签栏,打开了weibo.com... 也就是说在浏览器进程内部,我要同时做好些事情,这些子任务我们称为" 线程 ” (Thread).
每个进程至少要做一件事,所以,一个进程至少有一个线程,当然也可以有多个。多个线程可以同时运行,多线程的执行方式和多进程是一样的,也是由操作系统内核在多个线程(进程)之间快速切换,让每个线程(进程)都交替的运行,看起来就像是同时执行一样。当然,真正同时执行多线程需要多核心CPU才可能实现。
Python既支持多进程,又支持多线程。
小结:
- 线程是最小的执行单元,进程由至少一个线程组成;
- 进程和线程的调度,完全有操作系统决定,程序不能决定什么时候执行和执行多久。
- 多进程和多线程会会有资源同步和数据共享的问题。后面再续...
多线程、多进程
- 1. 一个应用程序可以有多进程、多线程
- 2. 默认是单进程、单线程
- 3. 单进程,多线程,在Python中不会性能提升,在Java和C#中可以提升。
提高并发:
- 多线程: IO操作,一般不会用到CPU,效率提升是可以的
- 多进程:计算型操作, 需要占用CPU,因此性能不会有提升
线程和进程是一个很重要的概念
一. 线程
1. 基础:
是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务
概念太吓人了,先来看一下进程,这个相对于线程来说还是稍微好理解一点的。进程,是程序运行的实体,这句话的意思是,程序是存放在硬盘中的,当这个程序运行时,就会产生若干个进程,并且这个进程是可见的
那么什么是 线程呢,线程是一个任务流,它被包含在进程之中。
(1)线程的状态
线程有五种状态,状态转换如下图:
(2) 线程同步(锁)
多线程的优势在于可以同时运行多个任务,而多个线程之间用到的数据可以共享进程的,效率很高(但在Python中多线程尽量应用在IO密集型的程序中)。正因为这样,所以存在数据不同步的问题.
为了避免资源争夺,所以引入了锁的概念。锁有两种状态:锁定和未锁定(这不废话么?)。每当一个线程要访问共享数据时,必须先加把锁,而后在处理,处理完成后,解锁,让其他线程再来处理。
线程与锁:
(3) 线程通信(条件变量)
条件变量允许线程在条件不满足的时候等待,等到条件满足的时候的时候发出一个通知,而后继续运行。
(4) 线程运行和阻塞的状态转换
阻塞状态的三种情况:
- 同步阻塞:运行竞争锁定的状态,线程请求锁定时将进入这个状态,一旦成功获得锁定又恢复到运行状态;
- 等待阻塞:等待其他线程通知的状态,线程获得条件锁定后,调用“等待(wait())”将进入,一旦其他线程发出通知,线程将进入同步阻塞状态,再次竞争条件锁定;
- 其他阻塞:运行的线程执行sleep()或者join()方法,或者发出I/O请求,会进入阻塞状态。当sleep()状态超时、join()等待线程终止或者超时、或者I/O处理完毕时,线程重新转入就绪状态。
2. 基本使用:
Python通过threading模块来提供对线程的支持。threading基于Java的线程设计模型。锁(Lock)和条件变量(condition)在Java中是对象的基本行为(每一个对象都自带了锁和条件变量),而Python中则是独立的对象。
Python Thread提供了Java Thread的行为的子集;
没有优先级、线程组,线程也不能被停止、暂停、恢复、中断。Java Thread中的部分被Python实现了的静态方法在Threading中以模块方法的形式提供。
threading常用方法:
threading.current_thread() 返回当前的线程变量。
threading.enumerate() 返回一个包含正在运行的线程的列表,正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
threading.active_count() 返回正在运行的线程数量,与len(threading.enumerate())一样。
Thread
Thread是线程类,与Java类似,有两种使用方法,直接传入要运行的方法或从Thread继承并覆盖 run()。
PS: Thread中的run()方法,就是CPU来调度的时候执行的.
import threading #创建线程第一种方法: 将要执行的方法做为参数传给方法 def f1(args): print(args) t = threading.Thread(Target=f1, args=(123,)) t.start()
#第二种, 自定义一个类,而后继承threading.Thread class MyThread(threading.Thread): #自己写一个类,继承threading.Thread def __init__(self,func,args): self.func = func self.args = args super(MyThread, self).__init__() #执行父类的构造方法 def run(self): #而后重写run方法 self.func(self.args) def f2(args): print(args) res = MyThread(f2,456) #传参数进去 res.start() #开始线程
Thread构造方法(__init__):
group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None
- group: 线程组
- target: 要执行的方法
- name: 线程名
- args/kwargs: 要传入的参数
常用方法:
- is_alive(): 返回线程是否在运行(启动前、终止前) 。
- getName(): 获取当前线程名
- setName(): 设置线程名
- isDaemon(): 获取线程是否是守护线程。 setDaemon(): 设置是否是守护进程
- 如果是后台线程,主线程执行过程中,后台线程也在运行,主线程执行完毕后,后台线程不论成功与否,全部都停止。
- 如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程夜之星完成后,程序停止
- start() : 启动线程
- join([timeout]): 阻塞当前上下文,直到调用此方法的线程终止或到达指定的timeout值
import threading import time def f1(args): time.sleep(5) print(args) for i in range(10): t1 = threading.Thread(target=f1,args=(123,)) t1.setDaemon(True) #默认值为False,也就是主线程等待子线程,值为True之后,会不等子线程,直接往下解释执行,该退出就退出 t1.start() t1.join(2) #主线程阻塞,等待子线程等待完成, 参数的意思是最多等待的秒数. 如果不join的话,程序输出直接就是一个end,也就是子线程根本不会运行 print('End')
3. 线程锁
Lock
由于线程之间是进行随机调度的,每个线程可能只执行n条之后,当多个线程同时修改同一条数据时可能会出现脏数据,所以线程锁就应用而生了。
Lock(指令锁) 是可用的最低级的同步指令。Lock处于锁定状态时,不被特定的线程拥有。
Lock有两种状态:
- 锁定
- 非锁定
构造方法:
Lock()
以及两个基本的方法。
实例方法:
- acquire([timeout]) 使线程进入同步阻塞状态,尝试获得锁定。
- release() 释放锁,使用前线程必须已经获得锁定,否则抛出异常。
#未使用锁的实例 import threading NUM = 10 def f1(args): global NUM NUM -= 1 print(NUM) for i in range(10): t = threading.Thread(target=f1,args=(i,)) t.start()
#使用Lock实例 import threading,time NUM = 10 lock = threading.Lock() def f1(args): global NUM lock.acquire() #加锁 NUM -= 1 time.sleep(1) print(NUM) lock.release() #释放锁 for i in range(10): t = threading.Thread(target=f1,args=(i,)) t.start()
Rlock
RLock() 是一个可以被同一个线程请求多次的同步指令。Rlock使用了“拥有的线程" 和 "递归等级"的概念,处于锁定状态时,RLock被某个线程拥有。拥有RLock的线程可以再次调用acquire() , 释放锁时需要调用release()相同次数。
可以认为RLock包含一个锁定池和一个初始值为0的计数器,每次成功调用acquire()和release(),计数器将+1/-1,为0时处于未锁定状态。
构造方法:
RLock()
实例方法:
acquire()
release()
import threading,time NUM = 10 lock = threading.RLock() def f1(args): global NUM lock.acquire() #加锁 NUM -= 1 lock.acquire() time.sleep(1) lock.release() #应用了几次acquire就需要几次release() lock.release() print(NUM) for i in range(10): t = threading.Thread(target=f1,args=(i,)) t.start()
Semaphore(信号量)
了解了上面的Lock和Rlock(互斥锁)之后,发现同时只能允许一个线程来更改数据,这和单线程就每个毛区别了,执行顺序不还是串行的么?而Semaphore是同时允许一定数量的线程来更改数据。可以理解成互斥锁的加强版,一个锁可以控制多个thread的访问,Lock的话,只能让一个线程来访问,Semaphore可以控制数量。
import threading,time NUM = 10 semaphore = threading.BoundedSemaphore(5) #最多允许5个线程同时运行,剩余的线程也只能阻塞等待了。。。 def f1(args): global NUM semaphore.acquire() NUM -= 1 time.sleep(1) semaphore.release() print(NUM) for i in range(10): t = threading.Thread(target=f1,args=(i,)) t.start()
Semaphore内部管理着一个内置的计数器,当调用acquire()时计数器-1.调用release()时内置计数器+1。计数器不能小于0,当计数器为0时,acquire()将阻塞线程知道其他线程调用release()。
Event(事件)
event线程间通信的方式,一个线程可以发送信号,其他的线程接受到信号后执行操作。
事件提供了三个主要方法:
- set 将Flag设置为True
- wait 阻塞当前线程,直到event的内部标志位被设置为True或者Timeout超时。如果内部标志位为True则wait()函数理解返回。
- clear 将Flag重新设置为False
- is_set 返回标志位当前状态
事件处理机制: 全局定义了一个”Flag“, 如果值为False,那么当程序执行event.wait方法时就会阻塞,如果”Flag“值为True,那么event.wait方法时便不再阻塞。
import threading,time class MyThread(threading.Thread): def __init__(self,signal): self.signal = signal super(MyThread,self).__init__() def run(self): #重写run方法 print('我是线程: %s, 因为我下面是wait(), 我要阻塞啦...'%self.name) self.signal.wait() print('我是线程: %s ,我被唤醒了,因为下面触发了set()操作,将我的标志位置为True了...'%self.name) signal = threading.Event() for i in range(10): t = MyThread(signal) t.start() print('主线程休眠3秒...') time.sleep(3) signal.set() #代码执行结果: 我是线程: Thread-1, 因为我下面是wait(), 我要阻塞啦... 我是线程: Thread-2, 因为我下面是wait(), 我要阻塞啦... 我是线程: Thread-3, 因为我下面是wait(), 我要阻塞啦... 我是线程: Thread-4, 因为我下面是wait(), 我要阻塞啦... 我是线程: Thread-5, 因为我下面是wait(), 我要阻塞啦... 我是线程: Thread-6, 因为我下面是wait(), 我要阻塞啦... 我是线程: Thread-7, 因为我下面是wait(), 我要阻塞啦... 我是线程: Thread-8, 因为我下面是wait(), 我要阻塞啦... 我是线程: Thread-9, 因为我下面是wait(), 我要阻塞啦... 我是线程: Thread-10, 因为我下面是wait(), 我要阻塞啦... 主线程休眠3秒... 我是线程: Thread-1 ,我被唤醒了,因为下面触发了set()操作,将我的标志位置为True了... 我是线程: Thread-4 ,我被唤醒了,因为下面触发了set()操作,将我的标志位置为True了... 我是线程: Thread-6 ,我被唤醒了,因为下面触发了set()操作,将我的标志位置为True了... 我是线程: Thread-8 ,我被唤醒了,因为下面触发了set()操作,将我的标志位置为True了... 我是线程: Thread-10 ,我被唤醒了,因为下面触发了set()操作,将我的标志位置为True了... 我是线程: Thread-2 ,我被唤醒了,因为下面触发了set()操作,将我的标志位置为True了... 我是线程: Thread-7 ,我被唤醒了,因为下面触发了set()操作,将我的标志位置为True了... 我是线程: Thread-3 ,我被唤醒了,因为下面触发了set()操作,将我的标志位置为True了... 我是线程: Thread-9 ,我被唤醒了,因为下面触发了set()操作,将我的标志位置为True了... 我是线程: Thread-5 ,我被唤醒了,因为下面触发了set()操作,将我的标志位置为True了...
Condition(条件)
可以把Condition理解为一把高级的锁,它提供了比Lock,RLock更高级的功能,允许控制复杂的线程同步问题。
threading.Condition在内部维护了一个锁对象(默认是RLock),可以在创建Condition对象的时候把锁对象作为参数传入。Condition也提供了acquire和release方法,
含义与锁的一样。提示它只是简单的调用内部锁对象对应的方法而已。
Condition还提供了如下方法(PS:这些方法只有在占用锁acquire之后才能调用,否则会抛异常RuntimeError)
- lock=threading.Condition()
- lock.wait() wait方法释放内部所占用的锁,同时线程被挂起,直到接受到通知被唤醒或者超时(如果提供了timeout值的话)。当线程被唤醒并重新占有锁的时候,程序才会继续执行下去。
- lock.notify() 唤起一个挂起的线程(如果存在挂起的线程)。PS:notify不会释放所占用的锁。
- lock.notify_all() lock.notifyAll() 唤起所有挂起的线程。PS: 也不会释放占用的锁。
import threading def run(num): con.acquire() #锁 con.wait() #释放内部占用的锁,挂起线程,等待通知唤醒或者被超时 print('Run the thread: %s'%num) con.release() con = threading.Condition() for i in range(10): t = threading.Thread(target=run,args=(i,)) t.start() while True: inp = input('>>>').strip() if inp == 'q': #如果输入的是q,则推出 break con.acquire() #加锁 con.notify(int(inp)) #发送通知,并将用户输入的数字转换为整形,发送唤醒的进程数量 con.release() #释放锁
import threading def condition_func(): ret = False inp = input('>>>').strip() if inp == '1': #如果用户输入1,唤醒一个线程 ret = True return ret def run(num): con.acquire() con.wait_for(condition_func) #接受一个函数 print(condition_func) print('Run the thread: %s'%num ) con.release() con = threading.Condition() for i in range(10): t = threading.Thread(target=run,args=(i,)) t.start()
Timer
定时器,指定n秒之后执行某操作。
from threading import Timer def f1(): print('Hello Tom') t = Timer(2,f1) #两秒之后执行f1 t.start()
4. 生产者消费者模型
二. 进程
1. 概念:
是计算机中已运行程序的实体。进程为曾经是分时系统的基本运作单位。在面向进程设计的系统(如早期的UNIX,Linux 2.4及更早的版本)中,进程是程序的基本执行实体;在面向线程设计的系统(如当代多数操作系统、Linux 2.6及更新的版本)中,进程本身不是基本运行单位,而是线程的容器。程序本身只是指令、数据及其组织形式的描述,进程才是程序(那些指令和数据)的真正运行实例。
2. 基本操作
Unix/Linux操作系统提供了一个fork()的系统调用,它非常特殊。普通的函数调用一次,返回一次。但是fork(),调用一次返回两次,操作系统会自动把当前进程(父进程)复制了一份(子进程),而后,分别在父进程和子进程内返回。
子进程永远返回 0, 而父进程返回子进程的ID,这样做的理由是,一个父进程可以fork出很多的子进程。所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()即可拿到父进程的ID。
Python中的 os模块封装了常见的系统调用,包括fork,可以轻松创建子进程:
import os,time print("Process (%s) start..." %os.getpid()) pid = os.fork() if pid == 0: print('I am child process (%s) and my parent is (%s)'%(os.getpid(),os.getppid())) else: print('I (%s) just created a child process(%s)'%(os.getpid(),pid)) time.sleep(10) #执行代码结果: Process (66554) start... I (66554) just created a child process(66555) I am child process (66555) and my parent is (66554)
PS: Windows中没有fork调用。
有了fork调用,一个进程在接到新任务时就可以fork一个子进程来处理新任务,常见的Apache服务器(Prefork模型)就是由父进程监听端口,每当有新的请求,就fork出子进程来处理新的请求。
multiprocessing
multiprocessing 模块是跨平台的多进程模块。multiprocessing模块提供了一个Process类来代表一个进程对象:
from multiprocessing import Process def f1(i): print('hello',i) for i in range(10): p = Process(target=f1,args=(i,)) p.start() #执行结果: hello 0 hello 1 hello 2 hello 3 hello 4 hello 5 hello 6 hello 7 hello 8 hello 9
创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,用start()方法启动,这样创建进程比fork()要简单的多。
Pool(进程池)
如果要启动大量的子进程,可以使用进程池的方式批量创建子进程。进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用的进程位置。
进程池中的两个方法:
- apply 以串行的方式执行(阻塞的)
- apply_async 以异步的方式执行(非阻塞的)
- close() 关闭pool,使其不再接受新的任务
- terminate() 结束工作进程,不再处理未完成的任务
- join() 主进程阻塞,等待子进程退出,join方法要在close或者terminate之后使用。
from multiprocessing import Pool import os,time,random def long_time_task(name): print('Running task %s(%s)...'%(name,os.getpid() )) start = time.time() time.sleep(random.random()*2) end = time.time() print('Task %s runs %0.2f seconds.'%(name,(end - start))) if __name__ == '__main__': print('Parent process %s.'%os.getpid()) p = Pool(5) #创建一个进程池,并设定执行数量为5 for i in range(10): p.apply_async(func=long_time_task,args=(i,)) #维持执行的进程总数为processes, 当一个进程执行完毕后会添加新的进程进去 print('Waiting for all subprocess done...') p.close() #使用join之前,先close,否则会报错,执行完close之后,不会有新的进程加入到pool p.join() #进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。 print('All subprocess done.') #全部执行完后,打印所有进程执行完成
执行结果:
Parent process 66995. Waiting for all subprocess done... Running task 0(66996)... Running task 1(66997)... Running task 2(66998)... Running task 3(66999)... Running task 4(67000)... Task 2 runs 0.07 seconds. Running task 5(66998)... Task 5 runs 0.48 seconds. Running task 6(66998)... Task 3 runs 0.65 seconds. Running task 7(66999)... Task 4 runs 0.66 seconds. Running task 8(67000)... Task 6 runs 0.15 seconds. Running task 9(66998)... Task 9 runs 0.25 seconds. Task 1 runs 0.98 seconds. Task 0 runs 1.18 seconds. Task 8 runs 0.99 seconds. Task 7 runs 1.28 seconds. All subprocess done.

from multiprocessing import Pool import os,time,random def long_time_task(name): print('Running task %s(%s)...'%(name,os.getpid() )) start = time.time() time.sleep(random.random()*2) end = time.time() print('Task %s runs %0.2f seconds.'%(name,(end - start))) if __name__ == '__main__': print('Parent process %s.'%os.getpid()) p = Pool(5) #创建一个进程池,并设定执行数量为5 for i in range(10): p.apply(func=long_time_task,args=(i,)) #维持执行的进程总数为processes, 当一个进程执行完毕后会添加新的进程进去 print('Waiting for all subprocess done...') p.close() #使用join之前,先close,否则会报错,执行完close之后,不会有新的进程加入到pool p.join() #进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。 print('All subprocess done.') #全部执行完后,打印所有进程执行完成 #执行代码结果: Parent process 67049. Running task 0(67050)... Task 0 runs 0.05 seconds. Running task 1(67051)... Task 1 runs 1.47 seconds. Running task 2(67052)... Task 2 runs 0.68 seconds. Running task 3(67053)... Task 3 runs 0.91 seconds. Running task 4(67054)... Task 4 runs 0.51 seconds. Running task 5(67050)... Task 5 runs 0.86 seconds. Running task 6(67051)... Task 6 runs 0.93 seconds. Running task 7(67052)... Task 7 runs 1.02 seconds. Running task 8(67053)... Task 8 runs 0.42 seconds. Running task 9(67054)... Task 9 runs 0.41 seconds. Waiting for all subprocess done... All subprocess done.
PS: Pool的默认大小是CPU的核心数,如果你有8核心CPU,你要提交至少9个子进程才能看到上面代码的效果。
进程间通信:
进程之间是需要通信的,操作系统提供了很多机制来实现进程间的通信。
在默认情况下进程间的数据是不共享的, 想象一下,你电脑上安装了某60的安全卫士,安装了某行的网银,这两个进程之间的数据共享、进程之间通信的话?嘿嘿...
so,默认下,进程间的数据是不共享的。
但是,如果是我们
from multiprocessing import Process from multiprocessing import Manager import time,os l1 = [] def f1(i): l1.append(i) #往l1中追加值 print('Process: %s, l1: %s' %(os.getpid(),l1)) for i in range(10): p = Process(target=f1,args=(i,)) #循环用10个进程,分别追加10个值,如果数据可以共享的话,l1中的值应该最后是0-9 p.start() time.sleep(1) print('Process: %s l1: %s End!'%(os.getpid(),l1)) #最后打印进程值和l1的值 #执行代码结果: Process: 67327, l1: [0] Process: 67328, l1: [1] Process: 67329, l1: [2] Process: 67330, l1: [3] Process: 67331, l1: [4] Process: 67332, l1: [5] Process: 67333, l1: [6] Process: 67334, l1: [7] Process: 67335, l1: [8] Process: 67336, l1: [9] Process: 67326 l1: [] End!
方法一:Array
PS: 不常用, 也可以实现进程间的数据共享,和Python的列表(链表) 特别相似。Java、C#中的数组。
数组只要定义好了,就必须:
- 类型必须是一致的;
- 个数也必须是一定的;
#方法一,Array from multiprocessing import Process,Array temp = Array('i', [1,2,3,4]) def Foo(i): temp[i] = 100+i for item in temp: print(i,'----->',item) for i in range(4): p = Process(target=Foo,args=(i,)) p.start() #代码执行结果: 0 -----> 100 0 -----> 2 0 -----> 3 0 -----> 4 1 -----> 100 1 -----> 101 1 -----> 3 1 -----> 4 2 -----> 100 2 -----> 101 2 -----> 102 2 -----> 4 3 -----> 100 3 -----> 101 3 -----> 102 3 -----> 103

'c': ctypes.c_char, 'u': ctypes.c_wchar, 'b': ctypes.c_byte, 'B': ctypes.c_ubyte, 'h': ctypes.c_short, 'H': ctypes.c_ushort, 'i': ctypes.c_int, 'I': ctypes.c_uint, 'l': ctypes.c_long, 'L': ctypes.c_ulong, 'f': ctypes.c_float, 'd': ctypes.c_double
方法二:Manager
from multiprocessing import Process,Manager manage = Manager() dic = manage.dict() def f1(i): dic[i] = 100+i print(dic.values()) for i in range(5): p = Process(target=f1,args=(i,)) p.start() p.join() #代码执行结果: [100] [100, 101] [100, 101, 102] [100, 101, 102, 103] [100, 101, 102, 103, 104]
方式三: Queue(队列,常用)
#方法三: queue from multiprocessing import Process from multiprocessing import Queue def f1(i,q): print(i,q.get()) if __name__ == '__main__': q = Queue() q.put(1) q.put(2) q.put(3) q.put(4) q.put(5) for i in range(5): p = Process(target=f1,args=(i,q)) p.start() #代码执行结果: 0 1 1 2 2 3 3 4 4 5
当创建进程时(非使用时), 共享数据会被拿到子进程中,当进程中的执行完毕后,再赋值给原值:
from multiprocessing import Process,Array,RLock def f1(l,t,i): l.acquire() t[0] = 100 + i #将第0个数+100 for item in temp: print(i,'---->',item) l.release() lock = RLock() temp = Array('i',[1,2,3,4]) for i in range(20): p = Process(target=f1,args=(lock,temp,i)) p.start() #代码执行结果: 0 ----> 100 0 ----> 2 0 ----> 3 0 ----> 4 1 ----> 101 1 ----> 2 1 ----> 3 1 ----> 4 2 ----> 102 2 ----> 2 2 ----> 3 2 ----> 4 3 ----> 103 3 ----> 2 3 ----> 3 3 ----> 4 4 ----> 104 4 ----> 2 4 ----> 3 4 ----> 4 5 ----> 105 5 ----> 2 5 ----> 3 5 ----> 4 6 ----> 106 6 ----> 2 6 ----> 3 6 ----> 4 7 ----> 107 7 ----> 2 7 ----> 3 7 ----> 4 8 ----> 108 8 ----> 2 8 ----> 3 8 ----> 4 9 ----> 109 9 ----> 2 9 ----> 3 9 ----> 4 10 ----> 110 10 ----> 2 10 ----> 3 10 ----> 4 11 ----> 111 11 ----> 2 11 ----> 3 11 ----> 4 12 ----> 112 12 ----> 2 12 ----> 3 12 ----> 4 13 ----> 113 13 ----> 2 13 ----> 3 13 ----> 4 14 ----> 114 14 ----> 2 14 ----> 3 14 ----> 4 15 ----> 115 15 ----> 2 15 ----> 3 15 ----> 4 16 ----> 116 16 ----> 2 16 ----> 3 16 ----> 4 17 ----> 117 17 ----> 2 17 ----> 3 17 ----> 4 18 ----> 118 18 ----> 2 18 ----> 3 18 ----> 4 19 ----> 119 19 ----> 2 19 ----> 3 19 ----> 4
三. 协程
线程和进程的操作都是由程序触发操作系统接口,最后执行的是系统,协程的操作则是程序员本身。
协程(Coroutine),又称为微线程,纤程.
协程存在的意义:
- 对于多线程应用,CPU通过切片的方式来切换进程间的执行,线程切换时需要耗时(保存状态、恢复状态下次继续,即,上下文切换)。因此没有线程切换的开销,线程数量越多,协程的性能优势就越是明显。
- 协程,则只使用一个线程,在一个线程中规定某个代码块的执行顺序。
- 不需要多线程的锁机制,因为只要只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。
协程适用的场景:
当程序中存在大量不需要CPU的操作时(如I/O操作), 适用于协程。
子程序调用总是一个入口,一次返回,调用顺序是明确的。如: 函数A调用B,B在执行过程中调用C,C执行完毕返回,B执行完毕返回,最后是A。
def A(): B() def B(): print('B') C() def C(): print('C') A()
但是,协程的调用和子程序不同。协程看上去也是子程序,但在其内部执行过程中可以中断,而后去执行其他的子程序,在适当的时候在返回再接着执行。
greenlet
from greenlet import greenlet def f1(): print(123) gr2.switch() #1, 跳到下面321 print(456) gr2.switch() #3. 跳到下面654 def f2(): print(321) gr1.switch() #2. 跳到上面456 print(654) gr1 = greenlet(f1) gr2 = greenlet(f2) gr1.switch() #代码执行结果: 123 321 456 654
gevent
gevent是一个基于libev的并发库。它为各种并发和网络相关的任务提供了整洁的API.
在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。
基本思想:
当一个greenlet遇到I/O操作时,比如访问网络,就自动切换到其他的greenlet,等到I/O操作完成,再在适当的时候切换回来继续执行。由于I/O操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换线程,就可以保证greenlet在运行,而不是等待I/O。
import gevent,os def f1(): print('Running in f1(%s)'%os.getpid()) gevent.sleep(0) print('Explicit context switch to f1 again') def f2(): print('Explicit context to f2(%s)'%os.getpid()) gevent.sleep(0) print('Implicit context switch back to f1') gevent.joinall( [ gevent.spawn(f1), gevent.spawn(f2), ] ) #代码执行结果: Running in f1(68207) Explicit context to f2(68207) Explicit context switch to f1 again Implicit context switch back to f1
from gevent import monkey; monkey.patch_all() #把socket请求做了一个封装,完成后才具有这种功能 import gevent import requests def f(url): print('GET: %s' %url) resp = requests.get(url) data = resp.text print('%d bytes received from %s. '%(len(data),url)) gevent.joinall([ gevent.spawn(f,'https://www.python.org/'), gevent.spawn(f,'https://www.yahoo.com/'), gevent.spawn(f,'https://github.com/'), gevent.spawn(f,'https://www.dbq168.com/'), ]) #代码执行结果: GET: https://www.python.org/ GET: https://www.yahoo.com/ GET: https://github.com/ GET: https://www.dbq168.com/ 464535 bytes received from https://www.yahoo.com/. 47394 bytes received from https://www.python.org/. 76292 bytes received from https://www.dbq168.com/. 25533 bytes received from https://github.com/.
从结果看,4个网络操作是并发执行的,而且结束顺序不同,但只有一个线程。
如果要让greenlet交替运行,可以通过gevent.sleep() 交出控制权。更多>>
五. Python 操作缓存
本章节包括Python操作Memcached以及Redis,详见另一篇博文.