线程:
线程可以理解成轻量的进程,实际在linux两者几乎没有区别,唯一的区别是线程并不产生新的地址空间和资源。
Threading用于提供线程的相关操作,线程是应用程序中工作的最小单元。
1
2
3
4
5
6
7
8
9
10
|
import threading import time def show(arg): time.sleep( 1 ) print ( 'thread' + str (arg)) for i in range ( 10 ): t = threading.Thread(target = show, args = (i,)) t.start() |
上述代码创建了10个前台线程,然后控制器就交给了cpu,CPU根据内部算法进行调度,
更多的方法:
- start 线程准备就绪,等待CPU调度
- setName 为线程设置名字
- getName 获取线程名称
- setDaemon 设置后台线程或者前台线程(默认False)
如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止
如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止
- join 逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义
- run 线程被cpu调度后自动执行线程对象的run方法
线程锁:
由于线程之间是进行随机调度,并且每个线程可能只执行n条执行之后,CPU接着执行其他线程。所以,可能出现如下问题:
#!/usr/bin/env python # -*- coding:utf-8 -*- import threading import time gl_num = 0 def show(arg): global gl_num time.sleep(1) gl_num +=1 print gl_num for i in range(10): t = threading.Thread(target=show, args=(i,)) t.start() print 'main thread stop'
event
python线程的事件用于主线程控制其他线程的执行,线程主要提供了三个方法:set、wait、clear
事件处理的机制:全局定义了一个"flag",值为False,,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。
- clear:将"flag"设置为False
- set:将"flag"设置为True
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
#!/usr/bin/env python # -*- coding:utf-8 -*- import threading def do(event): print 'start' event.wait() print 'execute' event_obj = threading.Event() for i in range ( 10 ): t = threading.Thread(target = do, args = (event_obj,)) t.start() event_obj.clear() inp = input ( 'input:' ) if inp = = 'true' : event_obj. set () |
使用线程队列
当多个线程需要共享数据的或者资源的时候,可能会使线程的使用变得复杂,线程的模块提供了许多同步原语,包括信号量、条件变量、事件和锁。当这些选项存在时,最佳实践是转而关注于使用队列,相比较而言,队列更容易处理,并且更容易处理,并且可以使线程编程更加安全,因为他们能够有效地传递单个线程资源的所有访问,并支持更加清晰的、可读性更高的设计模式。
url获取序列:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
import urllib2 import time hosts = [ "http://yahoo.com" , "http://google.com" , "http://amazon.com" , "http://ibm.com" , "http://apple.com" ] start = time.time() #grabs urls of hosts and prints first 1024 bytes of page for host in hosts: url = urllib2.urlopen(host) print url.read( 1024 ) print ( "Elapsed Time: %s" % (time.time() - start)) |
简单版本的线程池:
#!/usr/bin/env python # -*- coding:utf-8 -*- import Queue import threading class ThreadPool(object): def __init__(self, max_num=20): self.queue = queue.Queue(max_num) for i in xrange(max_num): self.queue.put(threading.Thread) def get_thread(self): return self.queue.get() def add_thread(self): self.queue.put(threading.Thread) pool = ThreadPool(10) def func(arg, p): print arg import time time.sleep(2) p.add_thread() for i in xrange(30): thread = pool.get_thread() t = thread(target=func, args=(i, pool)) t.start()
import queue import threading import contextlib StopEvent = object() class ThreadPool(object): def __init__(self, max_num): self.q = queue.Queue(max_num) self.max_num = max_num self.cancel = False self.generate_list = [] self.free_list = [] def run(self, func, args, callback=None): """ 线程池执行一个任务 :param func: 任务函数 :param args: 任务函数所需参数 :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数) :return: 如果线程池已经终止,则返回True否则None """ if self.cancel: return True if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: self.generate_thread() w = (func, args, callback,) self.q.put(w) def generate_thread(self): """ 创建一个线程 """ t = threading.Thread(target=self.call) t.start() def call(self): """ 循环去获取任务函数并执行任务函数 """ current_thread = threading.currentThread self.generate_list.append(current_thread) event = self.q.get() while event != StopEvent: func, arguments, callback = event try: result = func(*arguments) success = True except Exception, e: success = False result = None if callback is not None: try: callback(success, result) except Exception, e: pass with self.worker_state(self.free_list, current_thread): event = self.q.get() else: self.generate_list.remove(current_thread) def terminal(self): """ 终止线程池中的所有线程 """ self.cancel = True full_size = len(self.generate_list) while full_size: self.q.put(StopEvent) full_size -= 1 @contextlib.contextmanager def worker_state(self, state_list, worker_thread): """ 用于记录线程中正在等待的线程数 """ state_list.append(worker_thread) try: yield finally: state_list.remove(worker_thread)
#!/usr/bin/env python # -*- coding:utf-8 -*- import queue import threading import contextlib import time StopEvent = object() class ThreadPool(object): def __init__(self, max_num): self.q = queue.Queue() self.max_num = max_num self.terminal = False self.generate_list = [] self.free_list = [] def run(self, func, args, callback=None): """ 线程池执行一个任务 :param func: 任务函数 :param args: 任务函数所需参数 :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数) :return: 如果线程池已经终止,则返回True否则None """ if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: self.generate_thread() w = (func, args, callback,) self.q.put(w) def generate_thread(self): """ 创建一个线程 """ t = threading.Thread(target=self.call) t.start() def call(self): """ 循环去获取任务函数并执行任务函数 """ current_thread = threading.currentThread self.generate_list.append(current_thread) event = self.q.get() while event != StopEvent: func, arguments, callback = event try: result = func(*arguments) success = True except Exception as e: success = False result = None if callback is not None: try: callback(success, result) except Exception as e: pass with self.worker_state(self.free_list, current_thread): if self.terminal: event = StopEvent else: event = self.q.get() else: self.generate_list.remove(current_thread) def close(self): """ 执行完所有的任务后,所有线程停止 """ full_size = len(self.generate_list) while full_size: self.q.put(StopEvent) full_size -= 1 def terminate(self): """ 无论是否还有任务,终止线程 """ self.terminal = True while self.generate_list: self.q.put(StopEvent) self.q.empty() @contextlib.contextmanager def worker_state(self, state_list, worker_thread): """ 用于记录线程中正在等待的线程数 """ state_list.append(worker_thread) try: yield finally: state_list.remove(worker_thread) # How to use pool = ThreadPool(5) def callback(status, result): # status, execute action status # result, execute action return value pass def action(i): time.sleep(1) print(i) for i in range(30): ret = pool.run(action, (i,), callback) # pool.close() # pool.terminate()
#!/usr/bin/env python # -*- coding:utf-8 -*- import queue import threading import contextlib import time StopEvent = object() class ThreadPool(object): def __init__(self, max_num, max_task_num = None): if max_task_num: self.q = queue.Queue(max_task_num) else: self.q = queue.Queue() self.max_num = max_num self.cancel = False self.terminal = False self.generate_list = [] self.free_list = [] def run(self, func, args, callback=None): """ 线程池执行一个任务 :param func: 任务函数 :param args: 任务函数所需参数 :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数) :return: 如果线程池已经终止,则返回True否则None """ if self.cancel: return if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: self.generate_thread() w = (func, args, callback,) self.q.put(w) def generate_thread(self): """ 创建一个线程 """ t = threading.Thread(target=self.call) t.start() def call(self): """ 循环去获取任务函数并执行任务函数 """ current_thread = threading.currentThread self.generate_list.append(current_thread) event = self.q.get() while event != StopEvent: func, arguments, callback = event try: result = func(*arguments) success = True except Exception as e: success = False result = None if callback is not None: try: callback(success, result) except Exception as e: pass with self.worker_state(self.free_list, current_thread): if self.terminal: event = StopEvent else: event = self.q.get() else: self.generate_list.remove(current_thread) def close(self): """ 执行完所有的任务后,所有线程停止 """ self.cancel = True full_size = len(self.generate_list) while full_size: self.q.put(StopEvent) full_size -= 1 def terminate(self): """ 无论是否还有任务,终止线程 """ self.terminal = True while self.generate_list: self.q.put(StopEvent) self.q.empty() @contextlib.contextmanager def worker_state(self, state_list, worker_thread): """ 用于记录线程中正在等待的线程数 """ state_list.append(worker_thread) try: yield finally: state_list.remove(worker_thread) # How to use pool = ThreadPool(5) def callback(status, result): # status, execute action status # result, execute action return value pass def action(i): print(i) for i in range(30): ret = pool.run(action, (i,), callback) time.sleep(5) print(len(pool.generate_list), len(pool.free_list)) print(len(pool.generate_list), len(pool.free_list)) # pool.close() # pool.terminate()
queue模块:
queue就是对队列,它是线程安全的
举例来说,我们去肯德基吃饭。厨房是给我们做饭的地方,前台负责把厨房做好的饭卖给顾客,顾客则去前台领取做好的饭。这里的前台就相当于我们的队列。
这个模型也叫生产者-消费者模型
1
2
3
4
5
6
7
8
9
|
import queue q = queue.Queue(maxsize = 0 ) # 构造一个先进显出队列,maxsize指定队列长度,为0 时,表示队列长度无限制。 q.join() # 等到队列为kong的时候,在执行别的操作 q.qsize() # 返回队列的大小 (不可靠) q.empty() # 当队列为空的时候,返回True 否则返回False (不可靠) q.full() # 当队列满的时候,返回True,否则返回False (不可靠) q.put(item, block = True , timeout = None ) # 将item放入Queue尾部,item必须存在,可以参数block默认为True,表示当队列满时,会等待队列给出可用位置, 为False时为非阻塞,此时如果队列已满,会引发queue.Full 异常。 可选参数timeout,表示 会阻塞设置的时间,过后, 如果队列无法给出放入item的位置,则引发 queue.Full 异常q.get(block=True, timeout=None) # 移除并返回队列头部的一个值,可选参数block默认为True,表示获取值的时候,如果队列为空,则阻塞,为False时,不阻塞, 若此时队列为空,则引发 queue.Empty异常。 可选参数timeout,表示会阻塞设置的时候,过后,如果队列为空,则引发Empty异常。q.put_nowait(item) # 等效于 put(item,block=False)q.get_nowait() # 等效于 get(item,block=False) |
生产者--消费者:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
#!/usr/bin/env python import Queue import threading message = Queue.Queue( 10 ) def producer(i): while True : message.put(i) def consumer(i): while True : msg = message.get() for i in range ( 12 ): t = threading.Thread(target = producer, args = (i,)) t.start() for i in range ( 10 ): t = threading.Thread(target = consumer, args = (i,)) t.start() |
multiprocessing模块
multiprocessing是python的多进程管理包,和threading.Thread类似。直接从侧面用subprocesses替换线程使用GIL的方式,由于这一点,multiprocessing模块可以让程序员在给定的机器上充分的利用CPU。
在multiprocessing中,通过创建Process对象生成进程,然后调用它的start()方法,
1
2
3
4
5
6
7
8
9
|
from multiprocessing import Process def f(name): print ( 'hello' , name) if __name__ = = '__main__' : p = Process(target = f, args = ( 'bob' ,)) p.start() p.join() |
进程:
进程间的数据共享
在使用并发设计的时候最好尽可能的避免共享数据,尤其是在使用多进程的时候。 如果你真有需要 要共享数据, multiprocessing提供了两种方式。
Shared memory
数据可以用Value或Array存储在一个共享内存地图里,如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
from multiprocessing import Process, Value, Array def f(n, a): n.value = 3.1415927 for i in range ( len (a)): a[i] = - a[i] if __name__ = = '__main__' : num = Value( 'd' , 0.0 ) arr = Array( 'i' , range ( 10 )) p = Process(target = f, args = (num, arr)) p.start() p.join() print (num.value) print (arr[:]) |
输出:
1
2
|
3.1415927 [ 0 , - 1 , - 2 , - 3 , - 4 , - 5 , - 6 , - 7 , - 8 , - 9 ] |
创建num和arr时,“d”和“i”参数由Array模块使用的typecodes创建:“d”表示一个双精度的浮点数,“i”表示一个有符号的整数,这些共享对象将被线程安全的处理。Array(‘i’, range(10))中的‘i’参数:
1
2
|
‘c’: ctypes.c_char ‘u’: ctypes.c_wchar ‘b’: ctypes.c_byte ‘B’: ctypes.c_ubyte‘h’: ctypes.c_short ‘H’: ctypes.c_ushort ‘i’: ctypes.c_int ‘I’: ctypes.c_uint ‘l’: ctypes.c_long, ‘L’: ctypes.c_ulong ‘f’: ctypes.c_float ‘d’: ctypes.c_double |
Server process
由Manager()返回的manager提供list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array类型的支持
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
from multiprocessing import Process, Manager def f(d, l): d[ 1 ] = '1' d[ '2' ] = 2 d[ 0.25 ] = None l.reverse() if __name__ = = '__main__' : with Manager() as manager: d = manager. dict () l = manager. list ( range ( 10 )) p = Process(target = f, args = (d, l)) p.start() p.join() print (d) print (l) |
输出:
1
|
{ 0.25 : None , 1 : '1' , '2' : 2 }<br>[ 9 , 8 , 7 , 6 , 5 , 4 , 3 , 2 , 1 , 0 ] |
Server process manager比 shared memory 更灵活,因为它可以支持任意的对象类型。另外,一个单独的manager可以通过进程在网络上不同的计算机之间共享,不过他比shared memory要慢。
进程池:
Pool类描述了一个工作进程池,他有几种不同的方法让任务卸载工作进程。
进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止我们可以用Pool类创建一个进程池, 展开提交的任务给进程池。 例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
from multiprocessing import Pool import time def myFun(i): time.sleep( 2 ) return i + 100 def end_call(arg): print ( "end_call" ,arg) p = Pool( 5 ) # print(p.map(myFun,range(10))) for i in range ( 10 ): p.apply_async(func = myFun,args = (i,),callback = end_call) print ( "end" )p.close() p.join() |
from multiprocessing import Pool, TimeoutError import time import os def f(x): return x*x if __name__ == '__main__': # 创建4个进程 with Pool(processes=4) as pool: # 打印 "[0, 1, 4,..., 81]" print(pool.map(f, range(10))) # 使用任意顺序输出相同的数字, for i in pool.imap_unordered(f, range(10)): print(i) # 异步执行"f(20)" res = pool.apply_async(f, (20,)) # 只运行一个进程 print(res.get(timeout=1)) # 输出 "400" # 异步执行 "os.getpid()" res = pool.apply_async(os.getpid, ()) # 只运行一个进程 print(res.get(timeout=1)) # 输出进程的 PID # 运行多个异步执行可能会使用多个进程 multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)] print([res.get(timeout=1) for res in multiple_results]) # 是一个进程睡10秒 res = pool.apply_async(time.sleep, (10,)) try: print(res.get(timeout=1)) except TimeoutError: print("发现一个 multiprocessing.TimeoutError异常") print("目前,池中还有其他的工作") # 退出with块中已经停止的池 print("Now the pool is closed and no longer available")
1
|
class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]]) |
- processes :使用的工作进程的数量,如果processes是None那么使用 os.cpu_count()返回的数量。
- initializer: 如果initializer是None,那么每一个工作进程在开始的时候会调用initializer(*initargs)。
- maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个心的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild默认是None,意味着只要Pool存在工作进程就会一直存活。
- context: 用在制定工作进程启动时的上下文,一般使用 multiprocessing.Pool() 或者一个context对象的Pool()方法来创建一个池,两种方法都适当的设置了context注意:Pool对象的方法只可以被创建pool的进程所调用。进程池的方法
-
apply(func[, args[, kwds]]) :使用arg和kwds参数调用func函数,结果返回前会一直阻塞,由于这个原因,apply_async()更适合并发执行,另外,func函数仅被pool中的一个进程运行。
-
apply_async(func[, args[, kwds[, callback[, error_callback]]]]) : apply()方法的一个变体,会返回一个结果对象。如果callback被指定,那么callback可以接收一个参数然后被调用,当结果准备好回调时会调用callback,调用失败时,则用error_callback替换callback。 Callbacks应被立即完成,否则处理结果的线程会被阻塞。
-
close() : 阻止更多的任务提交到pool,待任务完成后,工作进程会退出。
-
terminate() : 不管任务是否完成,立即停止工作进程。在对pool对象进程垃圾回收的时候,会立即调用terminate()。
-
join() : wait工作线程的退出,在调用join()前,必须调用close() or terminate()。这样是因为被终止的进程需要被父进程调用wait(join等价与wait),否则进程会成为僵尸进程。
-
map(func, iterable[, chunksize])¶
-
map_async(func, iterable[, chunksize[, callback[, error_callback]]])¶
-
imap(func, iterable[, chunksize])¶
-
imap_unordered(func, iterable[, chunksize])
-
starmap(func, iterable[, chunksize])¶
-
starmap_async(func, iterable[, chunksize[, callback[, error_back]]])
协程
协程又叫微线程,从技术的角度来说,“协程就是你可以暂停执行的函数”。如果你把它理解成“就像生成器一样”,那么你就想对了。 线程和进程的操作是由程序触发系统接口,最后的执行者是系统;协程的操作则是程序员。
协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续)。协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序。
协程的适用场景:当程序中存在大量不需要CPU的操作时(IO),适用于协程;
Event Loop
Event Loop是一种等待程序分配时间或消息的编程架构。简单的说就是 当事件A发生的时候,我们就去执行事件B。 最简单的例子就是:当我们浏览网页的时候,我们点击页面的某个元素,这个点击事件会被 JavaScript 捕捉到,然后 JavaScript 就会检查这个事件是否绑定了onclick()回调函数来处理这个事件,只要绑定了,onclick()回调函数就会被执行。
event loop是协程执行的控制点, 如果你希望执行协程, 就需要用到它们。
event loop提供了如下的特性:
- 注册、执行、取消延时调用(异步函数)
- 创建用于通信的client和server协议(工具)
- 创建和别的程序通信的子进程和协议(工具)
- 把函数调用送入线程池中
协程示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
import asyncio async def cor1(): print ( "COR1 start" ) await cor2() print ( "COR1 end" ) async def cor2(): print ( "COR2" ) loop = asyncio.get_event_loop() loop.run_until_complete(cor1()) loop.close() |
最后三行是重点:
- asyncio.get_event_loop() : asyncio启动默认的event loop
- run_until_complete() : 这个函数是阻塞执行的,知道所有的异步函数执行完成,
- close() : 关闭event loop。
subprocess模块
通过使用subprocess模块可以创建新的进程,连接到他们的输入/输出/错误管道,并获取他们的返回值。 该模块计划替代及一个旧的模块的方法:
1
2
|
os.system os.spawn * |
使用subprocess模块
在所有用例调用subprocess时推荐使用run()方法,更高级的用例,可以直接使用subprocess.Popen接口。
run()方法
在Python3.5增加的。
1
|
subprocess.run(args, * , stdin = None , input = None , stdout = None , stderr = None , shell = False , timeout = None , check = False ) |
run()默认不会捕捉到标准输出和标准错误输出,要捕捉的话,可以为标准输出和标准错误输出指定subprocess.PIPE(一个特殊值,可被用于Popen的stdin, stdout或 stderr参数,表示一个标准流的管道应该被打开, Popen.communicate()用的最多)。
- args :args应该是一个字符串或者一个序列。
- timeout:设置超时时间,会传递给subprocess.Popen.communicate()。如果超时,子进程会被杀死并等待。子进程被终止后会报告一个 TimeoutExpired异常。
- input参数会传递给subprocess.Popen.communicate(),从而作为subprocess的标准输入。当我们使用的时候,内部的Popen对象会自动创建stdin=PIPE,stdin参数可能不会被使用。
- check:如果check参数为True,且进程退出的时候得到退出码一个非0的值,会报告一个 CalledProcessError异常
- shell:shell参数默认为False,此时arg参数应该是一个列表。
subprocess.run(["ls","-l"]) ;
当shell=True时,args可以是一个字符串。subprocess.run("ls -l",shell=True)
。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
>>> ret = subprocess.run([ "ls" , "-l" ]) # doesn't capture output CompletedProcess(args = [ 'ls' , '-l' ], returncode = 0 ) >>> print (ret.stdout) None >>> subprocess.run( "exit 1" , shell = True , check = True ) Traceback (most recent call last): ... subprocess.CalledProcessError: Command 'exit 1' returned non - zero exit status 1 >>> ret1 = subprocess.run([ "ls" , "-l" , "/dev/null" ], stdout = subprocess.PIPE) CompletedProcess(args = [ 'ls' , '-l' , '/dev/null' ], returncode = 0 , stdout = b 'crw-rw-rw- 1 root root 1, 3 Jan 23 16:23 /dev/null
' ) >>> print (ret.stdout) b 'crw-rw-rw- 1 root root 1, 3 6xe6x9cx88 8 06:50 /dev/null
' |
call方法
call()方法等价于:run(..., check=True)
和run()方法类所以,只是不支持input参数和check参数;
注意: 不要在这个方法里使用stdout=PIPE 或 stderr=PIPE,当到一个管道的输出填满系统的管道缓存时,子进程会被阻塞。
check_call方法
1
|
subprocess.check_output(args, * , stdin = None , stderr = None , shell = False , universal_newlines = False , timeout = None ) |
check_call()方法等价于: run(..., check=True, stdout=PIPE).stdout
3.1新增,3.3时增加了timeout参数,3.4时增加了对关键字参数的支持
check_output()方法
内部调用的是run()方法,但是会捕捉到stdout
1
2
3
|
>>> ret = subprocess.check_output([ "ls" , "-l" , "/dev/null" ]) >>> print (ret) b 'crw-rw-rw- 1 root root 1, 3 6xe6x9cx88 8 06:50 /dev/null
' |
Popen类
上面的四个方法本质上调用的都是subprocess中的Popen类。
Popen对象都有以下方法:
poll() : 检查子进程是否已经终止,返回一个returncode,相当于exit code。
wait() : 等待子进程终止,返回一个returncode
communicate(input=None) :和进程进行交互:发送数据到stdin;从stdout和stderr读取数据,直到读取完。等待进程终止。可选参数input会传递数据给子进程,当input=None事,没有数据传递给子进程。 communicate() 返回一个元组 (stdout, stderr). 注意: 读取的数据在内存的buffer中,所以当数据大小大于buffer或者没有限制时,不要使用这个方法。