zoukankan      html  css  js  c++  java
  • Python全栈之路系列----之-----线程池/死锁/信号量/事件/定时器/线程q(队列)

    线程池

    线程池各种方法的使用

    #1 介绍
    concurrent.futures模块提供了高度封装的异步调用接口
    ThreadPoolExecutor():线程池,提供异步调用
    ProcessPoolExecutor(): 进程池,提供异步调用
    Both implement the same interface, which is defined by the abstract Executor class.
    
    #2 基本方法
    #submit(fn, *args, **kwargs)
    异步提交任务
    
    #map(func, *iterables, timeout=None, chunksize=1) 
    取代for循环submit的操作
    
    #shutdown(wait=True) 
    相当于进程池的pool.close()+pool.join()操作
    wait=True,等待池内所有任务执行完毕回收完资源后才继续
    wait=False,立即返回,并不会等待池内的任务执行完毕
    但不管wait参数为何值,整个程序都会等到所有任务执行完毕
    submit和map必须在shutdown之前
    
    #result(timeout=None)
    取得结果
    
    #add_done_callback(fn)
    回调函数各
    各种方法
    #用法
    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    
    import os,time,random
    #定义一个函数
    def task(n):
        print('%s is runing' %os.getpid())
        time.sleep(random.randint(1,3))
        return n**2
    
    if __name__ == '__main__':
      #配置线程池里的线程,不写默认cpu*5,默认20
        executor=ProcessPoolExecutor(max_workers=3)
    
        futures=[]
        for i in range(11):
            #往线程池里面提交任务,就是函数名和参数
            future=executor.submit(task,i)
            futures.append(future)
    #主进程等待进程池里的进程运行结束后再结束
        executor.shutdown(True)
        print('+++>')
        for future in futures:
            print(future.result())    
    ProcessPoolExecutor/进程池
    # from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    # import time,random,os
    # import threading
    #
    # def work(n):
    #     print('%s is running' %threading.current_thread().getName())
    #     time.sleep(random.randint(1,3))
    #     return n**2
    # if __name__ == '__main__':
    #     executor=ThreadPoolExecutor()
    #     futrues=[]
    #     for i in range(30):
    #         future=executor.submit(work,i)
    #         futrues.append(future)
    #     executor.shutdown(wait=True)
    #     print('主')
    #     for obj in futrues:
    #         print(obj.result())
    ThreadPoolExecutor/线程池
    #map方法的使用
    # from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    # import time,random,os
    #
    # def work(n):
    #     print('%s is running' %os.getpid())
    #     time.sleep(random.randint(1,3))
    #     return n**2
    # if __name__ == '__main__':
    #     executor=ProcessPoolExecutor()
    #     # futures=[]
    #     # for i in range(10):
    #     #     future=executor.submit(work,i)
    #     #     futures.append(future)
    #
    #     executor.map(work,range(10))
    #     executor.shutdown(wait=True)
    #     print('主')
    map的用法
    #回调函数
    # from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    # import requests
    # import os
    # import time,random
    # def get(url):
    #     print('%s GET %s' %(os.getpid(),url))
    #     response=requests.get(url)
    #     time.sleep(random.randint(1,3))
    #     if response.status_code == 200:
    #         print('%s DONE %s' % (os.getpid(), url))
    #         return {'url':url,'text':response.text}
    #
    # def parse(future):
    #     dic=future.result()
    #     print('%s PARSE %s' %(os.getpid(),dic['url']))
    #     time.sleep(1)
    #     res='%s:%s
    ' %(dic['url'],len(dic['text']))
    #     with open('db.txt','a') as f:
    #         f.write(res)
    #
    # if __name__ == '__main__':
    #     urls=[
    #         'https://www.baidu.com',
    #         'https://www.python.org',
    #         'https://www.openstack.org',
    #         'https://help.github.com/',
    #         'http://www.sina.com.cn/'
    #     ]
    #     p=ProcessPoolExecutor()
    #     start_time=time.time()
    #     objs=[]
    #     for url in urls:
    #         # obj=p.apply_async(get,args=(url,),callback=parse) #主进程负责干回调函数的活
    #         p.submit(get,url).add_done_callback(parse)
    #        parse_page拿到的是一个future对象obj,需要用obj.result()拿到结果
    #
    #     # p.close()
    #     # p.join()
    #
    #     print('主',(time.time()-start_time))
        
    回调函数

    死锁/递归锁

      通过引用来进行多次计数,只要计数不为0(不释放锁),其他线程不能抢锁和运行

    #递归锁的特点:
    #1:可以acquire多次
    #2:每acquire一次,计数加1,只要计数不为0,就不能被其他线程抢到
    #互斥锁的特点:
    #1:只能acquire一次
    
    mutexA=mutexB=RLock()
    # mutexA=mutexB=Lock()
    class MyThread(Thread):
        def run(self):
            self.f1()
            self.f2()
    
        def f1(self):
            mutexA.acquire()
            print('%s 抢 A锁' %self.name)
            mutexB.acquire()
            print('%s 抢 B锁' %self.name)
            mutexB.release()
            mutexA.release()
    
        def f2(self):
            mutexB.acquire()
            print('%s 抢 B锁' %self.name)
    
            time.sleep(0.1)
            mutexA.acquire()
            print('%s 抢 A锁' %self.name)
            mutexA.release()
    
            mutexB.release()
    
    
    if __name__ == '__main__':
        for i in range(10):
            t=MyThread()
            t.start()
    死锁/递归锁

    信号量

      限制最大链接数与线程池/进程池不一样/    信号量是有一堆线程/进程去抢定义好的信号量

    而池是规定好只能有多少进程或者线程,   通俗点就是一个定义有几个茅坑,一个定义同是只能有几个人排队而且不能有新人加入

    from threading import Semaphore,Thread,current_thread
    import time,random
    def task():
        with sm:
            print('%s 正在上厕所' %current_thread().getName())
            time.sleep(random.randint(1,3))
    
    if __name__ == '__main__':
        sm=Semaphore(5)
        for i in range(11):
            t=Thread(target=task)
            t.start()
    信号量

    事件Event

      线程之间的协同工作,一个检测,合格之后运行,共享数据间的运行,当一个线程运行到需要另一个线程开始执行时就需要用到事件,无非就是发送一个信号量,FALSE或者TURE

      多个线程间干一个活才能用到,独立的线程间用不到

    from threading import Event,current_thread,Thread
    import time
    
    e=Event()
    
    def check():
        print('%s 正在检测' %current_thread().getName())
        time.sleep(5)
        e.set()
    
    def conn():
        count=1
        while not e.is_set():
            if count > 3:
                raise TimeoutError('连接超时')
            print('%s 正在等待%s连接' % (current_thread().getName(),count))
            e.wait(timeout=1)
            count+=1
        print('%s 开始连接' % current_thread().getName())
    
    if __name__ == '__main__':
        t1=Thread(target=check)
        t2=Thread(target=conn)
        t3=Thread(target=conn)
        t4=Thread(target=conn)
    
        t1.start()
        t2.start()
        t3.start()
        t4.start()
    View Code

    定时器Timer

      定时器,指定n秒后执行某操作

    from threading import Timer
     
     
    def hello():
        print("hello, world")
     
    t = Timer(1, hello)
    t.start()  # after 1 seconds, "hello, world" will be printed
    定时器

    线程queue

      队列:先进先出    堆栈:后进先出    

    class queue.Queue(maxsize=0) #先进先出
    import queue
    
    q=queue.Queue()
    q.put('first')
    q.put('second')
    q.put('third')
    
    print(q.get())
    print(q.get())
    print(q.get())
    '''
    结果(先进先出):
    first
    second
    third
    '''
    View Code

    class queue.LifoQueue(maxsize=0) #last in fisrt out

    import queue
    
    q=queue.LifoQueue()
    q.put('first')
    q.put('second')
    q.put('third')
    
    print(q.get())
    print(q.get())
    print(q.get())
    '''
    结果(后进先出):
    third
    second
    first
    '''
    View Code

    class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列

    #VIP用户设置优先级链接

    import queue
    
    q=queue.PriorityQueue()
    #put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
    q.put((20,'a'))
    q.put((10,'b'))
    q.put((30,'c'))
    
    print(q.get())
    print(q.get())
    print(q.get())
    '''
    结果(数字越小优先级越高,优先级高的优先出队):
    (10, 'b')
    (20, 'a')
    (30, 'c')
    '''
    View Code
  • 相关阅读:
    Myeclipse快捷键
    Resharper 的快捷键
    jQuery模块自由组合方案
    CTE
    Nhibernate3以上单元测试
    第三方控件下载篇
    10 个 Visual Studio 原生开发的调试技巧
    Nhibernate为hbm.xml配置文件添加智能提示(VS2010)
    Win7下nginx默认80端口被System占用
    Nginx本地配置
  • 原文地址:https://www.cnblogs.com/zgd1234/p/7678661.html
Copyright © 2011-2022 走看看