zoukankan      html  css  js  c++  java
  • 线程基础、线程池

    一、线程的概念

    在传统操作系统中,每个进程有一个地址空间,而且默认有一个控制进程,该控制线程可以执行代码从而创建新的线程,该控制线程的执行周期就代表该进程的执行周期。

    进程只是一个资源单位,不是执行单位。在进程中真正的执行单位是线程;一个进程中默认有一个控制线程,线程没有主线程、辅线程之分(进程有主进程);一个进程中有多个线程,所有的线程可以共享还进程的地址空间。

    进程只是用来把资源集中到一起(进程只是一个资源单位,或者说资源集合),而线程才是cpu上的执行单位。

    多线程(即多个控制线程)的概念是,在一个进程中存在多个控制线程,多个控制线程共享该进程的地址空间,相当于一个车间内有多条流水线,都共用一个车间资源。

    二、线程的创建开销小

    这是因为在创建进程的时候需要调用操作系统去分配一个内存空间之后才会开启一个进程,而一个进程中的线程是共享同一个进程的内存空间的,在创建线程的时候不需要调用操作系统来开辟新的空间,所以创建线程的开销小。

    三、进程与线程之间的区别

    1、线程的创建开销小于进程,创建速度快

    2、同一进程下的多个线程共享该进程的地址空间

    四、使用多线程的原因

    多线程指的是在一个进程中开启多个线程,简单的讲:如果多个任务共用一块地址空间,那必须在一个进程内开启多个线程。详细的讲分为4点:

    1、多线程共享一个进程的地址空间

    2、线程比进程更轻量级,线程比进程更容易创建,可撤销;在许多操作系统中,创建一个线程比创建一个进程要快10~100倍,在有大量线程需要动态和快速修改时,这一特性很有用。

    3、若多个线程都是cpu密集型的,那么并不能获得性能上的增强,但是如果存在大量的计算和大量的I/O处理,拥有多个线程允许这些活动彼此重叠运行,从而会加快程序执行的速度。

    4、在多cpu系统中,为了最大限度的利用多核,可以开启多个线程,这比开进程开销要小的多。(这条并不适用于python)

    五、开启线程的两种方式

    # 方法一:
    from threading import Thread
    import time,os
    def task():
        print('%s is runing'%os.getpid())
        time.sleep(2)
        print('%s is done'%os.getpid())
    if __name__ == '__main__':
        t = Thread(target=task,)
        t.start()
        print('')
    # 方法二:
    from threading import Thread
    import time,os
    class Mythread(Thread):
        def __init__(self,name):
            super().__init__()
            self.name = name
        def run(self):
            print('%s is runing'%os.getpid())
            time.sleep(1)
            print('%s is done'%os.getpid())
    if __name__ == '__main__':
        t = Mythread('alex')
        t.start()
        print('')

    六、线程与进程的关系

    1、一个进程内不开进程也不开其他线程(这是因为在进程开启之后会有一个默认的控制进程)

      主线程结束,该进程就结束了

    2、当一个进程内开启子进程,不开启其他线程的时候:

      当主线程结束时,主进程要等,等所有的子进程运行完毕,将子进程的空间进行回收

    3、当一个进程中没有开启子进程,但是开启了多个线程:

      主线程结束并不意味着进程的结束,进程的结束指的是该进程内所有的线程都运行完毕,才应该回收进程。

    七、在一个进程下开启多个线程与在一个进程下开启多个子进程的区别

    # 查看线程中的ppid , pid
    from threading import Thread
    import time,os
    def task():
        print('parents: %s self: %s '%(os.getppid(),os.getpid()))
        time.sleep(2)
    if __name__ == '__main__':
        t = Thread(target=task,)
        t.start()
        print('',os.getppid(),os.getpid())
    # parents: 7588 self: 7748 
    # 主 7588 7748
    # 进程的ppid , pid
    from multiprocessing import Process
    import time,os
    def task():
        print('parents: %s self: %s '%(os.getppid(),os.getpid()))
        time.sleep(2)
    if __name__ == '__main__':
        t = Process(target=task,)
        t.start()
        print('',os.getppid(),os.getpid())
    # 主 7588 11020
    # parents: 11020 self: 4228 
    # 线程之间内存空间共享
    from threading import Thread
    n=100
    def task():
        global n
        n=0
    if __name__ == '__main__':
        t = Thread(target=task,)
        t.start()
        t.join()
        print('',n)
    # 主 0
    # 进程直接内存空间隔离
    from multiprocessing import Process
    n=100
    def task():
        global n
        n=0
    if __name__ == '__main__':
        t = Process(target=task,)
        t.start()
        t.join()
        print('',n)
    # 主 100

    八、线程对象的其他方法和属性

    # name
    # current_thread().getName()
    from threading import Thread,current_thread
    import time
    def task():
        print('%s is runing'%current_thread().getName())  # 其中current_thread()代表当前线程
        time.sleep(2)
        print('%s is done'%current_thread().getName())
    if __name__ == '__main__':
        t = Thread(target=task)
        t.start()
        print(t.name)
        print('',current_thread().getName())
    # 结果
    # Thread-1 is runing
    # Thread-1
    # 主 MainThread
    # Thread-1 is done
    from threading import Thread,current_thread,enumerate,active_count
    import time
    def task():
        print('%s is runing'%current_thread().getName())  # 其中current_thread()代表当前线程
        time.sleep(2)
        print('%s is done'%current_thread().getName())
    if __name__ == '__main__':
        t = Thread(target=task)
        t.start()
        print(enumerate())  # 查看当前活着的线程
        print(enumerate()[0].getName())  
        print(active_count())  # 查看当前或者的线程数
        print('',current_thread().getName())
    # 结果
    # Thread-1 is runing
    # [<_MainThread(MainThread, started 10708)>, <Thread(Thread-1, started 2404)>]
    # MainThread
    # 2
    # 主 MainThread
    # Thread-1 is done

    九、线程池

    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    from threading import current_thread
    import time,random
    def task(n):
        print('%s is running'%current_thread().getName())
        time.sleep(random.randint(1,3))
        print('%s is done'%current_thread().getName())
        return n**2
    if __name__ == '__main__':
        # t = ProcessPoolExecutor()  # 不指定的话默认为cpu的核数
        import os
        print(os.cpu_count())  # 查看cpu的核数
        t = ThreadPoolExecutor(2)  # 不指定的话默认为cpu的核数*5
        objs = []
        for i in range(10):
            obj = t.submit(task,i)
            objs.append(obj)
        t.shutdown(wait=True)
        for obj in objs:
            print(obj.result())
        print('',current_thread().getName())
    # 结果:
    # 4
    # <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000226E09816A0>_0 is running
    # <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000226E09816A0>_1 is running
    # 
    # <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000226E09816A0>_1 is done
    # <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000226E09816A0>_1 is running
    # 
    # <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000226E09816A0>_1 is done
    # <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000226E09816A0>_1 is running
    # <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000226E09816A0>_0 is done
    # <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000226E09816A0>_0 is running
    # 
    # <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000226E09816A0>_1 is done
    # <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000226E09816A0>_1 is running
    # 
    # <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000226E09816A0>_0 is done
    # <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000226E09816A0>_0 is running
    # 
    # <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000226E09816A0>_0 is done
    # <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000226E09816A0>_0 is running
    # <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000226E09816A0>_1 is done
    # <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000226E09816A0>_1 is running
    # 
    # <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000226E09816A0>_1 is done
    # <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000226E09816A0>_1 is running
    # 
    # <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000226E09816A0>_0 is done
    # <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000226E09816A0>_1 is done
    # 0
    # 1
    # 4
    # 9
    # 16
    # 25
    # 36
    # 49
    # 64
    # 81
    # 主 MainThread

    当问题的规模比较大,大到无法直接将进程和线程一次性全部开启的时候,使用池的概念作为限制保证机器在一个合理的范围内将任务做完。

    十、回调机制

    # 用线程做
    import requests,time
    from concurrent.futures import ThreadPoolExecutor
    from threading import current_thread
    def get(url):
        print('%s GET %s' % (current_thread().getName(), url))
        re = requests.get(url)
        time.sleep(2)
        if re.status_code == 200:
            return {'url': url, 'text': re.text}
    def parse(obj):
        res = obj.result()
        print('[%s] <%s> (%s)' % (current_thread().getName(), res['url'], len(res['text'])))
    if __name__ == '__main__':
        urls = ['http://www.baidu.com',
                'http://www.jd.com',
                'http://www.tmall.com']
        t = ThreadPoolExecutor(2)
        for url in urls:
            t.submit(get,url).add_done_callback(parse)
        t.shutdown(wait=True)
        print('')
    # <concurrent.futures.thread.ThreadPoolExecutor object at 0x000001E657CC1908>_0 GET http://www.baidu.com
    # <concurrent.futures.thread.ThreadPoolExecutor object at 0x000001E657CC1908>_1 GET http://www.jd.com
    # [<concurrent.futures.thread.ThreadPoolExecutor object at 0x000001E657CC1908>_0] <http://www.baidu.com> (2381)
    # <concurrent.futures.thread.ThreadPoolExecutor object at 0x000001E657CC1908>_0 GET http://www.tmall.com
    # [<concurrent.futures.thread.ThreadPoolExecutor object at 0x000001E657CC1908>_1] <http://www.jd.com> (124541)
    # [<concurrent.futures.thread.ThreadPoolExecutor object at 0x000001E657CC1908>_0] <http://www.tmall.com> (212080)
    #
    # 用进程做
    import requests,time,os
    from concurrent.futures import ProcessPoolExecutor
    def get(url):
        print('%s GET %s' % (os.getpid(), url))
        re = requests.get(url)
        time.sleep(2)
        if re.status_code == 200:
            return {'url': url, 'text': re.text}
    def parse(obj):
        res = obj.result()
        print('[%s] <%s> (%s)' % (os.getpid(), res['url'], len(res['text'])))
    if __name__ == '__main__':
        urls = ['http://www.baidu.com',
                'http://www.jd.com',
                'http://www.tmall.com']
        t = ProcessPoolExecutor(2)
        for url in urls:
            t.submit(get,url).add_done_callback(parse)
        t.shutdown(wait=True)
        print('')
    # 9876 GET http://www.baidu.com
    # 2396 GET http://www.jd.com
    # 9876 GET http://www.tmall.com
    # [7576] <http://www.baidu.com> (2381)
    # [7576] <http://www.jd.com> (124541)
    # [7576] <http://www.tmall.com> (212080)
    #

     十一、守护进程:无论是进程还是线程,都遵循:守护xxx会等待主xxx运行完毕之后被销毁

    from multiprocessing import Process
    import os,time,random
    def task():
        print('%s is running'%os.getpid())
        time.sleep(random.randint(1,4))
        print('%s is done'%os.getpid())
    if __name__ == '__main__':
        p = Process(target = task)
        p.daemon = True
        p.start()
        p.join()
        print('主‘,os.getpid())
    # 9260 is running
    # 9260 is done
    # 主 6884

    其中的守护进程是p.daemon  = True,但是这句必须要放在p.start()之前,且守护进程不能开启子进程。

    举例说明守护进程的应用场景:

    假设有两个任务要做,且需要实现并发的效果,使用进程的话就可以让主进程执行一个任务,然后开启一个子进程执行一个任务

    如果这两个任务毫无关系,那么就用一般的方式实现并发就好;如果这两个任务有关系,主进程的任务在执行完成之后,子进程的任务没有存在的意义了,那么该子进程就应该在开启之前就被设定称为守护进程。

    但是当有多个进程,且只有一个守护进程时:

    from multiprocessing import Process
    import os,time,random
    def task():
        print('%s is running'%os.getpid())
        time.sleep(random.randint(1,3))
        print('%s is done'%os.getpid())
    if __name__ == '__main__':
        p = Process(target=task)
        p1 = Process(target=task)
        p2 = Process(target=task)
        p.daemon = True  # 1、必须在p.start()之前2、守护进程不能开启子进程
        p.start()
        p1.start()
        p2.start()
        p.join()
        print('',os.getpid())
    # 11128 is running
    # 9872 is running
    # 5968 is running
    # 11128 is done
    # 主 5632
    # 5968 is done
    # 9872 is done

    此时的主进程在执行完自己的代码之后还不会’死亡‘,它需要等待其他的非守护进程运行完毕之后回收它们的资源,而守护进程在运行完毕之后还是会释放自己的资源的,此时就出现了以上的运行结果,守护进程先于主进程释放自己的资源。

    需要强调的是:运行完毕并非终止运行

    # 1、对主进程来说,运行完毕指的是主进程代码的运行完毕
    # 2、对主进程来说,运行完毕指的是主线程所在的进程内所有的非守护线程全部运行完毕,主线程才算运行完毕

    详细解释:

    # 1、主进程在其代码结束之后就已经算运行完毕了(守护进程在此时就会被回收),然后主进程会一直等待非守护的子进程都运行完毕后回收子进程的资源(否则就会产生僵尸进程),才会结束。
    # 2、主线程在其他非守护线程运行完毕之后才算运行完毕(守护线程在此时就会被回收)。这是因为主线程的结束意味着进程的结束,进程整体的资源将全部被回收,而进程必须要保证非守护线程全部运行完毕之后才能够结束。

    迷惑人的例子:

    from multiprocessing import Process
    import time
    def foo():
        print(123)
        time.sleep(1)
        print('end123')
    def bar():
        print(456)
        time.sleep(3)
        print('end456')
    if __name__ == '__main__':
        p1 = Process(target=foo)
        p2 = Process(target=bar)
        p1.daemon = True
        p1.start()
        p2.start()
        print('main——————')
    
    # main——————
    # 456
    # end456
    from threading import Thread
    import time
    def foo():
        print(123)
        time.sleep(1)
        print('end123')
    def bar():
        print(456)
        time.sleep(3)
        print('end456')
    if __name__ == '__main__':
        p1 = Thread(target=foo)
        p2 = Thread(target=bar)
        p1.daemon = True
        p1.start()
        p2.start()
        print('main——————')
    
    # 123
    # 456
    # main——————
    # end123
    # end456

    十二、互斥锁

    互斥锁的本质都是将并发运行变成串行,一次来控制同一时间内共享数据只能被一个任务修改,进而保证数据安全。且可以肯定的一点是:保护不同的数据安全就应该加上不同的锁。

    互斥锁——进程

    from multiprocessing import Process,Lock
    import os,time,random
    def task(mutex):
        mutex.acquire()
        print('%s print 1'%os.getpid())
        time.sleep(random.randint(1,3))
        print('%s print 2'%os.getpid())
        time.sleep(random.randint(1,3))
        print('%s print 3'%os.getpid())
        mutex.release()
    if __name__ == '__main__':
        mutex = Lock()
        p1 = Process(target=task,args=(mutex,))
        p2 = Process(target=task,args=(mutex,))
        p3 = Process(target=task,args=(mutex,))
        p1.start()
        p2.start()
        p3.start()
    
    # 520 print 1
    # 520 print 2
    # 520 print 3
    # 11164 print 1
    # 11164 print 2
    # 11164 print 3
    # 5044 print 1
    # 5044 print 2
    # 5044 print 3

    以上代码的运行效果看上去像是串行,但是在本质上与串行不一样,这是因为完全的串行是某一个进程没有运行完的情况下其他的所有进程都是没有办法开启的;但是使用互斥锁的话在运行到互斥锁的时候其他的所有进程都还是在开启的,只是各个进程在互斥锁这一块的时候需要对资源进行竞争,在互斥锁之前的所有资源都是可以同时被各个进程使用的。

    互斥锁——线程

    from threading import Thread,Lock
    import time
    n = 100
    def task():
        global n
        with mutex:
            temp = n
            time.sleep(0.1)
            n = temp-1
    if __name__ == '__main__':
        mutex = Lock()
        t_l = []
        for i in range(100):
            t = Thread(target=task)
            t_l.append(t)
            t.start()
        for j in t_l:
            j.join()
        print(n)
    
    # 0

    有时候会将互斥锁与进程池搞混,觉得它们是一样的,只要记住一个最主要的区别就好:

    模拟抢票:

    from multiprocessing import Process,Lock
    import json,os,time,random
    def search():
        with open('db.txt',encoding='utf-8') as f:
            dic = json.load(f)
            print('%s 剩余票数 %s'%(os.getpid(),dic['count']))
    def get():
        with open('db.txt',encoding='utf-8') as read_f:
            dic = json.load(read_f)
        if dic['count'] > 0:
            dic['count']-=1
            time.sleep(random.randint(1,3))
            with open('db.txt','w',encoding='utf-8') as write_f:
                json.dump(dic,write_f)
                print('%s 抢票成功'%os.getpid())
    def task(mutex):
        search()
        with mutex:
            get()
    if __name__ == '__main__':
        mutex = Lock()
        for i in range(10):
            p = Process(target=task,args=(mutex,))
            p.start()
            p.join()
    
    # 7960 剩余票数 2
    # 7960 抢票成功
    # 2984 剩余票数 1
    # 2984 抢票成功
    # 10236 剩余票数 0
    # 6608 剩余票数 0
    # 8908 剩余票数 0
    # 6992 剩余票数 0
    # 4524 剩余票数 0
    # 8700 剩余票数 0
    # 10956 剩余票数 0
    # 172 剩余票数 0

    十三、信号量

    同进程的一样

    Semaphore管理一个内置的计数器,

    每当调用acquire()时内置函数-1;

    调用release()时内置计数器+1;计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。

    实例:(同时只有3个线程可以获得semaphore,即可以限制最大连接数为3)

    from multiprocessing import Process,Semaphore
    import time,random,os
    def task(sm):
        with sm:
            print('%s 上厕所'%os.getpid())
            time.sleep(random.randint(1,3))
            print('%s 上完厕所'%os.getpid())
    if __name__ == '__main__':
        sm = Semaphore(3)
        for i in range(10):
            p = Process(target=task,args=(sm,))
            p.start()
    
    # 9376 上厕所
    # 10848 上厕所
    # 5784 上厕所
    # 10848 上完厕所
    # 8296 上厕所
    # 5784 上完厕所
    # 172 上厕所
    # 9376 上完厕所
    # 10236 上厕所
    # 8296 上完厕所
    # 9860 上厕所
    # 10236 上完厕所
    # 10792 上厕所
    # 9860 上完厕所
    # 1136 上厕所
    # 172 上完厕所
    # 6140 上厕所
    # 10792 上完厕所
    # 1136 上完厕所
    # 6140 上完厕所

    互斥锁:在开启进程的时候其他所有的进程都是并行的,且是有几个进程就开启几个进程,运行结果看起来是串行的。

    进程池:假设进程池中选择同时开启4个进程,那么每次只有4个进程在执行,其他的进程只能等待,运行的结果是多个进程结果穿插打印出来的。

    十四、进程Queue与线程queue

    进程队列:

    # 进程队列
    from multiprocessing import Queue
    q = Queue(3)
    q.put({'a':1})
    q.put('xxxx')
    q.put(4)
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.get())
    # {'a': 1}
    # xxxx
    # 4
    # 此时的代码没有运行完毕,
    # 这是因为这时用put放入的数据个数小于从队列中get出来的个数,
    # 运行完第三个get之后,就停在这里等待put数据进入队列

    线程队列:

    import queue
    q = queue.Queue(3)
    q.put({'a':1})
    q.put('xxxx')
    q.put(4)
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.get())
    # {'a': 1}
    # xxxx
    # 4
    # 此时的代码没有运行完毕,
    # 这是因为这时用put放入的数据个数小于从队列中get出来的个数,
    # 运行完第三个get之后,就停在这里等待put数据进入队列

    优先级队列

    import queue
    q = queue.PriorityQueue(3)
    q.put((10,{'a':1}))
    q.put((-1,{'b':'xxx'}))
    # q.put((1, {'b': 3}))
    q.put((-2, {'c': 'xxxx'}))
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.get())
    # (-2, {'c': 'xxxx'})
    # (-1, {'b': 'xxx'})
    # (10, {'a': 1})
    # 此时的代码没有运行完毕,
    # 这是因为这使用put放入的数据个数小于从队列中get出来的个数,
    # 运行完第三个get之后,就停在这里等待put数据进入队列

     堆栈

    import queue
    q = queue.LifoQueue(3)
    q.put({'a': 1})
    q.put({'b': 2})
    q.put({'c': 3})
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.get())
    
    # {'c': 3}
    # {'b': 2}
    # {'a': 1}
    # 先进后出

    十五、生产者消费者模型

    from multiprocessing import Process,Queue
    import time,random,os
    def procducer(q):
        for i in range(10):
            res = '包子 %s'%i
            time.sleep(0.6)
            q.put(res)
            print('%s 生产了 %s'%(os.getpid(),res))
    def consumer(q):
        while True:
            res = q.get()
            if res is None:
                break
            print('%s 吃了 %s'%(os.getpid(),res))
            time.sleep(random.randint(1,3))
    if __name__ == '__main__':
        q = Queue()
        p = Process(target=procducer,args=(q,))
        c = Process(target=consumer,args=(q,))
        p.start()
        c.start()
        p.join()
        q.put(None)
        print('')
    # 9944 生产了 包子 0
    # 10944 吃了 包子 0
    # 9944 生产了 包子 1
    # 10944 吃了 包子 1
    # 9944 生产了 包子 2
    # 9944 生产了 包子 3
    # 10944 吃了 包子 2
    # 9944 生产了 包子 4
    # 9944 生产了 包子 5
    # 9944 生产了 包子 6
    # 9944 生产了 包子 7
    # 9944 生产了 包子 8
    # 10944 吃了 包子 3
    # 9944 生产了 包子 9
    #
    # 10944 吃了 包子 4
    # 10944 吃了 包子 5
    # 10944 吃了 包子 6
    # 10944 吃了 包子 7
    # 10944 吃了 包子 8
    # 10944 吃了 包子 9

     生产者消费者模型升级版

    from multiprocessing import Process, JoinableQueue
    import time, os, random
    def producer(food,q):
        for i in range(3):
            res = '%s %s' % (food, i)
            time.sleep(0.9)
            q.put(res)
            print('%s 生产了 %s' % (os.getpid(), res))
        q.join()
    def consumer(q):
        while True:
            res = q.get()
            print('%s 吃了 %s' % (os.getpid(), res))
            time.sleep(random.randint(1, 3))
            q.task_done()
    if __name__ == '__main__':
        q = JoinableQueue()
        p1 = Process(target=producer,args=('包子', q))
        p2 = Process(target=producer,args=('狗粮', q))
        c1 = Process(target=consumer,args=(q,))
        c2 = Process(target=consumer,args=(q,))
        c1.daemon = True
        c2.daemon = True
        p1.start()
        p2.start()
        c1.start()
        c2.start()
        p1.join()
        p2.join()
        print('')
    # 10008 生产了 包子 0
    # 8220 吃了 包子 0
    # 10972 生产了 狗粮 0
    # 11192 吃了 狗粮 0
    # 10008 生产了 包子 1
    # 10972 生产了 狗粮 1
    # 10008 生产了 包子 2
    # 10972 生产了 狗粮 2
    # 8220 吃了 包子 1
    # 11192 吃了 狗粮 1
    # 8220 吃了 包子 2
    # 11192 吃了 狗粮 2
    #

    十六、GIL解释器锁

    三个需要注意的点:

    1、线程抢的是GIL锁,GIL锁相当于执行权限,拿到执行权限之后才能拿到互斥锁Lock,其他线程也可以得到GIL,但是如果发现Lock仍然没有被释放则阻塞,即便是拿到执行权限GIL也要立即交出来

    2、join是等待所有,即整体串行,而锁只是锁住修改共享数据的部分,即部分串行,要想保证数据安全的根本原理在于让并发变成串行,join与互斥锁都可以实现,毫无疑问,互斥锁的部分串行效率要更高

    GIL和Lock是两把锁,保护的数据不一样。GIL是解释器级别的(保护的是解释器级别的数据,比如垃圾回收的数据);Lock是保护用户自己开发的应用程序的数据,而GIL不负责这件事,只能用户自定义加锁处理。

    过程分析:所有线程抢的是GIL锁,或者说所有线程抢的是执行权限

      线程1抢到GIL锁,拿到执行权限,开始执行,然后加了一把Lock,还没有执行完毕,即线程1还未释放Lock,有可能线程2抢到GIL锁,开始执行,执行过程中发现Lock还没有被线程1释放,于是线程2进入阻塞,被夺走执行权限,有可能线程1拿到GIL,然后正常执行到释放Lock...........这就导致了串行的运行效果

    既然是串行,那可以执行

      t1.start()

      t1.join()

      t2.start()

      t2.join()

      这些也是串行执行,那为什么还要加Lock?这是因为join是等待t1或t2的所有代码全部执行完毕,相当于锁住了t1的所有代码,而Lock只是锁住一部分操作,共享数据的代码。

    # 因为Python解释器帮你自动定期进行内存回收,可以理解为Python解释器里有一个独立的线程,
    美国一段时间它起wake up做一次全局轮询看看哪些内存数据是可以被清空的,此时你自己程序里的
    线程和py解释器自己的线程是并发运行的,假设你的线程删除了一个变量,py解释器的垃圾回收线程
    中清空这个变量的过程中的clearing时刻,可能一个其它线程正好有重新给这个还没来得及清空的
    内存空间赋值了,结果就有可能新赋值的数据被删除了,为了解决类似的问题,Python解释器简单
    粗暴的加了锁,即当一个线程运行时,其它人都不能动,这样就解决了上述的问题,这可以说是Python早期版本遗留的问题。
    from threading import Thread
    import os,time
    def work():
        global n
        temp = n
        time.sleep(0.3)
        n = temp-1
    if __name__ == '__main__':
        n = 100
        l = []
        for i in range(100):
            p = Thread(target=work)
            l.append(p)
            p.start()
        for p in l:
            p.join()
        print(n)  # 结果99

    锁通常被用来实现对共享资源的同步访问。为每个共享资源创建一个Lock对象,当你需要访问该资源时,调用acquire方法来获取锁对象(如果其他线程已经获得了该锁,则当前线程需等待其被释放),待资源访问完后,再调用release方法来释放锁:

    import threading
    R = threading.Lock()
    R.acquire()
    '''
    对公共数据的操作
    '''
    R.release()
    from threading import Thread,Lock
    import time
    n = 100
    def task():
        global n
        mutext.acquire()
        temp = n
        time.sleep(0.2)
        n = temp-1
        mutext.release()
    if __name__ == '__main__':
        t_l = []
        mutext = Lock()
        start = time.time()
        for i in range(100):
            t = Thread(target=task)
            t_l.append(t)
            t.start()
        for t in t_l:
            t.join()
        print(time.time()-start)  # 0.6020324230194092
        print(n)  # 97 由原来的并发执行变成串行,牺牲了执行效率保证了数据安全
    分析:
    # 1、100个线程去抢GIL锁,即抢执行权限
    # 2、肯定有一个线程先抢到GIL(暂且称为线程1),然后开始执行,一旦执行就会拿到lock.acquire()
    # 3、极有可能线程1 还未运行完毕,就有另外一个线程2抢到GIL,然后开始运行,但线程2发现互斥锁lock还未被线程1释放,于是阻塞,被迫交出执行权限,即释放GIL
    # 4、直到线程1重新抢到GIL,开始从上次暂停的位置继续执行,直到正常释放互斥锁lock,然后其他的线程再重复2 3 4的过程

    GIL锁与互斥锁综合分析(重点!!!!)

    # 不加锁:并发执行,速度快,数据不安全
    from threading import current_thread,Thread,Lock
    import os,time
    def task():
        global n
        print('%s is running'%current_thread().getName())
        temp = n
        time.sleep(0.5)
        n = temp-1
    if __name__ == '__main__':
        n = 100
        lock = Lock()
        threads = []
        start_time = time.time()
        for i in range(100):
            t = Thread(target=task)
            threads.append(t)
            t.start()
        for t in threads:
            t.join()
        stop_time = time.time()
        print('主: %s n: %s' %(stop_time-start_time,n))
    '''
    结果:
    Thread-1 is running
    Thread-2 is running
    …………
    Thread-98 is running
    Thread-99 is running
    Thread-100 is running
    主: 0.5232374668121338 n: 99
    '''
    # 不加锁:未加锁部分并发执行,加锁部分串行执行,速度慢,数据安全
    from threading import current_thread,Thread,Lock
    import os,time
    def task():
        # 未加锁的代码并发运行
        time.sleep(3)
        print('%s atsrt to run' %current_thread().getName())
        global n
        # 加锁的代码串行运行
        lock.acquire()
        temp = n
        time.sleep(0.5)
        n = temp-1
        lock.release()
    if __name__ == '__main__':
        n = 100
        lock = Lock()
        threads = []
        start_time = time.time()
        for i in range(10):
            t = Thread(target=task)
            threads.append(t)
            t.start()
        for t in threads:
            t.join()
        stop_time = time.time()
        print('主 :%s n: %s'% (stop_time-start_time,n))
    
    # Thread-1 atsrt to run
    # Thread-2 atsrt to run
    # Thread-6 atsrt to run
    # Thread-5 atsrt to run
    # Thread-4 atsrt to run
    # Thread-3 atsrt to run
    # Thread-11 atsrt to run
    # Thread-10 atsrt to run
    # Thread-8 atsrt to run
    # Thread-7 atsrt to run
    # Thread-9 atsrt to run
    # Thread-17 atsrt to run
    # Thread-15 atsrt to run
    # Thread-13 atsrt to run
    # Thread-14 atsrt to run
    # Thread-12 atsrt to run
    # Thread-16 atsrt to run
    # Thread-24 atsrt to run
    # Thread-23 atsrt to run
    # Thread-22 atsrt to run
    # Thread-21 atsrt to run
    # Thread-19 atsrt to run
    # Thread-18 atsrt to run
    # Thread-20 atsrt to run
    # Thread-29 atsrt to run
    # Thread-28 atsrt to run
    # Thread-26 atsrt to run
    # Thread-27 atsrt to run
    # Thread-30 atsrt to run
    # Thread-25 atsrt to run
    # Thread-31 atsrt to run
    # Thread-33 atsrt to run
    # Thread-32 atsrt to run
    # Thread-37 atsrt to run
    # Thread-36 atsrt to run
    # Thread-34 atsrt to run
    # Thread-35 atsrt to run
    # Thread-44 atsrt to run
    # Thread-43 atsrt to run
    # Thread-41 atsrt to run
    # Thread-40 atsrt to run
    # Thread-39 atsrt to run
    # Thread-42 atsrt to run
    # Thread-38 atsrt to run
    # Thread-49 atsrt to run
    # Thread-48 atsrt to run
    # Thread-50 atsrt to run
    # Thread-47 atsrt to run
    # Thread-45 atsrt to run
    # Thread-46 atsrt to run
    # Thread-53 atsrt to run
    # Thread-52 atsrt to run
    # Thread-51 atsrt to run
    # Thread-59 atsrt to run
    # Thread-58 atsrt to run
    # Thread-57 atsrt to run
    # Thread-56 atsrt to run
    # Thread-55 atsrt to run
    # Thread-54 atsrt to run
    # Thread-64 atsrt to run
    # Thread-63 atsrt to run
    # Thread-62 atsrt to run
    # Thread-61 atsrt to run
    # Thread-60 atsrt to run
    # Thread-66 atsrt to run
    # Thread-65 atsrt to run
    # Thread-70 atsrt to run
    # Thread-67 atsrt to run
    # Thread-69 atsrt to run
    # Thread-68 atsrt to run
    # Thread-71 atsrt to run
    # Thread-72 atsrt to run
    # Thread-75 atsrt to run
    # Thread-74 atsrt to run
    # Thread-73 atsrt to run
    # Thread-77 atsrt to run
    # Thread-76 atsrt to run
    # Thread-78 atsrt to run
    # Thread-79 atsrt to run
    # Thread-82 atsrt to run
    # Thread-80 atsrt to run
    # Thread-81 atsrt to run
    # Thread-86 atsrt to run
    # Thread-85 atsrt to run
    # Thread-84 atsrt to run
    # Thread-83 atsrt to run
    # Thread-91 atsrt to run
    # Thread-88 atsrt to run
    # Thread-90 atsrt to run
    # Thread-89 atsrt to run
    # Thread-87 atsrt to run
    # Thread-95 atsrt to run
    # Thread-96 atsrt to run
    # Thread-94 atsrt to run
    # Thread-93 atsrt to run
    # Thread-92 atsrt to run
    # Thread-100 atsrt to run
    # Thread-99 atsrt to run
    # Thread-98 atsrt to run
    # Thread-97 atsrt to run
    # 主 :53.08581185340881 n: 0  #耗时很久很久

    十七、死锁现象与递归锁

    所谓死锁:是指两个或两个以上的进程或线程中执行过程中,因争夺资源而造成的一种互相等待的现象

  • 相关阅读:
    Oracle-学习笔记(==》集合函数与分组四)
    Mysql--学习笔记(==》简单查询三)
    Mysql-学习笔记(==》插入修改数据二)
    Mysql-学习笔记(==》建表修改一)
    EasyUI的DataGrid 打印导出
    SQL 中ROLLUP 用法
    easyui commbox嵌入一个checkbox的实现
    Easyui Layout Center 全屏方法扩展
    Datagrid扩展方法InitEditGrid{支持单元格编辑}
    Datagrid扩展方法onClickCell{easyui-datagrid-扩充-支持单元格编辑}
  • 原文地址:https://www.cnblogs.com/hzhcdhm/p/7943252.html
Copyright © 2011-2022 走看看