zoukankan      html  css  js  c++  java
  • 网络编程基础--多线程---concurrent.futures 模块---事件Event---信号量Semaphore---定时器Timer---死锁现象 递归锁----线程队列queue

    1 concurrent.futures 模块:

    # from abc import abstractmethod,ABCMeta
    #
    # class A(metaclass=ABCMeta):
    #     def mai(self):
    #         pass
    # @classmethod
    # class B(A):
    #     def mai(self):
    #         pass
    
    # 抽象类----定义子类的一些接口标准 @abstractmethod
    
    
    
    
    ===================     进程池 与 线程池   ===================
    
    引入池 的 概念是为了 控制个数
    
     concurrent.futures   =======>>> 异步调用
    
    
    
    # 1  ====== shutdown(wait=True)====>> close()+join()     shutdown(wait=False)=======>> close()
    # 2 ====== submit ===>> apply_async()
    # 3 ======  map=====
    # 4 ======add_done_callback(fn)
    
    
    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    import time,random,os
    
    
    ===============================计算多的  进程池==================================================
    
    # def work(n):
    #     print('%s is running'%os.getpid())
    #     time.sleep(random.randint(1,3))
    #     return n*n
    # #               1
    # if __name__ == '__main__':
    #     executor_p=ProcessPoolExecutor(4)
    #     futures=[]
    #     for i in range(10):
    #         future=executor_p.submit(work,i)    # 得到异步提交的future对象结果
    #         futures.append(future)
    #     executor_p.shutdown(wait=True)           #  executor_p ==== shutdown
    #     print('main')
    #     for obj in futures:
    #         print(obj.result())   #  结果
    #
    # #             2  =====  with as
    # if __name__ == '__main__':
    #     with ProcessPoolExecutor(4) as e:
    #         futures=[]
    #         for i in range(6):
    #             future=e.submit(work,i)
    #             futures.append(future )
    #     print('Main')
    #     for obj in futures:
    #         print(obj.result())
    
    #     ================================  简写  ===========================================
    
    # from concurrent.futures import ThreadPoolExecutor
    # from threading import current_thread,enumerate,active_count
    #
    # def work(n):
    #     print('%s is running'%current_thread())
    #     return n**2
    #
    # if __name__ == '__main__':
    #     with ThreadPoolExecutor() as executor:
    #         futures=[executor.submit(work,i) for i in range(40)]
    #
    #     for i in futures:
    #         print(i.result())
    
    =======================================  IO多的  线程池 ===========================
    #
    # def work(n):
    #     print('%s is running'%os.getpid())
    #     time.sleep(random.randint(1,3))
    #     return n*n
    # #               1
    # if __name__ == '__main__':
    #     executor_p=ThreadPoolExecutor(30)
    #     futures=[]
    #     for i in range(10):
    #         future=executor_p.submit(work,i)    # 得到异步提交的future对象结果
    #         futures.append(future)
    #     executor_p.shutdown(wait=True)           #  executor_p ==== shutdown
    #     print('main')
    #     for obj in futures:
    #         print(obj.result())   #  结果
    #
    # #             2  =====  with as
    # if __name__ == '__main__':
    #     with ThreadPoolExecutor(40) as e:
    #         futures=[]
    #         for i in range(6):
    #             future=e.submit(work,i)
    #             futures.append(future )
    #     print('Main')
    #     for obj in futures:
    #         print(obj.result())
    
    =========================== map ========循环提交  多次结果====================================
    
    #
    from concurrent.futures import ThreadPoolExecutor # from threading import current_thread,enumerate,active_count # # def work(n): # print('%s is running'%current_thread()) # return n**2 # # if __name__ == '__main__': # with ThreadPoolExecutor() as executor: # futures=[executor.map(work,range(40))] # [1,2,3,4,5] # executor.map(work,range(6)) 把后边的可迭代对象当做 参数传给work # ===============================future.add_done_callback(fn)========回调函数============================================= # # from concurrent.futures import ProcessPoolExecutor # # def work(n): # return n**n # # def work2(m): # m=m.result() # print( m/2) # # if __name__ == '__main__': # with ProcessPoolExecutor() as e: # for i in range(6): # e.submit(work,i).add_done_callback(work2) #======================future.exeption(4)========等待时间--等待超时异常=============================================

    2.事件Event:

       两个进程之间的协同工作
    
    
    from threading import Event,current_thread,Thread
    import time
    e
    =Event() def check(): print('%s 正在检测'%current_thread().getName()) time.sleep(3) e.set() # 确认设置 def conn(): count=1 while not e.is_set(): if count >3: raise TimeoutError('连接超时') print('%s 正在等待连接'%current_thread().getName()) e.wait(timeout=0.1) # 超时时间限制 尝试次数 count+=1 print('%s 正在连接'%current_thread().getName()) if __name__ == '__main__': t1=Thread(target=check) t2=Thread(target=conn) t3=Thread(target=conn) t4=Thread(target=conn) t1.start() t2.start() t3.start() t4.start()

    3.信号量Semaphore:

    from threading import Semaphore
    
    信号量 Semaphore ---本质是一把锁======控制同时运行的进程数量(后台可以开启更多的进程)
    
    
    进程池  ----同时运行的进程数量()

    4.-定时器Timer:

    # from threading import Timer
    #
    # def hello():
    #     print('hello')
    #
    # t=Timer(4,hello)  # Timer--是Thread的子类  4秒后启动
    # t.start()

    5.死锁现象 递归锁:

    ============================== 死锁现象 =================================
    
      递归锁 RLock---可以aquire多次        每aquire一次 计数 加一,只要计数不为一 就不能被其他线程抢到
    
      互斥锁 ----只能aquire一次
    
    from threading import Thread,Lock,current_thread
    import time
    mutexA=Lock()
    mutexB=Lock()
    
    class Mythread(Thread):
        def run(self):
            self.f1()
            self.f2()
    
        def f1(self):
            with mutexA:
                print('%s 抢到了A'%current_thread().getName())
            with mutexB:
                print('抢到了B'%current_thread().getName())
        def f2(self):
            with mutexB:
                print('抢到了B'%current_thread().getName())
            time.sleep(0.1)
            with mutexA:
                print('抢到了A'%current_thread().getName())
    
    if __name__ == '__main__':
        for i in range(6):
            t = Mythread()
            t.start()

    6.线程队列queue:

    import queue
    
    q=queue.Queue(4)
    
    q.put(2)
    q.put(2)
    q.put(2)
    q.put(2)
    
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.get())
    
    #  1  优先级队列
    q=queue.PriorityQueue(3)
    q.put((10,'tsd'))
    q.put((4,'ard'))    #    相同的优先级 比较 后面的数据
    q.put((10,'asd'))   #     数字越小 优先级越高
    
    print(q.get())
    print(q.get())
    print(q.get())
    
    #  2 先进后出 ---堆栈 队列
    
    q=queue.LifoQueue(3)
    读书使人心眼明亮
  • 相关阅读:
    【Linux基础】linux下修改ls显示的时间格式
    【Teradata】gtwglobal查看
    【Teradata】tdlocaledef修改默认日期配置
    【Linux基础】文件处理实例
    【Linux基础】awk命令
    【teradata】强制解锁
    第1节:保存文档
    Centos7安装MySQL数据库
    MyBatis框架之异常处理
    spring事务源码分析
  • 原文地址:https://www.cnblogs.com/big-handsome-guy/p/7676749.html
Copyright © 2011-2022 走看看