zoukankan      html  css  js  c++  java
  • day11 队列、线程、进程、协程及Python使用缓存(redis/memcache)

    上篇博客简单介绍了多进程和多线程分别是什么,及分别使用于那种场景。

    这里再稍微聊聊线程和进程相关的东西以及协程

    一、队列

    import queue
    import threading
    
    # queue.Queue,先进先出队列
    # queue.LifoQueue,后进先出队列
    # queue.PriorityQueue,优先级队列
    # queue.deque,双向对队
    
    # queue.Queue(2) 先进先出队列
    # put放数据,是否阻塞,阻塞时的超时时间
    # get取数据(默认阻塞),是否阻塞,阻塞时的超时时间
    # qsize()真实个数
    # maxsize 最大支持的个数
    # join,task_done,阻塞进程,当队列中任务执行完毕之后,不再阻塞
    
    
    
    q = queue.Queue(2)   #先进先出队列(括号中的参数为maxsize,代表队列中最多能有几个值,默认为0,代表可以有无限个值)
    print(q.qsize())      #查看目前队列中有几个值
    q.put(11)      #put方法为给队列中放一个值
    q.put(22)
    q.put(block=False,timeout=2)     #block代表是否阻塞,默认为阻塞,为非阻塞时,当进入队列需要等待时直接报错。timeout代表最多等待时间,超过等待时间直接报错
    print(q.get())   #get方法为取一个值
    q.task_done()     #task_done方法为告诉队列我取完值了
    print(q.get())
    q.task_done()
    
    
    q.join()     #阻塞进程,当队列中任务执行完时,不再阻塞
    print(q.empty())   #若当前队列中有任务,返回False,否则返回True
    
    
    
    
    
    
    #生产者消费者模型
    #解决并发问题
    
    q = queue.Queue(20)
    
    def productor(arg):
        q.put('包子')
    
    def customer(arg):
        q.get()
    
    for i in range(10):
        t1 = threading.Thread(target=productor,args=(i,))
        t1.start()

        二、及其及其简易版线程池

    #!/usr/bin/env python
    # _*_ coding:utf-8_*_
    
    import threading
    import queue
    import time
    
    class threadpool:
    
        def __init__(self,maxsize=5):
            self.maxsize = maxsize    #默认最大线程为5
            self._q = queue.Queue(maxsize)     #创建队列,并设置队列中最多可以有5个值,既最大线程数为5
            for i in range(maxsize):
                self._q.put(threading.Thread)   #创建线程对象,将其放入到队列中
                #self._q为[threading.Thread,threading.Thread,threading.Thread,threading.Thread,threading.Thread]
        def get_thread(self):
            """
            获取线程函数
            :return:    队列中实际存放的为一个一个的线程对象,故return的一个线程对象,即threading.Thread
            """
            return self._q.get()
    
    
        def add_thread(self):
            self._q.put(threading.Thread)    #线程池中要保持有5个线程,当使用掉一个线程时,再重新put到队列中一个线程对象
    
    
    pool = threadpool()
    
    def task(arg):
        print(arg)
        time.sleep(1)
        pool.add_thread()
    
    for i in range(100):
        t = pool.get_thread()     #这里t为threading.Thread
        obj = t(target=task, args=(i,))
        obj.start()

    三、线程锁

    import threading
    import time
    
    
    #放行单个线程
    NUM = 10
    def func(l):
        l.acquire()     #上锁
        global NUM
        NUM -= 1
        time.sleep(1)
        print(NUM)
        l.release()   #解锁
    
    lock = threading.RLock()    #创建线程锁    #threading.LOCK 和 threading.RLOCK的区别:RLOCK支持多层锁,LOCK不支持,所以通常情况下使用RLOCK即可
    
    
    for i in range(10):
        t1 = threading.Thread(target=func,args=(lock,))
        t1.start()
    
    
    
    
    #放行指定个数的线程------信号量
    NUM = 20
    def func(l):
        l.acquire()     #上锁   ---  一次放行五个线程
        global NUM
        NUM -= 1
        time.sleep(1)
        print(NUM)
        l.release()   #解锁
    
    lock = threading.BoundedSemaphore(5)   #5代表依次放行几个线程
    
    for i in range(20):
        t1 = threading.Thread(target=func,args=(lock,))
        t1.start()   #t1.start()仅代表线程创建完毕,等待CPU来调度执行,但是CPU调度是随机的,so.. 输出的结果没按照递减的顺序输出
    
    
    
    #放行全部线程
    NUM = 10
    def func(e):
        global NUM
        NUM -= 1
        e.wait()    #检测是什么灯,(默认即为红灯),红灯停,绿灯行
        print(NUM)
        print(NUM+100)
    
    
    event = threading.Event()    #事务,可比喻为红绿灯,红灯全部锁住,绿灯全部放行
    
    for i in range(10):
        t1 = threading.Thread(target=func,args=(event,))
        t1.start()
    
    event.clear()    #设置为红灯(默认即为红灯)
    
    inp = input(">>: ")
    if inp == '1':
        event.set()    #设置为绿灯
    
    
    
    
    #条件锁 condition
    #当某个条件成立,放行指定个数的锁
    def func(i,conn):
        print(i)
        conn.acquire()
        conn.wait()
        print(i+100)
        conn.release()
    
    c = threading.Condition()
    
    for i in range(10):
        t = threading.Thread(target=func,args=(i,c))
        t.start()
    
    while True:
        inp = input(">>: ")
        if inp == 'q':
            break
        c.acquire()
        c.notify(int(inp))       #这三行为固定用法,notify的作用为告诉func里的函数acquire放行几个锁
        c.release()
    
    
    
    #条件锁的另外一种方式
    def condition():
        ret = False
        r = input(">>: ")
        if r == 'true':
            ret = True
        return ret
    
    
    def func(i,conn):
        print(i)
        conn.acquire()
        conn.wait_for(condition)    #condithon会return两种结果,一种为True,一种为False,返回值为True时,放行一个锁,为False时,不放行
        print(i+100)
        conn.release()
    
    c = threading.Condition()
    
    for i in range(10):
        t = threading.Thread(target=func,args=(i,c,))
        t.start()
    
    
    
    #Timer
    from threading import Timer
    
    def hello():
        print('hello world')
    
    t = Timer(1,hello)
    t.start()    #一秒后,执行hello函数

    四、进程池

    from multiprocessing import Pool
    import time
    
    def foo(arg):
        time.sleep(0.1)
        print(arg)
    
    
    if __name__ == '__main__':
        pool = Pool(5)
    
        for i in range(20):
            pool.apply_async(func=foo,args=(i,))
    
        pool.close()    #所有任务执行完毕后关闭进程池
        time.sleep(0.2)
        pool.terminate()   #立即终止正在执行的任务,关闭进程池
        pool.join()
        print('end')

    五、实现进程间数据共享

    """
    默认情况下进程之间数据是不共享的
    这里分别使用queues,Array,Manager来实现进程间数据共享
    """
    
    
    
    from multiprocessing import Process
    from multiprocessing import queues
    import multiprocessing
    
    
    def foo(i,arg):
        arg.put(i)
        print('say hi',i,arg.qsize())
    
    
    if __name__ == "__main__":
        li = queues.Queue(20,ctx=multiprocessing)   #类似queue队列
        for i in range(10):
            p = Process(target=foo,args=(i,li))
            p.start()
            # p.join()
    
    
    from multiprocessing import Process
    from  multiprocessing import Array
    #Array 数组,类似列表,但必须在创建时指定数组中存放的数据类型及数据的个数
    
    def foo(i,arg):
        arg[i] = i + 100
        for item in arg:
            print(item)
        print('===============')
    
    if __name__ == '__main__':
        li = Array('i',10)
        for i in range(10):
            p = Process(target=foo,args=(i,li,))
            p.start()
            # p.join()
    
    
    
    
    from multiprocessing import Process
    from  multiprocessing import Manager
    
    def foo(i,arg):
        arg[i] = i + 100
        print(arg.values())
    
    if __name__ == '__main__':
        obj = Manager()
        li = obj.dict()    #Manager.dict相当于Python中的字典
        for i in range(10):
            p = Process(target=foo,args=(i,li,))
            p.start()
            p.join()    #li属于主进程,p是子进程,主进程与子进程之间交互时会创建一个连接,这里不使用join时会报错,因为主进程执行完后主进程与子进程之间的连接就会断开,so,这里要让主进程等待子进程执行完毕
  • 相关阅读:
    C++ com 组件的使用
    年计算机硬件基础知识大全
    prism 的学习网站
    WPF
    C#录音从声卡
    C#实现放大镜
    HTML常用提交按钮
    HTML常用标签
    k8s 运行单实例 mysql
    aws eks ebs StorageClass PersistentVolume PersistentVolumeClaim
  • 原文地址:https://www.cnblogs.com/xuanouba/p/5693011.html
Copyright © 2011-2022 走看看