zoukankan      html  css  js  c++  java
  • 10. 锁

    1.锁

    # ### 锁
    from multiprocessing import Lock,Process
    import json,time
    """
    # 创建一把锁
    lock = Lock()
    # 上锁
    lock.acquire()
    print(123)
    # 解锁
    lock.release()
    """
    # 死锁 (只上锁不解锁会差生死锁) 程序添加了阻塞,代码不能往下执行;
    """如果上锁一定要解锁,上锁解锁是一对"""
    '''
    lock.acquire()
    lock.acquire()
    lock.release()
    '''
    
    # 读取票数,更新票数
    def wr_info(sign,dic=None):
    	if sign == "r":
    		with open("ticket",mode="r",encoding="utf-8") as fp:
    			dic =  json.load(fp)
    		return dic
    		
    	elif sign == "w":
    		with open("ticket",mode="w",encoding="utf-8") as fp:
    			json.dump(dic,fp)
    		
    # print(wr_info("r"),type(wr_info("r")))
    # dic = {"count":2}
    # wr_info("w",dic)
    
    # 抢票方法
    def get_ticket(person):
    	dic = wr_info("r")
    	time.sleep(0.1)
    	if dic["count"] > 0:
    		print("%s抢到票了" % (person))
    		dic["count"] -= 1
    		# 更新数据库
    		wr_info("w",dic)
    	else:
    		print("%s没有买到票" % person)
    		
    # get_ticket("李四")
    
    # 用ticket方法来进行统一调用
    def ticket(person,lock):
    	# 先读取票数
    	dic = wr_info("r")
    	# 查询余票
    	print("%s 查询余票 : %s" % (person,dic["count"]))
    	
    	lock.acquire()
    	# 在开始抢票
    	get_ticket(person)
    	lock.release()
    
    
    if __name__ == "__main__":
    	lock = Lock()
    	for i in range(10):
    		p = Process(target=ticket,args=("person%s" % (i),lock))
    		p.start()
    
    """
    创建进程的时候是异步程序,在上锁的时候是同步程序;
    """
    

    2. 信号量

    # ### 信号量 Semaphore 本质上就是锁,只不过可以控制锁的数量
    from multiprocessing import Process,Semaphore
    import os,time
    
    def ktv(person,sem):
    	sem.acquire()
    	print("%s进入ktv开始唱歌" % (person))
    	print(os.getpid())
    	time.sleep(3)
    	print("%s走出ktv离开歌房" % (person))
    	sem.release()
    	
    	
    if __name__ == "__main__":
    	sem = Semaphore(1)
    	for i in  range(10):
    		p = Process(target=ktv,args=("person%s" % (i),sem) )
    		p.start()
    

    3.Event事件

    # ### 事件
    from multiprocessing import Process,Event
    import time,random
    """
    # 阻塞事件
    	e = Event() 生成事件对象e
    	e.wait() 动态给程序加阻塞, 程序当中是否加阻塞完全取决于该对象中的is_set() (默认返回值是False)
    	
    	# 如果是True  不加阻塞
    	# 如果是False 加阻塞
    	
    # 控制属性的值
    	# set() 方法   将这个属性的值改成True
    	# clear() 方法 将这个属性的值改成False
    	# is_set() 方法 获取当前属性值是True 还是 False
    """
    # 基本语法
    # 1.
    """
    e = Event()
    print(e.is_set())
    # e.wait()
    # 最多阻塞时间为1秒
    e.wait(1)
    print(1)
    """
    
    """
    # 2
    e = Event()
    # 将内部的一个属性改成True
    e.set()
    e.wait()
    print(222)
    # 将内部的一个属性改成False
    e.clear()
    e.wait()
    print(3333)
    """
    
    # 模拟交通灯的效果
    def traffic_light(e):
    	# 默认红灯先亮
    	print("红灯亮")
    	while True:
    		if e.is_set():
    			# 当前是绿灯,等待1秒
    			time.sleep(1)
    			# 等完1秒后,变成红灯
    			print("红灯亮")
    			e.clear()
    		
    		else:
    			# 当前是红灯
    			time.sleep(1)
    			# 等完1秒之后,变成绿灯
    			print("绿灯亮")
    			e.set()
    
    # e = Event()
    # traffic_light(e)
    
    # 模拟小车遇到红灯停,绿灯行的操作
    def car(e,i):
    	# e.is_set() 默认返回是False 代表的是红灯
    	if not e.is_set():
    		print("car%s在等待" % (i))
    		e.wait()
    	print("car%s通行了" % (i))
    
    """
    if __name__ == "__main__":
    	e = Event()
    	# 模拟启动交通灯
    	p1 = Process(target=traffic_light,args=(e,))
    	p1.daemon = True
    	p1.start()
    	
    	# 模拟20辆小车
    	for i in range(20):
    		time.sleep(random.uniform(0,2))
    		p2 = Process(target=car,args=(e,i))
    		p2.start()
    
    	print("程序彻底结束")
    """
    
    # 优化版 : 等小车全都跑完之后,再让程序彻底终止
    if __name__ == "__main__":
    	lst = []
    	e = Event()
    	# 模拟启动交通灯
    	p1 = Process(target=traffic_light,args=(e,))
    	# 设置红绿灯为守护进程,灯小车跑完,也终止红绿灯;
    	p1.daemon = True
    	p1.start()
    	
    	# 模拟20辆小车
    	for i in range(20):
    		# 小车创建的速度太快,所以加一点延迟效果,生动表现出小车的行为;
    		time.sleep(random.uniform(0,2))
    		p2 = Process(target=car,args=(e,i))
    		p2.start()
    		lst.append(p2)
    	
    	# 等到小车都跑完之后,再去终止红绿灯;加一个等待;
    	for i in lst:
    		i.join()
    	
    	print("程序彻底结束")
    

    4. 进程队列

    # ### 进程队列 
    from multiprocessing import Process , Queue
    # (1) 基本语法
    """
    先进先出,后进后出
    功能: 让进程之间形成数据之间的共享
    """
    """
    q = Queue()
    # 1.把数据放到q队列中 put
    q.put(1111)
    # 2.把数据从队列中取出 get
    res = q.get()
    print(res)
    # 3.当队列里面的值都拿出来了,已经没有数据的时候,在获取就会出现阻塞
    # res = q.get()
    # 4.get_nowait 无论有没有都去拿数据,如果拿不到,直接报错
    # res = q.get_nowait()
    
    # 抑制get_nowait 报错 用try 方法实现
    try:
    	res = q.get_nowait()
    except:
    	pass
    """
    
    # (2) 可以使用queue , 指定队列长度
    """
    q = Queue(3)
    q.put(11)
    q.put(22)
    q.put(33)
    # print(q.get())
    # q.put(44) # 阻塞,如果队列已经存满了,在放值直接阻塞
    # 无论如何都往队列中存值,如果存满直接报错
    # q.put_nowait(555)
    try:
    	q.put_nowait(555)
    except:
    	pass
    """
    
    
    # (3) 进程之间的通讯,依赖Queue
    # 子进程
    def func(q):
    	# 2.子进程获取数据
    	res = q.get()
    	print(res)
    	# 3.子进程添加数据
    	q.put("cccc")
    	
    if __name__ == "__main__":
    	q = Queue()
    	p = Process(target=func,args=(q,))
    	p.start()
    	
    	# 1.主进程添加数据
    	q.put("abc")
    	p.join()
    	# 4.主进程获取数据
    	res = q.get()
    	print(res)
    	print("程序结束")
    	
    

    5. 生产者与消费者模型

    # #### 生产者与消费者模型
    """
    # 爬虫例子:
    1号进程负责爬取网页中所有的内容
    2号进程负责匹配提起网页中的关键字
    
    1号进程就可以看成一个生产者
    2号进程就可以看成一个消费者
    
    有时可能生产者比消费者快,或者慢,
    所以为了减少生产者和消费者速度上的差异化,加了一个中间的缓冲队列
    
    比较理想的模型,两者之间的速度相对平均
    生产者和消费者模型从程序上来看,就是存数据和取数据之间的过程
    """
    from multiprocessing import Process , Queue
    import time,random
    # 消费者模型(负责取值)
    def consumer(q,name):
    	while True:
    		# 拿出数据
    		food = q.get()
    		# 如果哪取的数据是None,代表已经拿到最后一个数据了,到头了,这个时候将循环结束;
    		if food is None:
    			break
    		time.sleep(random.uniform(0.1,1))
    		print("%s 吃了一个 %s" % (name,food))
    		
    # 生产者模型(负责存值)
    def producer(q,name,food):
    	for i in  range(3):
    		time.sleep(random.uniform(0.1,1))
    		print("%s 生产了 %s , %s" % (name,food,i))
    		q.put(food+str(i))
    
    if __name__ == "__main__":	
    	q = Queue()
    	# 消费者
    	c1 = Process(target=consumer,args=(q,"舒则会"))
    	c1.start()
    
    	# 生产者
    	p1 = Process(target=producer,args = (q,"郭一萌","面包"))
    	p1.start()
    	
    	# 等待生产者把所有的数据生产完毕,保证队列里面有3个数据
    	p1.join()
    	q.put(None)
    	
    

    6. 线程

    # ### 线程
    from threading  import Thread
    from multiprocessing import Process
    import os,time,random
    # (1) 一个进程可以有多个线程,这些线程共享同一份资源
    """线程是异步并发程序"""
    '''
    def func(num):
    	time.sleep(random.uniform(0.1,1))	
    	print("子线程",num,os.getpid())
    
    if __name__ == "__main__":
    	for i in range(10):
    		t = Thread(target=func,args=(i,))
    		t.start()
    '''
    # (2) 并发多线程 和 并发多进程 速度对比? 多线程更快
    '''
    def func(i):
    	print("子线程",i,os.getpid())
    	
    if __name__ == "__main__":
    	lst = []
    	# 1.计算多线程的时间
    	starttime = time.perf_counter()
    	for i in range(1000):
    		t = Thread(target=func,args=(i,))
    		t.start()
    		lst.append(t)
    	
    	for i in lst:
    		i.join()
    	
    	endtime = time.perf_counter()
    	print(endtime-starttime,"主线程执行结束 <=====>") # 0.10773659999999999
    	
    	# 2.计算多进程的时间
    	lst = []
    	starttime = time.perf_counter()
    	for i in range(1000):
    		p = Process(target=func,args=(i,))
    		p.start()
    		lst.append(p)
    		
    	for i in lst:
    		i.join()	
    		
    	endtime = time.perf_counter()
    	print(endtime-starttime,"主进程执行结束 <=====>") # 25.524820199999997
    '''
    	
    # (3) 多线程之间共享同一份进程资源
    num = 100
    lst = []
    def func(i):
    	global num
    	num -= 1
    	
    for i in range(100):
    	t = Thread(target=func,args=(i,))
    	t.start()
    	lst.append(t)
    	
    # i.join 可以保证每一个线程都执行一遍,然后在取打印num
    for i in lst:
    	i.join()
    	
    print(num)
    	
    # (4) 线程相关函数
    """
    线程.is_alive()    检测线程是否仍然存在
    线程.setName()     设置线程名字
    线程.getName()     获取线程名字
    1.currentThread().ident 查看线程id号 
    2.enumerate()        返回目前正在运行的线程列表
    3.activeCount()      返回目前正在运行的线程数量
    """
    """
    def func():
    	time.sleep(3)
    
    t = Thread(target=func)
    print(t)
    t.start()
    # is_alive
    print(t.is_alive())
    	
    # setName
    t.setName("李杰用脑过度")
    # print(t)
    print(t.getName())
    """
    
    # 1.currentThread().ident 查看线程id号 	
    from threading import currentThread
    '''
    def func():
    	print("子线程:",currentThread().ident)
    t = Thread(target=func)
    t.start()
    print("主线程:",currentThread().ident,os.getpid())	
    '''
    # 2.enumerate()        返回目前正在运行的线程列表
    from threading import enumerate
    def func():
    	print("子线程",currentThread().ident)
    	time.sleep(0.5)
    	
    for i in range(10):
    	t = Thread(target=func)
    	t.start()
    print(enumerate())
    print(len(enumerate()))
    	
    # 3.activeCount()      返回目前正在运行的线程数量
    from threading import activeCount
    def func():
    	print("子线程",currentThread().ident)
    	time.sleep(0.5)
    for i in range(10):
    	Thread(target=func).start()
    print(activeCount())
    	
    	
    
    	
    	
    	
    	
    	
    	
    	
    	
    	
    	
    	
    	
    	
    

    7. 守护线程

    # ### 守护线程:  等待所有线程执行结束之后,在自动结束,守护所有线程;
    from threading import Thread
    import time
    
    def func1():
    	while True:
    		time.sleep(0.5)
    		print("我是线程1,func1任务")
    		
    def func2():
    	print("我是线程2,start")
    	time.sleep(3)
    	print("我是线程2,end")
    
    # 启动线程1
    """线程可以选择不加if __name__ == "__main__": 因为线程共享同一份资源,当然加上更好"""
    t1 = Thread(target=func1)
    # 设置守护线程
    t1.setDaemon(True)
    t1.start()
    
    t2 = Thread(target=func2)
    t2.start()
    
    print("主线程执行结束")
    

    8. 线程数据安全

    # ### 线程的数据安全 依赖Lock
    """用上锁的方法,来保证数据安全,代价就是会牺牲一点执行的速度;"""
    from threading import Thread,Lock
    n = 0
    
    def func1(lock):
    	global n
    	for i in range(100000):
    		# 方法一
    		# 上锁
    		lock.acquire()
    		n -= 1
    		# 解锁
    		lock.release()
    
    def func2(lock):
    	global n
    	for i in range(100000):
    		"""with 语法 自动实现上锁 解锁"""
    		# 方法二		
    		with lock:
    			n+=1
    
    if __name__ == "__main__":
    	# 创建一把锁
    	lock = Lock()
    	lst = []
    	for i in range(10):
    		t1 = Thread(target=func1,args=(lock,))
    		t2 = Thread(target=func2,args=(lock,))
    		t1.start()
    		t2.start()
    		
    		lst.append(t1)
    		lst.append(t2)
    	
    	for i in lst:
    		i.join()
    	print("主线程执行结束")
    	print(n)
    

    9. 信号量(线程)

    # ### 信号量(线程)
    from threading import Semaphore,Thread
    import time,random
    def func(i,sem):
    	# 异步并发线程;
    	time.sleep(random.uniform(0.1,1))
    	with sem:
    		print(i)
    		time.sleep(2)
    		
    if __name__ == "__main__":
    	sem = Semaphore(5)
    	for i in range(20):
    		Thread(target=func,args=(i,sem)).start()
    

    10. 死锁 递归锁 互斥锁

    # ### 死锁,递归锁,互斥锁
    from threading import Thread,Lock
    import time
    lock = Lock()
    
    # ### 1.死锁现象 , 只上锁不解锁是死锁
    """
    lock.acquire()
    lock.acquire()
    print(123)
    """
    
    # 逻辑上的死锁现象;
    '''
    noodle = Lock()
    kuaizi = Lock()
    
    def eat1(name):
    	noodle.acquire()
    	print("%s 拿到面条" % (name))
    	kuaizi.acquire()
    	print("%s 拿到筷子" % (name))
    
    	print("开始吃面")
    	time.sleep(0.5)	
    	
    	kuaizi.release()
    	print("%s 放下筷子" % (name) )
    	noodle.release()
    	print("%s 放下面条" % (name) )
    
    def eat2(name):
    	kuaizi.acquire()
    	print("%s 拿到筷子" % (name))
    	noodle.acquire()
    	print("%s 拿到面条" % (name))
    	
    	
    	print("开始吃面")
    	time.sleep(0.5)
    	
    	noodle.release()
    	print("%s 放下面条" % (name) )
    	kuaizi.release()
    	print("%s 放下筷子" % (name) )
    	
    if __name__ == "__main__":
    	name_lst1 = ["李祖清","银燕"]
    	name_lst2 = ["廖萍萍","郭一萌"]
    	
    	for name in name_lst1:
    		Thread(target=eat1,args=(name,)).start()
    		
    	for name in name_lst2:
    		Thread(target=eat2,args=(name,)).start()
    '''
    
    
    # ### 2.递归锁
    '''
    上几把锁,就对应几个解锁,无论上了几把锁,只要解锁的数量相同,就可以解锁
    针对于应急情况下的解锁;
    
    	递归锁专门用于解决死锁现象
    	临时用于快速解决服务区崩溃死锁的问题
    
    '''
    from threading import Thread,RLock
    rlock = RLock()
    def func():
    	rlock.acquire()
    	rlock.acquire()
    	rlock.acquire()
    	
    	print(111)
    	
    	rlock.release()
    	rlock.release()
    	rlock.release()
    	
    	print(222)
    func()
    
    # 解决吃面条问题
    # noodle = Lock()
    # kuaizi = Lock()
    
    """
    noodle = kuaizi = RLock()
    
    def eat1(name):
    	noodle.acquire()
    	print("%s 拿到面条" % (name))
    	kuaizi.acquire()
    	print("%s 拿到筷子" % (name))
    
    	print("开始吃面")
    	time.sleep(0.5)	
    	
    	kuaizi.release()
    	print("%s 放下筷子" % (name) )
    	noodle.release()
    	print("%s 放下面条" % (name) )
    
    def eat2(name):
    	kuaizi.acquire()
    	print("%s 拿到筷子" % (name))
    	noodle.acquire()
    	print("%s 拿到面条" % (name))
    	
    	
    	print("开始吃面")
    	time.sleep(0.5)
    	
    	noodle.release()
    	print("%s 放下面条" % (name) )
    	kuaizi.release()
    	print("%s 放下筷子" % (name) )
    	
    if __name__ == "__main__":
    	name_lst1 = ["李祖清","银燕"]
    	name_lst2 = ["廖萍萍","郭一萌"]
    	
    	for name in name_lst1:
    		Thread(target=eat1,args=(name,)).start()
    		
    	for name in name_lst2:
    		Thread(target=eat2,args=(name,)).start()
    """
    # ### (4) 互斥锁
    """
    	从语法上来说,锁可以互相嵌套,但是不要使用
    	上一次锁,就对应解开一把锁,形成互斥锁
    	吃面条和拿筷子是同时的,上一把锁就够了,不需要分开上锁
    """
    print("<======================>")
    #正确逻辑
    mylock = Lock()
    def eat1(name):
    	mylock.acquire()
    	print("%s 拿到面条" % (name))
    	print("%s 拿到筷子" % (name))
    
    	print("开始吃面")
    	time.sleep(0.5)	
    
    	print("%s 放下筷子" % (name) )	
    	print("%s 放下面条" % (name) )
    	mylock.release()
    
    def eat2(name):
    	mylock.acquire()
    	print("%s 拿到筷子" % (name))
    	print("%s 拿到面条" % (name))	
    	
    	print("开始吃面")
    	time.sleep(0.5)	
    
    	print("%s 放下面条" % (name) )		
    	print("%s 放下筷子" % (name) )
    	mylock.release()
    
    
    if __name__ == "__main__":
    	name_lst1 = ["李祖清","银燕"]
    	name_lst2 = ["廖萍萍","郭一萌"]
    	
    	for name in name_lst1:
    		Thread(target=eat1,args=(name,)).start()
    		
    	for name in name_lst2:
    		Thread(target=eat2,args=(name,)).start()
    

    11. 线程队列

    # ### 线程队列
    from queue import Queue
    """
    put 往线程队列中放值
    get 从线程队列中取值
    put_nowait 如果放入的值,长度超过了队列的长度,直接报错
    get_nowait 如果获取的值,已经没有了,直接报错
    """
    
    # (1) queue 先进先出
    q = Queue()
    q.put(11)
    q.put(22)
    print(q.get())
    print(q.get())
    # q.get()  发生阻塞
    # q.get_nowait()  发生报错
    
    # 指定队列长度
    q2 = Queue(2)
    q2.put(1)
    q2.put(2)
    # q2.put(3) 发生阻塞
    # q2.put_nowait(3) 发生报错
    
    
    # (2)LifoQueue 后进先出 (数据结构中,内存栈队列的一种存储结构)
    from queue import LifoQueue
    lq = LifoQueue()
    lq.put(44)
    lq.put(55)
    print(lq.get())
    print(lq.get())
    
    # (3)PriorityQueue  按照优先级顺序排序
    from queue import PriorityQueue
    """
    默认按照数字大小排序,
    如果是字母,会按照ascii编码大小进行排序 从小到大
    先写先排
    """
    
    pq = PriorityQueue()
    pq.put( (12,"zhangsan") )
    pq.put( (6,"lisi") )
    pq.put( (18,"zhaoliu") )
    pq.put( (18,"wangwu") )
    
    print(pq.get())
    print(pq.get())
    print(pq.get())
    print(pq.get())
    
    # 单一的一个元素,只能是同一类型 数字
    pq = PriorityQueue()
    pq.put(13)
    pq.put(18)
    pq.put(3)
    print(pq.get())
    print(pq.get())
    print(pq.get())
    # 单一的一个元素,只能是同一类型 字符串
    pq = PriorityQueue()
    pq.put("acc")
    pq.put("aa")
    pq.put("z")
    
    print(pq.get())
    print(pq.get())
    print(pq.get())
    

    12. 新版线程池 进程池

    # ### 新版进程池 , 线程池
    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    import os,time
    # (1) 进程池 , 可以允许cpu 并行
    """
    def func(i):
    	print("Process:",i,os.getpid())
    	time.sleep(300)
    	print("Process:end")
    	return 5488
    	
    if __name__ == "__main__":
    	# cpu_count 获取的是逻辑处理器
    	print(os.cpu_count())
    	# 1.创建进程池对象
    	'''最多默认创建cpu_count这么多个进程执行任务,只创建6个进程来执行所有任务,不会再有额外的进程创建出来了'''
    	p = ProcessPoolExecutor(6)
    	# 2.异步触发进程
    	# res = p.submit(func,1)
    	# print(res) # <Future at 0x1adf3325198 state=running> 该对象存放了函数的返回值;
    	
    	# 多个任务
    	for i in range(10):
    		res = p.submit(func,i)
    	
    	
    	# 3.获取进程任务的返回值
    	res2 = res.result()
    	print(res2)
    	# 4.shutdown 等到所有子进程执行完毕之后,在向下执行 相当于join
    	p.shutdown()
    	print("主进程执行完毕")
    """
    # (2) 线程池 , as 相当于起一个别名
    """
    from threading import current_thread as cthread
    def func(i):
    	print("thread",i,cthread().ident)
    	time.sleep(3)
    	print("thread %s end" % (i))
    
    # max_workers = (os.cpu_count() or 1) * 5 默认值是cpu逻辑核心数 * 5
    '''最多默认创建(os.cpu_count() or 1) * 5 这么多个线程执行任务,不会再有额外的线程创建出来了'''
    tp = ThreadPoolExecutor()
    for i in range(50):
    	tp.submit(func,i)
    
    tp.shutdown()
    print("主线程执行结束")
    """
    # (3) 线程池的返回值
    from threading import current_thread as cthread
    """
    def func(i):
    	# 打印线程号
    	print("thread",i,cthread().ident)
    	time.sleep(3)
    	return cthread().ident
    	
    tp = ThreadPoolExecutor(5)
    lst = []
    setvar = set()
    for i in range(10):
    	res = tp.submit(func,i)
    	# 把对象都塞到列表里面,如果直接获取值会出现阻塞,就不能异步并发了,所有都放列表中,统一处理
    	lst.append(res)
    	# print(res.result())
    	
    for i in lst:
    	# 获取该线程对象的返回值
    	print(i.result())
    	setvar.add(i.result())
    
    print(setvar)
    print("主线程执行结束")
    """
    # (4) map 返回迭代器 线程池版本的高阶map函数 升级的map版本;
    def func(i):
    	time.sleep(0.2)
    	print("thread", i,cthread().ident)
    	print("thread end %s" % (i))
    	return "*" * i
    
    tp = ThreadPoolExecutor(5)
    it = tp.map(func,range(20))
    tp.shutdown()
    print("主线程执行结束")
    from collections import Iterator,Iterable
    res = isinstance(it,Iterator)
    print(res)
    
    print(list(it))
    

      

    善战者,求之于势,不责于人,故能择人而任势
  • 相关阅读:
    ViewData,ViewBag,TempData
    http和https
    Array与ArrayList
    程序员与书和视频
    技术学习的方法研究
    文章发布声明
    面向对象JAVA多态性
    嵌入式开发总结
    CSDN博客代码显示乱码的原因
    将Windows的桌面目录设置到D盘
  • 原文地址:https://www.cnblogs.com/NGU-PX/p/11440476.html
Copyright © 2011-2022 走看看