互斥锁(Mutex)
线程同步能够保证多个线程安全访问竞争资源,最简单的同步机制是引入互斥锁。互斥锁为资源引入一个状态:锁定/非锁定。某个线程要更改共享数据时,先将其锁定,此时资源的状态为“锁定”,其他线程不能更改;直到该线程释放资源,将资源的状态变成“非锁定”,其他的线程才能再次锁定该资源。互斥锁保证了每次只有一个线程进行写入操作,从而保证了多线程情况下数据的正确性。
threading模块中定义了Lock类,可以方便的处理锁定:
#创建锁 lock = threading.Lock() #锁定 lock.acquire(blocking=True, timeout=-1) #释放 lock.release()
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
from threading import Thread, Lock n = 0 def func(lock): global n for i in range(1500000): lock.acquire() n -= 1 lock.release() def func2(lock): global n for i in range(1500000): lock.acquire() n += 1 lock.release() if __name__ == '__main__': t_lsit = [] lock = Lock() for i in range(10): t = Thread(target=func, args=(lock,)) t2 = Thread(target=func2, args=(lock,)) t.start() t2.start() t_lsit.append(t) t_lsit.append(t2) for t in t_lsit: t.join() print(n) >>> 0
加锁之后,我们可以确保数据的正确性
死锁现象
所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。
此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁
import time from threading import Thread, Lock noodle_lock = Lock() fork_lock = Lock() def eat1(name): noodle_lock.acquire() print('%s拿到面条了'%name) fork_lock.acquire() print('%s拿到叉子了'%name) print('%s吃面'%name) time.sleep(0.3) fork_lock.release() print('%s放下叉子'%name) noodle_lock.release() print('%s放下面'%name) def eat2(name): fork_lock.acquire() print('%s拿到叉子了'%name) noodle_lock.acquire() print('%s拿到面条了'%name) print('%s吃面'%name) time.sleep(0.3) noodle_lock.release() print('%s放下面'%name) fork_lock.release() print('%s放下叉子'%name) if __name__ == '__main__': name_list = ['luffy', 'zoro'] name_list2 = ['sanji', 'chopper'] for name in name_list: Thread(target=eat1, args=(name, )).start() for name in name_list2: Thread(target=eat2, args=(name, )).start()
死锁发生的现象 两把锁 程序是异步的 操作的时候,强到一把锁之后还要再去抢第二把锁 一个线程抢到了一把锁 另一个线程抢到了另一把锁
递归锁
解决方法,递归锁,在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。
这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。
直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁,二者的区别是:递归锁可以连续acquire多次,而互斥锁只能acquire一次
from threading import Thread, RLock rlock = RLock() def func(name): rlock.acquire() print(name, 1) rlock.acquire() print(name, 2) rlock.acquire() print(name, 3) rlock.release() rlock.release() rlock.release() for i in range(10): Thread(target=func, args=('name%s'%i,)).start()
递归锁可以解决互斥锁的死锁问题 互斥锁 两把锁 多个线程抢 递归锁 一把锁 多个线程抢 递归锁能够快速的解决死锁问题 递归锁好不好? 递归锁世界上并不是一个好的解决方案 死锁现象的发生不是互斥锁的问题, 而是程序员的逻辑有问题导致的 递归锁能够快速的解决死锁的问题 递归锁 迅速恢复服务,递归锁替换互斥锁 在接下来的时间中,慢慢的把递归锁替换成互斥锁。 这样能够完善代码的逻辑 并且能够提高代码的效率 多个线程之间,用完一个资源再用另外一个资源 先释放一个资源,再去获取一个资源的锁
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
# import time # from threading import Thread,RLock # # fork_lock = noodle_lock = RLock() # def eat1(name): # noodle_lock.acquire() # print('%s拿到面条了'%name) # fork_lock.acquire() # print('%s拿到叉子了'%name) # print('%s吃面'%name) # time.sleep(0.3) # fork_lock.release() # print('%s放下叉子'%name) # noodle_lock.release() # print('%s放下面'%name) # # def eat2(name): # fork_lock.acquire() # print('%s拿到叉子了' % name) # noodle_lock.acquire() # print('%s拿到面条了'%name) # print('%s吃面'%name) # time.sleep(0.3) # noodle_lock.release() # print('%s放下面'%name) # fork_lock.release() # print('%s放下叉子' % name) # if __name__ == '__main__': # name_list = ['luffy','zoro'] # name_list2 = ['sanji','chopper'] # for name in name_list: # Thread(target=eat1,args=(name,)).start() # for name in name_list2: # Thread(target=eat2,args=(name,)).start()
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
from threading import Thread,Lock fork_noodle_lock = Lock() def eat1(name): fork_noodle_lock.acquire() print('%s拿到面条了'%name) print('%s拿到叉子了'%name) print('%s吃面'%name) time.sleep(0.3) fork_noodle_lock.release() print('%s放下叉子'%name) print('%s放下面'%name) def eat2(name): fork_noodle_lock.acquire() print('%s拿到叉子了' % name) print('%s拿到面条了'%name) print('%s吃面'%name) time.sleep(0.3) fork_noodle_lock.release() print('%s放下面'%name) print('%s放下叉子' % name) if __name__ == '__main__': name_list = ['luffy','zoro'] name_list2 = ['sanji','chopper'] for name in name_list: Thread(target=eat1,args=(name,)).start() for name in name_list2: Thread(target=eat2,args=(name,)).start()
线程队列
queue队列 :使用import queue,用法与进程Queue一样
queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.
- class
queue.
Queue
(maxsize=0) #先进先出
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
import queue q=queue.Queue() q.put('first') q.put('second') q.put('third') print(q.get()) print(q.get()) print(q.get()) ''' 结果(先进先出): first second third
class queue.
LifoQueue
(maxsize=0) #last in fisrt out
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
import queue q=queue.LifoQueue() q.put('first') q.put('second') q.put('third') print(q.get()) print(q.get()) print(q.get()) ''' 结果(后进先出): third second first ''' 后进先出
class queue.
PriorityQueue
(maxsize=0) #存储数据时可设置优先级的队列
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
import queue q=queue.PriorityQueue() #put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高 q.put((20,'a')) q.put((10,'b')) q.put((30,'c')) print(q.get()) print(q.get()) print(q.get()) ''' 结果(数字越小优先级越高,优先级高的优先出队): (10, 'b') (20, 'a') (30, 'c') ''' 优先级队列
线程池
Python--concurrent.futures
1.concurent.future模块是用来创建并行的任务,提供了更高级别的接口,
为了异步执行调用
2.concurent.future这个模块用起来非常方便,它的接口也封装的非常简单
3.concurent.future模块既可以实现进程池,也可以实现线程池
4.模块导入进程池和线程池
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
p = ProcessPoolExecutor(max_works)对于进程池如果不写max_works:默认的是cpu的数目
p = ThreadPoolExecutor(max_works)对于线程池如果不写max_works:默认的是cpu的数目*5
基本方法
1、submit(fn, *args, **kwargs) 异步提交任务 2、map(func, *iterables, timeout=None, chunksize=1) 取代for循环submit的操作 3、shutdown(wait=True) 相当于进程池的pool.close()+pool.join()操作 wait=True,等待池内所有任务执行完毕回收完资源后才继续 wait=False,立即返回,并不会等待池内的任务执行完毕 但不管wait参数为何值,整个程序都会等到所有任务执行完毕 submit和map必须在shutdown之前 4、result(timeout=None) 取得结果 5、add_done_callback(fn) 回调函数
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor from threading import currentThread import os,time,random def task(n): print("%s is running " % currentThread().getName()) time.sleep(random.randint(1,3)) return n*2 if __name__ == '__main__': start = time.time() executor = ThreadPoolExecutor(4) # 线程池 res = [] for i in range(10): # 开启10个任务 future = executor.submit(task,i) # 异步提交任务 res.append(future) executor.shutdown() # 等待所有线程执行完毕 print("++++>") for r in res: print(r.result()) # 打印结果 end = time.time() print(end - start) ------------输出 <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000000025B0DA0>_0 is running <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000000025B0DA0>_1 is running <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000000025B0DA0>_2 is running <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000000025B0DA0>_3 is running <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000000025B0DA0>_3 is running <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000000025B0DA0>_1 is running <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000000025B0DA0>_0 is running <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000000025B0DA0>_2 is running <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000000025B0DA0>_3 is running <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000000025B0DA0>_1 is running ++++> 0 2 4 6 8 10 12 14 16 18 5.002286195755005
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import os,time,random def task(n): print('%s is runing' %os.getpid()) time.sleep(random.randint(1,3)) return n**2 if __name__ == '__main__': executor=ThreadPoolExecutor(max_workers=3) # for i in range(11): # future=executor.submit(task,i) executor.map(task,range(1,12)) #map取代了for+submit
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor from multiprocessing import Pool import requests import json import os def get_page(url): print('<进程%s> get %s' %(os.getpid(),url)) respone=requests.get(url) if respone.status_code == 200: return {'url':url,'text':respone.text} def parse_page(res): res=res.result() print('<进程%s> parse %s' %(os.getpid(),res['url'])) parse_res='url:<%s> size:[%s] ' %(res['url'],len(res['text'])) with open('db.txt','a') as f: f.write(parse_res) if __name__ == '__main__': urls=[ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/' ] # p=Pool(3) # for url in urls: # p.apply_async(get_page,args=(url,),callback=pasrse_page) # p.close() # p.join() p=ProcessPoolExecutor(3) for url in urls: p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一个future对象obj,需要用obj.result()拿到结果 回调函数