#!/usr/bin/env python # -*- coding: utf-8 -*- # Created by Mona on 2017/7/18 import threading,time #方式一:开启多线程 def foo(n): time.sleep(n) print('this is a test message from foo!') def bar(m): time.sleep(m) print('this is a test message from bar!') t1 = threading.Thread(target=foo,args=(2,)) #利用threading.Thread 实例化线程对象,target传入执行的任务,args传入target里需要的参数,必须是元组形式 t1.start() #启动线程 print(t1.name) #打印线程的名字 t2 = threading.Thread(target=bar,args=(3,)) t2.start() print(t2.name) print(threading.enumerate()) #打印所有线程的信息 print(threading.active_count()) #打印所有正在运行的线程 #print 打印的信息如下: 因为子线程有sleep时间,所以主线程先运行 '''Thread-1 Thread-2 [<_MainThread(MainThread, started 140736054596544)>, <Thread(Thread-1, started 123145453486080)>, <Thread(Thread-2, started 123145458741248)>] 3 this is a test message from foo! this is a test message from bar!''' #方式二:创建类启动线程 class MyThread(threading.Thread): #继承threading.Thread 类派生自己的类 def __init__(self,n): #继承父类的init并派生自己的属性 super().__init__() self.n = n def run(self): #重写run方法 self.foo(self.n) def foo(self,n): # 创建自己的属性 time.sleep(n) print('this is a test message from foo!') t = MyThread(2) t.start() #print打印的结果: this is a test message from foo!
多线程的作用:节省资源,实现并行
#单线程
def bar(m): time.sleep(m) print('this is a test message from bar!') s= time.time() for i in range(10): bar(2) print('cost time:',time.time()-s) '''this is a test message from bar! this is a test message from bar! this is a test message from bar! this is a test message from bar! this is a test message from bar! this is a test message from bar! this is a test message from bar! cost time: 20.03743004798889''' #多线程: def bar(m): time.sleep(m) print('this is a test message from bar!') s=time.time() l = [] for i in range(10): t = threading.Thread(target=bar,args=(2,)) t.start() l.append(t) for t in l: t.join() #阻塞主进程,等带子线程结束,在执行主进程 print('cost time :',time.time()-s) '''this is a test message from bar! this is a test message from bar! this is a test message from bar! this is a test message from bar! this is a test message from bar! this is a test message from bar! this is a test message from bar! this is a test message from bar! this is a test message from bar! this is a test message from bar! cost time : 2.0072948932647705'''
守护线程:
# setDaemon(True): ''' 将线程声明为守护线程,必须在start() 方法调用之前设置,如果不设置为守护线程程序会被无限挂起。 当我们在程序运行中,执行一个主线程,如果主线程又创建一个子线程,主线程和子线程 就分兵两路,分别运行,那么当主线程完成 想退出时,会检验子线程是否完成。如果子线程未完成,则主线程会等待子线程完成后再退出。但是有时候我们需要的是只要主线程 完成了,不管子线程是否完成,都要和主线程一起退出,这时就可以 用setDaemon方法啦'''
def foo(): time.sleep(0.1) print('this is a test message from foo!') def bar(): time.sleep(2) print('this is a test message from bar!') s = time.time() t1 = threading.Thread(target=foo) t1.start() t2 = threading.Thread(target=bar) t2.setDaemon(True) t2.start() t1.join() print('cost time:',time.time()-s) '''this is a test message from foo! cost time: 0.10495209693908691''' ##主进程和T1执行完,程序结束 ef foo(): time.sleep(0.1) print('this is a test message from foo!') def bar(): time.sleep(2) print('this is a test message from bar!') s = time.time() t1 = threading.Thread(target=foo) t1.start() t2 = threading.Thread(target=bar) # t2.setDaemon(True) t2.start() t2.join() t1.join() print('cost time:',time.time()-s) '''this is a test message from foo! this is a test message from bar! cost time: 2.000761032104492''' #如果不设置守护线程,所有线程结束完,程序结束
互斥锁:
全局解释器锁(Global Interpreter Lock)来互斥线程对Python虚拟机的使用。为了支持多线程机制,一个基本的要求就是需要实现不同线程对共享资源访问的互斥,所以引入了GIL。即同一进程的多线程只有一个线程被执行
GIL:在一个线程拥有了解释器的访问权之后,其他的所有线程都必须等待它释放解释器的访问权,即使这些线程的下一条指令并不会互相影响。
在调用任何Python C API之前,要先获得GIL
GIL缺点:多处理器退化为单处理器;优点:避免大量的加锁解锁操作
为例解决互斥锁带来的弊端,python里引用下列锁来处理:
同步锁:加在修改数据的前后,将修改数据的那段代码变成串行,即不管有没有I/O阻塞,都将执行完锁里的代码再正常运行其他的代码
import threading num = 100 def minus(): global num #计算的结果再赋值给全部num num -= 1 l = [] for i in range(100): #循环100次,每次num减一 t = threading.Thread(target=minus) t.start() l.append(t) for t in l : #等待所有进程结束,打印num的值 t.join() print('result:',num) #result: 0 '''出现这种结果的原因是:第一个线程运行到tmp = num =100时,进入I/O阻塞, 状态被挂起,cpu分配给其他线程,因为没有计算num,此时num仍等于100,tmp = num =100时,再次进入I/O阻塞,cpu切换给下个线程。。。。一直等到所有线程都运行了一遍,之前睡的线程可能还没醒。等之前睡的线程重新执行,num = 99 全局变量被修改了100次,但是值都是99''' #为了解决这个问题,引入同步锁: import time import threading num = 100 def minus(): global num lock.acquire() #加锁 tmp = num #将num赋值给tmp time.sleep(0.1) #等待o.1秒再计算 num = tmp -1 lock.release() #释放锁 time.sleep(2) lock = threading.Lock() #创建锁实例对象 l = [] for i in range(100): t = threading.Thread(target=minus) t.start() l.append(t) for t in l : t.join() print('result:',num) #result: 0
死锁与递归锁:
class MyThread(threading.Thread): def __init__(self): super().__init__() def run(self): self.bar() self.foo() def bar(self): lockA.acquire() print('i am %s get A ----------%s'%(self.name,time.ctime())) lockB.acquire() print('i am %s get B ----------%s' % (self.name, time.ctime())) lockB.release() lockA.release() def foo(self): lockB.acquire() time.sleep(0.5) print('i am %s get B ----------%s' % (self.name, time.ctime())) lockA.acquire() print('i am %s get A ----------%s' % (self.name, time.ctime())) lockA.release() lockB.release() lockA = threading.Lock() lockB = threading.Lock() for i in range(10): t = MyThread() t.start() ''''i am Thread-1 get A ----------Tue Jul 18 16:14:38 2017 i am Thread-1 get B ----------Tue Jul 18 16:14:38 2017 i am Thread-2 get A ----------Tue Jul 18 16:14:38 2017 i am Thread-1 get B ----------Tue Jul 18 16:14:38 2017''' #出现死锁的原因是线程1 运行到foo 函数时,进入lockb锁,等待时,cpu分配给下个线程,进入lockA中,执行完代码,需要lockB可是,lockB被线程1占用。线程1等待时间结束,拿到cpu继续执行,需要lockA,可此时lockA被第二个线程占用,故卡死成死锁
递归锁:解决死锁问题,引入递归锁 :RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁。
class MyThread(threading.Thread): def __init__(self): super().__init__() def run(self): self.bar() self.foo() def bar(self): lockR.acquire() print('i am %s get A ----------%s'%(self.name,time.ctime())) lockR.acquire() print('i am %s get B ----------%s' % (self.name, time.ctime())) lockR.release() lockR.release() def foo(self): lockR.acquire() time.sleep(0.5) print('i am %s get B ----------%s' % (self.name, time.ctime())) lockR.acquire() print('i am %s get A ----------%s' % (self.name, time.ctime())) lockR.release() lockR.release() lockR = threading.RLock() for i in range(10): t = MyThread() t.start() '''i am Thread-1 get A ----------Tue Jul 18 16:24:37 2017 i am Thread-1 get B ----------Tue Jul 18 16:24:37 2017 i am Thread-1 get B ----------Tue Jul 18 16:24:37 2017 i am Thread-1 get A ----------Tue Jul 18 16:24:37 2017 i am Thread-2 get A ----------Tue Jul 18 16:24:37 2017 i am Thread-2 get B ----------Tue Jul 18 16:24:37 2017 i am Thread-2 get B ----------Tue Jul 18 16:24:38 2017 i am Thread-2 get A ----------Tue Jul 18 16:24:38 2017 i am Thread-4 get A ----------Tue Jul 18 16:24:38 2017 i am Thread-4 get B ----------Tue Jul 18 16:24:38 2017 i am Thread-4 get B ----------Tue Jul 18 16:24:38 2017 i am Thread-4 get A ----------Tue Jul 18 16:24:38 2017 i am Thread-6 get A ----------Tue Jul 18 16:24:38 2017 i am Thread-6 get B ----------Tue Jul 18 16:24:38 2017 i am Thread-6 get B ----------Tue Jul 18 16:24:39 2017 i am Thread-6 get A ----------Tue Jul 18 16:24:39 2017 i am Thread-8 get A ----------Tue Jul 18 16:24:39 2017 i am Thread-8 get B ----------Tue Jul 18 16:24:39 2017 i am Thread-8 get B ----------Tue Jul 18 16:24:39 2017 i am Thread-8 get A ----------Tue Jul 18 16:24:39 2017 i am Thread-10 get A ----------Tue Jul 18 16:24:39 2017 i am Thread-10 get B ----------Tue Jul 18 16:24:39 2017 i am Thread-10 get B ----------Tue Jul 18 16:24:40 2017 i am Thread-10 get A ----------Tue Jul 18 16:24:40 2017 i am Thread-5 get A ----------Tue Jul 18 16:24:40 2017 i am Thread-5 get B ----------Tue Jul 18 16:24:40 2017 i am Thread-5 get B ----------Tue Jul 18 16:24:40 2017 i am Thread-5 get A ----------Tue Jul 18 16:24:40 2017 i am Thread-9 get A ----------Tue Jul 18 16:24:40 2017 i am Thread-9 get B ----------Tue Jul 18 16:24:40 2017 i am Thread-9 get B ----------Tue Jul 18 16:24:41 2017 i am Thread-9 get A ----------Tue Jul 18 16:24:41 2017 i am Thread-7 get A ----------Tue Jul 18 16:24:41 2017 i am Thread-7 get B ----------Tue Jul 18 16:24:41 2017 i am Thread-7 get B ----------Tue Jul 18 16:24:41 2017 i am Thread-7 get A ----------Tue Jul 18 16:24:41 2017 i am Thread-3 get A ----------Tue Jul 18 16:24:41 2017 i am Thread-3 get B ----------Tue Jul 18 16:24:41 2017 i am Thread-3 get B ----------Tue Jul 18 16:24:42 2017 i am Thread-3 get A ----------Tue Jul 18 16:24:42 2017'''
Semaphore(信号量):
管理一个内置的计数器,每当调用acquire()时内置计数器-1;调用release() 时内置计数器+1;
计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。
import threading import time semaphore = threading.Semaphore(5) def func(): if semaphore.acquire(): print (threading.currentThread().getName() + ' get semaphore') time.sleep(2) semaphore.release() for i in range(20): t1 = threading.Thread(target=func) t1.start()
event 对象:
- isset() 返回event的状态值
- wait(). 如果Event .isset() == False 将阻塞该线程
- Set() 设置event 的状态值为True,将所有阻塞的线程激活
- Clear() 恢复event的状态值为False
import threading,time event = threading.Event() #创建event 对象 def foo(): while not event.is_set(): #默认event 状态值是False event.wait(2) #等待2秒 print('waiting for connection') print('connected') for i in range(5): t = threading.Thread(target=foo) t.start() print('try to connection') time.sleep(10) event.set() #设置event 状态值为True
队列:线程安全的数据结构
queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.
1先进先出模型-----FIFO
import queue,threading q = queue.Queue(maxsize=90) #设置队列的最大容量,当队列中容量达到90的时候,再执行put时就会阻塞 def put(): q.put(11) #存值 q.put('hello') q.put(3.14) q.join() print('ok') #q.join() #等待另一个线程的get信号,取完所有的值后执行下面的代码。每次取值时,执行q.task_done() def get(): print(q.get()) #取值 q.task_done() print(q.get()) q.task_done() print(q.get()) q.task_done() t1 = threading.Thread(target=put) t1.start() t2 = threading.Thread(target=get) t2.start()
2FILO------先进后出模型
q = queue.LifoQueue() q.put(1) q.put(2) q.put(3) while not q.empty(): print(q.get())
3 PRIORIT----优先级
q = queue.PriorityQueue() q.put([1,'hello']) #第一个元素是优先级序号,可以是数字,字母等 q.put([2,'mona']) q.put([3,3333]) while not q.empty(): a = q.get() print(a,type(a))
应用:---消费者生产者模型
进程 :开进程和开线程的操作基本类似
多进程的优点:可以利用多核,实现并行运算
import multiprocessing,time def mul(n): total = 1 i = 1 while i < n: total *= i i+=1 print(total) def mysum(n): total = 0 i = 0 while i < n: total += i i += 1 print(total) s = time.time() p1 = multiprocessing.Process(target=mul,args=(100000,)) p1.start() p2 = multiprocessing.Process(target=mysum,args=(1000000,)) p2.start() p1.join() p2.join()
进程间通信:
队列
import multiprocessing def foo(q,i): q.put(i**2) if __name__ == '__main__': q = multiprocessing.Queue() # 创建Queue 队列 q.put(99) for i in range(5): p = multiprocessing.Process(target=foo,args=(q,i)) #开启多进程,并传入值 p.start() print(q.get()) print(q.get()) print(q.get()) #再主进程打印队列里多值 print(q.get()) print(q.get()) '''99 0 1 4 9 '''
管道:
import multiprocessing def foo(sk): sk.send('hello word') #子进程给主进程发送数据 print(sk.recv()) #子进程接受主进程发送的信息 sock,conn = multiprocessing.Pipe() #建立管道,返回两个对象 if __name__ == '__main__': p = multiprocessing.Process(target=foo,args=(sock,)) #子进程将sock对象传入 p.start() print(conn.recv()) #主进程接受子进程传送的值 conn.send('hey') #主进程发送值给子进程
数据共享----manager
import multiprocessing import random def foo(d,i): d[i]=(i**2) d['name'] = 'mona' if __name__ == '__main__': manager = multiprocessing.Manager() #示例化manager对象 # l =manager.list([1,2,3,4]) d = manager.dict() #可创建manager的各种数据类型,由此创建的数据可供子线程共享 lp = [] for i in range(5): p = multiprocessing.Process(target=foo,args=(d,i)) #子线程调用并修改manager dict p.start() lp.append(p) for i in lp: i.join() #阻塞主进程,等待所有子进程运行完再执行下列代码 print(d) #{0: 0, 'name': 'mona', 1: 1, 2: 4, 3: 9, 4: 16}
进程池:
import multiprocessing import time def foo(n): print(n) time.sleep(2) if __name__ == '__main__': pool_obj= multiprocessing.Pool(5) #创建进程池对象,并设置进程池的容量(5) for i in range(10): p = pool_obj.apply_async(func=foo,args=(i,)) #异步应用 pool_obj.close() #先关闭进程池再阻塞 pool_obj.join() print('ending')