zoukankan      html  css  js  c++  java
  • Python中并发、多线程等

    1、基本概念

    并发和并行的区别:

    1)并行,parallel

    同时做某些事,可以互不干扰的同一时刻做几件事。(解决并发的一种方法)

    高速公路多个车道,车辆都在跑。同一时刻。

    2)并发 concurrency

    同时做某些事,一个时段内有事情要处理。(遇到的问题)

    高并发,同一时刻内,有很多事情要处理。

    2、并发的解决

    1)队列、缓冲区

    排队就是把人排成队列,先进先出,解决了资源使用的问题。

    排成的队列,其实就是一个缓冲地带,就是缓冲区。

    Queue模块的类queue、lifoqueue、priorityqueue。

    2)争抢的

    会有一个人占据窗口,其他人会继续争抢,可以锁定窗口,窗口不在为其他人服务,这就是锁机制。(锁的概念,排他性锁,非排他性锁)。

    3)预处理

    一种提前加载用户需要的数据的思路,预处理思想,缓存常用。

    4)并行

    日常可以通过购买更多的服务器,或者开多线程,实现并行处理,来解决并发问题。

    水平扩展思想。

    如果在但CPU上处理,就不是并行了。

    但是多数服务都是多CPU的,服务的部署就是多机、分布式的,都是并行处理。

    (串行比并行快)

    5)提速

    提高单个CPU性能,或单个服务器安装更多的CPU

    这就是一种垂直扩展思想。

    6)消息中间件

    例如地跌站外的九曲回肠的走廊,缓冲人流。

    常见的消息中间件有RabbitMQ,ActiveMQ(Apache)、RocketMQ(Apache)。

    3、进程和线程

    在实现了线程的操作系统中,线程是操作系统能够进行运算调度的最小单位。他包含在进程中,是进程中的实际运作单位。一个程序执行实例就是一个进程。

    进程(process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。

    (可执行,可运行的加载到内存中。程序是有一定格式的,Python解释器加载,所有进程都是有入口的。偏移多少位。主线程达不到要求,就会启用多线程。

    多核。调度到不同的CPU上面去,虚拟的计算单元。)

    资源争抢问题:锁,排他性锁。队列,不争抢的人排队。预加载,减少数据处理速度,提前加载到内存中。一变多。

    进程和程序的关系

    程序是源代码编译后的文件,而这些文件存放在磁盘上。当程序被操作系统加载到内存

    中,就是进程,进程中存放着指令和数据(资源),也是线程的容器。

    Linux进程有父进程、子进程,Windows的进程是平等关系。

    线程,有时被称为轻量级进程,是程序执行流的最小单元,一个标准的线程由线程ID,当前指令指针(pc寄存器集合堆栈组成。每个线程有自己独立的栈。

    在许多系统中,创建一个线程比创建一个进程快10-100倍。

    进程、线程的理解

    现代操作系统提出的进程的概念,每一个进程都认为自己是独占所有的计算机硬件资源。

    进程就是独立的王国,进程间不可以随便的共享数据。

    线程就是省份,同一个进程内的线程可以共享进程的资源,每一个线程拥有自己独立的堆栈。

    4、线程状态

    状态

    含义

    就绪(ready)

    线程能够运行,但在等待被调度,可能线程刚刚创建启动,或刚刚从阻塞恢复,或者被其他线程抢占。

    运行(running)

    线程正在运行

    阻塞(Blocked)

    线程等待外部事件发生而无法运行,如I/O操作。

    终止(Terminated)

    线程完成,或退出,或被取消。

    5、Python中的线程和进程

    进程会启动一个解释器进程,线程会共享一个解释器进程。

    1)Python的线程开发

    Python的线程开发使用标准库threading

    2)Thread类

    签名:

    def __init__(self, group: None = ...,
                 target: Optional[Callable[..., None]] = ...,
                 name: Optional[str] = ...,
                 args: Iterable = ...,
                 kwargs: Mapping[str, Any] = ...,
                 *, daemon: Optional[bool] = ...) -> None: ...

    参数名

    含义

    target

    线程调用对象,就是目标函数

    name

    为线程起名字

    args

    为目标函数传递实参,元组

    Kwargs

    为目标函数关键词传参,字典

    3)线程启动

    import threading
    import time

    def worker():
        print('before')
        time.sleep(3)
        print('finished')

    t = threading.Thread(target=worker)  #线程对象
    t.start()   #启动

    通过threading.Thread创建一个线程对象,target是目标函数,name可以指定名称。

    需要调用start方法启动函数。

    线程之所以执行函数,是因为线程中就是用来执行代码的,所以还是函数调用。

    函数执行完毕后,线程也就退出了。

    如果想让一个线程一直工作,不让线程退出就要利用到while循环。

    import threading
    import time

    def worker():
        count = 0
        while True:
            count += 1
            print('before')
            time.sleep(3)
            if count >5:
                print('finished')
                break

    t = threading.Thread(target=worker)  #线程对象
    t.start()   #启动

    4)线程退出

    Python中没有提供终止线程的方法。线程在下面情况下退出。

    (1)线程函数内语句执行完毕

    (2)线程函数中抛出未处理的异常。

    import threading
    import time

    def worker():
        count = 0
        while True:
            if count >5:
                break
                #return
                #raise RuntimeError(count)
            time.sleep(3)
            print('before')
            count += 1
            print('finished')

    t = threading.Thread(target=worker)  #线程对象
    t.start()   #启动
    print('end')

    线程没有优先级,没有线程组的概念。也不能被销毁、停止、挂起,那么就是没有恢复和中断了。

    5)线程的传参

    import threading
    import time

    def add(x,y):
        print('{}+{}={}'.format(x,y,x+y))


    t1 = threading.Thread(target=add,name='1',args=(4,5))
    t1.start()
    time.sleep(2)

    t2 = threading.Thread(target=add,name = '2',args=(4,),kwargs={'y':6})
    t2.start()
    time.sleep(2)
    t3 = threading.Thread(target=add,name='3',kwargs={'x':4,'y':7})
    t3.start()

    线程中的传参,和函数传参没有什么区别,本质上就是函数传承。

    6)threading的属性和方法

    名称

    含义

    current_thread()

    返回当前主线程

    main_thread()

    返回主线程对象

    active_count()

    当前处于alive状态的线程个数

    enumerate()

    返回所有活着的线程的列表,不包括已经终止的线程和未开始的线程

    git_ident()

    返回当前线程的ID,非0整数。

    active_count、enumerate方法返回的值还包括主线程。


    import threading
    import time


    def showinfo():
        print('currentthread = {}'.format(threading.current_thread()))
        print('main thread = {}'.format(threading.main_thread()))
        print('active count = {}'.format(threading.active_count()))

    def worker():
        count = 0
        showinfo()
        while True:
            if count>5:
                break
            time.sleep(5)
            count += 1
            print('finsh')

    t = threading.Thread(target=worker,name='work')
    showinfo()
    t.start()

    print('end')

    currentthread = <_MainThread(MainThread, started 4048)>

    main thread = <_MainThread(MainThread, started 4048)>

    active count = 1

    currentthread = <Thread(work, started 9084)>

    end

    main thread = <_MainThread(MainThread, stopped 4048)>

    active count = 2

    finsh

    finsh

    finsh

    finsh

    finsh

    Finsh

    名称

    含义

    Name

    他只是一个名字,只是一个标识符,名字可以重名,getname()获取,setname()设置这个名词

    Ident

    线程id,是非0的整数,线程启动后才会有ID,否则为None,线程退出,此id依旧可以访问,此id可以重复访问。

    Is_alive()

    返回线程是否或者

    线程的name只是一个名称,可以重复;id必须唯一,但可以在线程退出后在利用。

    import threading

    import time

    def worker():

        count = 0

        while True:

            if count > 5:

                break

            time.sleep(2)

            count += 1

            print(threading.current_thread().name)

    t = threading.Thread(name='work',target=worker)

    print(t.ident)

    t.start()

    while True:

        time.sleep(1)

        if t.is_alive():

            print('{}{}alive'.format(t.name,t.ident))

        else:

            print('{}{}dead'.format(t.name,t.ident))

    名称

    含义

    Start()

    启动线程,每一个线程必须且只能执行该方法一次

    Run()

    运行线程函数

    Start()启动线程,只能执行一次。操作系统。开辟新的线程。

    Run()直接做的是主线程。函数调用。

    (1)start()
    import threading
    import time

    def worker():
        count = 0
        while True:
            if count > 5:
                break
            time.sleep(3)
            count += 1
            print('running')
    class Mythread(threading.Thread):
        def start(self):
            print('start----')
            super().start()

        def run(self):
            print('run----')
            super().run()

    t = Mythread(target=worker,name='work')

    t.start()

    start方法运行结果是start----

    run----

    Running

    按照线程进行执行。

    (2)run()
    import threading
    import time


    def worker():
        count = 0
        while True:
            if count>3:
                break
            time.sleep(2)
            count += 1
            print('runing')
    class Mythread(threading.Thread):
        def start(self):
            print('start----')
            super().start()

        def run(self):
            print('run----')
            super().run()

    t = Mythread(target=worker,name='work1')
    t.run()

    # run----
    # runing

    总结:run()执行结果就是直接是函数,调用,调用run函数。

    Start()方法会调用run()方法,而run()方法可以运行函数。

    (3)start和run的区别

    Start方法启动线程,启动了一个新的线程,名字叫做worker运行,但是run方法,并没有启动新的线程,只是在主线程内调用了一个普通的函数。

    7)多线程

    多线程,一个进程中如果有多个线程,就是多线程,是先一种并发。

    import threading
    import time


    def worker():
        count = 0
        while True:
            if count>3:
                break
            time.sleep(2)
            count += 1
            print('runing')
            print(threading.current_thread().name,threading.current_thread().ident)
    class Mythread(threading.Thread):
        def start(self):
            print('start----')
            super().start()

        def run(self):
            print('run----')
            super().run()

    t1 = Mythread(target=worker,name='work1')
    t2 = Mythread(target=worker,name='work2')

    # t1.run()
    # t2.run()
    ####runing
    # MainThread 1380
    # runing
    # MainThread 1380
    # runing
    # MainThread 1380
    t1.start()
    t2.start()
    # start----
    # run----
    # start----
    # run----
    # runing
    # work2 5048
    # runing
    # work1 9048

    Start()方法work1和work2交替执行。启动线程后,进程内多个活动的线程并行工作,就是多线程。

    Run()方法中没有开启新的线程,就是普通函数调用,所以执行完t1.run()

    ,然后执行t2.run(),run()方法就不是多线程。

    一个进程中至少有一个线程,并作为程序的入口,这个线程就是主线程,一个线程必须有一个主线程。

    其他线程成为工作线程。

    8)线程安全

    import threading

    def worker():
        for x in range(100):
            print('{}is running'.format(threading.current_thread().name))


    for x in range(1,4):
        name = 'worker{}'.format(x)
        t = threading.Thread(name=name,target=worker)
        t.start()

    利用ipython执行的结果是不是一行行的打印,而是很多字符串打印在了一起。

    这样说明了print函数被打断了,被线程切换打断了,print函数分为两步,第一步是打印字符串,第二部是换行,就在这个期间,发生了线程的切换,说明了print函数是线程不安全的。

    线程安全:线程执行一段代码,不会产生不确定的结果,那么这段代码是线程安全的。

    也是要用锁,进程的锁是管进程内的线程。独占资源。

    解决上面打印的问题:

    (1)不让print打印换行

    import threading

    def worker():

        for x in range(100):

            print('{} is running. '.format(threading.current_thread().name),end='')

    for x in range(1,5):

        name = 'worker{}'.format(x)

        t = threading.Thread(name=name,target=worker)

        t.start()

    利用字符串是不可变类型,可以作为一个整体不可分割输出,end=’’就不在print输出换行了。

    (2)使用logging

    标准库里面的logging模块,是日志处理模块,线程安全的,生产环境代码都使用logging。

    import threading
    import logging


    def worker():
        for x in range(100):
            # print('{} is running. '.format(threading.current_thread().name),end='')
            logging.warning('{}is running'.format(threading.current_thread().name))
    for x in range(1,5):
        name = 'worker{}'.format(x)
        t = threading.Thread(name=name,target=worker)
        t.start()

    9)daemon线程和non-daemon线程

    daemon不是Linux里面的守护进程。

    进程靠线程执行代码,至少有一个主线程,其他线程是工作线程。

    主线程是第一个启动的线程。

    父线程:如果A中启动了一个线程B,那么A就是B的父线程。

    子线程:B就是A的子线程。

    源码Thread的__init__ 方法中。

    If deamon is not None:

    Self._daemonic = daemon

    else:

    Self._daemonic = current_thread().daemon

    Self._ident = None

    线程daemon属性,如果设定就是用户的设置,否则,就取当前线程的daemon的值。

    主线程是non-daemon线程,即daemon = False。

    import time
    import threading


    def foo():
        time.sleep(5)
        for i in range(20):
            print(i)

    t = threading.Thread(target=foo,daemon=False)
    t.start()
    print('end')

    daemon设置False值,主线程执行完毕后,等待工作线程。

    import time
    import threading


    def foo():
        time.sleep(5)
        for i in range(20):
            print(i)

    t = threading.Thread(target=foo,daemon=True)
    t.start()
    print('end')

    Daemon值改为true,主线程执行完毕后直接退出。

    名称

    含义

    Daemon

    表示线程是否是daemon,这个值必须在start()之前设置,否则引发RuntimeError异常

    IsDaemon()

    是否是daemon线程

    SetDaemon

    设置daemon线程,必须在start方法之前设置。

    总结:线程具有一个daemon属性,可以显示设置为True或者False,也可以不设置,则取默认值None。

    如果不设置daemon,就取当前线程的daemon来设置他。

    主线程是non-daemon线程,即daemon = False。

    从主线程创建的所有线程的不设置daemon属性,则默认daemon = False,也就是non-daemon线程。

    程序在没有活着的non-daemon线程运行时推出,也是就剩下的只是daemon线程,主线程才能推出。否则主线程只能等待。

    构造线程的时候,可以设置daemon属性,这个属性必须在start方法前设置好。

    daemon=True主线程不等。工作线程

    daemon=False主线程等。只要有一个non-daemon就会等待。

    控制一个属性的。

    在start之前。

    只是有一个non-daemon就会等待,没有的话直接不等,直接结束线程。

    总结:

    线程具有daemon属性,可以设置为True或者False。

    (激活的non-daemon,主线程才会等待工作线程。)

    import time
    import threading


    def bar():
        time.sleep(10)
        print('bar')

    def foo():
        for i in range(20):
            print(i)
        t = threading.Thread(target=bar,daemon=False)
        t.start()
    t = threading.Thread(target=foo,daemon=True)
    t.start()

    print('end')

    这样不会执行bar的,因为主线程的daemon设置的值为True,改为False就好了。

    活着让主线程sleep几秒。

    import time
    import threading


    def bar():
        time.sleep(10)
        print('bar')

    def foo(n):
        for i in range(n):
            print(i)
    t1 = threading.Thread(target=foo,args=(10,),daemon=True)
    t1.start()
    t2 = threading.Thread(target=foo,args=(20,),daemon=False)
    t2.start()

    time.sleep(6)
    print('end')

    如果non-daemon线程的时候,主线程退出,也不会结束所有的daemon线程,直到所有的non-daemon线程全部结束,如果还有daemon线程,主线程需要退出,会结束所有的daemon线程,退出。

    主线程是non-daemon。其他线程靠传参。

    决定的是是否需要等待。如果有激活的non-daemon,就需要等待,没有激活的,主线程直接退出。

    10)join方法

    import time
    import threading
    def foo(n):
        for i in range(n):
            print(i)
            time.sleep(1)

    t1 = threading.Thread(target=foo,args=(10,),daemon=True)
    t1.start()
    t1.join()

     利用join,主线程被迫等待他。把当前线程阻塞住了,x.join就等待谁。保证代码的执行顺序。

    使用了join方法后,daemon线程执行完了,主线程才退出了。

    Join(timeout= None),是线程的标准方法之一。

    一个线程中调用另一个线程的join方法,调用者将被阻塞,直到被调用线程终止。

    一个线程可以被join多次。

    Timeout参数指定调用者等待多久,没有设置超时的,就会一直等待到调用的线程结束。

    调用谁的join方法,就是join谁,就要等睡。

    11)daemon线程应用场景

    简单来说,本来并没有daemon Thread,这个概念唯一的作用是,当把一个线程设置为daemon,他会随着主线程的退出而退出。

    主要应用场景为:

    (1)后台任务。发送心跳包,监控。

    (2)主线程工作才有用的线程。如主线程中维护着公共的资源,主线程已经清理了,准备退出,而工作线程使用这些资源工作没有意义了,一起退出最合适。

    (3)随时可以被终止的线程。

    如果主线程退出,想所有其他工作线程一起退出,就使用daemon=True来创建工作线程。

    import time
    import threading


    def bar():
        while True:
            time.sleep(1)
            print('bar')

    def foo():
        print('t1 daemon = {}'.format(threading.current_thread().isDaemon()))
        t2 = threading.Thread(target=bar)
        t2.start()
       
        print('t2 daemon = {}'.format(t2.isDaemon()))

    t1 = threading.Thread(target=foo,daemon=True)
    t1.start()


    time.sleep(3)
    print('Main end')

    改造成一直执行的:

    import time
    import threading


    def bar():
        while True:
            time.sleep(1)
            print('bar')

    def foo():
        print('t1 daemon = {}'.format(threading.current_thread().isDaemon()))
        t2 = threading.Thread(target=bar)
        t2.start()
        t2.join()
        print('t2 daemon = {}'.format(t2.isDaemon()))

    t1 = threading.Thread(target=foo,daemon=True)
    t1.start()
    t1.join()

    time.sleep(3)
    print('Main end')

    Daemon线程,简化了手动关闭线程的工作。

    12)threading.local 类

    局部变量的实现:

    import threading
    import time

    def worker():
        x = 0
        for i in range(10):
            time.sleep(0.01)
            x += 1
            print(threading.current_thread(),x)


    for i in  range(10):
        threading.Thread(target=worker).start()

    利用全局变量实现:


    import threading
    import time

    globals_data = threading.local()

    def worker():
        globals_data.x = 0
        for i in range(10):
            time.sleep(0.01)
            globals_data.x += 1
            print(threading.current_thread(),globals_data.x)


    for i in  range(10):
        threading.Thread(target=worker).start()
    import threading


    X = 'abc'
    ctx = threading.local()
    ctx.x = 123

    print(ctx,type(ctx),ctx.x)

    def worker():
        print(X)
        print(ctx)
        print(ctx.x)   #打印的时候出错,表示x不能跨线程
        print('working')

    worker()
    print()
    threading.Thread(target=worker).start() #另一个线程启动

    threading.local类构建了一个大字典,其元素是每一线程实例地址为key和线程对象引用线程单独的字典的映射。

    通过threading.local实例就可在不同的线程中,安全的使用线程独有的数据,做到了线程间数据隔离,如同本地变量一样安全。

    Local和线程相关的大字典,每次利用的时候利用线程的小字典来顶替local实例的大字典。

    不利用的话,全局变量的话直接就是threading.local和本地线程相关的数据。

    13)定时器timer延迟执行

    Threading.Timer继承自thread,这个类用来另一多久执行一个函数。

    Class threading.Timer(interval,function,args=None,kwargs=None)

    Start方法执行以后,Timer对象会处于等待状态,等待了interval之后,开始执行function函数的。如果在执行函数之前的等待阶段,使用了cancel方法,就会跳过执行函数结果。

    本质上就是一个Thread,只是没有提供name,daemon。

    import threading
    import logging
    import time


    def worker():
        logging.info('in worker')
        time.sleep(2)


    t = threading.Timer(5,worker)
    t.start()  #启动
    print(threading.enumerate())
    t.cancel()   #取消
    time.sleep(1)
    print(threading.enumerate())

    [<_MainThread(MainThread, started 7512)>, <Timer(Thread-1, started 6644)>]

    [<_MainThread(MainThread, started 7512)>]

    import threading
    import logging
    import time


    def worker():
        logging.info('in worker')
        time.sleep(2)


    t = threading.Timer(5,worker)
    t.cancel()   #取消
    t.start()  #启动
    print(threading.enumerate())
    time.sleep(1)
    print(threading.enumerate())

    [<_MainThread(MainThread, started 7512)>]

    [<_MainThread(MainThread, started 7512)>]

    二、线程同步

    1、概念

    线程同步,线程间协同,通过某种技术,让一个线程访问某些数据时候,其他线程不能访问这些数据,直到该线程完成对数据的操作。

    不同操作系统实现技术有所不同,有临界区、互斥量、信号量、事件Event。

    2、Event

    Event事件,是线程间通信机制中最简单的实现,使用一个内部的标记flag,通过flag的True或False的变化来进行操作。

    名称

    含义

    set()

    标记为True

    clear()

    标记为False

    is_set()

    标记是否为True

    Wait(timeout=None)

    设置等待标记为True的时长,None为无限等待,等到返回True,未等到超时了返回False。

    课堂练习:老板雇佣了一个工人,让他生产杯子,老板一直等着这个工人,直到上产了十个杯子。

    1)利用join

    import threading
    import time
    import logging


    def worker(count=10):
        cups = []
        while len(cups)<count:
            logging.info('wprking')
            time.sleep(0.01)
            cups.append(1)
            print(len(cups))
        logging.info('I am finished')
    w = threading.Thread(target=worker)
    w.start()
    w.join()

    2)利用event

    import threading
    import logging
    import time


    def boss(event:threading.Event):
        logging.info('I am boss,waiting')
        event.wait()
        logging.info('good job')

    def worker(event:threading.Event,count=10):
        logging.info('I am working for u')
        cups = []
        while True:
            logging.info('makeing')
            time.sleep(1)
            cups.append(1)
            if len(cups) >= count:
                print(len(cups))
                event.set()
                break
        logging.info('finished my job.cups={}'.format(cups))

    event = threading.Event()
    w = threading.Thread(target=worker,args=(event,))
    b = threading.Thread(target=boss,args=(event,))
    w.start()
    b.start()

    3)wait的应用

    import threading
    import logging
    logging.basicConfig(level=logging.INFO)

    def do(event:threading.Event,interval:int):
        while not event.wait(interval):  #没有置set,所以是False。   不是False的时候就不能进入循环了。
            logging.info('do sth')    #没三秒打印一次。   not False执行此语句

    e = threading.Event()
    threading.Thread(target=do,args=(e,10)).start()

    e.wait(12)  #整体停留了十秒。
    e.set()    #重置为True。
    print('end')

    4)练习,实现timer。

    总结:

    使用同一个Event用来做标记。

    Event的wait优于time.sleep,更快的切换到其他线程,提高并发效率。

    import threading
    import time


    class MyTimer:
        def __init__(self,interval,function,args,kwargs):
            self.interval = interval
            self.target = function
            self.args = args
            self.kwargs = kwargs
            self.event = threading.Event()
            self.thread = threading.Thread(target=self.target,args=self.args,kwargs=self.kwargs)

        def start(self):
            self.event.wait(self.interval)
            if not self.event.is_set():   #如果没有置False,那么就是False,not False为True,执行run语句。
                self.run()
       
        def run(self):
            self.start()
           
            self.event.set()

        def cancel(self):
            self.event.set()

    Lock锁

    1)锁,凡是存在共享资源争抢的地方都可以使用锁。从而保证只有一个使用者可以完全使用这个资源。

    lock.acquire  上锁    lock.release  解锁

    import threading
    import logging
    import time
    FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
    logging.basicConfig(format=FORMAT,level=logging.INFO)


    cups = []
    def worker(count=10):
        logging.info('i am work')
        while len(cups) < count:
            time.sleep(0.1)
            cups.append(1)
        logging.info('i am finsh.cups={}'.format(len(cups)))


    for _ in range(10):
        threading.Thread(target=worker,args=(1000,)).start()

    2018-05-26 15:38:25,913 Thread-1 32 i am work

    2018-05-26 15:38:25,913 Thread-2 4332 i am work

    2018-05-26 15:38:25,913 Thread-3 9992 i am work

    2018-05-26 15:38:25,914 Thread-4 8464 i am work

    2018-05-26 15:38:25,914 Thread-5 9968 i am work

    2018-05-26 15:38:25,915 Thread-6 8712 i am work

    2018-05-26 15:38:25,915 Thread-7 4412 i am work

    2018-05-26 15:38:25,915 Thread-8 8456 i am work

    2018-05-26 15:38:25,915 Thread-9 8316 i am work

    2018-05-26 15:38:25,915 Thread-10 9772 i am work

    2018-05-26 15:38:35,925 Thread-8 8456 i am finsh.cups=1000

    2018-05-26 15:38:36,023 Thread-7 4412 i am finsh.cups=1001

    2018-05-26 15:38:36,023 Thread-1 32 i am finsh.cups=1002

    2018-05-26 15:38:36,023 Thread-6 8712 i am finsh.cups=1003

    2018-05-26 15:38:36,024 Thread-5 9968 i am finsh.cups=1004

    2018-05-26 15:38:36,024 Thread-4 8464 i am finsh.cups=1005

    2018-05-26 15:38:36,024 Thread-10 9772 i am finsh.cups=1006

    2018-05-26 15:38:36,024 Thread-2 4332 i am finsh.cups=1007

    2018-05-26 15:38:36,025 Thread-3 9992 i am finsh.cups=1008

    2018-05-26 15:38:36,025 Thread-9 8316 i am finsh.cups=1009

    运行结果来看,多线程调度,导致了判断失误,多生产了杯子只有用到了锁。

    Lock,锁,一旦线程获得锁,其他要获得锁的线程将被阻塞。

    名称

    含义

    acquire(blocking=True,timeout=-1)

    默认阻塞,阻塞可以设置超时时间,非阻塞时,timeout禁止设置,成果获取锁,返回True,否则返回None

    Release

    释放锁,可以从任何线程调用释放,

    已上锁的锁,会被重置到unlocked未上锁的锁上调用,抛出RuntimeError异常。

    import threading
    import logging
    import time
    FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
    logging.basicConfig(format=FORMAT,level=logging.INFO)


    cups = []
    lock = threading.Lock()
    def worker(count=10):
        logging.info('i am work')
        lock.acquire()
        while len(cups) < count:
            print(threading.current_thread(),len(cups))
            time.sleep(0.000001)
            cups.append(1)
        logging.info('i am finsh.cups={}'.format(len(cups)))
        lock.release()

    for _ in range(10):
        threading.Thread(target=worker,args=(1000,)).start()

    上锁位置不对,由一个线程抢占,并独自占锁并完成任务。

    import threading
    import logging
    import time
    FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
    logging.basicConfig(format=FORMAT,level=logging.INFO)


    cups = []
    lock = threading.Lock()
    def worker(count=10):
        logging.info('i am work')
        flag= False
        while True:
            lock.acquire() #获取锁

            if len(cups) >= count:
                flag = True
            # print(threading.current_thread(),len(cups))
            time.sleep(0.000001)
            if not flag:
                cups.append(1)
                print(threading.current_thread(),len(cups))
            lock.release()   #追加后释放锁
            if flag:
                break
        logging.info('i am finsh.cups={}'.format(len(cups)))


    for _ in range(10):
        threading.Thread(target=worker,args=(1000,)).start()

    锁保证了数据完整性,但是性能下降好多。

    If flag:break是为了保证release方法被执行,否则就出现了死锁,得到锁的永远没有释放。

    计数器类,可以加可以减。

    2)加锁、解锁

     一般加锁就需要解锁,但是加锁后解锁前,还要有一些代码执行,就有可能抛出异常,一旦出现异常锁是无法释放的,但是当前线程可能因为这个就异常终止了,这就产生了死锁。

    加锁。解锁常用语句:

    (1)使用try...finally语句保证锁的释放。

    (2)With上下文管理,锁对象支持上下文管理。

    import threading
    import time


    class Counter:
        def __init__(self):
            self._val = 0
            self.__lock = threading.Lock()

        @property
        def value(self):
            return self._val

        def inc(self):
            try:
                self.__lock.acquire()
                self._val += 1
            finally:
                self.__lock.release()

        def dec(self):
            with self.__lock:
                self._val -= 1

    def run(c:Counter,count=1000):
        for _ in range(10):
            for i in range(-50,50):
                if i<0:
                    c.dec()
                else:
                    c.inc()

    c = Counter()
    c1 = 10
    c2 = 10
    for i in range(c1):
        threading.Thread(target=run,args=(c,c2)).start()

    while True:
        time.sleep(1)
        if threading.active_count() == 1:
            print(threading.enumerate())
            print(c.value)
            break
        else:
            print(threading.enumerate())

    不影响其他线程的切换,但是上锁后其他线程被阻塞了。只能等待。

    3)锁的应用场景

    适用于访问和修改同一个共享资源的时候,读写同一个资源的时候。

    全部是读取同一个共享资源需要锁吗?

    因为共享资源是不可变的,每一次读取都是一样的值,所以不用加锁。

    使用锁的注意事项:

    少用锁必要时用锁,使用了锁,多线程访问被锁的资源时候,就成了串行,要么排队执行,要么争抢执行。

    加锁时间越短越好,不需要拍就立即释放锁。

    一定要避免死锁。(死锁,打不开,解不开,A有锁,B也锁,占有这把锁的人迟迟不释放锁。没有使用上下文,持有锁的的线程异常退出了)

    不使用锁,有了效率,但是结果是错的。

    使用了锁,效率低下,但是结果是对的。

    4)非阻塞锁使用

    import threading
    import logging
    import time

    FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
    logging.basicConfig(format=FORMAT,level=logging.INFO)

    def worker(tasks):
        for task in tasks:
            time.sleep(0.01)
            if task.lock.acquire(False):
                logging.info('{}{}begin to start'.format(threading.current_thread(),task.name))
            else:
                logging.info('{}{}is working'.format(threading.current_thread(),task.name))

    class Task:
        def __init__(self,name):
            self.name = name
            self.lock = threading.Lock()


    tasks = [Task('task-{}'.format(x))for x in range(10)]

    for i in range(5):
        threading.Thread(target=worker,name='worker-{}'.format(i),args=(tasks,)).start()

    5)可重入锁RLock:

    是线程相关的锁

    线程A可重复锁,并可以多次成功获取,不会阻塞 ,最后要在线程A中做和acquire次数相同的release。

    拿到这把锁的线程可以多次使用。

    别的线程拿到的话也是被阻塞的。

    一个线程占用锁的时候,其他线程不能拿到,只能的是阻塞。直到当前线程次有的锁全部释放完,其他线程才可以获取。

    可重入锁,与线程相关,可在一个线程中获取锁,并可继续在同一线程中不阻塞获取锁,当锁未释放完,其他线程获取锁就会阻塞。直到当前持有锁的线程释放完了锁。

    四、Condition

    构造方法:condition(lock=None),可以传入一个lock对象或Rlock对象,默认是Rlock。

    名称

    含义

    Acquire(*args)

    获取锁

    Wait(self,timeout=None)

    等待超时

    Notify(n=1)

    唤醒之多指定书目个数的等待的线程,没有等待的线程就没有任何操作

    Notify_all()

    唤醒所有等待的线程。

    用于生产者、消费者模型,为了解决生产者消费者速度匹配的问题:

    import threading
    import logging
    import random

    FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
    logging.basicConfig(format=FORMAT,level=logging.INFO)

    class Dispatcher:
        def __init__(self):
            self.data = None
            self.event = threading.Event()

        def produce(self,total):
            for _ in range(total):
                data = random.randint(0,100)
                logging.info(data)
                self.data = data
                self.event.wait(1)
            self.event.set()

        def consume(self):
            while not self.event.is_set():
                data = self.data
                logging.info('recieved{}'.format(data))
                self.data = None
                self.event.wait(0.5)

    d = Dispatcher()
    p = threading.Thread(target=d.produce,args=(10,),name='producer')
    c = threading.Thread(target=d.consume,name='consume')
    c.start()
    p.start()

    消费者采用主动消费,消费者浪费了大量的时间,主动来查看有没有数据。换成通知的机制。

    import threading
    import logging
    import random

    FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
    logging.basicConfig(format=FORMAT,level=logging.INFO)

    class Dispatcher:
        def __init__(self):
            self.data = None
            self.event = threading.Event()
            self.cond = threading.Condition()

        def produce(self,total):
            for _ in range(total):
                data = random.randint(0,100)
                with self.cond:
                    logging.info(data)
                    self.data = data
                    self.cond.notify_all()
                self.event.wait(1)
            self.event.set()

        def consume(self):
            while not self.event.is_set():
                with self.cond:
                    self.cond.wait()
                    logging.info('recieved{}'.format(self.data))
                    self.data = None
                self.event.wait(0.5)

    d = Dispatcher()
    p = threading.Thread(target=d.produce,args=(10,),name='producer')
    c = threading.Thread(target=d.consume,name='consume')
    c.start()
    p.start()

    如果是一个生产者,多个消费者呢:

    import threading
    import logging
    import random

    FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
    logging.basicConfig(format=FORMAT,level=logging.INFO)

    class Dispatcher:
        def __init__(self):
            self.data = None
            self.event = threading.Event()
            self.cond = threading.Condition()

        def produce(self,total):
            for _ in range(total):
                data = random.randint(0,100)
                with self.cond:
                    logging.info(data)
                    self.data = data
                    self.cond.notify_all()
                self.event.wait(1)  #模拟生产速度
            self.event.set()

        def consume(self):
            while not self.event.is_set():
                with self.cond:
                    self.cond.wait()  #阻塞等通知
                    logging.info('recieved{}'.format(self.data))
                self.event.wait(0.5)  #模拟消费 的速度

    d = Dispatcher()
    p = threading.Thread(target=d.produce,args=(10,),name='producer')


    for i in range(5):
        c = threading.Thread(target=d.consume, name='consume{}'.format(i))
        c.start()
    p.start()

    Self.cond.notify_all()发通知:

    修改为self.cond.notify(n=2)  随机通知两个消费者。

    Condition总结:

    用于生产者消费者模型中,解决生产者,消费者速度匹配的问题。

    采用了通知机制,非常有效率。

    使用方式:

    使用condition,必须先acquire,用完了要release。因为内部实现了锁,默认使用了RLock锁。最好的方式就是使用上下文。

    消费者wait,等待通知。

    生产者生产好消息,对消费者发出通知,可以使用notify或者notify_all方法。

    操作系统中基本单位是进程,进程是独立的王国,操作系统中不可调用线程,线程是轻量级进程,有独立自己栈。资源就是独立的栈。

    加载到内存中是进程管理,子系统之一。变成为一个实例,进程ID号。

    驱动管理,

    协议,

    Tcp udp,

    http协议。

    Linux:

    Unix:b语言基础上c语言写的。

    Windows:

    数据在哪里,计算就在哪里。

    五、Barrier

    1、栅栏,屏障、为路障、道闸

    达到一定的条件,才会打开barrier。

    名称

    含义

    Barrier(parties,action=None,timeout=None)

    构建barrier对象,指定参与方数目,timeout是wait方法未指定超时的默认值。

    n_waiting

    当前在屏障中等待的线程数

    Parties

    各方数,就是需要多少个等待

    Wait(timeout=None)

    等待通过屏障,返回0到线程数-1的整数,每个线程返回不同,如果wait方法设置了超时,并超时发送,屏障将处于broken状态。

    import threading
    import logging


    FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
    logging.basicConfig(format=FORMAT,level=logging.INFO)


    def worker(barrier:threading.Barrier):
        logging.info('waiting for {}threads'.format(barrier.n_waiting))
        try:
            barrier_id = barrier.wait()
            logging.info('after barrier{}'.format(barrier_id))
        except threading.BrokenBarrierError:
            logging.info('Broken Barrier')

    barrier = threading.Barrier(3)

    for x in range(3):
        threading.Thread(target=worker,name='worker-{}'.format(x),args=(barrier,)).start()

    logging.info('started')

    2018-06-11 21:29:45,173 worker-0 8804 waiting for 0threads

    2018-06-11 21:29:45,198 worker-1 2668 waiting for 1threads

    2018-06-11 21:29:45,199 worker-2 2716 waiting for 2threads

    2018-06-11 21:29:45,199 MainThread 10160 started

    2018-06-11 21:29:45,199 worker-2 2716 after barrier2

    2018-06-11 21:29:45,199 worker-0 8804 after barrier0

    2018-06-11 21:29:45,199 worker-1 2668 after barrier1

    如果Barrier()的值设置为3,开启5个线程,前三个线程执行后,后面两个线程不够三个线程,所以一直在等待,直到凑到三个barrier才打开。

    上面的运行结果,所有线程冲到了barrier前等待,直到到达parties的数目,屏障才打开,所有线程停止等待,继续执行。

    再有线程wait,屏障就就绪等待到达参数方数目。

    例如就是赛马需要的马匹全部就位,开闸,下一批陆续来到继续等待比赛。。

    名称

    含义

    Broken

    如果屏障处于打破的状态  返回true。

    Abort()

    将屏障至于broken状态,等待中的线程或者调用等待方法的线程中都会抛出brokenbarriererror异常,直达reset方法来恢复屏障

    Reset()

    重置,恢复屏障,重新开始拦截

    import threading
    import logging


    FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
    logging.basicConfig(format=FORMAT,level=logging.INFO)


    def worker(barrier:threading.Barrier):
        logging.info('waiting for {}threads'.format(barrier.n_waiting))
        try:
            barrier_id = barrier.wait()
            logging.info('after barrier{}'.format(barrier_id))
        except threading.BrokenBarrierError:
            logging.info('Broken Barrier')

    barrier = threading.Barrier(3)

    for x in range(9):
        if x == 2:
            barrier.abort()
        elif x == 6:
            barrier.reset()
        threading.Event().wait(1)
        threading.Thread(target=worker,name='worker-{}'.format(x),args=(barrier,)).start()

    logging.info('started')

    2018-06-11 21:42:20,952 worker-0 8460 waiting for 0threads

    2018-06-11 21:42:21,953 worker-1 8484 waiting for 1threads

    2018-06-11 21:42:21,953 worker-1 8484 Broken Barrier

    2018-06-11 21:42:21,954 worker-0 8460 Broken Barrier

    2018-06-11 21:42:22,954 worker-2 1500 waiting for 0threads

    2018-06-11 21:42:22,955 worker-2 1500 Broken Barrier

    2018-06-11 21:42:23,956 worker-3 1200 waiting for 0threads

    2018-06-11 21:42:23,956 worker-3 1200 Broken Barrier

    2018-06-11 21:42:24,958 worker-4 6652 waiting for 0threads

    2018-06-11 21:42:24,958 worker-4 6652 Broken Barrier

    2018-06-11 21:42:25,959 worker-5 3212 waiting for 0threads

    2018-06-11 21:42:25,959 worker-5 3212 Broken Barrier

    2018-06-11 21:42:26,961 worker-6 6344 waiting for 0threads

    2018-06-11 21:42:27,962 worker-7 9732 waiting for 1threads

    2018-06-11 21:42:28,964 worker-8 6068 waiting for 2threads

    2018-06-11 21:42:28,964 worker-8 6068 after barrier2

    2018-06-11 21:42:28,965 worker-6 6344 after barrier0

    2018-06-11 21:42:28,965 MainThread 9768 started

    2018-06-11 21:42:28,965 worker-7 9732 after barrier1

    屏障等待了两个,屏障就被break,waiting的线程跑出了brokenbarriererror异常,新wait的线程也是抛出异常,直到屏障恢复,才继续按照parties数目要求继续拦截线程。

    非broken状态情况下才可以继续等待。

    2、wait方法超时实例

    import threading
    import logging


    FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
    logging.basicConfig(format=FORMAT,level=logging.INFO)


    def worker(barrier:threading.Barrier,i:int):
        logging.info('waiting for {}threads'.format(barrier.n_waiting))
        try:
            # barrier_id = barrier.wait()
            # logging.info('after barrier{}'.format(barrier_id))
            logging.info(barrier.broken)  #是否是broken
            if i < 3:
                barrier_id = barrier.wait(1)#超时后,屏障broken
            else:
                if i == 6:
                    barrier.reset() #恢复屏障
                barrier_id = barrier.wait()
            logging.info('after barrier{}'.format(barrier_id))
        except threading.BrokenBarrierError:
            logging.info('Broken Barrier.run.')

    barrier = threading.Barrier(3)

    for x in range(9):
        # if x == 2:
        #     barrier.abort()
        # elif x == 6:
        #     barrier.reset()
        threading.Event().wait(2)
        threading.Thread(target=worker,name='worker-{}'.format(x),args=(barrier,x)).start()

    logging.info('started')

    2018-06-11 21:53:47,966 worker-0 8656 waiting for 0threads

    2018-06-11 21:53:47,967 worker-0 8656 False

    2018-06-11 21:53:48,967 worker-0 8656 Broken Barrier.run.

    2018-06-11 21:53:49,968 worker-1 168 waiting for 0threads

    2018-06-11 21:53:49,968 worker-1 168 True

    2018-06-11 21:53:49,969 worker-1 168 Broken Barrier.run.

    2018-06-11 21:53:51,969 worker-2 6448 waiting for 0threads

    2018-06-11 21:53:51,970 worker-2 6448 True

    2018-06-11 21:53:51,970 worker-2 6448 Broken Barrier.run.

    2018-06-11 21:53:53,970 worker-3 6192 waiting for 0threads

    2018-06-11 21:53:53,970 worker-3 6192 True

    2018-06-11 21:53:53,971 worker-3 6192 Broken Barrier.run.

    2018-06-11 21:53:55,972 worker-4 6380 waiting for 0threads

    2018-06-11 21:53:55,972 worker-4 6380 True

    2018-06-11 21:53:55,973 worker-4 6380 Broken Barrier.run.

    2018-06-11 21:53:57,973 worker-5 3228 waiting for 0threads

    2018-06-11 21:53:57,973 worker-5 3228 True

    2018-06-11 21:53:57,974 worker-5 3228 Broken Barrier.run.

    2018-06-11 21:53:59,975 worker-6 3924 waiting for 0threads

    2018-06-11 21:53:59,975 worker-6 3924 True

    2018-06-11 21:54:01,975 worker-7 6636 waiting for 1threads

    2018-06-11 21:54:01,975 worker-7 6636 False

    2018-06-11 21:54:03,976 worker-8 9684 waiting for 2threads

    2018-06-11 21:54:03,976 worker-8 9684 False

    2018-06-11 21:54:03,977 worker-8 9684 after barrier2

    2018-06-11 21:54:03,977 worker-7 6636 after barrier1

    2018-06-11 21:54:03,977 MainThread 10036 started

    2018-06-11 21:54:03,978 worker-6 3924 after barrier0

    3、Barrier应用

    并发初始化:

    所有线程都必须初始化完成后,才能继续工作。运行前加载数据、检查,如果这些工作没完成,就开始运行,将不能正常工作。

    10个线程做10种准备工作,每一个线程负责一种工作,只有这10个线程都完成后,才能继续工作,先完成的要等待后完成的线程。

    启动一个程序,需要先加载磁盘文件、缓存预热、初始化连接池等工作,这些工作可以齐头并进,不过只有都满足了,程序才会继续向后执行,假设数据库连接失败,则初始化工作失败,就要abort,所有线程收到异常退出。

    4、semaphore信号量

    和lock很像,信号量对象内部维护一个倒计数器,每一次acquire都会减一,当acquire方法发现技术为0就阻塞请求的线程,直到其他线程对信号量release后,计数器大于0,恢复阻塞的线程。

    名称

    含义

    Semaphore(value=1)

    构造方法,value小于0,抛出valueerror异常

    Acquire(blocking=True,timeout=None)

    获取信号量,计数器减一,获取成功返回true

    Release()

    释放信号量,计数器加1

    计数器永远不会低于0,因为acquire的时候,发现是0,都会被阻塞。

    用with语法。

    Boundedsemaphore。

    import threading
    import logging
    import time


    FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
    logging.basicConfig(format=FORMAT,level=logging.INFO)


    def worker(s:threading.Semaphore):
        logging.info('in sub thread')
        logging.info(s.acquire())
        logging.info('sub thread over')

    s = threading.Semaphore(3)
    logging.info(s.acquire())
    print(s._value)
    logging.info(s.acquire())
    print(s._value)
    logging.info(s.acquire())
    print(s._value)

    threading.Thread(target=worker,args=(s,)).start()

    time.sleep(2)

    logging.info(s.acquire(False))
    logging.info(s.acquire(timeout=3))

    logging.info('relesed')
    s.release()

    2018-06-11 22:36:17,646 MainThread 9340 True

    2

    2018-06-11 22:36:17,647 MainThread 9340 True

    1

    2018-06-11 22:36:17,647 MainThread 9340 True

    0

    2018-06-11 22:36:17,647 Thread-1 8124 in sub thread

    2018-06-11 22:36:19,647 MainThread 9340 False

    2018-06-11 22:36:22,648 MainThread 9340 False

    2018-06-11 22:36:22,648 MainThread 9340 relesed

    2018-06-11 22:36:22,648 Thread-1 8124 True

    2018-06-11 22:36:22,649 Thread-1 8124 sub thread over

    5、连接池

    连接池,因为资源有限,且开启一个连接成本较高,所以利用连接池。

    一个简单的连接池,连接池应该有容量(总数),有一个工厂方法可以获取连接,能够把不用的连接返回。供其他调用者使用。

    class Conn:
        def __init__(self,name):
            self.name = name

    class Pool:
        def __init__(self,count:int):
            self.count = count
            #池中是连接对象的列表
            self.pool = [self._connect('conn-{}'.format(x))for x in range(self.count)]

        def _connect(self,conn_name):
            #创建连接的方法,返回一个名称
            return Conn(conn_name)

        def get_conn(self):
            #从池中拿走一个连接
            if len(self.pool) > 0:
                return self.pool.pop()

        def return_conn(self,conn:Conn):
            #向池中添加一个连接
            self.pool.append(conn)

    上面的例子只是一个简单的功能实现。Get_conn()方法在多线程的时候也会有线程安全问题。

    import threading
    import logging
    import random


    FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
    logging.basicConfig(format=FORMAT,level=logging.INFO)


    class Conn:
        def __init__(self,name):
            self.name = name

        def __repr__(self):
            return self.name

    class Pool:
        def __init__(self,count:int):
            self.count = count
            self.pool = [self._connect('conn-{}'.format(x))for x in range(count)]
            self.semaphore = threading.Semaphore(count)

        def _connect(self,conn_name):
            return Conn(conn_name)

        def get_conn(self):
            print('----')
            self.semaphore.acquire()
            print('======')
            conn = self.pool.pop()
            return conn

        def return_conn(self,conn:Conn):
            self.pool.append(conn)
            self.semaphore.release()

    pool = Pool(3)

    def worker(pool:Pool):
        conn = pool.get_conn()
        logging.info(conn)
        threading.Event().wait(random.randint(1,2))
        pool.return_conn(conn)

    for i in range(6):
        threading.Thread(target=worker,name='worker{}'.format(i),args=(pool,)).start()

    ----

    2018-06-12 20:13:21,119 worker0 8392 conn-2

    ======

    ----

    ======

    2018-06-12 20:13:21,120 worker1 8240 conn-1

    ----

    ======

    ----

    2018-06-12 20:13:21,120 worker2 3216 conn-0

    ----

    ----

    ======

    2018-06-12 20:13:22,120 worker3 5700 conn-0

    2018-06-12 20:13:23,119 worker4 7448 conn-2

    ======

    2018-06-12 20:13:23,120 worker5 2556 conn-1

    ======

    上例中,使用信号量解决资源有限的问题,如果池中有资源,请求者获取资源时候信号量减1,拿走资源,当请求超过资源数,请求者只能等待,当使用者用完归还资源后信号量加1,等待线程就可以被唤醒拿走资源。

    容器,预加载,懒加载。

    6、问题

    Self.conns.append(conn)是否需要加锁。

    1)逻辑分析处理

    还没有使用信号量,就release,。

    import logging
    import threading

    sema = threading.Semaphore(3)
    logging.warning(sema.__dict__)
    for i in range(3):
        sema.acquire()
    logging.warning('~~~~~~~')
    logging.warning(sema.__dict__)
    for i in range(4):
        sema.release()
    logging.warning(sema.__dict__)

    for i in range(3):
        sema.acquire()
    logging.warning('~~~~')
    logging.warning(sema.__dict__)
    sema.acquire()
    logging.warning('~~~~~')
    logging.warning(sema.__dict__)

    WARNING:root:{'_cond': <Condition(<unlocked _thread.lock object at 0x000000C13D6C0B98>, 0)>, '_value': 3}

    WARNING:root:~~~~~~~

    WARNING:root:{'_cond': <Condition(<unlocked _thread.lock object at 0x000000C13D6C0B98>, 0)>, '_value': 0}

    WARNING:root:{'_cond': <Condition(<unlocked _thread.lock object at 0x000000C13D6C0B98>, 0)>, '_value': 4}

    WARNING:root:~~~~

    WARNING:root:{'_cond': <Condition(<unlocked _thread.lock object at 0x000000C13D6C0B98>, 0)>, '_value': 1}

    WARNING:root:~~~~~

    WARNING:root:{'_cond': <Condition(<unlocked _thread.lock object at 0x000000C13D6C0B98>, 0)>, '_value': 0}

    计数器超过了4,超过了设置的最大值,需要解决问题:

    Boundedsemaphore类

    有界的信号量,不允许使用release超出初始值的范围,否则就会抛出valueerror异常。

    保证了多归还连接抛出异常。

    2)如果使用了信号量,还没用完。

    计数器还差一个就满了,有三个线程ABC都执行了第一句,都没有来得及release,这个时候轮到 A release,正常的release,然后轮到线程C先release,一定会出现问题,超届,会抛出异常,信号量可以保证,一定不能多归还。

    3)许多线程用完了信号量

    没有信号量的线程都被阻塞,没有线程和归还的线程争抢,当append后才release,这个时候才能等待的线程被唤醒,才能pop,没有获取信号量的不能pop,这样才是安全的。

    7、信号量和锁

    锁,只允许同一个时间一个线程独占资源,他是特殊的信号量,即信号量初始值为1.

    信号量,可以多个线程访问共享资源,但这个共享资源属相有限,锁,可以看做是特殊的信号量。

    Event lock sem 三个必须要会用。

    8、数据结构和gill

    Queue是线程安全的,里面用到了锁,还有condition,用的是lock。

    Queue是标准库模块,提供FIFO的queue,lifo的队列,有限队列。

    Queue是线程安全的,适用于线程间的安全交换数据,内部使用了lock和condition。

    原子操作,一堆操作中要么全部做完,要么全部做不完。

    Guarantee 保证。

    严格要注意的事项:

    9、gil全局解释器锁

    CPython在解释器进程中大锁,叫做gill全局解释器锁。大锁解决进程内的所有线程的问题,在CPU上只有一个线程被调度使用。

    同一个时间内同一个进程内只有一个线程在工作,在执行字节码,甚至在多核的CPU的情况下,也是如此。

    cPython中:

    IO密集型:由于线程阻塞,就会调度其他线程。

    CPU密集型:不访问网络,当前线程可能会连续的获得gill,导致其他线程几乎无法使用CPU。

    在cPython中由于gill存在,IO密集型,使用多线程较为合算,CPU密集型,使用多进程,要绕开gill。

    新版cPython正在努力优化gill的问题,但是不是移除的问题,

    Python中绝大多数内置数据结构的读、写操作都是原子操作。

    由于gill的存在,Python的内置数据类型在多线程编程的时候变成了安全的,但是实际上本身不是线程和安全类型。

    移除gill,会降低cPython单线程的执行效率。

    本身不安全,有全局解释器锁gill,都是由线程操作的,线程安全的。

  • 相关阅读:
    os
    linux常用命令
    css-基础知识
    awk命令详解
    文献综述
    微信JSAPI支付
    SNMP详解
    SNMP进阶
    SNMP协议入门
    SNMP简单网络管理协议
  • 原文地址:https://www.cnblogs.com/wangchunli-blogs/p/9949894.html
Copyright © 2011-2022 走看看