zoukankan      html  css  js  c++  java
  • 线程、i/o多路复用

    一  线程基础

    1. 线程与进程的区别:

    只有cpython解释器中含有GIL;因为线程的切换速度比进程块,又因为线程存在GIL,不存在多线程并行,所以计算密集采用多进程处理,而i/o密集采用多线程处理

    线程无需if __name__ == '__main__':语句

    • 进程是资源分配的最小单位,线程是程序执行的最小单位(资源调度的最小单位)
    • 进程有自己的独立地址空间,建立数据表来维护代码段、堆栈段和数据段。而线程是共享进程中的数据的,使用相同的地址空间,因此CPU切换速度线程比进程块的多
    • 多线程共享同一进程下的全局变量、静态变量等数据。
    • 多线程程序只要有一个线程死掉,整个进程也死掉了,而一个进程死掉并不会对另外一个进程造成影响,因为进程有自己独立的地址空间。
    • 因为GIL(同一时间,只允许一个线程占用cpu,只有一个线程在运行)锁的存在,线程没有真正的并行,而多进程能够在多核下实现并行
    • 守护进程和守护线程的结束条件不一致

    二  Thread模块

    1. 模块介绍

      multiprocess模块的完全模仿了threading模块的接口,二者在使用层面,有很大的相似性。

    Thread实例对象的方法
      # isAlive(): 返回线程是否活动的。
      # getName(): 返回线程名。
      # setName(): 设置线程名。
    
    threading模块提供的一些方法:
      # threading.currentThread(): 返回当前的线程对象。
      # threading.current_thread():返回当前的线程对象,和上述方法一样。
      # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
      # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
      # threading.active_count():返回正在运行的线程数量,和上述方法一样。

    2. 互斥锁Lock、递归锁RLock

      因为GIL的存在,同一时间只能有一个线程运行,而多线程共享进程内的全局变量,修改数据时会混乱,GIL锁的是线程

    from threading import Thread
    import time,os
    def func():
        global num
        time.sleep(0.01)
        num -=1
    
    num = 100
    for i in range(100):
        th = Thread(target=func,)
        th.start()
    print(num)

    42    #数据混乱

    2.1线程锁的实质

      进程互斥锁和线程互斥锁都是锁定的语句加锁部分与未加锁部分还是并发的

    from threading import Thread,Lock
    import time,os
    def func(l):
        l.acquire()
        time.sleep(5)
        print('func')
        l.release()
    def func2(l):
        l.acquire()
        print('2222222222')
        l.release()
    l = Lock()
    th = Thread(target=func,args = (l,))
    th2 = Thread(target=func2,args=(l,))
    th.start()
    time.sleep(1)
    th2.start()

    2.2 死锁与递归锁RLock

      死锁:因争夺资源而造成的一种互相等待的现象

      递归锁:相同的锁只需一把钥匙,互容;互斥锁,相同的锁多少人来开,需要多少把钥匙,不互容

    from threading import Lock, RLock
    mutexA = Lock()
    mutexA = RLock()
    mutexA.acquire()
    mutexA.acquire()
    print(123)
    mutexA.release()
    mutexA.release()

      死锁实例:

    from threading import Thread,Lock,RLock
    import time
    def man(l_door,l_paper):
        l_door.acquire()
        print('man_door')
        time.sleep(1)
        l_paper.acquire()
        print('man_paper')
        time.sleep(1)
        l_paper.release()
        l_door.release()
    def woman(l_door,l_paper):
        l_paper.acquire()
        time.sleep(1)
        print('woman_paper')
        l_door.acquire()
        time.sleep(1)
        print('woman_door')
        l_door.release()
        l_paper.release()
    
    # l_door =l_paper = Lock()  #同一把锁,不互容
    
    # l_door = Lock()
    # l_paper = Lock()
    
    # l_door =  RLock()
    # l_paper = RLock()
    
    l_door = l_paper = RLock()  #同一把锁,互容
    
    th_man = Thread(target=man,args=(l_door,l_paper))
    th_woman = Thread(target=woman,args=(l_door,l_paper))
    th_man.start()
    th_woman.start()

    man_door
    man_paper
    woman_paper
    woman_door

    3. Semaphore信号量 

      参考进程信号量

    4. Event事件

    event.isSet():返回event的状态值;
    event.wait():如果 event.isSet()==False将阻塞线程;
    event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
    event.clear():恢复event的状态值为False。

    5. Condition 条件

    Condition被称为条件变量,除了提供与Lock类似的acquirerelease方法外,还提供了waitnotify方法。
    线程首先acquire一个条件变量,然后判断一些条件。如果条件不满足则wait;如果条件满足,进行一些处理改变条件后,通过notify方法通知其他线程,
    其他处于wait状态的线程接到通知后会重新判断条件。不断的重复这一过程,从而解决复杂的同步问题。

      条件实例,条件的本质就是人为的控制锁的开启时刻

    from threading import Condition,Thread
    import time # 条件被创建之初,wait()处于堵塞状态 # notify(int) 通知,允许多把锁打开 def func(con,i): con.acquire() con.wait()  # 所有线程都阻塞在这里 print('在第%s个循环里'%i) con.release()  #并不是还钥匙,而是把钥匙扔掉 if __name__ == '__main__': con = Condition() for i in range(10): t = Thread(target=func,args=(con,i)) t.start() while 1:        #主线程会和子线程抢夺钥匙,易出错 num = int(input('>>>')) con.acquire() con.notify(num)# 允许几个线程可以执行了 con.release()
         time.sleep(2)

    6. Timer定时器

      定时器,指定n秒后执行某个操作

    from threading import Timer
    def func():
        print("开始计时收费了")
    t = Timer(1, func)
    t.start()  # after 1 seconds, "hello, world" will be printed

    7. 守护线程

      无论是进程还是线程,都遵循:守护xx会等待主xx运行完毕后被销毁。需要强调的是:运行完毕并非终止运行

    • 对主进程来说,运行完毕指的是主进程代码运行完毕
    • 对主线程来说,运行完毕指的是主线程所在的进程内所有非守护线程统统运行完毕
    from threading import Thread
    from multiprocessing import Process
    import time
    def func():
        time.sleep(2)
        print('这里是子线程,就是守护线程,就是这么diao,还活着呢!')
    def func1():
        time.sleep(4)
        print('这里是子线程,并不是守护线程')
    if __name__ == '__main__':
        # t = Thread(target=func)
        t = Process(target=func)
        # t1 = Thread(target=func1)
        t1 = Process(target=func1)
        t.daemon=True
        t.start()
        t1.start()
        print('这里是父线程')

    进程结果:

    这里是父线程
    这里是子线程,并不是守护线程

    线程结果:

    这里是父线程
    这里是子线程,就是守护线程,就是这么diao,还活着呢!
    这里是子线程,并不是守护线程

    三  queue

      进程中一般使用from multiprocessing import Queue,该队列能实现不同进程的数据共享;而import queue主要用在线程中,用法与Queue相同

    1. Queue()

    import queue
    q=queue.Queue()
    q.put('first')
    q.put('second')
    q.put('third')
    
    print(q.get())
    print(q.get())
    print(q.get())
    

    2. LifoQueue()

    import queue
    
    q=queue.LifoQueue()
    q.put('first')
    q.put('second')
    q.put('third')
    
    print(q.get())
    print(q.get())
    print(q.get())

    3. PriorityQueue()

      存储数据时可设置优先级的队列,数据类型不一致时,按照ascii码排序,越小等级越高,等级相同时遵循先进先出的原则。

    import queue
    q=queue.PriorityQueue()
    #put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
    q.put((20,'a'))
    q.put((10,'b'))
    q.put((30,'c'))
    
    print(q.get())
    print(q.get())
    print(q.get())

    四  线程池

    1. concurrent.futures模块介绍

    #1 介绍
    concurrent.futures模块提供了高度封装的异步调用接口
    ThreadPoolExecutor:线程池,提供异步调用
    ProcessPoolExecutor: 进程池,提供异步调用
    Both implement the same interface, which is defined by the abstract Executor class.
    
    #2 基本方法
    #submit(fn, *args, **kwargs)
    异步提交任务
    
    #map(func, *iterables, timeout=None, chunksize=1) 
    取代for循环submit的操作,返回的结果是生成器,用next()取值
    
    #shutdown(wait=True) 
    相当于进程池的pool.close()+pool.join()操作
    wait=True,等待池内所有任务执行完毕回收完资源后才继续
    wait=False,立即返回,并不会等待池内的任务执行完毕
    但不管wait参数为何值,整个程序都会等到所有任务执行完毕
    submit和map必须在shutdown之前
    
    #result(timeout=None)
    取得结果,相当于进程中的get(),两者都会使异步变同步
    
    #add_done_callback(fn)
    回调函数

    2. 线程池效率对比

    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    from multiprocessing import  Pool
    import time
    def func(num):
        sum = 0
        for i in range(num):
            sum  +=i**2
        # print(sum)
    if __name__ == '__main__':
        # pool = ThreadPoolExecutor(20)
        # start = time.time()
        # for i in range(10000):
        #     pool.submit(func,i)    等于pool.apply()或pool.apply_asycn()
        # pool.shutdown()    等于Pool中的pool.close()和pool.join()
        # print(time.time()-start)
    
        # pool = ProcessPoolExecutor(5)
        # start = time.time()
        # for i in range(10000):
        #     pool.submit(func, i)
        # pool.shutdown()
        # print(time.time() - start)
    
        pool = Pool(5)
        start = time.time()
        for i in range(10000):
            pool.apply_async(func, (i,))
        pool.close()
        pool.join()
        print(time.time() - start)

    3. 回调函数

    from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
    import os
    def func(num):
        sum = 0
        for i in range(num):
            sum += i ** 2
        return sum
    def call_back(var):
        print(os.getpid())
    if __name__ == '__main__':
        print(os.getpid())
        pool = ThreadPoolExecutor(2)
        for i in range(10):
            pool.submit(func, i).add_done_callback(call_back)
        pool.shutdown()

       线程中的回调函数是由子线程调用的

    from threading import current_thread
    from concurrent.futures import ThreadPoolExecutor
    import time
    def func():
        time.sleep(0.5)
        print('son:',current_thread())
    def call_back(var):
        print('call_back:',current_thread())
    if __name__ == '__main__':
        th = ThreadPoolExecutor(2)
        for i in range(4):
            th.submit(func,).add_done_callback(call_back)
        th.shutdown()
        print('father',current_thread())
  • 相关阅读:
    SD_WebImage-03-多线程+下载任务放入非主线程执行
    NSOperationQueue_管理NSOperation-02-多线程
    CALayer小结-基本使用00-UI进阶
    XMPP-UI进阶-01
    XMPP总结-UI进阶-00
    UI控件总结-UI初级
    转场动画-01-day4
    暂停-开始动画-核心动画-08-day4
    核心动画-04-CALayer隐式动画
    Android开发技术周报 Issue#71
  • 原文地址:https://www.cnblogs.com/mushuiyishan/p/10537444.html
Copyright © 2011-2022 走看看