zoukankan      html  css  js  c++  java
  • python 修炼11 ----------线程进程

    知识点补充:

    #并发&并行
    
    并发:指的是系统具有处理多个任务的能力
    并行:系统同时处理多个任务能力
    
    并行是并发的一个子集
    
    #同步&异步
    同步:当一个进程执行到一个IO操作(等待外部数据)的时候------等:同步 (打电话 )
    
    异步: 当一个程序执行到一个IO操作,不等待, ()
    以为有gal 同一时刻只能有一个线运行

      进  程  

    定义:进程就是在1个程序在数据集上的一次动态执行过程。进程一般由程序、数据集、进程控制模块三部分组成。
    
      数据集:程序在执行过程中需要使用的资源,
    
      进程控制块用来记录进程的外部特征,描述进程的执行变化过程,系统可以利用它来控制和管理进程,它是系
      统感知进程存在的唯一标志。

    问题1:什么进程?

    (有时称为重量级进程)简单来说:进程是一个程序执行的过程,比较抽象。

    程序知识存在硬盘上的可执行的2进制(或其它)文件。只有把程序加载到内存中并被系统调用,才拥有生命期。进程是一个执行中的程序。每个进程都有自己的地址空间、内存、数据栈以及其它的用于跟踪执行的辅助数据。

    问题2:为什么需要进程

    简单来说:进程是为了实现多任务并发,它是最小资源单位。

    <多进程模块 multiprocessing>

    由于GIL的存在,python中的多线程其实并不是真正的多线程,如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程。
    
    multiprocessing包是Python中的多进程管理包。与threading.Thread类似,它可以利用multiprocessing.Process对象来创建一个进程。该进程可以运行在Python程序内部编写的函数。该Process对象与Thread对象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition类 (这些对象可以像多线程那样,通过参数传递给各个进程),用以同步进程,其用法与threading包中的同名类一致。所以,multiprocessing的很大一部份与threading使用同一套API,只不过换到了多进程的情境。

    一 进程的调用

    1.调用方式

    from multiprocessing import Process    #
    import time
    def foo(name):
        time.sleep(1)
        print('hello',name,time.ctime())
    
    if __name__ == '__main__':
        L=[]
        for i in range(3):
            p=Process(target=foo,args=('alex',))
            L.append(p)
            p.start()
        
        for i in L:
            p.join()
        print('end')
    执行结果
    from multiprocessing import Process
    import time
    
    class Myprocess(Process):
        def __init__(self):
            super(Myprocess,self).__init__()
            # self.name= name
        def run(self):
            time.sleep(1)
            print('hello',self.name,time.ctime())
    if __name__ == '__main__':
        L=[]
        for i in range(3):
            p=Myprocess()
            p.start()
        for p in L:
            p.join()
    执行结果
    end
    hello Myprocess-1 Mon Jan 16 23:32:09 2017
    hello Myprocess-2 Mon Jan 16 23:32:09 2017
    hello Myprocess-3 Mon Jan 16 23:32:09 2017

    获取id号
    from
    multiprocessing import Process import os,time def info(title): print('title',title) print('parent process',os.getppid()) #获取父级进程ID号 print('process id',os.getppid()) #或得当前进程id def f(name): info('function f') print('hello',name) if __name__ == '__main__': info('main process line') time.sleep(1) print('------') p=Process(target=info,args=('yuan',)) p.start() p.join()
    title main process line
    parent process 3704
    process id 3704
    ------
    title yuan
    parent process 848
    process id 848
    执行结果

    2. Process类 

    构造方法:

    Process([group [, target [, name [, args [, kwargs]]]]])

      group: 线程组,目前还没有实现,库引用中提示必须是None; 
      target: 要执行的方法; 
      name: 进程名; 
      args/kwargs: 要传入方法的参数。

    实例方法:

      is_alive():返回进程是否在运行。

      join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。

      start():进程准备就绪,等待CPU调度

      run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。

      terminate():不管任务是否完成,立即停止工作进程

    属性:

      daemon:和线程的setDeamon功能一样

      name:进程名字。

      

     pid:进程号。

    #linux 调试 可以成功
    import
    time from multiprocessing import Process def foo(i): time.sleep(1) print (p.is_alive(),i,p.pid) time.sleep(1) if __name__ == '__main__': p_list=[] for i in range(10): p = Process(target=foo, args=(i,)) #p.daemon=True p_list.append(p) for p in p_list: p.start() # for p in p_list: # p.join() print('main process end')
    import time
    from  multiprocessing import Process
    class Myprocess(Process):
        def __init__(self):
            super(Myprocess,self).__init__()
            self.num=enumerate
        def run(self):
            time.sleep(1)
            print(self.pid)
            print(self.is_alive())
            print(self.num)
            time.sleep(1)
    
    if __name__ == '__main__':
        p_list=[]
        p=Myprocess()
        # p.daemon=True
        p_list.append(p)
        for p in p_list:
            p.start()
        for p in p_list:
            p.join()
        print('main process end')
    6964
    True
    <class 'enumerate'>
    main process end
    运行结果

      进程间通讯  

     实现方案1: 进程对列Queue

    
    
    #1队列通信 queue
    from multiprocessing import Process ,Queue
    import queue

    def f(q,n):
    q.put(n*n+1)
    print('sub',id(q))

    if __name__ == '__main__':
    q=Queue() #进程P线程的区别q=queue.Queue()
    print('main process',id(q))
    for i in range(3):
    p=Process(target=f,args=(q,i))

    p.start()
    for i in range(3):
    print(q.get())
     
    main process 4535792360
    sub 683720072776
    1
    sub 1063852297912
    2
    sub 484540747560
    5
    运行结果

    实现通信方案2:管道

    #管道
    from multiprocessing import Process,Pipe
    def f(conn):
        conn.send(['Hi boy'])
        res_parent=conn.recv()
        print(res_parent)
        conn.close()
        print('id child',id(conn))
    
    if __name__ == '__main__':
        parent_conn,child_coon=Pipe()   #Pipe 双向管路
        print('id child',id(child_coon),'id parent',id(parent_conn))
        p=Process(target=f,args=(child_coon,))
        p.start()
        print(parent_conn.recv())
        parent_conn.send('hello oldboy')
        p.join()
    id child 156755866456 id parent 156755866512
    ['Hi boy']
    hello oldboy
    id child 37905770928
    执行结果

     实现方案3 Managers  实现资源共享

     Queue和pipe只是实现了数据交互,并没实现数据共享,即一个进程去更改另一个进程的数据。

    A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

    A manager returned by Manager() will support types listdictNamespaceLockRLockSemaphoreBoundedSemaphoreConditionEventBarrierQueueValue and Array. For example:

    #资源共享
    from multiprocessing import Process,Manager
    def f(d,l,n):
        d[n]='1'
        l.append(n)
    if __name__ == '__main__':
        with Manager() as manager:
            d=manager.dict()
            l=manager.list()
            print('main process',id(d),id(l))
    
            p_list=[]
            for i in range(10):
                p=Process(target=f,args=(d,l,i))
                p.start()
                p_list.append(p)
            for res in p_list:
                res.join()
            print(id(p))
            print(l)
    main process 321547575024 321547591632
    321547916568
    [4, 0, 9, 2, 1, 5, 8, 6, 3, 7]
    运行结果

    进程同步

     # 利用锁实现同步
    from multiprocessing import Process,Lock
    def f(lock,num):
        with lock:  #=====lock.acquire()
            print('%s person hello world'%num)
        pass
    if __name__ == '__main__':
        lock=Lock()
        for num in range(10):
            q=Process(target=f,args=(lock,num))
            q.start()
    3 person hello world
    0 person hello world
    4 person hello world
    2 person hello world
    6 person hello world
    7 person hello world
    5 person hello world
    8 person hello world
    1 person hello world
    9 person hello world
    运行结果

    进程池

    进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。

    apply    同步执行

    apply_async  异步执行

    from multiprocessing import Process,Pool
    import time,os
    
    def Foo(i):
        time.sleep(1)
        print(i)
        print('son',os.getpid())
        return "hello %s"%i
    def Bar(arg): #必须有个参数接收回调过来的值
        print(arg)
        print('Bar id',os.getppid())
    
    if __name__ == '__main__':
        pool=Pool(5)
        print('main pid',os.getppid())
        for i in range(100):
            # pool.apply(func=Foo,args=(i,))   #同步执行
            # pool.apply_async(func=Foo,args=(i,))
            #回调函数callback=:就是某个动作或函数之星成功后在执行的函数
            pool.apply_async(func=Foo,args=(i,),callback=Bar)
    
        pool.close()
        pool.join()   #join与close调用顺序是固定的必须放到close之后
        print('end')
    main pid 6992
    0
    son 412
    hello 0
    Bar id 6992
    1
    son 7304
    hello 1
    Bar id 6992
    2
    son 2852
    hello 2
    Bar id 6992
    4
    3
    son 9052
    son 4220
    hello 4
    Bar id 6992
    hello 3
    Bar id 6992
    5
    son 412
    hello 5
    Bar id 6992
    6
    son 7304
    hello 6
    Bar id 6992
    7
    son 2852
    hello 7
    Bar id 6992
    8
    son 9052
    9
    son 4220
    hello 8
    Bar id 6992
    hello 9
    Bar id 6992
    10
    son 412
    hello 10
    Bar id 6992
    11
    son 7304
    hello 11
    Bar id 6992
    12
    son 2852
    hello 12
    Bar id 6992
    13
    son 9052
    hello 13
    Bar id 6992
    14
    son 4220
    hello 14
    Bar id 6992
    15
    son 412
    hello 15
    Bar id 6992
    16
    son 7304
    hello 16
    Bar id 6992
    17
    son 2852
    hello 17
    Bar id 6992
    18
    son 9052
    hello 18
    Bar id 6992
    19
    son 4220
    hello 19
    Bar id 6992
    20
    son 412
    hello 20
    Bar id 6992
    21
    son 7304
    hello 21
    Bar id 6992
    22
    son 2852
    hello 22
    Bar id 6992
    23
    son 9052
    hello 23
    Bar id 6992
    24
    son 4220
    hello 24
    Bar id 6992
    25
    son 412
    hello 25
    Bar id 6992
    26
    son 7304
    hello 26
    Bar id 6992
    27
    son 2852
    hello 27
    Bar id 6992
    28
    son 9052
    hello 28
    Bar id 6992
    29
    son 4220
    hello 29
    Bar id 6992
    30
    son 412
    hello 30
    Bar id 6992
    31
    son 7304
    hello 31
    Bar id 6992
    32
    son 2852
    hello 32
    Bar id 6992
    33
    son 9052
    hello 33
    Bar id 6992
    34
    son 4220
    hello 34
    Bar id 6992
    35
    son 412
    hello 35
    Bar id 6992
    36
    son 7304
    hello 36
    Bar id 6992
    37
    son 2852
    hello 37
    Bar id 6992
    38
    son 9052
    hello 38
    Bar id 6992
    39
    son 4220
    hello 39
    Bar id 6992
    40
    son 412
    hello 40
    Bar id 6992
    41
    son 7304
    hello 41
    42
    son 2852
    Bar id 6992
    hello 42
    Bar id 6992
    43
    son 9052
    hello 43
    Bar id 6992
    44
    son 4220
    hello 44
    Bar id 6992
    45
    son 412
    hello 45
    Bar id 6992
    46
    son 7304
    hello 46
    47
    son 2852
    Bar id 6992
    hello 47
    Bar id 6992
    48
    son 9052
    hello 48
    Bar id 6992
    49
    son 4220
    hello 49
    Bar id 6992
    50
    son 412
    hello 50
    Bar id 6992
    51
    son 7304
    hello 51
    Bar id 6992
    52
    son 2852
    hello 52
    Bar id 6992
    53
    son 9052
    hello 53
    Bar id 6992
    54
    son 4220
    hello 54
    Bar id 6992
    55
    son 412
    hello 55
    Bar id 6992
    56
    son 7304
    hello 56
    Bar id 6992
    57
    son 2852
    hello 57
    Bar id 6992
    58
    son 9052
    hello 58
    Bar id 6992
    59
    son 4220
    hello 59
    Bar id 6992
    60
    son 412
    hello 60
    Bar id 6992
    61
    son 7304
    hello 61
    Bar id 6992
    62
    son 2852
    hello 62
    Bar id 6992
    63
    son 9052
    hello 63
    Bar id 6992
    64
    son 4220
    hello 64
    Bar id 6992
    65
    son 412
    hello 65
    Bar id 6992
    66
    son 7304
    hello 66
    67
    son 2852
    Bar id 6992
    hello 67
    Bar id 6992
    68
    son 9052
    hello 68
    Bar id 6992
    69
    son 4220
    hello 69
    Bar id 6992
    70
    son 412
    hello 70
    Bar id 6992
    71
    son 7304
    hello 71
    Bar id 6992
    72
    son 2852
    hello 72
    Bar id 6992
    73
    son 9052
    hello 73
    Bar id 6992
    74
    son 4220
    hello 74
    Bar id 6992
    75
    son 412
    hello 75
    Bar id 6992
    76
    son 7304
    hello 76
    77
    son 2852
    Bar id 6992
    hello 77
    Bar id 6992
    78
    son 9052
    hello 78
    Bar id 6992
    79
    son 4220
    hello 79
    Bar id 6992
    80
    son 412
    hello 80
    Bar id 6992
    81
    son 7304
    hello 81
    82
    son 2852
    Bar id 6992
    hello 82
    Bar id 6992
    83
    son 9052
    hello 83
    Bar id 6992
    84
    son 4220
    hello 84
    Bar id 6992
    85
    son 412
    hello 85
    Bar id 6992
    86
    son 7304
    hello 86
    87
    son 2852
    Bar id 6992
    hello 87
    Bar id 6992
    88
    son 9052
    hello 88
    Bar id 6992
    89
    son 4220
    hello 89
    Bar id 6992
    90
    son 412
    hello 90
    Bar id 6992
    91
    son 7304
    hello 91
    92
    son 2852
    Bar id 6992
    hello 92
    Bar id 6992
    93
    son 9052
    hello 93
    Bar id 6992
    94
    son 4220
    hello 94
    Bar id 6992
    95
    son 412
    hello 95
    Bar id 6992
    96
    son 7304
    hello 96
    97
    son 2852
    Bar id 6992
    hello 97
    Bar id 6992
    98
    son 9052
    hello 98
    Bar id 6992
    99
    son 4220
    hello 99
    Bar id 6992
    end
    运行结果

    Python中的上下文管理器(contextlib模块)>

    上下文管理器的任务是:代码块执行前准备,代码块执行后收拾

    1 如何使用上下文管理器:

    如何打开一个文件,并写入"hello world"

    1
    2
    3
    4
    5
    filename="my.txt"
    mode="w"
    f=open(filename,mode)
    f.write("hello world")
    f.close()

    当发生异常时(如磁盘写满),就没有机会执行第5行。当然,我们可以采用try-finally语句块进行包装:

    1
    2
    3
    4
    5
    writer=open(filename,mode)
    try:
        writer.write("hello world")
    finally:
        writer.close()

    当我们进行复杂的操作时,try-finally语句就会变得丑陋,采用with语句重写:

    1
    2
    with open(filename,mode) as writer:
        writer.write("hello world")

    as指代了从open()函数返回的内容,并把它赋给了新值。with完成了try-finally的任务。

    2 自定义上下文管理器  

    with语句的作用类似于try-finally,提供一种上下文机制。要应用with语句的类,其内部必须提供两个内置函数__enter__和__exit__。前者在主体代码执行前执行,后者在主体代码执行后执行。as后面的变量,是在__enter__函数中返回的。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    class echo():
        def output(self):
            print "hello world"
        def __enter__(self):
            print "enter"
            return self  #可以返回任何希望返回的东西
        def__exit__(self,exception_type,value,trackback):
            print "exit"
            ifexception_type==ValueError:
                return True
            else:
                return Flase
      
    >>>with echo as e:
        e.output()
         
    输出:
    enter
    hello world
    exit

    完备的__exit__函数如下:

    1
    def__exit__(self,exc_type,exc_value,exc_tb)

    其中,exc_type:异常类型;exc_value:异常值;exc_tb:异常追踪信息

    当__exit__返回True时,异常不传播

    3 contextlib模块  

    contextlib模块的作用是提供更易用的上下文管理器,它是通过Generator实现的。contextlib中的contextmanager作为装饰器来提供一种针对函数级别的上下文管理机制,常用框架如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    from contextlib import contextmanager
    @contextmanager
    def make_context():
        print 'enter'
        try:
            yield "ok"
        except RuntimeError,err:
            print 'error',err
        finally:
            print 'exit'
             
    >>>with make_context() as value:
        print value
         
    输出为:
        enter
        ok
        exit

    其中,yield写入try-finally中是为了保证异常安全(能处理异常)as后的变量的值是由yield返回。yield前面的语句可看作代码块执行前操作,yield之后的操作可以看作在__exit__函数中的操作。

    以线程锁为例:

    复制代码
    @contextlib.contextmanager
    def loudLock():
        print 'Locking'
        lock.acquire()
        yield
        print 'Releasing'
        lock.release()
     
    with loudLock():
        print 'Lock is locked: %s' % lock.locked()
        print 'Doing something that needs locking'
     
    #Output:
    #Locking
    #Lock is locked: True
    #Doing something that needs locking
    #Releasing
    复制代码

    4 contextlib.nested:减少嵌套

    对于:

    1
    2
    3
    with open(filename,mode) as reader:
        with open(filename1,mode1) as writer:
            writer.write(reader.read())

    可以通过contextlib.nested进行简化:

    1
    2
    with contextlib.nested(open(filename,mode),open(filename1,mode1)) as (reader,writer):
        writer.write(reader.read())

    在python 2.7及以后,被一种新的语法取代:

    1
    2
    with open(filename,mode) as reader,open(filename1,mode1) as writer:
        writer.write(reader.read())

    5 contextlib.closing() 

    file类直接支持上下文管理器API,但有些表示打开句柄的对象并不支持,如urllib.urlopen()返回的对象。还有些遗留类,使用close()方法而不支持上下文管理器API。为了确保关闭句柄,需要使用closing()为它创建一个上下文管理器(调用类的close方法)。

    View Code

    协程

    协程,又称微线程,纤程。英文名Coroutine。

    优点1: 协程极高的执行效率。因为子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。

    优点2: 不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。

    因为协程是一个线程执行,那怎么利用多核CPU呢?最简单的方法是多进程+协程,既充分利用多核,又充分发挥协程的高效率,可获得极高的性能。

    yield的简单实现

    复制代码
    import time
    import queue
    
    def consumer(name):
        print("--->ready to eat baozi...")
        while True:
            new_baozi = yield
            print("[%s] is eating baozi %s" % (name,new_baozi))
            #time.sleep(1)
    
    def producer():
    
        r = con.__next__()
        r = con2.__next__()
        n = 0
        while 1:
            time.sleep(1)
            print("33[32;1m[producer]33[0m is making baozi %s and %s" %(n,n+1) )
            con.send(n)
            con2.send(n+1)
    
            n +=2
    
    
    if __name__ == '__main__':
        con = consumer("c1")
        con2 = consumer("c2")
        p = producer()
    复制代码

    Greenlet

    greenlet是一个用C实现的协程模块,相比与python自带的yield,它可以使你在任意函数之间随意切换,而不需把这个函数先声明为generator

    复制代码
    from greenlet import greenlet
     
     
    def test1():
        print(12)
        gr2.switch()
        print(34)
        gr2.switch()
     
     
    def test2():
        print(56)
        gr1.switch()
        print(78)
     
     
    gr1 = greenlet(test1)
    gr2 = greenlet(test2)
    gr1.switch()
    复制代码

    Gevent

    复制代码
    import gevent
    
    import requests,time
    
    
    start=time.time()
    
    def f(url):
        print('GET: %s' % url)
        resp =requests.get(url)
        data = resp.text
        print('%d bytes received from %s.' % (len(data), url))
    
    gevent.joinall([
    
            gevent.spawn(f, 'https://www.python.org/'),
            gevent.spawn(f, 'https://www.yahoo.com/'),
            gevent.spawn(f, 'https://www.baidu.com/'),
            gevent.spawn(f, 'https://www.sina.com.cn/'),
    
    ])
    
    # f('https://www.python.org/')
    #
    # f('https://www.yahoo.com/')
    #
    # f('https://baidu.com/')
    #
    # f('https://www.sina.com.cn/')
    
    print("cost time:",time.time()-start)

      **线  程** 

    python的GIL

    In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)

    上面的核心意思就是,无论你启多少个线程,你有多少个cpu, Python在执行的时候会淡定的在同一时刻只允许一个线程运行

    线程的降低进程上下文切换的消耗,提高系统的并发性,并突破一个进程只能干一样事的缺陷,
    使到进程内并发成为可能。
    线程也叫轻量级进程,它是一个基本的CPU执行单元,也是程序执行过程中的最小单元,由线程ID、程序
    计数器、寄存器集合和堆栈共同组成。线程的引入减小了程序并发执行时的开销,提高了操作系统的并发
    性能。线程没有自己的系统资源。

    问题1:什么是线程?

    线程:(有时称为轻量级进程)线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务

    总结:

    线程进程的关系区别

    1 一个程序至少有一个进程,一个进程至少有一个线程.(进程可以理解成线程的容器)

    
    

    2 进程在执行过程中拥有独立的内存单元,而多个线程共享内存,从而极大地提高了程序的运行效率。

    
    

    3 线程在执行过程中与进程还是有区别的。每个独立的线程有一个程序运行的入口、顺序执行序列和
    程序的出口。但是线程不能够独立执行,必须依存在应用程序中,由应用程序提供多个线程执行控制。

    
    

    4 进程是具有一定独立功能的程序关于某个数据集合上的一次运行活动,进程是系统进行资源分配和调
    度的一个独立单位.
    线程是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位.线程
    自己基本上不拥有系统资源,只拥有一点在运行中必不可少的资源(如程序计数器,一组寄存器和栈)但是
    它可与同属一个进程的其他的线程共享进程所拥有的全部资源.
    一个线程可以创建

    
    

    一、threading.thread

    if __name__ == '__main__':
        l = []
        t1=threading.Thread(target=Listen_music,args=('alex',))        #target 跟函数名  args跟参数必须(,)
        t2=threading.Thread(target=Play_blog,args=('xiaoming',))
        l.append(t1)
        l.append(t2)
        for t in l:
            t.start()
    
    
    #解释:开了两个线程,实现并发。
    先执行两个函数中的一个
    执行第一个线程print()

    遇到到阻塞,马上切到另一个线程(函数),时间特别短暂,感觉同时执行
    执行另一个线程print
    遇到下个sleep()等你
    切换线程到了2s后
    执行Liseten_music print() 4s后执行Play_blog(title) 的第二个print()
    bengin listen to alex  1484618756.364966
    beginn play bay xiaoming 1484618756.3659663
    end listening ...1484618758.3663201
    end play......1484618760.3667886
    执行结果

    1.join&Deamon方法

    join():在子线程完成运行之前,这个子线程的父线程将一直被阻塞。
    
    setDaemon(True):
    
             将线程声明为守护线程,必须在start() 方法调用之前设置, 如果不设置为守护线程程序会被无限挂起。这个方法基本和join是相反的。
    
             当我们 在程序运行中,执行一个主线程,如果主线程又创建一个子线程,主线程和子线程 就分兵两路,分别运行,那么当主线程完成
    
             想退出时,会检验子线程是否完成。如 果子线程未完成,则主线程会等待子线程完成后再退出。但是有时候我们需要的是 只要主线程
    
             完成了,不管子线程是否完成,都要和主线程一起退出,这时就可以 用setDaemon方法啦
    setDaemon()谁和设置成守护线程,主线程就不等待哪个线程,只等待没有设置守护的线程执行完毕后,主线结束,只要主线程结束设置守护线程额线程,不管执行是否完成,都强制结束。。。。

    import
    threading import time def Listen_music(name): print("bengin listen to %s %s"%(name,time.time())) time.sleep(2) print('end listening ...%s'%time.time()) def Play_blog(title): print('beginn play bay %s %s'%(title,time.time())) time.sleep(4) print('end play......%s'%time.time()) if __name__ == '__main__': l = [] t1=threading.Thread(target=Listen_music,args=('alex',)) #target 跟函数名 args跟参数必须(,) t2=threading.Thread(target=Play_blog,args=('xiaoming',)) l.append(t1) l.append(t2) t1.setDaemon(True) #必须跟一个参数布尔值 for t in l: # t.setDaemon(True) #设置守护线程 必须在start之前设置 此处设置没有意义 t.start() t2.join() #join 子线程运行完毕直前,该进程的父进程在此处进行阻塞 print("主线程执行完毕 %s"%time.time())
    bengin listen to alex  1484619465.6224053
    beginn play bay xiaoming 1484619465.6224053
    end listening ...1484619467.623588
    end play......1484619469.6231313
    主线程执行完毕 1484619469.6231313
    执行结果

     2.其他的方法

    thread 模块提供的其他方法:
    # threading.currentThread(): 返回当前的线程变量。
    # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
    # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
    # 除了使用方法外,线程模块同样提供了Thread类来处理线程,Thread类提供了以下方法:
    # run(): 用以表示线程活动的方法。
    # start():启动线程活动。
    # join([time]): 等待至线程中止。这阻塞调用线程直至线程的join() 方法被调用中止-正常退出或者抛出未处理的异常-或者是可选的超时发生。
    # isAlive(): 返回线程是否活动的。
    # getName(): 返回线程名。
    # setName(): 设置线程名
    #继承式调用
    import threading,time
    class Mythread(threading.Thread):
        def __init__(self,num):
            threading.Thread.__init__(self)
            self.num=num
    
        def run(self): #继承调用需要从新定义run()方法  线程被cpu调度后自动执行线程对象的run方法
            print('---> %s 返回线程名字%s 判断线程是否是活动线程%s'%(self.num,self.getName(),self.is_alive(),))
            time.sleep(2)
    if __name__ == '__main__':
        t1=Mythread(1)
        t2=Mythread(4)
        t1.start()
        t2.start()
    ---> 1 返回线程名字Thread-1 判断线程是否是活动线程True
    ---> 4 返回线程名字Thread-2 判断线程是否是活动线程True
    执行结果

       **同步锁(Lock)**  

    我们先看一个例子

    我们启动100个线程  计算100 递减1  我们理解就是最后结果为0

    import time,threading
    def Add_num():
        global num    #在每个线程中都获取这个全局变量
        # num -=1
        temp=num
        print('now--->num',num)
        time.sleep(0.08)
        num =temp-1         #对此公共变量进行-1操作
    
    num=100
    L=[]
    for i in range(100):
        t=threading.Thread(target=Add_num)
        t.start()
        L.append(t)
    for t in L:
        t.join() #等待所有线程执行完毕
    
    print("%s
    ending..."%num)

    你猜这是多少 ?下面是结果:

    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    now--->num 100
    99
    ending...
    View Code

    从结果可以看出,由于遇到io阻塞,切换下一个进程,由于num还没有进行自减一,就把num从新给了下一个线程,知道过了0.08s后 进行自减1,但是由于切换进程太快 知道最后一个进程 num仍然还是100 减去1  为99

     

    多个线程都在同时操作同一个共享资源,所以造成了资源破坏,怎么办呢?

    有同学会想用join呗,但join会把整个线程给停住,造成了串行,失去了多线程的意义,而我们只需要把计算(涉及到操作公共数据)的时候串行执行。

    我们可以通过同步锁来解决这种问题

    import time,threading
    def Add_num():
    global num #在每个线程中都获取这个全局变量
    # num -=1
    lock.acquire() #-------------------------------------->挂起进程锁
    temp=num
    print('now--->num',num)
    time.sleep(0.08)
    num =temp-1 #对此公共变量进行-1操作
    lock.release() #------------------------------------->解除进程锁
    num=100
    L=[]
    lock=threading.Lock() #------------------------------------->先买一个锁子
    for i in range(100):
    t=threading.Thread(target=Add_num)
    t.start()
    L.append(t)
    for t in L:
    t.join() #等待所有线程执行完毕

    print("%s ending..."%num)
    now--->num 100
    now--->num 99
    now--->num 98
    now--->num 97
    now--->num 96
    now--->num 95
    now--->num 94
    now--->num 93
    now--->num 92
    now--->num 91
    now--->num 90
    now--->num 89
    now--->num 88
    now--->num 87
    now--->num 86
    now--->num 85
    now--->num 84
    now--->num 83
    now--->num 82
    now--->num 81
    now--->num 80
    now--->num 79
    now--->num 78
    now--->num 77
    now--->num 76
    now--->num 75
    now--->num 74
    now--->num 73
    now--->num 72
    now--->num 71
    now--->num 70
    now--->num 69
    now--->num 68
    now--->num 67
    now--->num 66
    now--->num 65
    now--->num 64
    now--->num 63
    now--->num 62
    now--->num 61
    now--->num 60
    now--->num 59
    now--->num 58
    now--->num 57
    now--->num 56
    now--->num 55
    now--->num 54
    now--->num 53
    now--->num 52
    now--->num 51
    now--->num 50
    now--->num 49
    now--->num 48
    now--->num 47
    now--->num 46
    now--->num 45
    now--->num 44
    now--->num 43
    now--->num 42
    now--->num 41
    now--->num 40
    now--->num 39
    now--->num 38
    now--->num 37
    now--->num 36
    now--->num 35
    now--->num 34
    now--->num 33
    now--->num 32
    now--->num 31
    now--->num 30
    now--->num 29
    now--->num 28
    now--->num 27
    now--->num 26
    now--->num 25
    now--->num 24
    now--->num 23
    now--->num 22
    now--->num 21
    now--->num 20
    now--->num 19
    now--->num 18
    now--->num 17
    now--->num 16
    now--->num 15
    now--->num 14
    now--->num 13
    now--->num 12
    now--->num 11
    now--->num 10
    now--->num 9
    now--->num 8
    now--->num 7
    now--->num 6
    now--->num 5
    now--->num 4
    now--->num 3
    now--->num 2
    now--->num 1
    0
    ending...
    执行结果

    问题解决,但

    请问:同步锁与GIL的关系?

    Python的线程在GIL的控制之下,线程之间,对整个python解释器,对python提供的C API的访问都是互斥的,这可以看作是Python内核级的互斥机制。但是这种互斥是我们不能控制的,我们还需要另外一种可控的互斥机制———用户级互斥。内核级通过互斥保护了内核的共享资源,同样,用户级互斥保护了用户程序中的共享资源。

    GIL 的作用是:对于一个解释器,只能有一个thread在执行bytecode。所以每时每刻只有一条bytecode在被执行一个thread。GIL保证了bytecode 这层面上是thread safe的。
    但是如果你有个操作比如 x += 1,这个操作需要多个bytecodes操作,在执行这个操作的多条bytecodes期间的时候可能中途就换thread了,这样就出现了data races的情况了。
     
    那我的同步锁也是保证同一时刻只有一个线程被执行,是不是没有GIL也可以?是的;那要GIL有什么鸟用?你没治;

     

        **线程死锁和递归锁 **      

    线程之间共享进程给的多个资源,如果两个线程分别占有一部分资源并且同时等待对方释放资源,自己去取,就会造成死锁。以为系统判断这部分资源正在使用,所以这两个线程吴外力作用下,一直都是请求获取,就这样一直等待下去

    
    
    import time,threading
    class Mythread(threading.Thread):
    def actionA(self):
    A.acquire()
    print(self.name,'gotlookA',time.time())
    time.sleep(2)
    B.acquire()
    print(self.name,'gotlookB',time.time())
    B.release()
    print(self.name,'removelookB',time.time())
    A.release()
    print(self.name, 'removelookA', time.time())

    def actionB(self):
    B.acquire()
    print(self.name,'gotlookB',time.time())
    time.sleep(3)
    A.acquire()
    print(self.name,'gotlookA',time.time())
    A.release()
    print(self.name, 'removelookA', time.time())
    B.release()
    print(self.name, 'removelookB', time.time())
    def run(self):
    self.actionA()
    self.actionB()
    if __name__ == '__main__':
    A=threading.Lock()
    B=threading.Lock()
    L=[]
    for i in range(3):
    L.append(Mythread())
    for t in L:
    t.start()
    for t in L:
    t.join()
    print('end.....')
     
    执行结果:

    Thread-1 gotlookA 1484624775.050157     #线程1获得A锁 其他线程进行等待获得A锁
    Thread-1 gotlookB 1484624777.05151       #线程1获得B锁
    Thread-1 removelookB 1484624777.05151    #线程1 释放B锁
    Thread-1 removelookA 1484624777.05151     #线程1 释放A锁
    Thread-2 gotlookA 1484624777.05151          #这是线程2 进入获取A锁  --------》等待获得B锁
    Thread-1 gotlookB 1484624777.05151            #线程1 继续运行获取B锁----------》等待获得A锁

    程序阻塞停止一直出去获取状态

    解决方法:使用递归锁,

    为了支持在同一线程中多次请求同一资源,python提供了“可重入锁”:threading.RLock。RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次acquire。直到一个线程所有的acquire都被release,其他的线程才能获得资源。

    import time,threading
    class Mythread(threading.Thread):
        def actionA(self):
            A.acquire()
            print(self.name,'gotlookA',time.time())
            time.sleep(2)
            A.acquire()
            print(self.name,'gotlookB',time.time())
            A.release()
            print(self.name,'removelookB',time.time())
            A.release()
            print(self.name, 'removelookB', time.time())
    
        def actionB(self):
            A.acquire()
            print(self.name,'gotlookB',time.time())
            time.sleep(3)
            A.acquire()
            print(self.name,'gotlookA',time.time())
            A.release()
            print(self.name, 'removelookB', time.time())
            A.release()
            print(self.name, 'removelookB', time.time())
        def run(self):
            self.actionA()
            self.actionB()
    if __name__ == '__main__':
        A=threading.RLock()
    
        L=[]
        for i in range(3):
            L.append(Mythread())
        for t in L:
            t.start()
        for t in L:
            t.join()
        print('end.....')
    Thread-1 gotlookA 1484625486.6879797
    Thread-1 gotlookB 1484625488.6892748
    Thread-1 removelookB 1484625488.6892748
    Thread-1 removelookB 1484625488.6892748
    Thread-2 gotlookA 1484625488.6892748
    Thread-2 gotlookB 1484625490.690051
    Thread-2 removelookB 1484625490.690051
    Thread-2 removelookB 1484625490.690051
    Thread-3 gotlookA 1484625490.690051
    Thread-3 gotlookB 1484625492.6903684
    Thread-3 removelookB 1484625492.6903684
    Thread-3 removelookB 1484625492.6903684
    Thread-1 gotlookB 1484625492.6903684
    Thread-1 gotlookA 1484625495.6916673
    Thread-1 removelookB 1484625495.6916673
    Thread-1 removelookB 1484625495.6916673
    Thread-2 gotlookB 1484625495.6916673
    Thread-2 gotlookA 1484625498.692889
    Thread-2 removelookB 1484625498.692889
    Thread-2 removelookB 1484625498.692889
    Thread-3 gotlookB 1484625498.692889
    Thread-3 gotlookA 1484625501.6936538
    Thread-3 removelookB 1484625501.6936538
    Thread-3 removelookB 1484625501.6936538
    end.....
    执行结果

    附加例子:

    import time
    
    import threading
    
    class Account:
        def __init__(self, _id, balance):
            self.id = _id
            self.balance = balance
            self.lock = threading.RLock()
    
        def withdraw(self, amount):
    
            with self.lock:
                self.balance -= amount
    
        def deposit(self, amount):
            with self.lock:
                self.balance += amount
    
    
        def drawcash(self, amount):#lock.acquire中嵌套lock.acquire的场景
    
            with self.lock:
                interest=0.05
                count=amount+amount*interest
    
                self.withdraw(count)
    
    
    def transfer(_from, to, amount):
    
        #锁不可以加在这里 因为其他的其它线程执行的其它方法在不加锁的情况下数据同样是不安全的
         _from.withdraw(amount)
    
         to.deposit(amount)
    
    
    
    alex = Account('alex',1000)
    yuan = Account('yuan',1000)
    
    t1=threading.Thread(target = transfer, args = (alex,yuan, 100))
    t1.start()
    
    t2=threading.Thread(target = transfer, args = (yuan,alex, 200))
    t2.start()
    
    t1.join()
    t2.join()
    
    print('>>>',alex.balance)
    print('>>>',yuan.balance)
    View Code

        同步条件(Event) *   

    条件同步和条件变量同步差不多意思,只是少了锁功能,因为条件同步设计于不访问共享资源的条件环境。event=threading.Event():条件环境对象,初始值 为False;

    event.isSet():返回event的状态值;
    
    event.wait():如果 event.isSet()==False将阻塞线程;
    
    event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
    
    event.clear():恢复event的状态值为False。
    通过一个状态判断是否运行,实现了两个类的交互   通过event
    import
    threading,time class Boss(threading.Thread): def run(self): print("The boss say:Today to work overtime to ten point!!!!") event.isSet() or event.set() #激活event.wait 线程 time.sleep(3) print('the boss say:Hard work! Everyone Go off work!') event.isSet() or event.set() class Worker(threading.Thread): def run(self): event.wait() #等待线程被设置 阻塞 print("Helpless pain worker say:Hard luck!") time.sleep(0.2) event.clear() #清除event 状态 event.wait() #阻塞进行激活 print('Workers:OhYeah!') if __name__ == '__main__': event=threading.Event() #创建条件环境变量,初始值为Flase L=[] for i in range(5): L.append(Worker()) #产生5个工人对象 L.append(Boss()) for t in L: t.start() for i in L: i.join()
    The boss say:Today to work overtime to ten point!!!!
    Helpless pain worker say:Hard luck!
    Helpless pain worker say:Hard luck!
    Helpless pain worker say:Hard luck!
    Helpless pain worker say:Hard luck!
    Helpless pain worker say:Hard luck!
    the boss say:Hard work! Everyone Go off work!
    Workers:OhYeah!
    Workers:OhYeah!
    Workers:OhYeah!
    Workers:OhYeah!
    Workers:OhYeah!
    执行结果

    实现红路灯模拟通车

    import threading,time,random
    def light():
        if not event.isSet():   #初始红绿灯
            event.set()
        count=0
        while True:
            if count<30:
                print('33[42;1m---green light on--- ')
            elif count<25:
                print('33[41;1m---yellow light on ---')
            elif count<5:
                if event.isSet:
                    event.clear()
                    print('33[43;1m---red light on ---')
            else:
                count=0
                event.set()
            time.sleep(4)
            count +=1
    def car(n):
        while 1:
            time.sleep(random.randrange(3))
            if event.isSet():
                print("car [%s] is running.."%n)
            else:
                print('car %s is waiting for the red light.....')
    
    
    if __name__ == '__main__':
        event=threading.Event()
        l=threading.Thread(target=light)
        l.start()
        for i in range(3):
            t=threading.Thread(target=car,args=(i,))
            t.start()

    信号量(Semaphore)

    信号量用来控制线程并发数的,BoundedSemaphore或Semaphore管理一个内置的计数 器,每当调用acquire()时-1,调用release()时+1。

          计数器不能小于0,当计数器为 0时,acquire()将阻塞线程至同步锁定状态,直到其他线程调用release()。(类似于停车位的概念)

          BoundedSemaphore与Semaphore的唯一区别在于前者将在调用release()时检查计数 器的值是否超过了计数器的初始值,如果超过了将抛出一个异常。

    import threading,time
    class Mythread(threading.Thread):
        def run(self):
            if semaphore.acquire():  #锁定
                print(self.name)
                time.sleep(1)
                print(self.name,"我又来了")
                semaphore.release()
    
    if __name__ == '__main__':
        semaphore=threading.Semaphore()   #创建一个semaphore 对象
        l=[]
        for i in range(100):
            l.append(Mythread())
        for t in l:
            t.start()

    ***多线程利器(queue)****

    创建一个“队列”对象
    import Queue
    q = Queue.Queue(maxsize = 10)
    Queue.Queue类即是一个队列的同步实现。队列长度可为无限或者有限。可通过Queue的构造函数的可选参数maxsize来设定队列长度。如果maxsize小于1就表示队列长度无限。
    
    将一个值放入队列中
    q.put(10)
    调用队列对象的put()方法在队尾插入一个项目。put()有两个参数,第一个item为必需的,为插入项目的值;第二个block为可选参数,默认为
    1。如果队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。如果block为0,put方法将引发Full异常。
    
    将一个值从队列中取出
    q.get()
    调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。如果队列为空且block为True,get()就使调用线程暂停,直至有项目可用。如果队列为空且block为False,队列将引发Empty异常。
    
    Python Queue模块有三种队列及构造函数:
    1、Python Queue模块的FIFO队列先进先出。  class queue.Queue(maxsize)
    2、LIFO类似于堆,即先进后出。             class queue.LifoQueue(maxsize)
    3、还有一种是优先级队列级别越低越先出来。   class queue.PriorityQueue(maxsize)
    
    此包中的常用方法(q = Queue.Queue()):
    q.qsize() 返回队列的大小
    q.empty() 如果队列为空,返回True,反之False
    q.full() 如果队列满了,返回True,反之False
    q.full 与 maxsize 大小对应
    q.get([block[, timeout]]) 获取队列,timeout等待时间
    q.get_nowait() 相当q.get(False)
    非阻塞 q.put(item) 写入队列,timeout等待时间
    q.put_nowait(item) 相当q.put(item, False)
    q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号
    q.join() 实际上意味着等到队列为空,再执行别的操作

    列表线程是不安全的数据结构

    import threading,time
    li=[1,2,3,4,5]
    def pri():
        while li:
            a=li[-1]
            print(a)
            time.sleep(2)
            try:
                li.remove(a)
            except Exception as e:
                print('---',a,e)
    
    t1=threading.Thread(target=pri)
    t1.start()
    t2=threading.Thread(target=pri)
    t2.start()
    执行中发现  会引发一场找不该数据
    5
    5
    4
    --- 5 list.remove(x): x not in list
    4
    3
    --- 4 list.remove(x): x not in list
    3
    2
    --- 3 list.remove(x): x not in list
    2
    1
    --- 2 list.remove(x): x not in list
    1
    --- 1 list.remove(x): x not in list
    执行结果

    queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

    import queue
    q=queue.LifoQueue()
    q.put(1)
    q.put(2)
    q.put(3)
    q.put(4)
    q.put(5)
    while 1:
        data=q.get()
        print(data)
    一棵树的例子
    #实现一个线程不断生成一个随机数到一个队列中(考虑使用Queue这个模块)
    # 实现一个线程从上面的队列里面不断的取出奇数
    # 实现另外一个线程从上面的队列里面不断取出偶数
    
    import random,threading,time
    from queue import Queue
    #Producer thread
    class Producer(threading.Thread):
      def __init__(self, t_name, queue):
        threading.Thread.__init__(self,name=t_name)
        self.data=queue
      def run(self):
        for i in range(10):  #随机产生10个数字 ,可以修改为任意大小
          randomnum=random.randint(1,99)
          print ("%s: %s is producing %d to the queue!" % (time.ctime(), self.getName(), randomnum))
          self.data.put(randomnum) #将数据依次存入队列
          time.sleep(1)
        print ("%s: %s finished!" %(time.ctime(), self.getName()))
    
    #Consumer thread
    class Consumer_even(threading.Thread):
      def __init__(self,t_name,queue):
        threading.Thread.__init__(self,name=t_name)
        self.data=queue
      def run(self):
        while 1:
          try:
            val_even = self.data.get(1,5) #get(self, block=True, timeout=None) ,1就是阻塞等待,5是超时5秒
            if val_even%2==0:
              print ("%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(),self.getName(),val_even))
              time.sleep(2)
            else:
              self.data.put(val_even)
              time.sleep(2)
          except:   #等待输入,超过5秒 就报异常
            print ("%s: %s finished!" %(time.ctime(),self.getName()))
            break
    class Consumer_odd(threading.Thread):
      def __init__(self,t_name,queue):
        threading.Thread.__init__(self, name=t_name)
        self.data=queue
      def run(self):
        while 1:
          try:
            val_odd = self.data.get(1,5)
            if val_odd%2!=0:
              print ("%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(), self.getName(), val_odd))
              time.sleep(2)
            else:
              self.data.put(val_odd)
              time.sleep(2)
          except:
            print ("%s: %s finished!" % (time.ctime(), self.getName()))
            break
    #Main thread
    def main():
      queue = Queue()
      producer = Producer('Pro.', queue)
      consumer_even = Consumer_even('Con_even.', queue)
      consumer_odd = Consumer_odd('Con_odd.',queue)
      producer.start()
      consumer_even.start()
      consumer_odd.start()
      producer.join()
      consumer_even.join()
      consumer_odd.join()
      print ('All threads terminate!')
    
    if __name__ == '__main__':
      main()

    生产者消费者模型:

    为什么要使用生产者和消费者模式

    在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

    什么是生产者消费者模式

    生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

    这就像,在餐厅,厨师做好菜,不需要直接和客户交流,而是交给前台,而客户去饭菜也不需要不找厨师,直接去前台领取即可,这也是一个结耦的过程。

    第一版

    import threading,queue
    from time import sleep
    from random import randint
    class Production(threading.Thread):
        def run(self):
            while True:
                r=randint(0,100)
                q.put(r)
                print('包子来了 第%s个'%r)
                sleep(1)
    
    class Process(threading.Thread):
        def run(self):
            while True:
                re=q.get()
                print('来个包子,第%s个包子被只掉了'%re)
    if __name__ == '__main__':
        q=queue.LifoQueue()
        l=[Process(),Production(),Production(),Production()]
        for t in l:
            t.start()
            

    第二版 再创

    import threading,queue,random,time
    class Producer(threading.Thread):
        def run(self):
            count=1
            while count<10:
                print("making 包子。。。。。")
                time.sleep(random.randrange(3))
                q.put(count)
                print('包子来了 第%s个'%(count))
                count +=1
                q.task_done()   #-----<<<<<<<<
                q.join()
    
    class Consumer(threading.Thread):
        def run(self):
            count=0
            while count<10:
                time.sleep(random.randrange(4))
                if not q.empty():
                    data=q.get()
                    print('33[32;1m来个包子,第%s个包子被吃掉了'%data)
                else:
                    print('waiting  for making.....')
                count +=1
    if __name__ == '__main__':
        q=queue.LifoQueue()
    
        l=[Producer(),Consumer(),Consumer(),Consumer()]
        for t in l:
            t.start()

    今天学习应用

    
    
    import threading,queue,random,time
    class Producer(threading.Thread):
    def run(self):
    count=1
    while count<10:
    print("making 包子。。。。。")
    time.sleep(random.randrange(3))
    print(q.put(count))
    print(q.qsize()) #返回队列的大小布尔值
    print(q.full()) #返回队列是否满了 返回值布尔值
    print(q.get('block'))


    print('包子来了 第%s个'%(count))
    count +=1
    q.task_done() #-----<<<<<<<<
    #q.join()

    class Consumer(threading.Thread):
    def run(self):
    count=0
    while count<10:
    time.sleep(random.randrange(4))
    q.join() #线程阻塞等task_done()信号
    data=q.get()
    print('33[32;1m来个包子,第%s个包子被吃掉了'%data)
    count +=1
    if __name__ == '__main__':
    q=queue.LifoQueue()

    l=[Producer(),Consumer(),Consumer(),Consumer()]
    for t in l:
    t.start()
     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

  • 相关阅读:
    AdvDataList分页 例码
    问一个关于生成静态页面的问题
    使用XMLDataSource简单实现多级下拉菜单
    简单的封装一个HTML 弹出对话框的空间
    JS 语言强大, 动态修改标准库
    Eclipse IDE 学习
    分布式程序的开发
    Http request Post pk Put
    Forward: X Forwarding with Putty on Windows
    转载: 颠覆了对于design 的认识
  • 原文地址:https://www.cnblogs.com/honglingjin/p/6288090.html
Copyright © 2011-2022 走看看