zoukankan      html  css  js  c++  java
  • Python之线程 3

    一 信号量

    同进程一样,semaphore管理一个内置的计数器,每当调用acquire()时内置计数器-1,调用release()时内置计数器+1。计数器不能小于0,当计数器为0时,acquire()将阻塞线程直到其他线程调用release()

    实例:(同时只有5个线程可以获得semaphore,即可以限制最大连接数为5)

    from threading import Thread,Semaphore
    import threading
    import time
    
    # def func():
    #     if sm.acquire():
    #         print (threading.currentThread().getName() + ' get semaphore')
    #         time.sleep(2)
    #         sm.release()
    def func():
        sm.acquire()
        print('%s get sm' %threading.current_thread().getName())
        time.sleep(3)
        sm.release()
    
        if __name__ == '__main__':
        sm=Semaphore(5)
        for i in range(23):
            t=Thread(target=func)
            t.start()
    

    还记得信号量和进程池的区别吗,线程也有线程池,和信号量也是那些区别

    互斥锁与信号量推荐博客

    二 事件

    ​ 同进程的一样

      线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行

    事件的基本方法:

    event.isSet():返回event的状态值;
    
    event.wait():如果 event.isSet()==False将阻塞线程;
    
    event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
    
    event.clear():恢复event的状态值为False。
    

    ​ 还记得我们进程那里的事件用的什么例子吗,是不是红绿灯啊,这次我们不讲红绿灯的例子了,换个新的!

    ​ 例如,有多个工作线程尝试链接MySQL,我们想要在链接前确保MySQL服务正常才让那些工作线程去连接MySQL服务器,如果连接不成功,都会去尝试重新连接。那么我们就可以采用threading.Event机制来协调各个工作线程的连接操作

    我们先模拟一个场景:

    首先起两个线程:

    ​ 第一个线程的用处:连接数据库,那么我这个线程需要等待一个信号,告诉我我们之间的网络是可以连通的。

    ​ 第二个线程的用处:检测与数据库之间的网络是否联通,并发送一个可联通或者不可联通的信号。

    from threading import Thread,Event
    import threading
    import time,random
    def conn_mysql():
        count=1
        while not event.is_set():
            if count > 3:
                raise TimeoutError('链接超时') #自己发起错误
            print('<%s>第%s次尝试链接' % (threading.current_thread().getName(), count))
            event.wait(0.5) #
            count+=1
        print('<%s>链接成功' %threading.current_thread().getName())
    
    
    def check_mysql():
        print('33[45m[%s]正在检查mysql33[0m' % threading.current_thread().getName())
        t1 = random.randint(0,3)
        print('>>>>',t1)
        time.sleep(t1)
        event.set()
    if __name__ == '__main__':
        event=Event()
        check = Thread(target=check_mysql)
        conn1=Thread(target=conn_mysql)
        conn2=Thread(target=conn_mysql)
    
        check.start()
        conn1.start()
        conn2.start()
    

    三 条件Condition

    使得线程等待,只有满足某条件时,才释放n个线程

    import time
    from threading import Thread,RLock,Condition,current_thread
    
    def func1(c):
        c.acquire(False) #固定格式
        # print(1111)
    
        c.wait()  #等待通知,
        time.sleep(3)  #通知完成后大家是串行执行的,这也看出了锁的机制了
        print('%s执行了'%(current_thread().getName()))
    
        c.release()
    
    if __name__ == '__main__':
        c = Condition()
        for i in range(5):
            t = Thread(target=func1,args=(c,))
            t.start()
    
        while True:
            num = int(input('请输入你要通知的线程个数:'))
            c.acquire() #固定格式
            c.notify(num)  #通知num个线程别等待了,去执行吧
            c.release()
    
    #结果分析: 
    # 请输入你要通知的线程个数:3
    # 请输入你要通知的线程个数:Thread-1执行了 #有时候你会发现的你结果打印在了你要输入内容的地方,这是打印的问题,没关系,不影响
    # Thread-3执行了
    # Thread-2执行了
    

    四 定时器(了解)

    定时器,指定n秒后执行某个操作,这个做定时任务的时候可能会用到。

    import time
    from threading import Timer,current_thread #这里就不需要再引入Timer
    import threading
    
    def hello():
        print(current_thread().getName())
        print("hello, world")
        # time.sleep(3) #如果你的子线程的程序执行时间比较长,那么这个定时任务也会乱,当然了,主要还是看业务需求
    t = Timer(10, hello)  #创建一个子线程去执行后面的函数
    t.start()  # after 1 seconds, "hello, world" will be printed
    # for i in range(5):
    #     t = Timer(2, hello)
    #     t.start()  
    #     time.sleep(3) #这个是创建一个t用的时间是2秒,创建出来第二个的时候,第一个已经过了两秒了,所以你的5个t的执行结果基本上就是2秒中,这个延迟操作。
    
    print(threading.active_count())
    print('主进程',current_thread().getName())
    

    五 线程队列

    线程之间的通信我们用列表行不行呢,当然行,那么队列和列表有什么区别呢?

      queue队列 :使用import queue,用法与进程Queue一样

    queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.
    

    1.class queue.Queue(maxsize=0) - 先进先出

    import queue #不需要通过threading模块里面导入,直接import queue就可以了,这是python自带的
    #用法基本和我们进程multiprocess中的queue是一样的
    q=queue.Queue()
    q.put('first')
    q.put('second')
    q.put('third')
    # q.put_nowait() #没有数据就报错,可以通过try来搞
    print(q.get())
    print(q.get())
    print(q.get())
    # q.get_nowait() #没有数据就报错,可以通过try来搞
    '''
    结果(先进先出):
    first
    second
    third
    '''
    

    2.class queue.LifoQueue(maxsize=0) - 后进先出

    import queue
    
    q=queue.LifoQueue() #队列,类似于栈,栈我们提过吗,是不是先进后出的顺序啊
    q.put('first')
    q.put('second')
    q.put('third')
    # q.put_nowait()
    
    print(q.get())
    print(q.get())
    print(q.get())
    # q.get_nowait()
    '''
    结果(后进先出):
    third
    second
    first
    '''
    

    3.class queue.PriorityQueue(maxsize=0) - 存储数据时可设置优先级的队列

    import queue
    
    q=queue.PriorityQueue()
    #put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
    q.put((-10,'a'))
    q.put((-5,'a'))  #负数也可以
    # q.put((20,'ws'))  #如果两个值的优先级一样,那么按照后面的值的acsii码顺序来排序,如果字符串第一个数元素相同,比较第二个元素的acsii码顺序
    # q.put((20,'wd'))
    # q.put((20,{'a':11})) #TypeError: unorderable types: dict() < dict() 不能是字典
    # q.put((20,('w',1)))  #优先级相同的两个数据,他们后面的值必须是相同的数据类型才能比较,可以是元祖,也是通过元素的ascii码顺序来排序
    
    q.put((20,'b'))
    q.put((20,'a'))
    q.put((0,'b'))
    q.put((30,'c'))
    
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.get())
    '''
    结果(数字越小优先级越高,优先级高的优先出队):
    '''
    

    这三种队列都是线程安全的,不会出现多个线程抢占同一个资源或数据的情况。

    六 标准模块—concurrent.futures

    ​ 早期的时候我们没有线程池,现在python提供了一个新的标准或者说内置的模块,这个模块里面提供了新的线程池和进程池,之前我们说的进程池是在multiprocessing里面的,现在这个在这个新的模块里面,他俩用法上是一样的。

    为什么要将进程池和线程池放到一起呢,是为了统一使用方式,使用threadPollExecutorProcessPollExecutor的方式一样,而且只要通过这个concurrent.futures导入就可以直接用他们两个了

    concurrent.futures模块提供了高度封装的异步调用接口
    
    ThreadPoolExecutor:线程池,提供异步调用
    ProcessPoolExecutor: 进程池,提供异步调用
    
    Both implement the same interface, which is defined by the abstract Executor class.
    

    基本方法

    # submit(fn, *args, **kwargs)
    异步提交任务
    
    # map(func, *iterables, timeout=None, chunksize=1) 
    取代for循环submit的操作
    
    # shutdown(wait=True) 
    相当于进程池的pool.close()+pool.join()操作
    wait=True,等待池内所有任务执行完毕回收完资源后才继续
    wait=False,立即返回,并不会等待池内的任务执行完毕
    但不管wait参数为何值,整个程序都会等到所有任务执行完毕
    submit和map必须在shutdown之前
    
    # result(timeout=None)
    取得结果
    
    # add_done_callback(fn)
    回调函数
    

    ThreadPoolExecutor的简单使用

    # ThreadPoolExecutor的简单使用
    import time
    import os
    import threading
    from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
    
    def func(n):
        time.sleep(2)
        print('%s打印的:' % (threading.get_ident()), n)
        return n * n
    
    tpool = ThreadPoolExecutor(max_workers=5)  # 默认一般起线程的数据不超过CPU个数*5
    # tpool = ProcessPoolExecutor(max_workers=5) #进程池的使用只需要将上面的ThreadPoolExecutor改为ProcessPoolExecutor就行了,其他都不用改
    # 异步执行
    t_lst = []
    for i in range(5):
        t = tpool.submit(func, i)  # 提交执行函数,返回一个结果对象,i作为任务函数的参数 def submit(self, fn, *args, **kwargs):  可以传任意形式的参数
        t_lst.append(t)  #
        # print(t.result())
        # 这个返回的结果对象t,不能直接去拿结果,不然又变成串行了,可以理解为拿到一个号码,等所有线程的结果都出来之后,我们再去通过结果对象t获取结果
    tpool.shutdown()  # 起到原来的close阻止新任务进来 + join的作用,等待所有的线程执行完毕
    print('主线程')
    for ti in t_lst:
        print('>>>>', ti.result())
    
    # 我们还可以不用shutdown(),用下面这种方式
    # while 1:
    #     for n,ti in enumerate(t_lst):
    #         print('>>>>', ti.result(),n)
    #     time.sleep(2) #每个两秒去去一次结果,哪个有结果了,就可以取出哪一个,想表达的意思就是说不用等到所有的结果都出来再去取,可以轮询着去取结果,因为你的任务需要执行的时间很长,那么你需要等很久才能拿到结果,通过这样的方式可以将快速出来的结果先拿出来。如果有的结果对象里面还没有执行结果,那么你什么也取不到,这一点要注意,不是空的,是什么也取不到,那怎么判断我已经取出了哪一个的结果,可以通过枚举enumerate来搞,记录你是哪一个位置的结果对象的结果已经被取过了,取过的就不再取了
    
    # 结果分析: 打印的结果是没有顺序的,因为到了func函数中的sleep的时候线程会切换,谁先打印就没准儿了,但是最后的我们通过结果对象取结果的时候拿到的是有序的,因为我们主线程进行for循环的时候,我们是按顺序将结果对象添加到列表中的。
    # 37220打印的: 0
    # 32292打印的: 4
    # 33444打印的: 1
    # 30068打印的: 2
    # 29884打印的: 3
    # 主线程
    # >>>> 0
    # >>>> 1
    # >>>> 4
    # >>>> 9
    # >>>> 16
    
    

    ProcessPoolExecutor的使用

    只需要将这一行代码改为下面这一行就可以了,其他的代码都不用变
    tpool = ThreadPoolExecutor(max_workers=5) #默认一般起线程的数据不超过CPU个数*5
    # tpool = ProcessPoolExecutor(max_workers=5)
    
    你就会发现为什么将线程池和进程池都放到这一个模块里面了,用法一样
    

    map的使用

    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    import threading
    import os,time,random
    def task(n):
        print('%s is runing' %threading.get_ident())
        time.sleep(random.randint(1,3))
        return n**2
    
    if __name__ == '__main__':
    
        executor=ThreadPoolExecutor(max_workers=3)
    
        # for i in range(11):
        #     future=executor.submit(task,i)
    
        s = executor.map(task,range(1,5)) #map取代了for+submit
        print([i for i in s])
    

    回调函数的应用

    回调函数的简单应用:

    import time
    import os
    import threading
    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    
    def func(n):
        time.sleep(2)
        return n*n
    
    def call_back(m):
        print('结果为:%s'%(m.result()))
    
    tpool = ThreadPoolExecutor(max_workers=5)
    t_lst = []
    for i in range(5):
        t = tpool.submit(func,i).add_done_callback(call_back)
    

    爬虫:

    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    from multiprocessing import Pool
    import requests
    import json
    import os
    
    def get_page(url):
        print('<进程%s> get %s' %(os.getpid(),url))
        respone=requests.get(url)
        if respone.status_code == 200:
            return {'url':url,'text':respone.text}
    
    def parse_page(res):
        res=res.result()
        print('<进程%s> parse %s' %(os.getpid(),res['url']))
        parse_res='url:<%s> size:[%s]
    ' %(res['url'],len(res['text']))
        with open('db.txt','a') as f:
            f.write(parse_res)
    
    
    if __name__ == '__main__':
        urls=[
            'https://www.baidu.com',
            'https://www.python.org',
            'https://www.openstack.org',
            'https://help.github.com/',
            'http://www.sina.com.cn/'
        ]
    
        # p=Pool(3)
        # for url in urls:
        #     p.apply_async(get_page,args=(url,),callback=pasrse_page)
        # p.close()
        # p.join()
    
        p=ProcessPoolExecutor(3)
        for url in urls:
            p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一个future对象obj,需要用obj.result()拿到结果
    
  • 相关阅读:
    操作标签的属性和属性值 table表格
    dom基本获取 标签文本操作
    延时器 清除延时器
    倒计时
    电子时钟
    时间戳
    设定时间的方法
    内置对象Date
    对象的基本特点
    终于有人把云计算、大数据和 AI 讲明白了【深度好文】
  • 原文地址:https://www.cnblogs.com/russellyoung/p/python-zhi-xian-cheng-3--xin-hao-liang-shi-jian-xi.html
Copyright © 2011-2022 走看看