zoukankan      html  css  js  c++  java
  • Python自动化运维之16、线程、进程、协程、queue队列

    一、线程

    1、什么是线程

      线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。

    一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务

    2、基本使用

    (1)创建线程的两种方式

    直接调用(常用)

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import threading
    import time
    
    def f1(arg):   # 定义每个线程要执行的函数
        time.sleep(0.1)
        print(arg,threading.current_thread())    # threading.current_thread()详细的线程信息
    
    for i in range(10):    # 创建10个线程并发执行函数
        t = threading.Thread(target=f1,args=('python',))   # args是函数的参数,元组最后一个必须要逗号,
        t.start()   # 启动线程
    
    print(t.getName())  # 可以获取主线程的名字
    

    继承调用

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import threading
    import time
    
    class MyThread(threading.Thread):   # 继承threading.Thread类
        def __init__(self,func,args):
            self.func = func
            self.args = args
            super(MyThread,self).__init__()  # 执行父类的构造方法
    
        def run(self):   # run()方法,是cpu调度线程会使用的方法,必须是run()方法
            self.func(self.args)
    
    def f2(arg):
        time.sleep(0.1)
        print(arg,threading.current_thread())
    
    for i in range(10):   # 创建10个线程
        obj = MyThread(f2,123)
        obj.start()
    

    (2)更多方法  

    自己还可以为线程自定义名字,通过 t = threading.Thread(target=f1, args=(i,), name='mythread{}'.format(i)) 中的name参数,除此之外,Thread还有一下一些方法

    t.join(n)       表示主线程等待子线程多少时间,n表示主线程等待子线程的超时时间,如果在n时间内子线程未完成,主线程不在等待,执行后面的代码
    t.run()         线程被cpu调度后自动执行线程对象的run方法(一般我们无需设置,除非自己定义类调用)
    t.start()       线程准备就绪,等待CPU调度
    t.getName()     获取线程的名称
    t.setName()     设置线程的名称 
    t.name          获取或设置线程的名称
    t.is_alive()    判断线程是否为激活状态
    t.isAlive()     判断线程是否为激活状态
    t.isDaemon()    判断是否为守护线程
    t.setDaemon     设置True或False(默认)
                       True表示主线程不等待子线程全部完成就执行后面的代码
                       False默认值,标识主线程等待子线程全部执行完后继续执行后面的代码
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import threading
    import time
    
    def f1(arg):
        time.sleep(5)
        print(arg)
    
    t = threading.Thread(target=f1,args=('python',))
    t.setDaemon(True) # 默认是False,设置为true表示主线程不等子线程
    t.start()  
    t.join(2)  # 表示主线程到此,等待子线程执行完毕,2表示主线程最多等待2秒
    
    print('end') # 默认主线程在等待子线程结束
    print('end')
    print('end')
    

    3、线程锁(Lock、RLock)  

      由于线程之间是进行随机调度,并且每个线程可能只执行n条执行之后,当多个线程同时修改同一条数据时可能会出现脏数据,所以,出现了线程锁 - 同一时刻允许一个线程执行操作。这里使用Rlock,而不使用Lock,因为Lock如果多次获取锁的时候会出错,而RLock允许在同一线程中被多次acquire,但是需要用n次的release才能真正释放所占用的琐,一个线程获取了锁在释放之前,其他线程只有等待。

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import threading
    import time
    
    NUM = 10
    
    def func(l):
        global NUM
        # 上锁
        l.acquire()
        NUM -=1
        time.sleep(0.1)
        print(NUM,threading.current_thread())
        # 开锁
        l.release()
    
    # lock = threading.Lock()
    lock = threading.RLock()  # 递归锁
    
    for j in range(10):
        t = threading.Thread(target=func,args=(lock,))
        t.start()
    

    4、信号量(Semaphore)

      互斥锁同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去。

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import threading
    import time
    
    NUM = 30
    
    def func(i,l):
        global NUM
        # 上锁
        l.acquire()
        NUM -=1
        time.sleep(1)
        print(NUM,i,threading.current_thread())
        # 开锁
        l.release()
    
    lock = threading.BoundedSemaphore(5)  # 设置信号量5,表示同时5个线程同时执行
    
    for i in range(30):
        t = threading.Thread(target=func,args=(i,lock,))
        t.start()
    

    5、事件(event)

      python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。

    事件处理的机制:全局定义了一个"Flag",如果"Flag"值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果"Flag"值为True,那么event.wait 方法时便不再阻塞。

    • clear:将"Flag"设置为False
    • set:  将"Flag"设置为True
    • wait: 检测当前"Flag",如果"Flag"值为 False,那么当线程执行 event.wait 方法时就会阻塞,如果"Flag"值为True,那么event.wait 方法时便不再阻塞。

    下面是一个红绿灯的例子,主线程做交通指挥灯,生成几个线程做车辆,车辆行驶按红灯停,绿灯行的规则。

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import threading
    
    def func(i,e):
        print(i)
        e.wait()  # 检测是什么灯,如果是True红灯,停;绿灯False行,默认是红灯
        print(i+100)
    
    
    event = threading.Event()
    
    for i in range(10):
        t = threading.Thread(target=func,args=(i,event,))
        t.start()
    
    event.clear()  # 主动设置成红灯,默认是红灯,此句可以不写
    inp = input('>>>')
    if inp == '1':
        event.set()  # 设置成绿灯,就会执行func()函数中print(i+100)语句
    

    6、条件(Condition)

    使得线程等待,只有满足某条件时,才释放n个线程

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    import threading
    
    def func(i,con):
        print(i)
        con.acquire()  # 固定写法acquire,wait
        con.wait()
        print(i+100)
        con.release()
    
    c = threading.Condition()  # 设置条件
    
    for i in range(10):
        t = threading.Thread(target=func,args=(i,c,))
        t.start()
    
    
    while True:
        inp = input('>>>')
        if inp == 'q':
            break
        c.acquire() # 这里是固定写法,acquire,notify,release
        c.notify(int(inp))
        c.release()
    

    第二种

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import threading
    
    def condition():
        ret = False
        r = input('>>>')
        if r == 'true':
            ret = True
        else:
            ret = False
        return ret
    
    def func(i,con):
        print(i)
        con.acquire()
        con.wait_for(condition)  # 和上一个例子的差别在这里
        print(i+100)
        con.release()
    
    c = threading.Condition()
    for i in range(10):
        t = threading.Thread(target=func,args=(i,c,))
        t.start()
    

    6、Timer

    定时器,指定n秒后执行某操作

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    from threading import Timer
    
    def hello():
        print("hello python")
    
    t = Timer(1,hello)
    t.start() 

    7、线程池,点击这里  

    二、进程  

      线程的上一级就是进程,进程可包含很多线程,进程和线程的区别是进程间的数据不共享,多进程也可以用来处理多任务,不过多进程很消耗资源,计算型的任务最好交给多进程来处理,IO密集型最好交给多线程来处理,此外进程的数量应该和cpu的核心数保持一致。  

    1、线程与进程的区别

    1、线程共享创建它的进程的地址空间,进程有自己的地址空间。
    2、线程是直接可以访问线程之间的数据;进程需要复制父进程的数据才能访问。
    3、线程可以直接与其他线程的通信过程,进程必须使用进程间通信和同胞交流过程。
    4、新创建一个线程很容易;新创建一个进程需要复制父进程。
    5、主线程可以控制相当大的线程在同一进程中;进程只能控制子进程。
    6、主线程变更(取消、优先级变化等)可能会影响进程的其他线程的行为;父进程的变化不会影响子进程。

    2、基本使用

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    from multiprocessing import Process
    
    def foo(i):
        print('say hi',i)
    
    for i in range(10):
        p = Process(target=foo,args=(i,))
        #p.daemon = True  # 和线程t.setdaemon是一样的
        p.start()
        #p.join()
    

    注意:由于进程之间的数据需要各自持有一份,所以创建进程需要的非常大的开销。其他使用方法和线程threading.Thread是一样的  

    3、进程数据共享

    进程各自持有一份数据,默认无法共享数据;queues,Array,Manager.dict,pipe这些方法都能实现数据共享

    (1)特殊队列queues()

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    from multiprocessing import Process
    from multiprocessing import queues
    import multiprocessing
    
    def foo(i,arg):
        arg.put(i)
        print('say hi',i,arg.qsize())
    
    li = queues.Queue(20,ctx=multiprocessing)
    
    for i in range(10):
        p = Process(target=foo,args=(i,li,))
        p.start()
    

    (2)数组Array()

    数组和列表很像,但是数组中的元素在内存中的地址是一段连续的空间地址,而列表中的元素则不是一段连续的的地址,是通过链表的形式找到下一个元素

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    from multiprocessing import Process
    from multiprocessing import Array
    
    def foo(i,arg):
        arg[i] = i+100
        for item in arg:
            print(item)
    
    li = Array('i',10)  # 指定数组时需要指定类型
    for i in range(10):
        p = Process(target=foo,args=(i,li,))
        p.start()
    
        'c': ctypes.c_char,  'u': ctypes.c_wchar,
        'b': ctypes.c_byte,  'B': ctypes.c_ubyte,
        'h': ctypes.c_short, 'H': ctypes.c_ushort,
        'i': ctypes.c_int,   'I': ctypes.c_uint,
        'l': ctypes.c_long,  'L': ctypes.c_ulong,
        'f': ctypes.c_float, 'd': ctypes.c_double
    类型对应表

    (3)Manager.dict()

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    from multiprocessing import Process
    from multiprocessing import Manager
    
    def foo(i,arg):
        arg[i] = i +100
        print(arg.values())
    
    obj = Manager()
    li = obj.dict()
    for i in range(10):
        p = Process(target=foo,args=(i,li,))
        p.start()
        p.join()
    

    (4)pipe()

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    from multiprocessing import Process, Pipe
    
    def f(conn):
        conn.send([42, None, 'hello'])
        conn.close()
    
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # 父进程可以收到子进程的共享信息prints "[42, None, 'hello']" 
    p.join()

    4、进程池

    进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。

    进程池中有两个方法:

    • apply
    • apply_async
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    from multiprocessing import Pool
    import time
    
    def f1(arg):
        time.sleep(1)
        print(arg)
    
    pool = Pool(5)
    
    for i in range(30):  # 定义30个任务
        #pool.apply(func=f1,args=(i,))  # 所有进程串行执行没有多大意义
        pool.apply_async(func=f1,args=(i,)) # 异步并行执行
    
    pool.close() #等待所有的任务执行完毕
    #time.sleep(1)
    #pool.terminate()  # 立即终止子进程的任务,主进程继续执行
    pool.join() # 执行pool.join时必须先执行pool.close或者pool.terminate
                # 进程池中进程执行完毕后在关闭,如果注释,那么程序直接关闭close,terminate也无效
    print('end')
    

    三、协程

      线程和进程的操作是由程序触发系统接口,最后的执行者是系统;协程的操作则是程序员。

    协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续)。协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序。

    协程的适用场景:当程序中存在大量不需要CPU的操作时(IO),适用于协程;由greenlet,gevent实现,gevent是调用greenlet进行封装;需要安装pip install greenlet;pip install gevent;

    greenlet  

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
     
     
    from greenlet import greenlet
     
     
    def test1():
        print 12
        gr2.switch()
        print 34
        gr2.switch()
     
     
    def test2():
        print 56
        gr1.switch()
        print 78
     
    gr1 = greenlet(test1)
    gr2 = greenlet(test2)
    gr1.switch()
    

    gevent

    import gevent
     
    def foo():
        print('Running in foo')
        gevent.sleep(0)
        print('Explicit context switch to foo again')
     
    def bar():
        print('Explicit context to bar')
        gevent.sleep(0)
        print('Implicit context switch back to bar')
     
    gevent.joinall([
        gevent.spawn(foo),
        gevent.spawn(bar),
    ])
    

    遇到IO操作自动切换:此操作在python2.x中执行的,urllib2不支持python3.x

    from gevent import monkey; monkey.patch_all()
    import gevent
    import urllib2
    
    def f(url):
        print('GET: %s' % url)
        resp = urllib2.urlopen(url)
        data = resp.read()
        print('%d bytes received from %s.' % (len(data), url))
    
    gevent.joinall([
            gevent.spawn(f, 'https://www.python.org/'),
            gevent.spawn(f, 'https://www.yahoo.com/'),
            gevent.spawn(f, 'https://github.com/'),
    ])
    

    四、queue队列  

    queue有哪些队列? 

    • queue.Queue(maxsize) 先进先出队列
    • queue.LifoQueue(maxsize) 后进先出
    • queue.PriorityQueue(maxsize) 优先级队列
    • queue.deque(maxsize) 双向队列

    先进先出

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    import queue
    
    q = queue.Queue(5)  # 默认maxsize=0无限接收,最大支持的个数
    print(q.empty())    # 查看队列是否为空
    q.put(11)           # put防数据,是否阻塞默认是阻塞block=True,timeout超时时间
    q.put(22)
    q.put(33,block=False,timeout=2)
    print(q.full())     # 查看队列是否已经放满
    print(q.qsize())    # 队列中多少元素
    print(q.maxsize)    # 队列最大支持的个数
    print(q.get(block=False,timeout=2))  # get取数据,是否阻塞默认是阻塞block=True,timeout超时时间
    print("*" * 10)
    
    print(q.get())
    q.task_done()       # join配合task_done,队列中有任务就会阻塞进程,当队列中的任务执行完毕之后,不在阻塞
    print(q.get())
    q.task_done()
    q.join()            # 队列中还有元素的话,程序就不会结束程序,只有元素被取完配合task_done执行,程序才会结束
    

    后进先出

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    import queue
    
    q = queue.LifoQueue()
    q.put(123)
    q.put(456)
    print(q.get())
    

    优先级队列 

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    import queue
    
    q = queue.PriorityQueue()
    q.put((1,'python1'))
    q.put((5,'python'))
    q.put((3,'python3'))
    print(q.get())
    

    双向队列  

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    import queue
    
    q = queue.deque()
    q.append(123)
    q.append(333)
    q.appendleft(456)
    print(q.pop())
    print(q.popleft())

    更多请查看官方文档

    生产者消费者模型

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    import queue
    import threading
    import time
    q = queue.Queue()
    
    def productor(arg):
        while True:
            q.put(str(arg))
            print('%s 号窗口有票' %str(arg))
            time.sleep(1)
    
    for i in range(3):
        t = threading.Thread(target=productor,args=(i,))
        t.start()
    
    
    def consumer(arg):
        while True:
            print('第 %s 人取 %s 号窗口票' %(str(arg),q.get()))
            time.sleep(1)
    
    for j in range(300):
        t = threading.Thread(target=consumer,args=(j,))
        t.start()
    

      

  • 相关阅读:
    更改默认alert框体
    自定义垂直拖动的seekbar进度条
    android适配pad和部分手机底部虚拟按键+沉浸式状态栏
    解决studio的URI is not registered (Setting|Language&Frameworks|Schemas and DTDs)
    王者荣耀是怎样炼成的(二)《王者荣耀》unity安装及使用的小白零基础入门
    王者荣耀是怎样炼成的(一)《王者荣耀》用什么开发,游戏入门,unity3D介绍
    使用python(command line)出现的ImportError: No module named 'xxx'问题
    Android Studio生成keystore签名文件步骤讲解
    greendao数据库初次使用的配置及多表关联的初始化
    android视频双向实时通讯的横竖屏切换
  • 原文地址:https://www.cnblogs.com/xiaozhiqi/p/5796967.html
Copyright © 2011-2022 走看看