zoukankan      html  css  js  c++  java
  • 045.Python线程队列


    线程队列

    1 基本语法和用法

    1. put 往线程队列里防止,超过队列长度,直接阻塞
    2. get 从队列中取值,如果获取不到,直接阻塞
    3. put_nowait: 如果放入的值超过队列长度,直接报错(linux)
    4. get_nowait: 如果获取的值已经没有了,直接报错

    (1) queue 先进先出

    复制代码
    from queue import Queue
    q = Queue()
    q.put(11)
    q.put(22)
    print(q.get())
    print(q.get_nowait())
    复制代码

    执行

    [root@node10 python]# python3 test.py
    11
    22

    直接报错

    复制代码
    from queue import Queue
    q = Queue()
    q.put(11)
    q.put(22)
    print(q.get())
    print(q.get_nowait())
    print(q.get_nowait())
    复制代码

    执行

    指定队列长度

    from queue import Queue
    q2 = Queue(2)
    q2.put(33)
    q2.put(44)
    q2.put(55)

    直接阻塞

    使用put_nowait报错

     LifoQueue 后进先出

    数据结构中,栈队列的一种储存顺序

    复制代码
    from queue import LifoQueue
    lq = LifoQueue()
    lq.put(55)
    lq.put(66)
    print(lq.get())
    print(lq.get())
    复制代码

    执行

    [root@node10 python]# python3 test.py
    66
    55

    PriorityQueue 按照优先级顺序排列

    1. 默认按照数字大小排序,然后会按照ascii编码在从小到大排序
    2. 先写先排,后写后排
    复制代码
    from queue import PriorityQueue
    pq = PriorityQueue()
    pq.put( (12,"John") )
    pq.put( (6,"Jim") )
    pq.put( (19,"Tom") )
    pq.put( (8,"Lucy") )
    
    print(pq.get())
    print(pq.get())
    print(pq.get())
    print(pq.get())
    复制代码

    执行

    [root@node10 python]# python3 test.py
    (6, 'Jim')
    (8, 'Lucy')
    (12, 'John')
    (19, 'Tom')

    当数字一样  按照ascsi值

    复制代码
    from queue import PriorityQueue
    pq = PriorityQueue()
    pq.put( (12,"John") )
    pq.put( (6,"Jim") )
    pq.put( (19,"Tom") )
    pq.put( (19,"Lucy") )
    
    
    print(pq.get())
    print(pq.get())
    print(pq.get())
    print(pq.get())
    复制代码

    执行

    [root@node10 python]# python3 test.py
    (6, 'Jim')
    (12, 'John')
    (19, 'Lucy')
    (19, 'Tom')

    单独一个元素,必须放同一种类型

    复制代码
    from queue import PriorityQueue
    pq = PriorityQueue()
    pg = PriorityQueue()
    pg.put(13)
    pg.put(18)
    pg.put(3)
    print(pg.get())
    print(pg.get())
    print(pg.get())
    复制代码

    执行

    [root@node10 python]# python3 test.py
    3
    13
    18

    如果不同类型

    复制代码
    from queue import PriorityQueue
    pq = PriorityQueue()
    pg = PriorityQueue()
    pg.put(13)
    pg.put(18)
    pg.put(3)
    pg.put("sdfsdf")
    print(pg.get())
    print(pg.get())
    print(pg.get())
    复制代码

    执行

    字符串类型

    复制代码
    from queue import PriorityQueue
    pg1 = PriorityQueue()
    pg1.put("ab")
    pg1.put("cc")
    print(pg1.get())
    print(pg1.get())
    复制代码

    执行

    [root@node10 python]# python3 test.py
    ab
    cc

    2 新版进程池,线程池

    进程池 允许cpu并行

    执行一个进程,如果使用了进程池,是要控制进程并行数量

    复制代码
    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    import os,time
    def func(i):
            print ("process:",i,os.getpid())
            time.sleep(3)
            print ("process:end")
            return 6666
    # 创建进程池对象,8是代表最大8个进程,ProcessPoolExecutor 后面的参数默认是cpu的最大逻辑处理器核心数.
    p = ProcessPoolExecutor(8)
    #异步触发进程,res 接收的是对象,这个对象可以通过result()来获取返回值
    res = p.submit(func,1)
    #获取进程任务的返回值
    res2 = res.result()
    #shutdown,等待所有子进程执行完毕之后,在向下执行,类似于join
    p.shutdown()
    
    print("主进程执行完毕")
    复制代码

    执行

    [root@node10 python]# python3 test.py
    process: 1 42441
    process:end
    主进程执行完毕

    执行多个进程,如果使用了进程池,是要控制进程并行数量

    复制代码
    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    import os,time
    def func(i):
            print ("process:",i,os.getpid())
            time.sleep(3)
            print ("process:end")
            return 6666
    # 创建进程池对象
    p = ProcessPoolExecutor(8)
    #异步触发进程,res 接收的是对象,这个对象可以通过result()来获取返回值
    for i in range(12):
            res = p.submit(func,i)
    #获取进程任务的返回值
    res2 = res.result()
    #shutdown,等待所有子进程执行完毕之后,在向下执行,类似于join
    p.shutdown()
    
    print("主进程执行完毕")
    复制代码

    执行

    复制代码
    [root@node10 python]# python3 test.py
    process: 0 42457
    process: 1 42458
    process: 2 42459
    process: 3 42460
    process: 4 42461
    process: 5 42462
    process: 6 42463
    process: 7 42464
    process:end
    process:end
    process:end
    process: 8 42463
    process: 9 42457
    process: 10 42459
    process:end
    process: 11 42462
    process:end
    process:end
    process:end
    process:end
    process:end
    process:end
    process:end
    process:end
    主进程执行完毕
    复制代码

    3 线程池

    复制代码
    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    from threading import current_thread as cthread
    import os,time
    def func(i):
            print("thread",i,cthread().ident)
            time.sleep(3)
            print("thread %s end %s"%(i))
    
    #创建线程池。括号里面可以指定并发的线程数
    tp = ThreadPoolExecutor(4)
    for i in range(20):
            tp.submit(func,i)
    tp.shutdown()
    print("主线程执行结束。。。")
    复制代码

    执行

    复制代码
    [root@node10 python]# python3 test.py
    thread 0 140712745903872
    thread 1 140712737511168
    thread 2 140712658073344
    thread 3 140712649680640
    thread 4 140712745903872
    thread 5 140712658073344
    thread 6 140712737511168
    thread 7 140712649680640
    thread 8 140712737511168
    thread 9 140712658073344
    thread 10 140712745903872
    thread 11 140712649680640
    thread 12 140712737511168
    thread 13 140712745903872
    thread 14 140712658073344
    thread 15 140712649680640
    thread 16 140712745903872
    thread 17 140712737511168
    thread 18 140712649680640
    thread 19 140712658073344
    主线程执行结束。。。
    复制代码

    4 GIL锁

    一个进程中的多条线程同一时间只能被一个cpu执行,不能实现并行操作.
    想要解决:更换Jpython 或者 PyPy解释器
    为什么加锁:
    python是解释性语言,编译一行,就执行一行,不能提前规划系统资源,进行全局分配,根本原因是历史遗留问题.
    程序分为两大类:

    • 计算密集型程序,通过c语言改写python部分模块来实现
    • io密集型程序,类似于python_web 运维,数据分析 都可以使用

    线程池的返回值

    复制代码
    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    from threading import current_thread as cthread
    import os,time
    def func(i):
            #获取当前线程号
            print("thread",i,cthread().ident)
            time.sleep(1)
            #返回线程号,获取返回值,会加阻塞,无需shutdown
            return cthread().ident
    
    #创建线程池。括号里面可以指定并发的线程数
    tp = ThreadPoolExecutor(6)
    lst = []
    setvar = set()
    for i in range(12):
            #异步出发
            res = tp.submit(func,i)
            lst.append(res)
    for i in lst:
            #获取该进程对象的返回值
            print (i.result())
        #塞到集合里面,可以去重,验证 setvar.add(i.result())
    #打印所有的线程号 print (setvar) print("主线程执行结束。。。")
    复制代码

    执行

    复制代码
    [root@node10 python]# python3 test.py
    thread 0 140423614576384
    thread 1 140423606183680
    thread 2 140423597790976
    thread 3 140423589398272
    thread 4 140423581005568
    thread 5 140423572612864
    thread <Future at 0x7fb6f7ad4b70 state=running> 140423597790976
    thread <Future at 0x7fb6f7ad4b70 state=running> 140423572612864
    thread <Future at 0x7fb6f7ad4b70 state=running> 140423589398272
    thread <Future at 0x7fb6f7ad4b70 state=running> 140423606183680
    thread <Future at 0x7fb6f7ad4b70 state=running> 140423581005568
    thread <Future at 0x7fb6f7ad4b70 state=finished returned int> 140423614576384
    140423614576384
    140423606183680
    140423597790976
    140423589398272
    140423581005568
    140423572612864
    140423597790976
    140423572612864
    140423589398272
    140423606183680
    140423581005568
    140423614576384
    {140423614576384, 140423606183680, 140423581005568, 140423597790976, 140423589398272, 140423572612864}
    主线程执行结束。。。
    复制代码

    5 map返回迭代器

    复制代码
    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    import os,time
    from threading import current_thread as cthread
    def func(i):
            time.sleep(0.2)
            print("thread",i,cthread().ident)
            print("thread .. end %s" % (i))
            return "*" * i
    
    tp = ThreadPoolExecutor(5)
    it = tp.map(func,range(20))
    tp.shutdown()
    print("<===>")
    from collections import Iterator
    res = isinstance(it,Iterator)
    print(res)
    print(list(it))
    
    # "1234567"
    # it = map(int,"1234567")
    # print(list(it))
    复制代码

    执行

    复制代码
    [root@node10 python]# python3 test.py
    thread 0 140381751781120
    thread .. end 0
    thread 2 140381734995712
    thread .. end 2
    thread 1 140381743388416
    thread .. end 1
    thread 3 140381726603008
    thread .. end 3
    thread 4 140381718210304
    thread .. end 4
    thread 5 140381751781120
    thread 6 140381734995712
    thread .. end 5
    thread .. end 6
    thread 9 140381718210304
    thread 8 140381726603008
    thread .. end 8
    thread 7 140381743388416
    thread .. end 7
    thread .. end 9
    thread 14 140381718210304
    thread .. end 14
    thread 10 140381751781120
    thread .. end 10
    thread 11 140381734995712
    thread .. end 11
    thread 13 140381743388416
    thread .. end 13
    thread 12 140381726603008
    thread .. end 12
    thread 15 140381718210304
    thread .. end 15
    thread 19 140381726603008
    thread .. end 19
    thread 16 140381751781120
    thread .. end 16
    thread 18 140381743388416
    thread .. end 18
    thread 17 140381734995712
    thread .. end 17
    <===>
    True
    ['', '*', '**', '***', '****', '*****', '******', '*******', '********', '*********', '**********', '***********', '************', '*************', '**************', '***************', '****************', '*****************', '******************', '*******************']
    复制代码

    学习记录,小白一枚
  • 相关阅读:
    pyroscope 参考使用
    pyroscope 很不错的基于golang 的火焰图分析工具
    dremio 14 版本发布&&新的官方文档页面
    sijms/go-ora 1.0 发布了,使用buffer提升了系统的性能
    开发一个cockroachdb 的cube.js 驱动
    dremio 配置文件
    cratedb 将完全开源
    jfrog 关闭开放 bintray&&jcenter&&gocenter&&chartcenter 服务
    dremio tar 模式安装
    dremio 部署系统要求
  • 原文地址:https://www.cnblogs.com/wangsirde0428/p/14322807.html
Copyright © 2011-2022 走看看