进程和线程
- 进程:内存独立,线程共享同一进程的内存,一个进程就像一个应用程序,例如qq和word,这属于两个进程,
- 进程是资源的集合,线程是执行单位
- 进程之间不能直接互相访问,同一进程中的程可以互相通信
- 创建新进程消耗系统资源,线程非常轻量,只保存线程运行时的必要数据,如上下文、程序堆栈信息
- 同一进程里的线程可以相互控制,父进程可以控制子进程
1 import threading 2 import time 3 4 def sayhi(num): 5 print('num:',num) 6 time.sleep(3) 7 8 a = threading.Thread(target=sayhi,args=(1,)) 9 b = threading.Thread(target=sayhi,args=(2,)) 10 now1 = time.time() 11 print(now1) 12 a.start() 13 b.start() 14 now2 = time.time() 15 print(now2) 16 print(threading.active_count()) 17 # 包含主线程,总共3个 18 print(a.getName()) 19 print(b.getName()) 20 21 class MyThread(threading.Thread): 22 def __init__(self,n): 23 threading.Thread.__init__(self) 24 self.n = n 25 26 def run(self): 27 print('running on thread $s'%self.n) 28 time.sleep(3) 29 now3 = time.time() 30 print(now3) 31 t1 = MyThread(1) 32 t2 = MyThread(2) 33 t1.start() 34 t2.start() 35 print(t1.getName()) 36 print(t2.getName()) 37 now4 = time.time() 38 print(now4) 39 40 thread_list = [] 41 for i in range(10): 42 s1 = threading.Thread(target=sayhi,args=(i,)) 43 s1.start() 44 thread_list.append(s1) 45 now5 = time.time() 46 print(now5) 47 for r in thread_list: 48 r.join() # s1.wait() 49 print('--work done--') 50 now6 = time.time() 51 print(now6) 52 print('primary'.center(20,'-')) 53 54 55 for ii in range(10): 56 s2 = threading.Thread(target=sayhi) 57 s2.setDaemon(s2) 58 s2.start()
一、线程的基本使用 threading模块
threading 模块建立在 _thread 模块之上。
thread 模块以低级、原始的方式来处理和控制线程,
而 threading 模块通过对 thread 进行二次封装,提供了更方便的 api 来处理线程。
1 import threading 2 import time 3 4 def task(arg): 5 time.sleep(arg) 6 print(arg) 7 8 for i in range(30): 9 # 创建线程,args必须是可迭代的对象 10 t = threading.Thread(target=task,args=[i,]) 11 12 # 主线程终止,不等待子线程 13 # t.setDaemon(True) 14 # 等待子线程 15 # t.setDaemon(False) 16 17 # 开始工作,等待cpu使用 18 t.start() 19 20 # 变成串行 21 # t.join() #一直等 22 # t.join(1) # 超时,等待最大时间 23 print('end')
使用线程中的run方法
1 import threading 2 import time 3 class MyThread(threading.Thread): #继承threading方法 4 def __init__(self,func,*args,**kwargs): 5 super(MyThread,self).__init__(*args,**kwargs) 6 self.func = func 7 8 def run(self): # 线程中的run方法 9 self.func() 10 11 def task(): 12 time.sleep(1) 13 print('is a test') 14 15 obj = MyThread(func=task) 16 obj.start()
Thread方法说明
t.start() : 激活线程,
t.getName() : 获取线程的名称
t.setName() : 设置线程的名称
t.name : 获取或设置线程的名称
t.is_alive() : 判断线程是否为激活状态
t.isAlive() :判断线程是否为激活状态
t.setDaemon() 设置为后台线程或前台线程(默认:False);通过一个布尔值设置线程是否为守护线程,必须在执行start()方法之后才可以使用。如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止;如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止
t.isDaemon() : 判断是否为守护线程
t.ident :获取线程的标识符。线程标识符是一个非零整数,只有在调用了start()方法之后该属性才有效,否则它只返回None。
t.join() :逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义
t.run() :线程被cpu调度后自动执行线程对象的run方法
二、线程锁threading.RLock和threading.Lock
由于线程之间是进行随机调度,并且每个线程可能只执行n条执行之后,CPU接着执行其他线程。为了保证数据的准确性,引入了锁的概念
假如现在有10个人要上厕所,钥匙在我手里,这个时候A进来,他把门关上了,别人就进不去,只有等A完事后其他人才能进来,这个门就是控制线程的那把锁
锁的基本使用:
1 lock=threading.Lock() 2 lock.acquire() 3 lock.release()
Rlock和Lock的区别:
RLock允许在同一线程中被多次acquire。而Lock却不允许这种情况。 如果使用RLock,那么acquire和release必须成对出现,即调用了n次acquire,必须调用n次的release才能真正释放所占用的琐
1 # 只能一个人使用锁 2 # 创建锁 3 lock = threading.Lock() 4 # 递归锁,多把锁 5 # lock = threading.RLock() 6 7 def task(arg): 8 time.sleep(2) 9 print('arg:',arg) 10 # 申请使用锁,其他人等 11 lock.acquire() 12 # lock.acquire() # Rlock可使用多把锁 13 14 global v 15 v -= 1 16 print(v) 17 18 # 释放锁 19 lock.release() 20 # lock.release() # Rlock可解锁 21 22 for i in range(2): 23 t = threading.Thread(target=task,args=(i,)) 24 t.start()
如果厕所有3个坑,同时允许3个人上,使用 BoundedSemaphore
1 # 多个人同时使用锁 2 # 信号链,定义3个人同时使用 3 lock = threading.BoundedSemaphore(3) 4 def task(arg): 5 lock.acquire() 6 time.sleep(1) 7 global v 8 v -= 1 9 print(v) 10 lock.release() 11 for i in range(10): 12 t = threading.Thread(target=task,args=(i,)) 13 t.start()
三、事件Event
python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。
事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。
- clear:将“Flag”设置为False
- set:将“Flag”设置为True
- Event.isSet() :判断标识位是否为Ture
1 # 事件 event 2 lock = threading.Event() 3 def task(arg): 4 time.sleep(1) 5 # 锁住所有的线程 6 lock.wait() 7 print(arg) 8 for i in range(10): 9 t = threading.Thread(target=task,args=(i,)) 10 t.start() 11 while 1: 12 value = input('>>:').strip() 13 if value == '1': 14 lock.set() # 打开锁,执行上面的print 15 # lock.clear() # 再锁上
四、条件Condition
condition变量服从上下文管理协议:with语句块封闭之前可以获取与锁的联系。
acquire() 和 release() 会调用与锁相关联的相应的方法。
其他和锁关联的方法必须被调用,wait()方法会释放锁,当另外一个线程使用 notify() or notify_all()唤醒它之前会一直阻塞。一旦被唤醒,wait()会重新获得锁并返回,
- wait(timeout=None) : 等待通知,或者等到设定的超时时间。当调用这wait()方法时,如果调用它的线程没有得到锁,那么会抛出一个RuntimeError 异常。 wati()释放锁以后,在被调用相同条件的另一个进程用notify() or notify_all() 叫醒之前 会一直阻塞。wait() 还可以指定一个超时时间。
如果有等待的线程,notify()方法会唤醒一个在等待conditon变量的线程。notify_all() 则会唤醒所有在等待conditon变量的线程。
1 lock = threading.Condition() 2 3 def task(arg): 4 time.sleep(1) 5 # 锁住所有线程 6 lock.acquire() 7 lock.wait() 8 print('线程:',arg) 9 lock.release() 10 for i in range(10): 11 t = threading.Thread(target=task,args=(i,)) 12 t.start() 13 while 1: 14 value = input('>>>:').strip() 15 lock.acquire() 16 lock.notify(int(value)) 17 lock.release() 18 19 >>>:1 20 线程: 3 21 >>>:2 22 线程: 4 23 线程: 2 24 >>>:3 25 线程: 0 26 线程: 7 27 线程: 1 28 >>>:4 29 线程: 8 30 线程: 9 31 线程: 6 32 线程: 5 33 >>>:5
五、queue模块
Queue 就是对队列,它是线程安全的
举例来说,我们去麦当劳吃饭。饭店里面有厨师职位,前台负责把厨房做好的饭卖给顾客,顾客则去前台领取做好的饭。这里的前台就相当于我们的队列。形成管道样,厨师做好饭通过前台传送给顾客,所谓单向队列
1 import queue 2 3 q = queue.Queue(maxsize=0) # 构造一个先进显出队列,maxsize指定队列长度,为0 时,表示队列长度无限制。 4 5 q.join() # 等到队列为kong的时候,在执行别的操作 6 q.qsize() # 返回队列的大小 (不可靠) 7 q.empty() # 当队列为空的时候,返回True 否则返回False (不可靠) 8 q.full() # 当队列满的时候,返回True,否则返回False (不可靠) 9 q.put(item, block=True, timeout=None) # 将item放入Queue尾部,item必须存在,可以参数block默认为True,表示当队列满时,会等待队列给出可用位置, 10 为False时为非阻塞,此时如果队列已满,会引发queue.Full 异常。 可选参数timeout,表示 会阻塞设置的时间,过后, 11 如果队列无法给出放入item的位置,则引发 queue.Full 异常 12 q.get(block=True, timeout=None) # 移除并返回队列头部的一个值,可选参数block默认为True,表示获取值的时候,如果队列为空,则阻塞,为False时,不阻塞, 13 若此时队列为空,则引发 queue.Empty异常。 可选参数timeout,表示会阻塞设置的时候,过后,如果队列为空,则引发Empty异常。 14 q.put_nowait(item) # 等效于 put(item,block=False) 15 q.get_nowait() # 等效于 get(item,block=False)
六、线程池
1 from concurrent.futures import ThreadPoolExecutor as TPE 2 3 def task(arg): 4 time.sleep(0.5) 5 print('Thread:',arg) 6 7 pool = TPE(5) # 线程池里放5个线程 8 9 for i in range(100): 10 # 去连接池中获取连接 11 pool.submit(task,i)
1 import requests 2 def task(url): 3 response = requests.get(url) 4 print('得到结果:',url,len(response.content)) 5 6 pool = TPE(2) 7 url_list = [ 8 'http://www.oldboyedu.com', 9 'http://www.baidu.com', 10 'http://www.sohu.com', 11 ] 12 13 for url in url_list: 14 print('开始请求',url) 15 # 去连接池获取连接 16 pool.submit(task,url)
七、回调函数
线程池定义为一个变量,使用变量.add_done_callback(函数名称)进行函数回调
1 import requests 2 3 def txt(future): 4 download_response = future.result() 5 print('得到结果:', url, len(download_response.content)) 6 7 def download(url): 8 response = requests.get(url) 9 # print('得到结果:',url,len(response.content)) 10 return response 11 pool = TPE(2) 12 url_list = [ 13 'http://www.oldboyedu.com', 14 'http://www.baidu.com', 15 'http://www.sohu.com', 16 ] 17 18 for url in url_list: 19 print('开始请求',url) 20 # 去连接池获取连接 21 future = pool.submit(download,url) 22 # 一旦download函数return,开始执行txt函数 23 future.add_done_callback(txt)
八、进程
multiprocessing是python的多进程管理包,和threading.Thread类似。
1 from multiprocessing import Process 2 def task(arg): 3 time.sleep(arg) 4 print(arg) 5 6 if __name__ == '__main__': 7 for i in range(10): 8 p = Process(target=task,args=(i,)) 9 # 守护true,不执行子进程 10 # p.daemon = True 11 # false执行子进程,默认 12 # p.daemon = False 13 p.start() 14 # p.join() 15 p.join(1) 16 print('主进程中的主线程...')
九、multiprocessing,Array,Value
数据可以用Value或Array存储在一个共享内存地图里,如下:
1 # 进程数据共享 2 from multiprocessing import Process,Array 3 def task(num,li): 4 li[num] = num 5 print(list(li)) 6 7 if __name__ == '__main__': 8 L = Array('i',10) # 数据类型,长度 9 for i in range(10): 10 p = Process(target=task,args=(i,L)) 11 p.start()
from multiprocessing import Array, Value, Process def func(a, b): a.value = 6.66666 for i in range(len(b)): b[i] = -b[i] if __name__ == "__main__": num = Value('d', 0.0) arr = Array('i', range(11)) c = Process(target=func, args=(num, arr)) d = Process(target=func, args=(num, arr)) c.start() d.start() c.join() d.join() print(num.value) for i in arr: print(i)
十、进程池
和线程池差不多
1 from concurrent.futures import ProcessPoolExecutor as PPE 2 3 4 #基本用法 5 def task(arg): 6 time.sleep(1) 7 print(arg) 8 9 pool = PPE(5) 10 for i in range(10): 11 pool.submit(task,i) 12 13 14 # 进程池回调 15 def call(arg): 16 data = arg.result() 17 print(data) 18 19 def task(arg): 20 print(arg) 21 return arg+100 22 23 pool = PPE(5) 24 for i in range(10): 25 obj = pool.submit(task,i) 26 obj.add_done_callback(call)