zoukankan      html  css  js  c++  java
  • 同一个进程内的队列(多线程) 线程池

    一、同一个进程内的队列(多线程)

      import queue

      queue.Queue()   先进先出

      queue.LifoQueue()  后进先出

      queue.PriorityQueue()   优先级队列

        优先级队列   q = queue.PriorityQueue() 

          q.put((pri , data))   接收的是一个元祖

          元祖中第一个参数是:表示当前数据的优先级

          元祖中第二个参数是:需要存放到队列中的数据

        优先级的比较(首先保证整个队列中,所有表示优先级的东西类型必须一致)

          如果都是int,比数值的大小

          如果都是str,比较字符串的大小(从第一个字符的ASCII码开始比较),当ASCII码相同时会按照先进先出的原则

    from multiprocessing import Queue
    import queue
    
    q = queue.LifoQueue() # 后进先出的队列
    q.put(1)
    q.put(2)
    q.put(3)
    for i in range(q.qsize()):
        print(q.get())
    
    q = queue.Queue() # 先进先出队列
    q.put(1)
    q.put(2)
    q.put(3)
    for i in range(q.qsize()):
        print(q.get())
    
    q = queue.PriorityQueue() # 优先级队列
    q.put((1,3))
    q.put((2,2))
    q.put((3,1))
    for i in range(q.qsize()):
        print(q.get())

    二、线程池

      1、在一个池子里放固定数量的线程,这些线程处于等待状态,一旦有任务来,就有线程自发的去执行任务。

      concurrent.futures   这个模块是异步调用的机制(提交任务都是用submit)

      for + submit 多个任务的提交

      shutdown  是等效于Pool中的close  +  join ,是指不允许再继续向池中增加任务,然后让父进程(线程)等待池中所有进程(线程)执行完所有任务。

    from concurrent.futures import ThreadPoolExecutor
    import time
    
    
    def func(num):
        sum = 0
        for i in range(num):
            sum += i ** 2
        return sum
    
    
    t = ThreadPoolExecutor(20) # 实例化20个线程
    start = time.time()
    res = t.map(func,range(1000)) # 提交多个任务给池中,等效于for + submit
    t.shutdown()
    for i in range(1000):
        print(res.__next__()) # res是一个生成器
    print(time.time() - start)

      2、如何把多个任务扔进池中?

        要么使用for + submit  的方法去提交多个任务

        要么直接使用map(func,iterable) 方式去提交多个任务

      不同的方式提交多个任务(for + submit  或者map),拥有不同的得到结果的方式。

        如果是for + submit 的方式提交任务,想得到结果用result方法

        如果是map的方式提交任务,结果是一个生成器,采用__next__()  的方式得到结果。

      关于回调函数,不管是Pool进程池的方式,还是ProcessPoolExecutor的方式开启进程池,

              回调函数都是由父进程调用

    from concurrent.futures import ThreadPoolExecutor
    import time
    
    
    def func(num):
        sum = 0
        for i in range(num):
            sum += i ** 2
        return sum
    
    
    def call_back(res):
        num = res.result()
        print(num)
    
    
    t = ThreadPoolExecutor(20) # 实例化20个线程
    start = time.time()
    # t.map(func,range(1000)) # 提交多个任务给池中,等效于for + submit
    for i in range(1000):
        t.submit(func,i).add_done_callback(call_back)
    t.shutdown()
    print(time.time() - start)

      3、锁

        第一种情况,在同一个线程内,递归锁可以无止尽的acquire,但是互斥锁不行。

        第二种情况,在不同的线程内,递归锁是保证只能被一个线程拿到钥匙,然后无止尽的acquire,其他线程等待。

    from threading import RLock,Thread
    from threading import Semaphore
    import time
    
    
    def func(i,l):
        l.acquire()
        l.acquire()
        l.acquire()
        l.acquire()
        print(i)
        l.release()
        l.release()
        l.release()
        l.release()
    
    l = RLock()
    for i in range(10):
        Thread(target=func,args=(i,l)).start()
  • 相关阅读:
    bzoj 4008 亚瑟王 期望概率dp
    t[..., 1, tf.newaxis]
    Keras learning_phase()和learning_phase_scope()
    Keras Sequential模型和add()
    Keras克隆层
    Keras搭建一个Wide & Deep 神经网络
    1 链表的数据结构
    海康威视2017软件精英挑战赛初赛题目
    2016年倒计时两天
    可自定义片头的腾讯视频无广告可全屏调用代码
  • 原文地址:https://www.cnblogs.com/wjs521/p/9544051.html
Copyright © 2011-2022 走看看