zoukankan      html  css  js  c++  java
  • 死锁,互斥锁,递归锁,线程事件Event,线程队列Queue,进程池和线程池,回调函数,协程的使用,协程的例子---day33

    1.死锁,互斥锁,递归锁

    # ###  死锁  互斥锁 递归锁
    from threading import Lock,Thread,RLock  #递归锁
    import time
    
    noddle_lock = Lock()
    kuaizi_lock = Lock()
    
    def eat1(name):
        noodle_lock.acquire()   #上锁
        print("%s拿到面条了" %(name))
        kuaizi_lock.acquire()  #上锁
        print("%s拿到筷子了"%(name))
    
        print("开始享受这碗面条...")
        time.sleep(0.5)
    
        kuaizi_lock.acquire() #解锁
        print("%s放下裤子了"%(name))
        noddle_lock.release()
        print("%s放下面条了"%(name))
    
    def eat2(name):
        kuaizi_lock.acquire()
        print("%s拿到筷子了"%(name))
        noddle_lock.acquire()
        print("%s拿到面条了"%(name))
    
        print("开始享受这碗面条了...")
        time.sleep(0.5)
    
        noddle_lock.release()
        print("%s放下面条了"%(name))
        kuaizi_lock.release()
        print("%s放下筷子了"%(name))
    
    if __name__ == '__main__':
        name_lst1 = ['王振','小白']
        name_lst2 = ['刘伟','小林']
        for name in name_lst1:
            Thread(target=eat1,args=(name,)).start()
    
        for name in name_lst2:
            Thread(target=eat2,args=(name,)).start()
    #上面的代码就是死锁
    
    #(2) 递归锁 解决线上bug  上锁和解锁要保持一对,递归锁连续上锁连续解锁也不会变成死锁,而互斥锁就不可以
    '''
    作用:
        递归锁专门用来解决死锁现象
        是临时用于快速解决项目因死锁问题不能正常运行的场景
        用来处理异常死锁的
    rlock =RLock()  #创建一个对象  上锁后再上一把锁,然后再解锁,不会变成死锁
    lock = Lock() #互斥锁没遇到release就会变成死锁,即使你上锁后又上了一把锁,然后再解锁,也是死锁
    
    def func():
        #lock上锁后下面必须跟解锁,下面的代码是阻塞状态,只上锁不解锁 死锁
        lock.acquire()  #下面两把锁是同一个对象,一个对象连续上锁就会造成死锁
        lock.acquire()
        print(333)
        lock.release()
        lock.release()
        
        #下面代码正常执行
        rlock.acquire()
        rlock.acquire()
        print(111)
        rlock.release()
        rlock.release()
        print(222)
    func()
    '''
    
    #(3) 用递归锁解决死锁
    noodle_lock = Lock()
    kuaizi_lock = Lock()
    
    noodle_lock.acquire()  #这两把锁是不同对象,所有不会造成死锁,上面代码造成死锁是同一个对象
    kuaizi_lock.acquire()
    print(1)
    noodle_lock.release()
    kuaizi_lock.release()
    print(2)
    
    #递归锁解决死锁
    noodle_lock = kuaiz_lock = RLock()  #是一把递归锁
    
    def eat1(name):
        noodle_lock.acquire()  #上锁
        print("%s拿到面条了"%(name))
        kuaizi_lock.acquire()  #上锁
        print("%s拿到筷子了"%s(name))
    
        print("开始享受这碗面条...")
        time.sleep(0.5)
    
        kuaizi_lock.release()  #解锁
        print("%s放下筷子了"%(name))
        noodle_lock.release()
        print("%s放下面条了"%(name))
    
    def eat2(name):
        kuaizi_lock.acquire()
        print("%s拿到筷子了"%(name))
        noodle_lock.acquire()  #上锁
        print("%s拿到面条了"%(name))
    
        print("开始享受这碗面条...")
        time.sleep(0.5)
    
        noodle_lock.release()
        print("%s放下面条了"%(namne))
        kuaizi_lock.release()  #解锁
        print("%s放下筷子了"%(name))
    
    if __name__ == '__main__':
        name_lst1 = ["王振","小白"]
        name_lst2 = ["刘伟","小林"]
        for name in name_lst1:
            Thread(target=eat1,args=(name,)).start()
    
        for name in name_lst2:
            Thread(target=eat2,args=(name,)).start()
    
    #(4) 互斥锁 让拿筷子和吃面条绑定成一个整体,
    #尽量使用一把锁解决问题,不要互相嵌套,否则容易死锁
    mylock =Lock()
    def eat1(name):
        mylock.acquire()  #上锁
        print("%s拿到面条了"%(name))
        print("%s拿到筷子了"%(name))
        print("开始享受这碗面条...")
        time.sleep(0.5)
        print("%s放下筷子了"%(name))
        print("%s放下面条了"%(name))
        mylock.release()  #解锁
    
    def eat2(name):
        mylock.acquire()  #上锁
        print("%s拿到筷子了"%(name))
        print("%s拿到面条了"%(name))
        print("开始享受这碗面条...")
        time.sleep(0.5)
        print("%s放下面条了"%(name))
        print("%s放下筷子了"%(name))
        mylock.release()
    
    if __name__ == '__main__':
        name_lst1 = ["王振","小白"]
        name_lst2 = ["刘伟","小林"]
        for name in name_lst1:
            Thread(target=eat1,args=(name,)).start()
    
        for name in name_lst2:
            Thread(target=eat2,args=(name,)).start()

    2.线程事件Event

    # ### 事件 Event
    from threading import Event,Thread
    '''
    e = Event()  #创建一个对象
    wait()  动态加阻塞
    clear()  将阻塞事件的值改成False
    set()  将阻塞事件的值改成True
    is_set()  获取阻塞事件的值
    '''
    
    #(1) 基本语法
    '''
    e = Event()
    print(e.is_set())  #False
    e.set()  #阻塞消失
    e.wait(3)  #放行
    print("程序在运行...")
    '''
    
    #(2) 模拟连接远程数据库
    '''连接三次数据库,如果都不成功直接抛异常'''
    import time,random
    def check(e):
        #模拟网络延迟
        time.sleep(random.randrange(1,6))
        #开始检测连接的合法性
        print("开始检测连接合法性")
        e.set()
    
    def connect(e):
        sign = False
        for i in range(1,4):
            #设置最大等待时间
            e.wait(1)
            if e.is_set():
                print("数据库连接成功...")
                sign = True
                break
            else:
                print("尝试连接数据%s次失败"%(i))
        if sign == False:
            #主动抛异常
            raise TimeoutError
    e = Event()
    
    #线程1负责执行检测任务
    Thread(target=check,args=(e,)).start()
    #线程2赋值连接任务
    Thread(target=connect,args=(e,)).start()

    3.线程队列Queue

    # ### 线程队列
    from queue import Queue  #从这个模块导入类
    '''
    put 存放
    get 取
    get_nowait()  取,没数据取不出来 报错
    put_nowait()  存,超过了队列长度 报错
    '''
    #(1) Queue
    #特点  先进先出,后进后出
    q = Queue()  #创建对象
    q.put(1)
    q.put(2)
    print(q.get())
    print(q.get())
    #print(q.get()) #取不出来  阻塞
    print(q.get_nowait()) #queue.Empty 报错
    
    q = Queue(3)  #限定长度,只能放三个值,超出则报错
    q.put(11)
    q.put(22)
    q.put(33)
    #q.put(44) #阻塞
    q.put_nowait(44) #queue.Full  报错
    
    #(2) LifoQueue  大写  先进后出  后进先出(按照栈的特点设计)
    from queue import LifoQueue
    lq = LifoQueue(3)
    lq.put(11)
    lq.put(22)
    lq.put(33)
    print(lq.get()) #33
    print(lq.get()) #22
    print(lq.get()) #11
    #先放进来的最后取出来  后放进来的先取出来
    
    #(3)PriorityQueue  按照优先级的顺序 (默认从小到大排序)
    from queue import PriorityQueue
    #如果都是数字,默认从小到大排序
    pq = PriorityQueue()
    pq.put(13)
    pq.put(3)
    pq.put(20)
    print(pq.get()) #3
    print(pq.get()) #13
    print(pq.get()) #20
    
    #如果都是字符串,按照ascii编码排序
    pq = PriorityQueue()
    pq.put('chinese')
    pq.put('american')
    pq.put('latinos')
    pq.put('blackman')
    print(pq.get()) #american
    print(pq.get()) #blackman
    print(pq.get()) #chinese
    print(pq.get()) #latinos
    
    #要么全是数字,要么全是字符串,不能混合 error
    pq2 = PriorityQueue()
    pq2.put('asd')
    pq2.put(12)
    pq2.put("中国")
    print(pq2.get())
    print(pq2.get())
    print(pq2.get())
    
    #如果是元组,会按院子第一个元素的规则进行比较
    pq3 = PriorityQueue()
    pq3.put(('wangwen',20))
    pq3.put(('liuwei',15))
    pq3.put(('xiaobai',30))
    print(pq3.get()) #('liuwei', 15)
    print(pq3.get()) #('wangwen', 20)
    print(pq3.get()) #('xiaobai', 30)

    4.进程池和线程池

    # ### 进程池和线程池
    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    import os
    import time
    
    def func(i):
        print("任务执行中...start",os.getpid())
        time.sleep(10)
        print("任务结束...end",i)
        return i
    
    #(1) ProcessPoolExecutor 进程池基本使用
    #默认如果一个进程短时间内可以完成更多的任务,就不会创建额外的新的进程,以节省资源
    if __name__ == '__main__':
        lst = []
        print(os.cpu_count()) #获取cpu逻辑核心数,本地机器的
        #(1)创建进程池对象,可以传参,约束可以创建多少进程
        #进程池里面最多创建os.cpu_count()这么多个进程,所有任务全由这几个进程完成,不会再创建额外进程
        p = ProcessPoolExecutor() #如果不写 默认就是本地机器的CPU核心数,参数写多少就是多少进程,不会再多出这个进程
    
        #(2) 异步提交任务
        for i in range(10): #表示多少人(cpu核心数)干多少活(range里面的数量)
            res = p.submit(func,i) #参数1为执行的任务,后面的参数有多个可以写多个
            #print(res.result()) #来获取当前进程池返回的结果,由异步变为同步
    
        #(3) 获取当前进程池的返回值
        for i in lst:
            print(i.result())
    
        #(4)等待所有子进程结束之后再执行
        p.shutdown()   #跟join是一样的用法
    
        print("主程序执行结束...")
    
    #(2) ThreadPoolExecutor  线程池的基本使用
    from threading import current_thread as cthread
    def func(i):
        print("thread..start..",cthread().ident,i)  # 线程的id
        time.sleep(3)
        print("thread...end..",i)
        return cthread().ident
    
    #默认如果一个线程短时间内可以完成更多的任务,就不会创建额外的新的线程,以节省资源
    if __name__ == '__main__':
        lst = []
        setvar = set()
        #(1)创建一个线程池对象
        #限制线程池最多创建os.cpu_count * 5=线程数,所有任务全由这几个线程完成,不会再创建额外线程
        tp = ThreadPoolExecutor()  #写5的话就是5个线程 #我的电脑最高开20个线程
    
        #(2) 异步提交任务,用法跟进程池差不多
        for i in range(100):
            res = tp.submit(func,i)
            lst.append(res)
    
        #(3) 获取返回值
        for i in lst:
            setvar.add(i.result())
    
        #(4)等待所有线程执行结束
        tp.shutdown()
    
        print(len(setvar),setvar)
        print("主线程执行结束...'")
    
    
    #(3)  线程池map 返回迭代器 多条主线执行
    from concurrent.futures import ThreadPoolExecutor
    from threading import current_thread as cthread
    from collections import Iterator
    
    def func(i):
        print("thread start...",cthread().ident)
        print("thread end...",i)
        #return cthread().ident
        return '*' * i
    
    if __name__ == '__main__':
        lst = []
        setvar = set()
        tp = ThreadPoolExecutor(5) #5个线程使用20个数据
        it = tp.map(func,range(20)) #要执行的任务,跟可迭代类型数据 ,返回迭代器
        #print(isinstance(it,Iterator))  #True
    
        tp.shutdown()   #等待所有子线程结束
    
        for i in it:
            print(it)
    
        '''
        for i in range(100):
            res = tp.submit(func,i) #返回对象
            lst.append(res)
        
        for i in lst:
            setvar.add(i.result())
        #等待所有线程结束
        tp.shutdown()
        print(setvar)
        '''

    5.回调函数

    # ### 回调函数
    '''
    回调函数:
        把函数当成参数传递给另外一个函数
        在当前函数执行完毕之后,最后调用一下该参数(函数),这个函数就是回调函数
    功能:
    打印状态:a属性
    支付状态:b属性
    退款状态:c属性
    转账状态:d属性
    把想要的相关成员或者相关逻辑写在自定义的函数中
    支付宝接口在正常执行之后,会调用自定义的函数来执行相应的逻辑
    那么这个函数就是回调函数
    '''
    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    from threading import current_thread as cthread
    import os,time
    
    def func1(i):
        print("Process start...",os.getpid())
        time.sleep(0.5)
        print("Process end ...",i)
        return '*' * i
    
    def func2(i):
        print("Thread start...",cthread().ident) #线程号
        time.sleep(0.5)
        print("Thread end .. ",i)
        return '*' * i
    
    def call_back1(obj):
        print("<==回调函数==>",os.getpid())
        print(obj.result())
    
    def call_back2(obj):
        print("<==回调函数==>",os.getpid())
        print(obj.result())
    
    #(1) 进程池的回调函数:由主进程执行调用完成
    if __name__ == '__main__':
        p = ProcessPoolExecutor(4)
        for i in range(10):
            res = p.submit(func1,i)
            #提交任务后创建的结果.add_done_callback(回调函数)
            # add_done_callback 可以把res本对象和回调函数自动传递到函数中来
            res.add_done_callback(call_back1) #add_done_callback得到res和call_back1然后并调用了一下call_back1,并把res传给obj
        p.shutdown()
        print("主进程结束了...",os.getpid()) #回调函数是主进程完成的
    
    #(2) 线程池的回调函数:由当前子线程执行调用完成
    if __name__ == '__main__':
        tp = ThreadPoolExecutor(4)
        for i in range(10):
            res = tp.submit(func2,i)
            #提交任务的结果.add_done_callback(回调函数)
            #add_done_callback 可以把res本对象和回调函数自动传递到函数中来
            res.add_done_callback(call_back2) # #add_done_callback得到res和call_back1,然后并调用了一下call_back1,并把res传给obj
        tp.shutdown()  #等待所有线程执行结束
        print("主线程结束了...",cthread().ident) #由当前子线程执行调用完成
    
    
    #回调函数的原型
    class Ceshi():
        def add_done_callback(self,func):
            print("执行操作1...")
            print("执行操作2...")
            func(self)
    
        def result(self):
            return 12345
    def call_back3(obj):
        print(obj)
        print(obj.result())
    obj = Ceshi()  #创建对象
    obj.add_done_callback(call_back3)
    #调用函数,并把call_back3函数当做参数传进去
    # func(self) 等于 call_bakc3(self)  等于call_back3(obj), obj.result就是对象本身调用result()

    6.协程的使用

     # ### 协程  就是线程 的一种实现方式
    '''
     首先需要安装协程模块 gevent
    '''
    
    #(1) 用协程来改写一下生产者和消费者模型
    import gevent
    def producer():
        for i in range(1000):
            yield i
    
    def counsumer(gen):
        for i in range(10):
            print(next(gen))
    #初始化生成器函数
    gen = producer()
    counsumer(gen)
    counsumer(gen)
    
    #(2) 协程的具体实现
    '''
    switch 遇到阻塞的时候只能手动调用该函数,进行函数切换,不能自动实现切换来规避IO阻塞
    '''
    from greenlet import grennlet
    import time
    
    def eat():
        print('eat 1') #第一步
        g2.switch() #手动切 #第二步切换   #第七步
        time.sleep(3)  #第八步
        print('eat 2') #第九步
    
    def play():
        print('play one') #第三步
        time.sleep(3) #第四步
        print('play two') #第五步
        g1.switch()  #再切换回去  #第六步 在切换回最开始阻塞的地方 g2.switch()
    
    g1 = greenlet(eat) #两个协程对象
    g2 = greenlet(play)
    g1.switch()  #相当于start执行
    
    #(3) gevent
    '''gevent可以自动切换,但是不能够自动识别time.sleep这样的阻塞'''
    import gevent
    import time
    def eat():
        print('eat 1')
        time.sleep(3)
        print('eat 2')
    
    def play():
        print('play one')
        time.sleep(3)
        print('play two')
    #利用gevent.spawn创建协程对象g1
    g3 = gevent.spawn(eat)
    g4 = gevent.spawn(play)
    #阻塞,必须g3,g4协程执行完毕为止才放行
    g3.join()
    g4.join()
    print("主线程执行完毕...")
    '''
    #执行结果
    eat 1
    eat 2
    play one
    play two
    主线程执行完毕...
    '''
    
    #(4) gevent.time  添加阻塞,让它实现自动切换
    import gevent
    def eat():
        print('eat 1') #1 先执行这个
        gevent.sleep(3) #2 遇到阻塞
        print('eat 2') #5  #这个阻塞时间过的比较快,然后就先执行这个
    
    def play():
        print('play one') #3 #执行这个
        gevent.time(3) #4 #遇到阻塞
        print('play two')#6     #最后等这个阻塞时间过去后执行这个
    
    #利用gevent.spawn 创建协程对象
    g5 = gevent.spawn(eat)
    g6 = gevent.spawn(play)
    #阻塞,必须g1协程,g2协程执行完毕为止才放行
    g5.join()
    g6.join()
    print("主线程执行完毕...")
    
    
    #(5) 终极大招 彻底解决不识别阻塞的问题
    from gevent import monkey
    monkey.patch_all()  #意思是把下面所有引入的模块中的阻塞识别一下
    import time
    import gevent
    def eat():
        print('eat 1')
        time.sleep(3)
        print('eat 2')
    
    def play():
        print('play one')
        time.sleep(3)
        print('play two')
    #利用gevent.spawn创建协程对象
    g7 = gevent.spawn(eat)
    g8 = gevent.spawn(play)
    #阻塞,必须两个协程执行完毕为止才放行
    g7.join()
    g8.join()
    print("主线程执行完毕...")

    7.协程的例子

    # ### 协程的例子
    '''
    #(1) spawn(函数,参数1,参数2...)  用来创建协程对象
    #(2) 协程.join  阻塞 直到某个协程任务执行完毕之后再放行
    #(3) joinall  等到所有的协程任务都执行完毕之后再放行  gevent.joinall([g1,g2]) #推荐
    #(4) value  获取协程任务中的返回值  g1.value  获取对应协程中的返回值
    '''
    from gevent import monkey
    monkey.patch_all()  #把下面所有引入的模块中的阻塞识别一下
    import time
    import gevent
    def eat():
        print('eat 1')
        time.sleep(3)
        print('eat 2')
        return '吃完了'
    
    def play():
        print('play one')
        time.sleep(3)
        print('play two')
        return '玩完了'
    #利用gevent.spawn创建协程对象g1
    g1 = gevent.spawn(eat)
    g2 = gevent.spawn(play)
    #等到g1,g2协程任务完毕之后再向下执行
    gevent.joinall([g1,g2])  #接收一个列表参数
    print('主线程执行结束...')
    print(g1.value) #获取返回值
    print(g2.value) #获取返回值
    
    
    #(2) 关于利用协程爬取数据
    '''requests 抓取页面数据模块'''
    import requests
    '''
    HTTP 状态码
        200  成功
        404  页面丢失
        400  bad request
    '''
    #基本语法
    res = requests.get('http://www.baidu.com')
    #获取状态码
    print(res.status_code)
    #获取网页中的字符编码
    print(res.apparent_encoding) #utf-8
    #设置编码集,防止乱码
    res.encoding=res.apparent_encoding
    #获取网页当中内容
    print(res.text)
    
    #
    url_list = [
    "http://www.baidu.com",
    "http://www.4399.com/",
    "http://www.7k7k.com/",
    "http://www.taobao.com/",
    "http://www.jingdong.com/"
    ]
    def get_url(url):
        response = requests.get(url)
        if response.status_code == 200:
            #print(response.text)
            time.sleep(0.1)
    #(1) 正常爬取
    starttime = time.time()
    for i in url_list:
        get_url(i)
    endtime = time.time()
    print("执行时间:",endtime - starttime) # 18.780214071273804
    
    '''
    import re
    strvar = '<img lz_src="http://i5.7k7kimg.cn/cms/cms10/20200609/113159_2868.jpg">'
    obj = re.search(r"<img lz_src='(.*?)'",strvar)
    print(obj.groups()[0])
    '''
    
    #(2) 用协程的方式爬取数据
    lst = []
    starttime = time.time()
    for i in url_list:
        g = gevent.spawn(geturl,i)
        lst.append(g)
    gevent.joinall(lst)
    endtime = time.time()
    print("执行时间:",endtime-starttime)#执行时间 2.3307271003723145
    
    '''
    利用好多进程,多线程,多协程可以让服务器运行速度更快
    并且也可以抗住更多用户的访问
    '''
  • 相关阅读:
    oracle 递归查询 查询当前选中节点的所有子节点
    sql 常见操作
    【转】VS2008制作打包程序将安装路径写入注册表
    HTML字符集大全
    Oracle左连接,右连接
    Ubuntu下root用户和user用户如何进行相互切换
    【转】 vs2008 用文件部署生成的exe安装包
    C# 中out 和 ref 关键字的区别
    【转】vs2008安装部署程序时如何设置程序开机启动
    【转】vs2008安装部署工程制作教程
  • 原文地址:https://www.cnblogs.com/weiweivip666/p/13128704.html
Copyright © 2011-2022 走看看