zoukankan      html  css  js  c++  java
  • 线程队列 线程池 协程

    1 . 线程队列

    from multiprocessing Queue , JoinableQueue  #进程IPC队列

    from queue import Queue  #线程队列  先进先出

    from queue import LifoQueue  #后进先出的

    方法都是一样的 : put , get , put_nowait , get_nowait , full , empty , qsize

    队列 Queue : 先进先出 , 自带锁 , 数据安全

    栈 LifoQueue : 后进先出 , 自带锁 , 数据安全

    q = LifoQueue(5)

    q.put(3)

    q.put_nowait(4)

    print(q.get())  #谁最后进的,就会先取谁

    q.get_nowait()

    print(q.full())

    pirnt(q.empty())

    print(q.qsize())

    from queue import PriorityQueue  #优先级队列

    q = PriorityQueue()

    q.put((10,"aaa"))

    q.put((4,"bbb"))

    print(q.get())

    线程池

    Threading 没有线程池的

    Multiprocessing Pool

    concurrent.futures  帮助管理线程池和进程池

    import time

    from threading import currentThread,get_ident

    from concurrent.futures import ThreadPoolExecutor  #帮助启动线程池

    from concurrent.futures import ProcessPoolExecutor  #帮助启动进程池

    def func(i):

      time.sleep(1)

      print("in%s%s"%(i,currentThread()))

      return i**2

    def back(fn):

      print(fn.result(),currentThread())

    #map启动多线程任务

    t = ThreadPoolExecutor(5)

    t.map(func,range(20))      ==     for i in range(20):

                         t.submit(func,i)

    #submit异步提交任务

    t = ThreadPoolExecutor(5)

    for i in range(20):

      t.submit(func,i)

    t.shutdown()

    print("main:",currentThread())  #起多少个线程池 , 5*CPU的个数

    #获取任务结果

    t = ThreadPoolExecutor(5)

    li = []

    for i in range(20):

      ret = t.submit(func,i)

      li.append(ret)

    t.shutdown()

    for i in li:

      print(i.result())

    print("main:",currentThread())

    #回调函数

    t = ThreadPoolExecutor(5)

    for i in range(20):

      t.submit(func,i).add_done_callback(back)

    #回调函数(进程版)

    import os,time

    from concurrent.futures import ProcessPoolExecutor

    def func(i):

      print("in%s%s"%(i,os.getpid()))

      return i**2

    def back(fn):

      print(fn.result(),os.getpid())

    if __name__ == "__main__":

      p = ProcessPoolExecutor(5)

      for i in range(20):

        p.submit(func,i).add_done_callback(back)

      print("main:",os.getpid())

    multiprocessing模块自带进程池的

    threading模块是没有线程池的

    concurrent.futures 进程池和线程池 : 高度封装 , 进程池/线程池的统一的使用方式

    创建线程池/进程池 ProcessPoolExecutor  ThreadPoolExecutor

    ret .result()获取结果,如果想实现异步效果,应该使用列表

    shutdown  == close + join 同步控制

    add_done_callback 回调函数,在回调函数内接收的参数是一个对象,需要通过result来获取返回值. 进程池的回调函数仍然在主进程中执行,但是线程池的回调函数是在线程中执行.

    进程 : 资源分配的最小单位  ,  班级

    线程 : CPU调度最小单位 , 人

    Cpython线程不能利用多核的 ,多线程无法利用多核 ,一个线程能同时执行多个任务.

    协程 : 能在一条线程的基础上 ,再过个任务之间互相切换 .节省了线程开启的消耗. 

    协程是从python代码的级别调度的 , 正常的线程是CPU调度的最小单位 ; 协程的调度并不是由操作系统来完成的.

    之前学过的协程在两个任务之间相互切换的是生成器函数:yield

    def func():

      print(1)

      x = yield "aaa"

      print(x)

      yield  "bbb"

    g  = func()

    print(next(g))

    print(g.send("***"))

    在多个函数之间互相切换的功能  ---  协程

    def consumer():

      while True:

        x = yield

        print(x)

    def producer():

      g = consumer()

      next(g)    #预激

      for i in range(20):

        g.send(i)

    producer()

    yield只有程序之间的切换,没有重利用任何IO操作的时间

    模块的安装 :

    pip3 install 要安装的模块的名字

    使用协程减少IO操作带来的时间消耗

    from gevent import monkey ; monkey.patch_all()

    import gevent,time

    def eat():

      print("吃")

      time.sleep(2)

      print("吃完了")

    def play():

      print("玩")

      time.sleep(1)

      print("玩完了")

    g1 = gevent.spawn(eat)

    g2 = gevent.spawn(play)

    gevent.joinall([g1,g2])

    没有执行 , 为什么没执行???是需要开启么?

    没有开启但是切换了

    gevent帮我们做了切换,做切换是有条件的,遇到IO才切换

    gevent不认识除了gevent这个模块内以外的IO操作

    使用join可以一直阻塞直到协程任务完成

    帮助gevent来认识其他模块中的阻塞

    from gevent import monkey;monkey.patch_all() 写在其他模块导入之前.

    使用协程来实现TCP协议 :

    服务器 :

    from gevent import monkey;monkey.patch_all()

    import gevent,socket

    def func(conn):

      while 1:

        conn.send(b"hello")

        print(conn.recv(1024))

    sk = socket.socket()

    sk.bind(("127.0.0.1",9090))

    sk.listen()

    while 1:

      conn,addr = sk.accept()

      gevent.spawn(func,conn)

    客户端 :

    import socket

    from threading import Thread

    def client():

      sk = socket.socket()

      sk.connect(("127.0.0.1",9090))

      while 1:

        print(sk.recv(1024))

        sk.send(b"bye")

    for i in range(20):

      Thread(target=client).start()

    4c 并发50000  qps

    5个进程

    20个线程

    500个协程

    协程能够在单核的情况极大地提高CPU的利用率

    协程不存在数据不安全 , 也不存在线程切换/创造的时间开销 ; 切换时用户级别的,程序不会因为协程中某一个任务进入阻塞状态而使整条线程阻塞

    线程的切换 :

    时间片到了 降低了CPU的效率

    IO会切  提高了CPU的效率

  • 相关阅读:
    IE安全级别没法定制
    将应用部署到Websphere的context root根/
    SqlServer插入慢的问题解决
    SqlServer中用@@IDENTITY取最新ID不准的问题
    分享我的戒烟经验
    Telnet发邮件过程
    .net垃圾回收和CLR 4.0对垃圾回收所做的改进之三
    Please pay more attention to the character set of your database
    翻旧贴: 什么是对象?
    C# Code in ASPX Page
  • 原文地址:https://www.cnblogs.com/fengkun125/p/9403360.html
Copyright © 2011-2022 走看看