一、线程
线程是程序工作的最小单元,由进程生成,生成的线程间会共享内存空间。Python中创建线程比较简单,导入threading模块,创建线程实例。下面这段代码是一个简单的多线程例子
1 import threading 2 import time 3 def func(i): 4 time.sleep(1) 5 print (i) 6 7 8 for i in range(10): 9 t = threading.Thread(target=func,args=(i,)) 10 t.start() 11 12 >>> 13 14 1 15 0 16 5 17 4 18 3 19 2 20 7 21 8 22 6 23 9
该段代码创建了10个线程,打印出每个线程的编号,可以看到线程是并发执行的,并且打印数字的顺序也不一样,这是因为CPU调度线程的顺序不一样。
threading方法:
- t.start() 激活线程
- t.getName() 获取线程的名称
- t.setName() 设置线程名称
- t.name() 获取或设置线程的名称
- t.is_alive() 判断线程是否为激活状态
- t.isAlive() 判断线程是否为激活状态
- t.setDaemon 将线程设置为前台或后台线程。后台线程是指:当主线程执行完毕后,不管子线程是否执行完,程序就此停止。如果是前台线程,主线程执行完后会等待子线程执行完毕,程序才会停止。
- t.isDaemon() 判断是否为后台进程
- t.join() 逐个执行每个线程,执行完毕后继续往下执行
二、线程锁
锁可以避免同一时间线程间同时操作一个资源时造成严重后果(两个线程同时操作同一资源,得出的结果却不是想要的)。
''' Created on 2016年7月27日 @author: baitutu ''' import threading import time global_var = 0 lock = threading.RLock()#获得锁的实例 def func(): lock.acquire()#获得锁 global global_var global_var += 1 time.sleep(1) print(global_var) lock.release()#释放锁 for i in range(10): t = threading.Thread(target=func) t.start()
三、Event
Event用于线程间通信,比如主线程控制其他线程执行。原理是主线程发送信号,其他线程等待信号。
- Event.wait(timeout) 阻塞线程,直到Event对象标识位设置为True或超时。
- Event.set() 设置标识位为True
- Event.clear() 设置标识位为False
- Event.isSet() 判断标识位是否为True
import threading def do(event): print('start') event.wait() #阻塞线程执行,直到主线程输入true print('execute') event_obj = threading.Event()#创建Event对象 for i in range(10):#生成10个子线程 t = threading.Thread(target=do, args=(event_obj,)) t.start() event_obj.clear()#标识位置为False inp = input('input:') if inp == 'true': event_obj.set()#设置标识位为True
四、队列
简单理解为一种数据结构,可以用来安全的传递多线程信息。主要有四种,先进先出(queue.Queue()),先进后出(queue.LifoQueue()),优先级队列(queue.PriorityQueue()),双向队列(queue.deque())
queue的主要方法:
q
=
queue.Queue(maxsize
=
0
)
# 构造一个先进显出队列,maxsize指定队列长度,为0时,表示队列长度无限制。
q.join()
# 等到队列为kong的时候,在执行别的操作
q.qsize()
# 返回队列的大小 (不可靠)
q.empty()
# 当队列为空的时候,返回True 否则返回False (不可靠)
q.full()
# 当队列满的时候,返回True,否则返回False (不可靠)
q.put(item, block
=
True
, timeout
=
None
)
# 将item放入Queue尾部,item必须存在,参数block默认为True,表示当队列满时,会等待
# 为False时为非阻塞,此时如果队列已满,会引发queue.Full 异常。 可选参数timeout,表示会阻塞设置的时间,
# 如果在阻塞时间里 队列还是无法放入,则引发 queue.Full 异常
q.get(block
=
True
, timeout
=
None
)
# 移除并返回队列头部的一个值,可选参数block默认为True,表示获取值的时候,如果队列为空,则阻塞
# 阻塞的话若此时队列为空,则引发queue.Empty异常。 可选参数timeout,表示会阻塞设置的时间,
q.get_nowait()
# 等效于 get(item,block=False)
生产者消费者模型
import queue import threading que = queue.Queue(10) def p(i): que.put(i) #往队列中放数据 def g(i): g = que.get(i) #从队列中取数据 print("get:", g) for i in range(10): t = threading.Thread(target=p, args=(i,)) t.start() for i in range(10): t = threading.Thread(target=g, args=(i,)) t.start()
五、进程
进程间的内存空间都是独占的,因此会比较消耗内存
from multiprocessing import Process def Foo(a, dic): dic[a] = 100 + a print(len(dic)) i = [] if __name__ == "__main__": for i in range(10): m = Process(target=Foo, args=(i, dic,)) m.start() m.join()
六、进程池
进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会阻塞,直到进程池中有可用进程为止。
-
apply(func[, args[, kwds]]) :使用arg和kwds参数调用func函数,结果返回前会一直阻塞,由于这个原因,apply_async()更适合并发执行,另外,func函数仅被pool中的一个进程运行。
-
apply_async(func[, args[, kwds[, callback[, error_callback]]]]) : apply()方法的一个变体,会返回一个结果对象。如果callback被指定,那么callback可以接收一个参数然后被调用,当结果准备好回调时会调用callback,调用失败时,则用error_callback替换callback。 Callbacks应被立即完成,否则处理结果的线程会被阻塞。
-
close() : 阻止更多的任务提交到pool,待任务完成后,工作进程会退出。
-
terminate() : 不管任务是否完成,立即停止工作进程。在对pool对象进程垃圾回收的时候,会立即调用terminate()。
-
join() : wait工作线程的退出,在调用join()前,必须调用close() or terminate()。这样是因为被终止的进程需要被父进程调用wait(join等价与wait),否则进程会成为僵尸进程
from multiprocessing import Pool import time def f1(a): time.sleep(1) print(a+1) return 1000 def f2(a): print(a) if __name__ == "__main__": pool = Pool(5) for i in range(10): #apply执行任务是串行的,因为结果是一个一个输出 ,前台线程 # pool.apply(func=f1, args=(i,)) #循环申请10个进程,但进程池最多只有5个,因为会一次性申请5个进程,然后再次申请 (并发),callback设置回调函数,后台线程 pool.apply_async(func=f1, args=(i,), callback=f2) pool.close() pool.join()#进程池中的进程执行完才关闭,如果注释,apply_async是不等待子进程执行完毕的。 # #