zoukankan      html  css  js  c++  java
  • 进程、线程、协程

    进程与线程

     

    进程与线程的关系

    进程:
    优点:同时利用多个cpu,能够同时进行多个操作
    缺点:耗费资源(重新开辟内存空间)

    线程:
    优点:共享内存,IO操作的时候,创造并发操作
    缺点:抢占资源

    进程不是越多越好,cpu个数 = 进程个数

    线程也不是越多越好,具体案例具体分析,请求上下文切换耗时

    计算机中执行任务的最小单元:线程

    IO操作利用cpu

    GIL 全局解释锁

    IO密集型(不用cpu):
    多线程

    计算密集型(用cpu):
    多进程

    进程和线程的目的:提高执行效率
    1,截止目前写的程序:单进程单线程,主进程、主线程
    2,自定义线程:
        主进程
        子进程

     线程

    创建线程

    import time
    import threading
    
    def f0():
        pass
    def f1(a1,a2):
        time.sleep(10)
        f0()
    
    t = threading.Thread(target=f1,args=(123,111,))
    t.setDaemon(True)  #默认是False表示等待,True后就表示不等待
    t.start()
    t = threading.Thread(target=f1,args=(123,111,))
    t.setDaemon(True)
    t.start()
    t = threading.Thread(target=f1,args=(123,111,))
    t.setDaemon(True)
    t.start()

    #
    t.setDaemon(True) 表示不等待
    #t.setDaemon(False) 表示等待
    
    

     

    threading模块

     threading 模块通过对 thread 进行二次封装,提供了更方便的 api 来处理线程。

    import threading
    import time
    
    def worker(num):
        time.sleep(1)
        print("Thread %d" % num)
        return
    
    for i in range(20):
        t = threading.Thread(target=worker, args=(i,), name="t.%d" % i)  #括号内部的:函数名,参数,线程起名
        t.start()

    thread方法
    1)t.start() :   激活线程,

    2)t.getName() :  获取线程的名称

    3)t.setName() : 设置线程的名称

    4)t.name :   获取或设置线程的名称

    5)t.is_alive() : 判断线程是否为激活状态

    6)t.isAlive() :  判断线程是否为激活状态

    7)t.setDaemon():   设置为后台线程或前台线程(默认:False);通过一个布尔值设置线程是否为守护线程,必须在执行start()方法之后才可以使用。
    如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止;
    如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止

    8)t.isDaemon() : 判断是否为守护线程

    9)t.ident :  获取线程的标识符。线程标识符是一个非零整数,只有在调用了start()方法之后该属性才有效,否则它只返回None。

    10)t.join():  等待执行   逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义

    11)t.run():   线程被cpu调度后自动执行线程对象的run方法

    线程锁

    线程锁 threading.RLock

    import threading
    import time
    
    globals_num = 0
    
    lock = threading.RLock()
    
    def Func():
        lock.acquire()  #获得锁
        global globals_num  #声明全局变量
        globals_num += 1
        time.sleep(1)
        print(globals_num)
        lock.release()  #释放锁
    
    for i in range(10):
        t = threading.Thread(target=Func)
        t.start()
    
    threading.RLock和threading.Lock 的区别

    threading.Lock 

    import threading
    lock = threading.Lock()    #Lock对象
    lock.acquire()
    lock.acquire()  #产生了死琐。
    lock.release()
    lock.release()

    threading.RLock

    import threading
    rLock = threading.RLock()  #RLock对象
    rLock.acquire()
    rLock.acquire()    #在同一线程内,程序不会堵塞。
    rLock.release()
    rLock.release()
    # 使用RLock,那么acquire和release必须成对出现,
    # 即调用了n次acquire,必须调用n次的release才能真正释放所占用的琐。
    # threading.Lock与threading.RLock差别

    Event模块

    threading.Event 线程间通信的机制
    Event.wait([timeout]) : 堵塞线程,直到Event对象内部标识位被设为True或超时(如果提供了参数timeout)。
    Event.set() :将标识位设为Ture
    Event.clear() : 将标识位设为False。
    Event.isSet() :判断标识位是否为Ture。

    import threading
    
    def do(event):
        print('start')
        event.wait()
        print('execute')
    
    event_obj = threading.Event()
    for i in range(10):
        t = threading.Thread(target=do, args=(event_obj,))
        t.start()
    
    event_obj.clear()
    inp = input('input:')
    if inp == 'true':
        event_obj.set()

    queue模块

    Queue 就是队列,它是线程安全的

     队列的特性:先进先出

    import queue
    
    q = queue.Queue(maxsize=0)  #构造一个先进先出的队列,maxsize指定队列长度,为0时,表示队列长度无限。
    
    q.join()  #等到队列为None的时候,再执行别的操作
    q.qsize()  #返回队列的大小(不可靠)
    q.empty()  #当队列为空的时候,返回True 否则返回False(不可靠)
    q.full()  #当队列满的时候,返回True,否则返回False(不可靠)
    
    
    q.put(item, block=True, timeout=None)  #将item放入Queue尾部,item必须存在,可以参数block默认为True,表示当队列满时,会等待队列给出可用位置
                                           #为Flase时为非阻塞,此时如果队列已经满,会引发queue.Full异常。可以选参数timeout,表示会阻塞的时间,
                                           #如果队列无法给出放入item的位置,则引发queue.Full异常。
    q.get(block=True, timeout=None)  #等  移除并返回队列头部的一个值,可选参数block默认为True,表示获取值的时候,如果队列为空,则阻塞,为False时,不阻塞,
                                       #若此时队列为空,则引发 queue.Empty异常。可选参数timeout,表示会阻塞设置的时候,过后,如果队列为空,则引发Empty异常。
    q.put_nowait(item)  #等效put(item,block=False)
    q.get_nowait()  #不等   等效于 get(item,block=False)
    生产者,消费者

    生产者将数据依次存入队列,消费者依次从队列中取出数据。

    实例

    import queue
    import threading
    
    message = queue.Queue(10)
    
    def producer(i):
        while True:
            message.put(i)
    
    def consumer(i):
        while True:
            msg = message.get()
            print(msg)
    
    for i in range(12):
        t = threading.Thread(target=producer, args=(i,))
        t.start()
    
    for i in range(10):
        t = threading.Thread(target=consumer, args=(i,))
        t.start()

    进程 

    创建进程

    from multiprocessing import Process
    import time
    
    li = []
    
    def foo(i):
        li.append(i)
        print('say hi ', li)
    
    if __name__ == "__main__":
        for i in range(10):
            p = Process(target=foo, args=(i,))
            p.start()

    multiprocessing模块

    multiprocessing是python的多进程管理包,和threading.Thread类似。直接从侧面用subprocesses替换线程使用GIL的方式,由于这一点,multiprocessing模块可

    以让程序员在给定的机器上充分的利用CPU。

    在multiprocessing中,通过创建Process对象生成进程,然后调用它的start()方法,

    from multiprocessing import Process
    
    def f(name):
        print('hello', name)
    
    if __name__ == '__main__':
        p = Process(target=f, args=('bob',))  #括号里的两个值,调用函数  给函数传一个参数
        p.start()  # 激活线程
        p.join()   #相当于‘wait’等待

     进程间的数据共享

    进程各自持有一份数据,默认无法共享数据

    from multiprocessing import Process
    from multiprocessing import Manager
    
    import time
    
    li = []
    
    def foo(i):
        li.append(i)
        print('say hi',li)
    
    if __name__ == '__main__':
        for i in range(10):
            p = Process(target=foo,args=(i,))
            p.start()
    
        print('ending',li)

    两种共享方法:

    方法一:Array()
    from multiprocessing import Process, Array
    temp = Array('i', [11, 22, 33, 44])
    
    def Foo(i):
        temp[i] = 100+i
        for item in temp:
            print(i, '----->', item)
    if __name__ == '__main__':
        for i in range(2):
            p = Process(target=Foo, args=(i,))
            p.start()

    方法二:manage.dict()

    from multiprocessing import Process,Manager
    
    def Foo(i,dic):
        dic [i] = 100+i
        print(len(dic))
    
    if __name__ == '__main__':
        manage = Manager()
        dic = manage.dict()  #定义字典
    
        for i in range(2):
            p = Process(target=Foo,args=(i,dic,))
            p.start()
            p.join()
    # 输出:
    # 1
    # 2
    


    #上面代码实现进程数据共享,与上面对比。 from multiprocessing import Process,Manager def Foo(i,dic): dic [i] = 100+i print(len(dic)) if __name__ == '__main__': manage = Manager() dic = {} #普通的字典 for i in range(2): p = Process(target=Foo,args=(i,dic,)) p.start() p.join() # 输出: # 1 # 1

    &

    'c': ctypes.c_char,  'u': ctypes.c_wchar,
        'b': ctypes.c_byte,  'B': ctypes.c_ubyte,
        'h': ctypes.c_short, 'H': ctypes.c_ushort,
        'i': ctypes.c_int,   'I': ctypes.c_uint,
        'l': ctypes.c_long,  'L': ctypes.c_ulong,
        'f': ctypes.c_float, 'd': ctypes.c_double
    
    类型对应表
    类型对应表

    在使用并发设计的时候最好尽可能的避免共享数据,尤其是在使用多进程的时候。 如果你真有需要 要共享数据,

    multiprocessing提供了两种方式:

      1)Shared memory

    from multiprocessing import Process, Value, Array
    
    def f(n, a):
        n.value = 3.1415927
        for i in range(len(a)):
            a[i] = -a[i]
    
    if __name__ == '__main__':
        num = Value('d', 0.0)
        arr = Array('i', range(10))
    
        p = Process(target=f, args=(num, arr))
        p.start()
        p.join()
    
        print(num.value)
        print(arr[:])
        print(arr[:2])

    #创建num和arr时,“d”和“i”参数由Array模块使用的typecodes创建: 

    #“d”表示一个双精度的浮点数,

    #“i”表示一个有符号的整数,这些共享对象将被线程安全处理。

    2)Server process

    from multiprocessing import Process, Manager
    
    def f(d, l):
        d[1] = '1'
        d['2'] = 2
        d[0.25] = None
        l.reverse()  #反转
    
    if __name__ == '__main__':
        with Manager() as manager:  #上下文管理
            d = manager.dict()
            l = manager.list(range(10))
    
            p = Process(target=f, args=(d, l))
            p.start()
            p.join()
    
            print(d)
            print(l)

    进程池

    Pool类描述一个工作进程池

    p = Pool(5)  #一次最多执行5个进程

    p.apply  每一个任务是排队进行的;进程.join()

    p.apply_async  每一个任务都并发进行,可以设置回调函数;进程.无join();进程daemon = True

    1)p.apply 每一个任务是排队进行的 进程,join()

    from multiprocessing import Pool
    import time
    
    def f1(a):
        time.sleep(1)
        print(a)
        return 1000
    def f2(arg):
        print(arg)
    
    if __name__ == '__main__':
        pool = Pool(5)
        for i in range(10):
            pool.apply(func=f1, args=(i,))  #apply  每一个任务是排队进行的
            print('8888')
        pool.close()
        pool.join()

    2)p.apply_async 每一个任务都并发进行,可以设置回调函数; 进程.无join();进程daemon =True

    from multiprocessing import Pool
    import time
    
    def f1(a):
        time.sleep(1)
        print(a)
        return 1000
    def f2(arg):
        print(arg)
    
    if __name__ == '__main__':
        pool = Pool(5)
        for i in range(10):
            pool.apply_async(func=f1, args=(i,), callback=f2)  #apply_async 每一个任务都并发进行
            print('8888')
        pool.close()
        pool.join()

     协程

    线程和进程的操作是由程序触发系统接口,最后的执行者是系统;协程的操作则是程序员。

    协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续)。协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序。

    协程的适用场景:当程序中存在大量不需要CPU的操作时(IO),适用于协程

    greenlet

    from greenlet import greenlet
     
     
    def test1():
        print 12
        gr2.switch()
        print 34
        gr2.switch()
     
     
    def test2():
        print 56
        gr1.switch()
        print 78
     
    gr1 = greenlet(test1)
    gr2 = greenlet(test2)
    gr1.switch()

    gevent

    import gevent
    
    def foo():
        print('Running in foo')
        gevent.sleep(0)
        print('Explicit context switch to foo again')
    
    def bar():
        print('Explicit context to bar')
        gevent.sleep(0)
        print('Impliciit context switch back to bar')
    
    gevent.joinall([
        gevent.spawn(foo),
        gevent.spawn(bar),
    ])

    详情参考:http://www.cnblogs.com/wupeiqi/articles/5040827.html

         http://www.cnblogs.com/resn/p/5591419.html

         http://www.cnblogs.com/wupeiqi/articles/4839959.html

  • 相关阅读:
    MapReduce job在JobTracker初始化源码级分析
    Flume-NG源码阅读之FileChannel
    linux下gzip压缩同样内容大小不一样
    mapreduce job提交流程源码级分析(三)
    JobTracker启动流程源码级分析
    机器学习经典算法之K-Means
    机器学习经典算法之KNN
    机器学习经典算法之SVM
    机器学习经典算法之朴素贝叶斯分类
    机器学习经典算法之决策树
  • 原文地址:https://www.cnblogs.com/kongqi816-boke/p/5597161.html
Copyright © 2011-2022 走看看