zoukankan      html  css  js  c++  java
  • python网络篇【第十一篇】线程、进程、协程

    一、线程 

    上一篇已经大致介绍了什么是线程以及线程的作用、多线程和进程的关系大致的总结如下:

    • 线程是最小的执行单元,进程由至少一个线程组成;
    • 进程和线程的调度,完全有操作系统决定,程序不能决定什么时候执行和执行多久
    •  一个应用程序可以有多进程、多线程
    •  默认是单进程、单线程
    •  单进程,多线程,在Python中不会性能提升,在Java和C#中可以提升
    • 多线程: IO操作密集,一般不会用到CPU,效率提升是可以的
    • 多进程:计算型操作密集, 需要占用CPU,因此性能不会有提升

    概念说了很多下面来点实际,代码!

    创建线程:

    import threading
    def f1(arg):
        print(arg)
    t=threading.Thread(target=f1,args=(123,))
    t.start()   

    简单吧这就是创建一个线程,也可以继承线程,自己创建

    class MyThread(threading.Thread):   #自己创建一个类,继承threading.Thread)
        def __init__(self,func,args):   
            self.func=func
            self.args=args
            super(MyThread,self).__init__()   #执行Mythread父类的构造函数
        
        def run(self):   #然后自己定义一个run方法
            self.func(self.args)    #执行f2函数
    
    def f2(arg):
        print(arg)
    
    obj=MyThread(f2,123)   #传参数进去
    obj.start()           #开始线程

    threading常用方法:

    threading.current_thread()   返回当前的线程变量。
    threading.enumerate()         返回一个包含正在运行的线程的列表,正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
    threading.active_count()      返回正在运行的线程数量,与len(threading.enumerate())一样。

    Thread

    Thread是线程类,与Java类似,有两种使用方法,直接传入要运行的方法或从Thread继承并覆盖 run()。

    PS: Thread中的run()方法,就是CPU来调度的时候执行的自动代用run方法

    Thread构造方法(__init__):

      group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None

    • group: 线程组
    • target: 要执行的方法
    • name: 线程名
    • args/kwargs: 要传入的参数

    常用方法:

    • is_alive():  返回线程是否在运行(启动前、终止前) 。
    • getName(): 获取当前线程名
    • setName():   设置线程名
    • isDaemon():  获取线程是否是守护线程。
    • setDaemon(): 设置是否是守护进程
      • 如果是后台线程,主线程执行过程中,后台线程也在运行,主线程执行完毕后,后台线程不论成功与否,全部都停止。
      • 如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程夜之星完成后,程序停止
    • start()  :  启动线程
    • join([timeout]):   阻塞当前上下文,直到调用此方法的线程终止或到达指定的timeout值
    import threading
    import time
    def f1(arg):
        time.sleep(5)    
        print(arg)
    t = threading.Thread(target=f1,args=(123,))
    t.setDaemon(True)   #True 表示主线程不等此子线程,默认值为False,也就是主线程等待子线程
    t.start()  #不代表当前线程会被立即执行
    t.join(6)  #表示主线程到此,等待。。
                # 参数6,表示主线程在此最多等待6秒
    
    print(33)

    线程锁

    lock

    多线程的优势在于可以同时运行多个任务,而多个线程之间用到的数据可以共享进程的,效率很高(但在Python中多线程尽量应用在IO密集型的程序中)。正因为这样,所以存在数据不同步的问题.

    为了避免资源争夺,所以引入了锁的概念。每当一个线程要访问共享数据时,必须先加把锁,而后在处理,处理完成后,解锁,让其他线程再来处理。

    由于线程之间是进行随机调度的,每个线程可能只执行n条之后,当多个线程同时修改同一条数据时可能会出现脏数据,所以线程锁就应用而生了。

    实例方法:

    • acquire([timeout])  上锁, 使线程进入同步阻塞状态,尝试获得锁定。
    • release()    释放锁,使用前线程必须已经获得锁定,否则抛出异常。
    NUM = 10
    def f1(args):
        global NUM
        NUM -= 1
        print(NUM)
    
    for i in range(10):
        t = threading.Thread(target=f1,args=(i,))
        t.start()
    
    #显示结果有可能会是 10个 0   其实我们想要的结果是 9 8 7......0   这就是产生了垃圾数据

    那么我们来看看 上了锁之后会是神马效果:

    NUM=10
    def f1(l):
        global NUM
        l.acquire()    #上锁
        NUM-=1
        time.sleep(1)
        print(NUM)
        l.release()    #解锁
    
    lock=threading.Lock()
    for i in range(10):
        t=threading.Thread(target=f1,args=(lock,))
        t.start()
    
    #这样就能我们想要的结果了

    RLOCK

    RLock() 是一个可以被同一个线程请求多次的同步指令。Rlock使用了“拥有的线程" 和 "递归等级"的概念,处于锁定状态时,RLock被某个线程拥有。拥有RLock的线程可以再次调用acquire() , 释放锁时需要调用release()相同次数。说白了就是LOCK只能锁住一次,RLOCK可多次锁定递归

    可以认为RLock包含一个锁定池和一个初始值为0的计数器,每次成功调用acquire()和release(),计数器将+1/-1,为0时处于未锁定状态。

    NUM=10
    
    def func(l):
    
        global NUM
        l.acquire()  #上锁
        NUM-=1
        l.acquire()  #上锁
        time.sleep(1)
        l.release()  #解锁
        print(NUM)
    
        l.release()  #解锁
    # lock=threading.Lock() #只能锁一次
    
    lock=threading.RLock() #锁多次
    for i in range(10):
        t=threading.Thread(target=func,args=(lock,))
        t.start()

    Semaphore(信号量)

    Lock和Rlock(互斥锁),发现同时只能允许一个线程来更改数据,执行顺序不还是串行的么,所谓的多线程就没有神马卵用了。

    而Semaphore是同时允许一定数量的线程来更改数据。可以理解成互斥锁的加强版,一个锁可以控制多个thread的访问,Lock的话,只能让一个线程来访问,Semaphore可以控制数量。

    #指定放行几个
    NUM=10
    
    def func(l,i):
    
        global NUM
        l.acquire()  #上锁
        NUM-=1
        time.sleep(2)
        print(NUM,i)
        l.release()  #解锁
    
    
    lock=threading.BoundedSemaphore(5)  #锁5个线程
    for i in range(15):
        t=threading.Thread(target=func,args=(lock,i))
        t.start()
    
    #显示的结果
    5 0
    5 3
    5 2
    5 1
    1 4
    0 5
    0 8
    0 6
    0 7
    -4 9
    -5 13
    -5 10
    -5 11
    -5 12
    -5 14

     

    Event(事件)

    event线程间通信的方式,一个线程可以发送信号,其他的线程接受到信号后执行操作。相当于设置一个红绿灯

    主要方法:

    • set   将Flag设置为True
    • wait  阻塞当前线程,直到event的内部标志位被设置为True或者Timeout超时。如果内部标志位为True则wait()函数理解返回。
    • clear  将Flag重新设置为False
    • is_set  返回标志位当前状态

    事件处理机制: 全局定义了一个”Flag“, 如果值为False,那么当程序执行event.wait方法时就会阻塞,如果”Flag“值为True,那么event.wait方法时便不再阻塞,类似于信号灯,False相当于红灯,True相当于绿灯,绿灯时等红灯一次性通过。

    def func(i,e):
        print(i)
        e.wait()   #检测是什么灯,如果是红灯 默认红灯停; 绿灯,执行下面的程序
        print(i+100)
    even=threading.Event() 
    for i in range(10):
        t=threading.Thread(target=func,args=(i,even))
        t.start()
    even.clear()   #设置红灯(默认就是红灯)
    inp=input(">>>")
    if inp =="1":
        even.set()    #设置成绿锁

    Condition(条件)

      使得线程等待,只有满足某条件时,才释放n个线程,hreading.Condition在内部维护了一个锁对象(默认是RLock),可以在创建Condition对象的时候把锁对象作为参数传入,Condition也提供了acquire和release方法。

    import threading
    
    def func(i,con):
        print(i)
        con.acquire()     #程序运行到这里就会卡主
        con.wait()      
        print(i+100)
        con.release()
    
    c=threading.Condition()
    for i in range(10):
        t=threading.Thread(target=func,args=(i,c,))
        t.start()
    while True:
        inp=input(">>>")
        if inp =="q":break
        c.acquire()        
        c.notify(int(inp))     #输入数字多少,上面func函数就放行几个线程
        c.release()

    第二种方法,根据你输入的代码做判断:

    def condition():
        ret=False
        r =input(">>>>:")
        if r == "1":     
            ret=True
        else:
            ret=False
        return ret
    
    def func(c,i):
        print(i)
        c.acquire()
        c.wait_for(condition)     #如果condition返回结果为真,就执行以下程序 
        print(i+200)
        c.release()
    
    c=threading.Condition()
    for i in range(10):
        t=threading.Thread(target=func,args=(c,i,))
        t.start()
    
    
    #显示结果
    ....
    6
    7
    8
    9
    >>>>:1
    200
    >>>>:1
    201
    >>>>:1
    202

    Timer(定时器)

    from threading import Timer
    def f1():
        print("Hello Tom")
    t=Timer(3,f1)   #过三秒之后执行f1方法 
    t.start()

    二、进程

    在python中multiprocess模块提供了Process类,实现进程相关的功能。但是,由于它是基于fork机制的,因此不被windows平台支持。想要在windows中运行,必须使用if __name__ == '__main__:的方式,显然这只能用于调试和学习,不能用于实际环境。

    def f1(l):
        print(l)
    if __name__=="__main__":
        for i in range(10):
            p=Process(target=f1,args=(i,))
            p.start()
    
    #显示结果
    5
    0
    4
    7
    2
    3
    1
    6
    9
    8

    进程池:

    如果要启动大量的子进程,可以使用进程池的方式批量创建子进程。进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用的进程位置。

    进程池中的两个方法:

    • apply             以串行的方式执行(阻塞的)
    • apply_async   以异步的方式执行(非阻塞的)
    • close()           关闭pool,使其不再接受新的任务
    • terminate()    结束工作进程,不再处理未完成的任务
    • join()            主进程阻塞,等待子进程退出,join方法要在close或者terminate之后使用。
    from multiprocessing  import Pool
    from multiprocessing import Process
    import time
    def f1(arg):
        time.sleep(1)
        print(arg)
    
    if __name__=="__main__":
    
        poll=Pool(5)   #一次放5个进程进行工作
        for i in range(30):
            # poll.apply(func=f1,args=(i,))  #串行执行一个一个
            poll.apply_async(func=f1,args=(i,))   #当一个进程执行完毕后会添加新的进程进去
    
    
        poll.close() #使用join之前,先close,否则会报错,执行完close之后,不会有新的进程加入到pool
        # poll.terminate() #上面任务立即结束
        poll.join() #等待子进程执行完毕退出
        print("Hello")

    进程共享:

    默认情况下进程之间数据是不允许共享的,假想一下:如果进程之间默认是共享的,你在电脑上安装某宝的支付插件,又安装了某度的管家。那岂不是你所有的支付密码,里面的金额某度都知道了。。。

    所以进程之间默认数据是不共享的,但是如果个人开发需要进程之间要数据共享也可以做到,如下:

    #queues 进程共享队列方式
    from multiprocessing import Process
    from multiprocessing import queues
    import multiprocessing
    def foo(i,arg):
        arg.put(i)
        print("say hi",i,arg.qsize())   #打印队列里面的内容,你也可以使用arg.get获取
    
    if __name__=="__main__":   #在Windows中多进程必须加上这一句
        li=queues.Queue(20,ctx=multiprocessing)   #数据共享
        for i in range(10):
            p=Process(target=foo,args=(i,li,))
            p.start()
    
    #显示结果
    say hi 2 3
    say hi 3 5
    say hi 0 5
    say hi 5 6
    say hi 1 6
    say hi 4 6
    say hi 6 9
    say hi 9 9
    say hi 7 9
    say hi 8 10

    上面呢使用队列的方式共享,下面还有两种方式:

    Array共享方式:

    不常用, 也可以实现进程间的数据共享,和Python的列表(链表) 特别相似。Java、C#中的数组。

    数组只要定义好了,就必须:

    • 类型必须是一致的;
    • 个数也必须是一定的;
    from multiprocessing import Process
    from multiprocessing import Array
    def foo(i,arg):
        arg[i]=i+100
        for itme in arg:
            print(itme)
        print("----------")
    
    if __name__=="__main__":   #在Windows中多进程必须加上这一句
        li=Array("i",10)   #数据共享   ,i:必须制定类型,i是int类型,10分配的内存块
        for i in range(10):
            p=Process(target=foo,args=(i,li,))
            p.start()
    
    
    #显示结果
    0
    0
    0
    0
    104
    0
    0
    0
    0
    0
    ----------
    0
    0
    102
    0
    104
    0
    0
    0
    0
    0
    ----------
    0
    101
    102
    0
    104
    0
    0
    0
    0
    0
    ----------
    100
    101
    102
    0
    104
    0
    0
    0
    0
    0
    ----------
    100
    101
    102
    0
    104
    0
    0
    0
    0
    109
    ----------
    100
    101
    102
    0
    104
    0
    106
    0
    0
    109
    ----------
    100
    101
    102
    0
    104
    0
    106
    0
    108
    109
    ----------
    100
    101
    102
    0
    104
    105
    106
    0
    108
    109
    ----------
    100
    101
    102
    0
    104
    105
    106
    107
    108
    109
    ----------
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    ----------
    1     'c': ctypes.c_char,  'u': ctypes.c_wchar,
    2     'b': ctypes.c_byte,  'B': ctypes.c_ubyte,
    3     'h': ctypes.c_short, 'H': ctypes.c_ushort,
    4     'i': ctypes.c_int,   'I': ctypes.c_uint,
    5     'l': ctypes.c_long,  'L': ctypes.c_ulong,
    6     'f': ctypes.c_float, 'd': ctypes.c_double
    Array 类型对应表

    Manager共享方式:

    from multiprocessing import Process
    from  multiprocessing import Manager
    def foo(i,arg):
        arg[i]=i+100
        print(arg.values())
    
    if __name__=="__main__":   #在Windows中多进程必须加上这一句
    
        obj=Manager() #数据共享
        li=obj.dict()   #必须要加这一块
        for i in range(10):
            p=Process(target=foo,args=(i,li,))
            p.start()
            p.join()
    
    #显示结果
    [100]
    [100, 101]
    [100, 101, 102]
    [100, 101, 102, 103]
    [100, 101, 102, 103, 104]
    [100, 101, 102, 103, 104, 105]
    [100, 101, 102, 103, 104, 105, 106]
    [100, 101, 102, 103, 104, 105, 106, 107]
    [100, 101, 102, 103, 104, 105, 106, 107, 108]
    [100, 101, 102, 103, 104, 105, 106, 107, 108, 109]

    三、协程

    线程和进程的操作都是由程序触发操作系统接口,最后执行的是系统,协程的操作则是程序员本身。

    协程(Coroutine),又称为微线程,纤程. 

    协程存在的意义:

    • 对于多线程应用,CPU通过切片的方式来切换进程间的执行,线程切换时需要耗时(保存状态、恢复状态下次继续,即,上下文切换)。因此没有线程切换的开销,线程数量越多,协程的性能优势就越是明显。
    • 协程,则只使用一个线程,在一个线程中规定某个代码块的执行顺序。
    • 不需要多线程的锁机制,因为只要只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。

    协程适用的场景:

      当程序中存在大量不需要CPU的操作时(如I/O操作), 适用于协程。计算型就不能协程来工作了!

    子程序调用总是一个入口,一次返回,调用顺序是明确的。

    greenlet方式

    import greenlet
    def test1():
        print(12)
        gr2.switch()   #调到test2中执行  print(56)
        print(34)
        gr2.switch()   #调到test2中执行 print(78)
    
    def test2():
        print(56)
        gr1.switch()   #跳到test1中执行print(34)
        print(78)
    
    gr1=greenlet(test1)
    gr2=greenlet(test2)
    gr1.switch()
    
    #显示结果
    12
    56
    34
    78

    gevent方式(常用)

    gevent是一个基于libev的并发库。它为各种并发和网络相关的任务提供了整洁的API.

    当一个greenlet遇到I/O操作时,比如访问网络,就自动切换到其他的greenlet,等到I/O操作完成,再在适当的时候切换回来继续执行。由于I/O操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换线程,就可以保证greenlet在运行,而不是等待I/O。

    from gevent import monkey; monkey.patch_all()    #把socket请求做了一个封装,完成后才具有这种功能
    import gevent
    import requests
    
    
    def f(url):
        print('GET: %s' %url)
        resp = requests.get(url)
        data = resp.text
    
        print('%d bytes received from %s. '%(len(data),url))
    
    gevent.joinall([
        gevent.spawn(f,'https://www.python.org/'),
        gevent.spawn(f,'https://www.yahoo.com/'),
        gevent.spawn(f,'https://github.com/'),
        gevent.spawn(f,'https://www.dbq168.com/'),
    ])
    
    
    #代码执行结果:
    GET: https://www.python.org/
    GET: https://www.yahoo.com/
    GET: https://github.com/
    GET: https://www.dbq168.com/
    464535 bytes received from https://www.yahoo.com/. 
    47394 bytes received from https://www.python.org/. 
    76292 bytes received from https://www.dbq168.com/. 
    25533 bytes received from https://github.com/. 
  • 相关阅读:
    springcloud-Ribbon之手写轮询算法
    springcloud-Ribbon负载均衡规则的替换
    git本地库和远程库的连接和断开
    本地项目第一次提交到码云或github
    python基础语法练习
    Xss-labs-level11-15
    Vulnhub-靶机-ESCALATE_LINUX: 1
    Xss-labs-level7-10
    Vulnhub-靶机-DC: 6
    Jenkins入门之执行Powershell脚本
  • 原文地址:https://www.cnblogs.com/tianjie0522/p/5690081.html
Copyright © 2011-2022 走看看