zoukankan      html  css  js  c++  java
  • 线程

    进程是资源分配的最小单位,线程是CPU调度的最小单位.
    每一个进程中至少有一个线程。 
    线程与进程的区别可以归纳为以下4点:
      1)地址空间和其它资源(如打开文件):进程间相互独立,同一进程的各线程间共享。某进程内的线程在其它进程不可见。
      2)通信:进程间通信IPC,线程间可以直接读写进程数据段(如全局变量)来进行通信——需要进程同步和互斥手段的辅助,以保证数据的一致性。
      3)调度和切换:线程上下文切换比进程上下文切换要快得多。
      4)在多线程操作系统中,进程不是一个可执行的实体。

    线程的特点

    1)轻型实体

    2)独立调度和分派的基本单位。

    3)共享进程资源。

    4)可并发执行。

    python中的线程
    一个进程中的多个线程能够并行么? 不行
    原因:
    Cpython解释器 内部有一把全局解释器锁 GIL
    所以线程不能充分的利用多核
    同一时刻用一个进程中的线程只有一个能被CPU执行
    GIL锁 确实是限制了你的程序效率
    GIL锁 目前 是能够帮助你在线程的切换中提高效率
    
    

    threading模块

    线程和进程速度比较

    import os
    import time
    from threading import Thread
    from multiprocessing import Process
    def func(i):
        print('子 :',i,os.getpid())
    
    if __name__ == '__main__':
        start = time.time()
        t_lst = []
        for i in range(100):
            t = Thread(target=func,args=(i,))
            t.start()
            t_lst.append(t)
        for t in t_lst:t.join()
        tt = time.time()-start
    
        start = time.time()
        t_lst = []
        for i in range(100):
            t = Process(target=func, args=(i,))
            t.start()
            t_lst.append(t)
        for t in t_lst: t.join()
        pt = time.time() - start
        print(tt,pt)
    Thread实例对象的方法
      # isAlive(): 返回线程是否活动的。
      # getName(): 返回线程名。
      # setName(): 设置线程名。
    
    threading模块提供的一些方法:
      # threading.currentThread(): 返回当前的线程变量。
      # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
      # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
    守护线程
    import time
    from threading import Thread
    def func1():
        while True:
            time.sleep(0.5)
            print(123)
    
    def func2():
        print('func2 start')
        time.sleep(3)
        print('func2 end')
    
    t1 = Thread(target=func1)
    t2 = Thread(target=func2)
    t1.setDaemon(True)
    t1.start()
    t2.start()
    print('主线程的代码结束')
    守护线程 是在主线程代码结束之后,还等待了子线程执行结束才结束
    主线程结束 就意味着主进程结束
    主线程等待所有的线程结束
    主线程结束了之后 守护线程随着主进程的结束自然结束了

     互斥锁

    未加锁部分并发执行,加锁部分串行执行,速度慢,数据安全
    from threading import Thread,Lock
    n = 0
    def func(lock):
        global n
        for i in range(1500000):
            lock.acquire()
            n  -= 1
            lock.release()
    
    def func2(lock):
        global n
        for i in range(1500000):
            lock.acquire()
            n += 1
            lock.release()
    
    if __name__ == '__main__':
        t_lst = []
        lock = Lock()
        for i in range(10):
            t2 = Thread(target=func2,args=(lock,))
            t = Thread(target=func,args=(lock,))
            t.start()
            t2.start()
            t_lst.append(t)
            t_lst.append(t2)
        for t in t_lst:
            t.join()
        print('-->',n)

    死锁

    是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。

    此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁

    import time
    from threading import Thread,Lock
    
    noodle_lock = Lock()
    fork_lock = Lock()
    def eat1(name):
        noodle_lock.acquire()
        print('%s拿到面条了'%name)
        fork_lock.acquire()
        print('%s拿到叉子了'%name)
        print('%s吃面'%name)
        time.sleep(0.3)
        fork_lock.release()
        print('%s放下叉子'%name)
        noodle_lock.release()
        print('%s放下面'%name)
    
    def eat2(name):
        fork_lock.acquire()
        print('%s拿到叉子了' % name)
        noodle_lock.acquire()
        print('%s拿到面条了'%name)
        print('%s吃面'%name)
        time.sleep(0.3)
        noodle_lock.release()
        print('%s放下面'%name)
        fork_lock.release()
        print('%s放下叉子' % name)
    
    if __name__ == '__main__':
        name_list = ['alex','wusir']
        name_list2 = ['nezha','yuan']
        for name in name_list:
            Thread(target=eat1,args=(name,)).start()
        for name in name_list2:
            Thread(target=eat2,args=(name,)).start()

     递归锁

    import time
    from threading import Thread,RLock
    
    fork_lock = noodle_lock = RLock()
    def eat1(name):
        noodle_lock.acquire()
        print('%s拿到面条了'%name)
        fork_lock.acquire()
        print('%s拿到叉子了'%name)
        print('%s吃面'%name)
        time.sleep(0.3)
        fork_lock.release()
        print('%s放下叉子'%name)
        noodle_lock.release()
        print('%s放下面'%name)
    #
    def eat2(name):
        fork_lock.acquire()
        print('%s拿到叉子了' % name)
        noodle_lock.acquire()
        print('%s拿到面条了'%name)
        print('%s吃面'%name)
        time.sleep(0.3)
        noodle_lock.release()
        print('%s放下面'%name)
        fork_lock.release()
        print('%s放下叉子' % name)
    
    if __name__ == '__main__':
        name_list = ['alex','wusir']
        name_list2 = ['nezha','yuan']
        for name in name_list:
            Thread(target=eat1,args=(name,)).start()
        for name in name_list2:
            Thread(target=eat2,args=(name,)).start()

    在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。

    这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁。递归锁并不是一个好的解决方案,死锁现象的发生不是互斥锁的问题,而是程序员的逻辑有问题导致的,递归锁能够快速的解决死锁问题。

    实际生产中,应该迅速恢复服务 递归锁替换互斥锁,在接下来的时间中慢慢把递归锁替换成互斥锁,能够完善代码的逻辑,提高代码的效率。

    信号量

    同进程的一样

    Semaphore管理一个内置的计数器,
    每当调用acquire()时内置计数器-1;
    调用release() 时内置计数器+1;
    计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。

    与进程池是完全不同的概念,进程池Pool(4),最大只能产生4个进程,而且从头到尾都只是这四个进程,不会产生新的,而信号量是产生一堆线程/进程

    import time
    from threading import Semaphore,Thread
    
    def func(index,sem):
        sem.acquire()
        print(index)
        time.sleep(1)
        sem.release()
    
    if __name__ == '__main__':
        sem = Semaphore(5)
        for i in range(10):
            Thread(target=func,args=(i,sem)).start()

    事件

    同进程的一样

    线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行

    import time
    import random
    from threading import Event,Thread
    def check(e):
        print('开始检测数据库连接')
        time.sleep(random.randint(1,5))  # 检测数据库连接
        e.set()  # 成功了
    
    def connect(e):
        for i in range(3):
            e.wait(0.5)
            if e.is_set():
                print('数据库连接成功')
                break
            else:
                print('尝试连接数据库%s次失败'%(i+1))
        else:
            raise TimeoutError
    
    e = Event()
    Thread(target=connect,args=(e,)).start()
    Thread(target=check,args=(e,)).start()

    条件

    使得线程等待,只有满足某条件时,才释放n个线程。

    Python提供的Condition对象提供了对复杂线程同步问题的支持。Condition被称为条件变量,除了提供与Lock类似的acquire和release方法外,还提供了wait和notify方法。

    线程首先acquire一个条件变量,然后判断一些条件。如果条件不满足则wait;如果条件满足,进行一些处理改变条件后,通过notify方法通知其他线程,

    其他处于wait状态的线程接到通知后会重新判断条件。不断的重复这一过程,从而解决复杂的同步问题。

    from threading import Condition,Thread
    def func(con,index):
        print('%s在等待'%index)
        con.acquire()
        con.wait()
        print('%s do something'%index)
        con.release()
    
    con = Condition()
    for i in range(10):
        t = Thread(target=func,args=(con,i))
        t.start()
    count = 10
    while count > 0:
        num= int(input('>>>'))
        con.acquire()
        con.notify(num)
        count -= num
        con.release()

    线程队列

    先进先出
    import queue
    
    q=queue.Queue()
    q.put('first')
    q.put('second')
    q.put('third')
    
    print(q.get())
    print(q.get())
    print(q.get())
    '''
    结果(先进先出):
    first
    second
    third
    '''

     后进先出

    import queue
    
    q=queue.LifoQueue()
    q.put('first')
    q.put('second')
    q.put('third')
    
    print(q.get())
    print(q.get())
    print(q.get())
    '''
    结果(后进先出):
    third
    second
    first
    '''

     存储数据时可设置优先级的队列

    from queue import PriorityQueue
    pq = PriorityQueue()
    pq.put((15,'abc'))
    pq.put((5,'ghi'))
    pq.put((12,'def'))
    pq.put((12,'aaa'))
    
    print(pq.get())
    print(pq.get())
    print(pq.get())
    put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高,数字一样时,按asc码的大小排序。

    Python标准模块--concurrent.futures

    #1 介绍
    concurrent.futures模块提供了高度封装的异步调用接口
    ThreadPoolExecutor:线程池,提供异步调用
    ProcessPoolExecutor: 进程池,提供异步调用
    Both implement the same interface, which is defined by the abstract Executor class.
    
    #2 基本方法
    #submit(fn, *args, **kwargs)
    异步提交任务
    
    #map(func, *iterables, timeout=None, chunksize=1) 
    取代for循环submit的操作
    
    #shutdown(wait=True) 
    相当于进程池的pool.close()+pool.join()操作
    wait=True,等待池内所有任务执行完毕回收完资源后才继续
    wait=False,立即返回,并不会等待池内的任务执行完毕
    但不管wait参数为何值,整个程序都会等到所有任务执行完毕
    submit和map必须在shutdown之前
    
    #result(timeout=None)
    取得结果
    
    #add_done_callback(fn)
    回调函数

    获取返回值

    import time
    from concurrent.futures import ThreadPoolExecutor
    from threading import current_thread as cthread
    
    def func(i):
        print('thread',i,cthread().ident)
        time.sleep(1)
        print('thread %s end'%i)
        return i* '*'
    
    tp = ThreadPoolExecutor(5)
    ret_l = []
    for i in range(20):
        ret = tp.submit(func,i)
        ret_l.append(ret)
    for ret in ret_l:
        print(ret.result())
    print('主线程')

    map用法

    import time
    from concurrent.futures import ThreadPoolExecutor
    def func(i):
        print('thread',i)
        time.sleep(1)
        print('thread %s end'%i)
        return i* '*'
    
    tp = ThreadPoolExecutor(5)
    res = tp.map(func,range(20))
    for i in res:print(i)

    回调函数

    import os
    import time
    from concurrent.futures import ProcessPoolExecutor
    from threading import current_thread as cthread
    # # 线程池的回调函数 子线程完成的
    def func(i):
        print('thread',i,os.getpid())
        time.sleep(1)
        print('thread %s end'%i)
        return i* '*'
    
    def call_back(arg):
        print('call back : ',os.getpid())
        print('ret : ',arg.result())
    
    if __name__ == '__main__':
        tp = ProcessPoolExecutor(5)
        ret_l = []
        for i in range(20):
            tp.submit(func,i).add_done_callback(call_back)
        print('主线程',os.getpid())
    
    
     
  • 相关阅读:
    mysql的存储过程
    一份php高级工程师的面试题,我每天看一点点
    php的常用函数(持续更新)
    php 中文字符串截取
    php递归遍历文件目录
    ajax timeout 断网处理
    angular 刷新问题
    angular state中templateUrl 路径的模板
    angular请求传递不了数据
    截取字符串 substring substr slice
  • 原文地址:https://www.cnblogs.com/yidashi110/p/9688277.html
Copyright © 2011-2022 走看看