zoukankan      html  css  js  c++  java
  • python学习之路-11 多线程、多进程、协程

    python内置队列模块 queue

    queue的四种队列

    q = queue.Queue()           # 先进先出队列
    q = queue.LifoQueue()       # 后进先出队列
    q = queue.PriorityQueue()   # 优先级队列
    q = queue.deque()           # 双向队列
    

    queue.Queue()先进先出队列

    • 基本使用方法
    import queue
    
    q = queue.Queue(maxsize=10)  # 创建一个先进先出的队列,maxsize为队列大小
    
    q.put(11)       # 向队列添加数据
    q.put(22, block=False)   # 默认block=True,表示为阻塞状态,如果队列中已经满了,就一直处于阻塞(等待)状态,直到队列中有位置为止
                                # block=Flase,表示为非阻塞状态,当队列满了的时候直接报错 queue.Full
    q.put(33, timeout=2)    # timeout=2,当队列满了的时候,等待2秒,如果2秒后还是满的则报错  queue.Full
    
    print(q.qsize())    # 返回队列中当前的个数
    print(q.empty())    # 查看队列是否为空,返回True表示为空,False表示不为空
    print(q.full())     # 查看队列是否满了,返回True表示满了,False表示没满
    q.get()     # 从队列取数据,如果队列没有数据,一直处于阻塞状态,直到取到数据为止
    q.get(timeout=2)   # 取数据,timeout参考 q.put(timeout=2)
    q.get_nowait()     # 从队列取数据,不阻塞,调用 q.get(block=False)
    
    • join和task_done的组合
    # 情景1   当队列中没有数据的时候该线程还是处于阻塞状态
    import queue
    
    q = queue.Queue(maxsize=10)  # 创建一个先进先出的队列,maxsize为队列大小
    
    q.put("sss")
    q.get()
    
    q.join()   # 任务完成之后才结束,否则一直处于阻塞状态
    
    
    # 情景2  当每次从队列取完数据,执行task_done方法后,才表示该任务结束了
    import queue
    q = queue.Queue(maxsize=10)  # 创建一个先进先出的队列,maxsize为队列大小
    
    q.put("sss")
    q.put("sss")
    q.put("sss")
    
    q.get()
    q.task_done()
    q.get()
    q.task_done()
    q.get()
    q.task_done()
    
    q.join()   # 任务完成之后才结束,否则一直处于阻塞状态()
    
    

    queue.LifoQueue() 后进先出队列

    • 基本使用方法,该队列接继承先进先出队列,其他方法参考先进先出队列
    import queue
    q = queue.LifoQueue()      # 创建一个后进先出的队列,该队列基础先进先出队列
    q.put(123)
    q.put(456)
    value = q.get()
    print(value)
    

    queue.PriorityQueue() 优先级队列

    • 基本使用方法,该队列接继承先进先出队列,其他方法参考先进先出队列
    import queue
    q = queue.PriorityQueue()
    q.put((1, "aaa"))
    q.put((2, "bbb"))
    q.put((3, "ccc"))
    q.put((-100, "0000"))
    value = q.get()
    print(value)
    

    queue.deque() 双向队列

    • 基本使用方法
    q = queue.deque()
    q.append("appendright")   # 在队列右边添加一个元素
    q.appendleft("appendleft")  # 在队列左边添加一个元素
    right_v = q.pop()  # 在队列右边取出一个元素
    left_v = q.popleft()  # 在队列左边取出一个元素
    
    print(right_v)
    print(left_v)
    
    • 双向队列剩下的一些方法
    clear()   				# 清空整个队列
    count(value)				# 查看value在队列中的次数
    extend(iterable)			# 在队列右侧添加一个可迭代的数据
    extendleft(iterable)	# 在队列左侧添加一个可迭代的数据
    remove(value)				# 删除队列中第一个value值
    reverse()					# 将队列进行反转
    rotate()
    

    多线程

    基本使用

    • 创建多线程的两种模式

      • 使用threading.Thread创建多线程
      import threading
      
      def f1(arg):
          print(arg)
      
      t = threading.Thread(target=f1, args=[123,])
      t.start()
      
      • 通过自定义类创建多线程
      import threading
      
      def f2(arg):
          print(arg)
      
      
      class MyThread(threading.Thread):   # 自定义类,并继承threading.Thread类
          def __init__(self, func, args):
              self.func = func
              self.args = args
              super(MyThread, self).__init__()
      
          def run(self):  # run方法用来执行线程里面的func函数
              self.func(self.args)
      
      obj = MyThread(func=f2, args=123)
      obj.start()
      

    线程锁 (Lock、RLock)

    • 在没有锁的情况下
    import threading
    import time
    	
    NUM = 10
    	
    def func():
        global NUM
        NUM -= 1
    	
        time.sleep(2)
        print(NUM)
    	
    for i in range(10):
        t = threading.Thread(target=func)
        t.start()
        
    # 输出
    0
    0
    0
    0
    0
    0
    0
    0
    0
    0
    
    • 单个锁 (Lock)
    import threading
    import time
    	
    NUM = 10
    	
    	
    def func():
        global NUM
        lock.acquire()  # 加锁
        NUM -= 1
    	
        time.sleep(2)
        print(NUM)
        lock.release()  # 解锁
    	
    lock = threading.Lock()     # 创建锁的对象
    for i in range(10):
        t = threading.Thread(target=func)
        t.start()
    	
    # 输出
    9
    8
    7
    6
    5
    4
    3
    2
    1
    0
    
    • 多重锁 (RLock)
    import threading
    import time
    	
    NUM = 10
    	
    	
    def func():
        global NUM
        lock.acquire()  # 加锁
        NUM -= 1
    	
        time.sleep(2)
    	
        lock.acquire()  # 加锁
        NUM += 3
        time.sleep(1)
        print(NUM)
        lock.release()  # 解锁
        lock.release()  # 解锁
    	
    	
    lock = threading.RLock()     # 创建锁的对象
    for i in range(10):
        t = threading.Thread(target=func)
        t.start()
    

    信号量(Semaphore)

    同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去。

    import threading
    import time
    
    
    def func(i):
        sem.acquire()  # 加锁
        time.sleep(1)
        print(i)
        sem.release()  # 解锁
    
    
    maxThread = 5   # 定义最大线程数
    semaphore = threading.BoundedSemaphore(maxThread)     # 创建信号量对象,同时最多允许5个线程运行
    for i in range(10):
        t = threading.Thread(target=func, args=[i,])
        t.start()
    

    事件 (event)

    python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。
    事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。
    • clear:将“Flag”设置为False
    • set:将“Flag”设置为True
    import threading
    
    
    def func(i):
        print(i)
        event.wait()    # 阻塞状态,等待event的flag变为True
        print(i+100)
    
    
    event = threading.Event()	# 创建event对象
    for i in range(10):
        t = threading.Thread(target=func, args=[i,])
        t.start()
    
    event.clear()   # 将event的flag变为flase
    
    inp = input(">>")
    if inp == "1":
        event.set()     # 将event的flag变为True
    

    条件 (Condition)

    使得线程等待,只有满足某条件时,才释放n个线程
    import threading
    
    
    def func(i):
        print(i)
        con.acquire()   # 加锁
        con.wait()      # 阻塞,等待notify方法执行唤醒该线程
        print(i+100)
        con.release()   # 释放锁
    
    
    con = threading.Condition()
    for i in range(10):
        t = threading.Thread(target=func, args=[i,])
        t.start()
    
    while True:
        inp = input(">>")
        if inp == "q":
            break
        con.acquire()       # 加锁
        con.notify(int(inp))    # 激活int(inp)个线程执行
        con.release()       # 释放锁
    

    计时器

    定时器,指定n秒后执行某操作,主线程不阻塞
    import threading
    
    def hello():
        print("hello, world")
    
    
    t = threading.Timer(2, hello)
    t.start()
    print("xxx")
    

    自定义线程池

    import queue
    import threading
    import time
    
    
    class ThreadPool:  # 创建线程池类
    
        def __init__(self, maxsize=5):
            self.maxsize = maxsize
            self._q = queue.Queue(maxsize)
    
            # 默认添加5个类名到队列中
            for i in range(maxsize):
                self._q.put(threading.Thread)
    
        def get_thread(self):
            """
            从队列中取出一个线程名
            :return:
            """
            return self._q.get()
    
        def add_thread(self):
            """
            向线程中添加一个线程名
            :return:
            """
            self._q.put(threading.Thread)
    
    
    def task(arg):
        print(arg)
        time.sleep(1)
        pool.add_thread()  # 当该任务完成时,调用add_thread()方法向队列中添加一个线程名
    
    
    pool = ThreadPool(5)   # 实例化线程池对象
    for i in range(100):
        t = pool.get_thread()   # 调用get_thread()方法从队列中获取多线程名
        obj = t(target=task, args=[i,])   # 执行多线程,生成线程对象
        obj.start()  # 将线程运行,等待cpu调度
    

    生产者消费者模型(队列)

    多进程

    基本使用

    多进程数据共享的三种方法

    • queues.Queue
    from multiprocessing import Process
    from multiprocessing import queues
    import multiprocessing
    
    
    def foo(i):
        q.put(i)
        print("say hi", i, q.qsize())       # mac下调qsize()会报错,可以看下qsize的源码
    
    # if __name__ == '__main__':  # 在windows下运行需要加这行,否则会报错
    q = queues.Queue(20, ctx=multiprocessing)
    for i in range(10):
        p = Process(target=foo, args=(i,))
        p.start()
    
    • Array
    from multiprocessing import Process
    from multiprocessing import Array
    
    
    def foo(i, arg):
        arg[i] = i + 100
        for item in arg:
            print(item)
        print("==================")
    
    # if __name__ == '__main__':  # windows下需要加上这行,否则会报错
    li = Array('i', 10)
    for i in range(10):
        p = Process(target=foo, args=(i, li,))
        p.start()
    
    • Manager
    from multiprocessing import Process
    from multiprocessing import Manager
    import time
    
    
    def foo(i, arg):
        arg[i] = i + 100
        print(arg.values())
    
    # if __name__ == '__main__':  # windows下需要添加这行,否则会报错
    
    obj = Manager()
    li = obj.dict()
    for i in range(10):
        p = Process(target=foo, args=(i, li,))
        p.start()
        # p.join()
    
    time.sleep(0.1)  # 需要加个等待时间,否则主进程执行完毕之后就会将li关闭掉, 或者在上面加个join
    

    进程锁

    参考线程锁

    进程池

    
    from multiprocessing import Pool
    
    import time
    
    
    def f1(arg):
        time.sleep(1)
        print(arg)
    
    
    # if __name__ == '__main__':   # 在windows下执行需要加上这行,否则会报错
    
    pool = Pool(5)
    for i in range(30):
        # pool.apply(func=f1, args=(i,))  # 单进程模式
        pool.apply_async(func=f1, args=(i,))
    
    # pool.close()    # 所有任务执行完毕,在进行关闭进程池
    pool.terminate()    # 立即关闭进程池
    pool.join()
    print("end")
    

    协程

    原理:利用一个线程,分解一个线程成为多个微线程,程序级别,跟系统没关系
    • greenlet # 在gevent的基础之上进行的封装
    • gevent
    from gevent import monkey
    monkey.patch_all()
    
    import gevent
    import requests
    
    
    def f(url):
        print("GET: %s" % url)
        resp = requests.get(url)
        data = resp.text
        print("%d bytes received from %s" % (len(data), url))
    
    gevent.joinall([
        gevent.spawn(f, "http://www.python.org"),
        gevent.spawn(f, "http://www.yahoo.com"),
        gevent.spawn(f, "http://www.github.com"),
    ])
    
  • 相关阅读:
    2级联动下拉列表写法
    select选中获取索引三种写法
    判断设备-安卓|苹果|微信
    限制输入字符个数的jq插件
    面试题:1.清空字符串前后的空格;2.找出出现最多的字符
    css3玩转各种效果【资源】
    利用jquery.touchSwipe.js实现的移动滑屏效果。
    【leetcode刷题笔记】Letter Combinations of a Phone Number
    【leetcode刷题笔记】Linked List Cycle
    【leetcode刷题笔记】Length of Last Word
  • 原文地址:https://www.cnblogs.com/CongZhang/p/5697936.html
Copyright © 2011-2022 走看看