线程:
线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。
threading模块:
threading模块主要用于线程相关的操作。
线程方法:
start 线程准备就绪,等待CPU调度
setName 为线程设置名称
getName 获取线程名称
setDaemon 设置为后台线程或前台线程(默认), 如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止。如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止。
join 逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义
run 线程被cpu调度后自动执行线程对象的run方法
调用线程的两种方式:
1、直接调用
import threading from time import sleep def mythread(num):#定义每个线程要执行的操作(线程要做的事情) print 'the thread num is %s' % num sleep(3) if __name__ == '__main__': t1 = threading.Thread(target=mythread,args=(1,))#实例化一个线程,执行mythread函数,传递参数1 t2 = threading.Thread(target=mythread,args=(2,)) t1.start()#启动线程1 t2.start() print t1.getName()#获取线程1的名字 print t2.getName() 执行结果: the thread num is 1 the thread num is 2 Thread-1 Thread-2
线程1和线程2同时执行完毕,然后sleep(3)。并不是执行完线程1后sleep(3),然后再执行线程2。因此两个线程是并行执行。
2、继承调用
import threading from time import sleep class mythread(threading.Thread): def __init__(self,num): threading.Thread.__init__(self)#先继承父类中的构造方法(__init__) self.num = num def run(self):#必须有一个run方法 print 'the thread num is %s ' % self.num sleep(3) if __name__ == '__main__': t1 = mythread(1) t2 = mythread(2) t1.start() t2.start() print t1.getName() print t2.getName()
执行结果同上。
主线程等待子线程:
以上两个例子中,主线程启动了两个子线程,并和两个子线程同时执行。如果让主线程等待子线程执行完毕后再执行,可以如下操作:
import threading from time import sleep def mythread(num): print 'the thread num is %s' % num sleep(3) if __name__ == '__main__': t1 = threading.Thread(target=mythread,args=(1,)) t2 = threading.Thread(target=mythread,args=(2,)) t1.start() t2.start() print t1.getName() print t2.getName() t1.join()#主线程等待t1执行完毕 t2.join()#主线程等待t2执行完毕 print '---main---'#两个子线程执行完,等待3秒,主线程执行。
同时启动10个线程,主线程与子线程同时执行:
import threading from time import sleep def mythread(num): print 'the thread num is %s' % num sleep(3) if __name__ == '__main__': for i in range(10): t = threading.Thread(target=mythread,args=(i,)) t.start() print '---main---'
同时启动10个线程,主线程等待子线程执行完后再执行:
import threading from time import sleep def mythread(num): print 'the thread num is %s' % num sleep(3) t_list = [] if __name__ == '__main__': for i in range(10): t = threading.Thread(target=mythread,args=(i,)) t.start() t_list.append(t)#将子线程加到列表中 for i in t_list: t.join()#等待列表中的全部子线程执行完毕,主线程再执行 print '---main---'
守护进程:
setDaemon实例1:
import time import threading def run(n): print '%s running,,,' % n time.sleep(2) print 'done...' def main(): for i in range(5): t = threading.Thread(target=run,args=(i,)) t.start() t.join(1) print 'start %s' % t.getName() m = threading.Thread(target=main,args=()) m.setDaemon(True)#将主线程设置为Daemon(守护)线程,它退出时,其它子线程会同时退出,不管是否执行完任务 m.start() m.join(2) print 'main thread done...' #执行结果: 0 running,,, start Thread-2 1 running,,, main thread done...
setDaemon实例2:
import time import threading def run(n): print '%s running,,,' % n time.sleep(2) print 'done...' def main(): for i in range(5): t = threading.Thread(target=run,args=(i,)) t.start() t.join(1) print 'start %s' % t.getName() m = threading.Thread(target=main,args=()) m.setDaemon(True) m.start() #m.join(2) print 'main thread done...' #执行结果: main thread done...
GIL(Glogbal Interpreter Lock):
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)
在 CPython中,某一时刻只有一个线程在运行。
首先需要明确的一点是GIL
并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。就好比C++是一套语言(语法)标准,但是可以用不同的编译器来编译成可执行代码。有名的编译器例如GCC,INTEL C++,Visual C++等。Python也一样,同样一段代码可以通过CPython,PyPy,Psyco等不同的Python执行环境来执行。像其中的JPython就没有GIL。然而因为CPython是大部分环境下默认的Python执行环境。所以在很多人的概念里CPython就是Python,也就想当然的把GIL
归结为Python语言的缺陷。所以这里要先明确一点:GIL并不是Python的特性,Python完全可以不依赖于GIL。
线程锁/互斥锁:
一个进程下可以启动多个线程,多个线程共享父进程的内存空间,也就意味着每个线程可以访问同一份数据。此时,如果多个线程同时要修改同一份数据,那么将会导致线程运算结果不准确。
eg:
import time import threading def add(): global num time.sleep(1)#打乱每个线程的顺序 num -=1 print num num = 10 for i in range(10): t = threading.Thread(target=add,args=()) t.start() #执行结果: 9
8 6 7 5
4 3 2
1 0
由结果可以看出,多个线程同时操作一份共享数据,导致将结果不可预期,也称“线程不安全”。
要使结果准确,需要确保某一时刻只有一个线程可以对数据进行操作———线程锁。
eg:
import time import threading def add(): global num time.sleep(1)#打乱每个线程的顺序 lock.acquire()#添加一个进程锁 num -=1 print num lock.release()#解除进程锁 lock = threading.Lock()#实例化一个lock num = 10 for i in range(10): t = threading.Thread(target=add,args=()) t.start() #运行结果: 9 8 7 6 5 4 3 2 1 0
递归锁:
在大锁中还要包含小锁:
def run1(): print("grab the first part data") lock.acquire() global num num +=1 lock.release() return num def run2(): print("grab the second part data") lock.acquire() global num2 num2+=1 lock.release() return num2 def run3(): lock.acquire()#在运行run1和run2之前加锁,确保执行完run1后接着执行run2,并且在这过程中没有其他的线程来执行run1和run2 res = run1() print('--------between run1 and run2-----') res2 = run2() lock.release() print(res,res2) if __name__ == '__main__': num,num2 = 0,0 lock = threading.RLock() for i in range(10): t = threading.Thread(target=run3) t.start() while threading.active_count() != 1: print(threading.active_count()) else: print('----all threads done---') print(num,num2)
GIL(全局锁)与线程锁:
既然有GIL了,为什么还要线程锁呢???
GIL是防止底层多个C原生线程(CPython)同一时间修改同一数据,就是说python解释器不能并行的执行代码。线程锁是为了防止多个线程同时修改同一份数据。
多进程:
p2.start() print '===main=== #执行结果: ===main=== hello ahaii hello tom
主进程等待子进程执行完毕,再执行:
import multiprocessing from time import sleep def say(name): print 'hello %s' % name sleep(2) if __name__ == '__main__': p1 = multiprocessing.Process(target=say,args=('ahaii',)) p2 = multiprocessing.Process(target=say,args=('tom',)) p1.start() p2.start() p1.join() p2.join() print '===main===' #执行结果: hello ahaii hello tom#执行完后,等待2s,执行主进程。 ===main===
进程之间通信
线程之间可以相互访问,而不同进程之间内存不是共享的,不能相互访问。因此,要实现通信,需要通过第三方方法(管道)。
Queue队列
python中,队列是进程之间进行数据交换的主要形式。Queue模块提供了队列的操作。无论多少个线程向队列中放数据或者取数据,Queue同一时刻只允许一个线程在操作,即它自带锁。
Queue.put():向队列中放数据
Queue.get():从队列中取数据
Queue.qsize():查看队列中剩余数据个数
常用方法:
import Queue #q = Queue.Queue(maxsize=3)#最大长度 #print q.get() #print q.get(timeout=2) q = Queue.PriorityQueue(maxsize=3)#优先级队列 q.put((3,[1,2,3]))#第一组数据为优先级,第二组为数据 q.put((1,22)) q.put((2,33)) #q.put(5)#队列设置长度为3,第四次put时,由于队列已满,会发生阻塞。可以设置超时时间避免阻塞,如:q.put(5,timeout=2) # print q.qsize() # print q.full()#判断队列是否已满,False or True # print q.empty()#判断队列是否已空,False or True print q.get() print q.get() print q.get() #执行结果: (1, 22) (2, 33) (3, [1, 2, 3])
默认Queue队列遵循先进先出的原则,如有特殊要求,可以使用优先级队列。在优先级队列中,优先级最小,权重越大,越现被取到。
eg:
from multiprocessing import Process,Queue def func(q): q.put([123,'hello']) if __name__ == '__main__': q = Queue() p1 = Process(target=func,args=(q,))#子进程执行func函数,向队列中put数据 p1.start() print q.get()#父进程向队列中get数据 p1.join() #执行结果: [123, 'hello']
Queue队列在取数据时,时按照数据存放的先后顺序,每次只能存、取一个数据,顺序遵循“先进先出”的原则。
多次存、取数据:
eg:
from multiprocessing import Process,Queue def func(q): q.put([123,'hello']) if __name__ == '__main__': q = Queue() p1 = Process(target=func,args=(q,))#子进程执行func函数,向队列中put数据 p2 = Process(target=func,args=(q,))#子进程执行func函数,向队列中put数据 p1.start() p2.start() print q.get()#父进程向队列中get数据 print q.get() p1.join() p2.join() #执行结果: [123, 'hello'] [123, 'hello']
Managers:
from multiprocessing import Manager,Process def f(d,l):#传入一个字典和一个列表 d[1] = '1'#修改字典中的值 d['2'] = 2 d[0.25] = None l.append(1)#列表追加1 print l if __name__ == '__main__': with Manager() as manager:#这种写法好处是,manager执行完后自动销毁。 #manager = Manager() d = manager.dict()#生成一个字典 l = manager.list(range(5))#生成一个列表 p_list = [] for i in range(10):#生成10个进程 p = Process(target=f,args=(d,l)) p.start() p_list.append(p) for res in p_list: res.join()#等待10个线程执行完毕 print d#主进程执行 print l #执行结果: [0, 1, 2, 3, 4, 1] [0, 1, 2, 3, 4, 1, 1] [0, 1, 2, 3, 4, 1, 1, 1] [0, 1, 2, 3, 4, 1, 1, 1, 1] [0, 1, 2, 3, 4, 1, 1, 1, 1, 1] [0, 1, 2, 3, 4, 1, 1, 1, 1, 1, 1] [0, 1, 2, 3, 4, 1, 1, 1, 1, 1, 1, 1] [0, 1, 2, 3, 4, 1, 1, 1, 1, 1, 1, 1, 1] [0, 1, 2, 3, 4, 1, 1, 1, 1, 1, 1, 1, 1, 1] [0, 1, 2, 3, 4, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1] {0.25: None, 1: '1', '2': 2} [0, 1, 2, 3, 4, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
由结果可以看出,每个子进程都向同一个列表中追加了1。
进程池:
由于进程比较消耗资源,因此当系统中运行多个进程时,需要设定一个限制。进程池就是设定系统中同一时刻,最多有多少个进程执行。
eg:
from multiprocessing import Process,Pool import time def Foo(i): time.sleep(2) # print 'hello' return i+100 def Bar(arg): print('-->exec done:',arg) pool = Pool(3)#限制最多执行5个进程 for i in range(10): pool.apply_async(func=Foo, args=(i,),callback=Bar)#异步,执行完Foo后,接着执行Bar(回调), # pool.apply(func=Foo, args=(i,))#同步,每个进程串行 print('end') pool.close() pool.join()#进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭