zoukankan      html  css  js  c++  java
  • DAY37-Python入门学习-进程池与线程池、协程、gevent模块

    一、进程池与线程池

    基本使用:

      进程池和线程池操作一样

    提交任务的两种方式:

    同步调用:提交完一个任务之后,就在原地等待,等待任务完完整整地运行完毕拿到结果后,再执行下一行代码,会导致任务是串行执行的

    异步调用:提交完一个任务之后,不在原地等待,结果???,而是直接执行下一行代码,会导致任务是并发执行的

    同步调用

    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    import  time,random,os
    
    def task():
        print('%s is running'%os.getpid())
        i=random.randint(1,3)
        time.sleep(i)
        return i
    
    
    if __name__ == '__main__':
        p=ProcessPoolExecutor(4)
    
        l=[]
        for i in range(10):
            res = p.submit(task).result()#等待任务执行完毕,返回结果
            print(res)
        print('')
    View Code

    异步调用

    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    import  time,random,os
    
    def task():
        print('%s is running'%os.getpid())
        i=random.randint(1,3)
        time.sleep(i)
        return i
    
    
    if __name__ == '__main__':
        p=ProcessPoolExecutor(4)
    
        l=[]
        for i in range(10):
            future=p.submit(task)#只替提交任务
            l.append(future)
        p.shutdown(wait=True)#关闭进程池入口,并在原地等待所有进程任务执行完毕
        for i in l:
            print(i.result())
        print('')
    View Code

    异步 + 回调函数

    from concurrent.futures import ProcessPoolExecutor
    import time,os
    import requests
    
    
    def get(url):
        print('%s GET %s' %(os.getpid(),url))
        time.sleep(3)
        response=requests.get(url)
        if response.status_code == 200:
            res=response.text
        else:
            res='下载失败'
        return res
    
    def parse(future):
        time.sleep(1)
        res=future.result()
        print('%s 解析结果为%s' %(os.getpid(),len(res)))
    
    if __name__ == '__main__':
        urls=[
            'https://www.baidu.com',
            'https://www.sina.com.cn',
            'https://www.tmall.com',
            'https://www.jd.com',
            'https://www.python.org',
            'https://www.openstack.org',
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.baidu.com',
    
        ]
    
        p=ProcessPoolExecutor(9)
    
        start=time.time()
        for url in urls:
            future=p.submit(get,url)
            future.add_done_callback(parse)
            #parse会在任务运行完毕后自动触发,然后接收一个参数future对象,回调函数的执行是在主进程里,而线程中的回调函数是由空闲的线程来执行
    
        p.shutdown(wait=True)
    
        print('',time.time()-start)
        print('',os.getpid())
    View Code

    基于线程池的套接字通讯

    服务端

    from concurrent.futures import ThreadPoolExecutor
    import socket
    from threading import current_thread
    IP='127.0.0.1'
    PORT=8085
    ADDRESS=(IP,PORT)
    BUFFSIZE=1024
    t = ThreadPoolExecutor(4)
    
    
    def communicate(conn,addr):
        while True:
            try:
                data=conn.recv(BUFFSIZE)
                if not data:
                    print('%s客户端断开....'%addr)
                    break
                print('>>>>%s  端口:%s 线程:%s'%(data.decode('utf-8'),addr[1],current_thread().name))
                conn.send(data.upper())
            except ConnectionResetError:
                break
        conn.close()
    
    if __name__ == '__main__':
        server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        server.bind(ADDRESS)
        server.listen(2)
        print(current_thread().name)
        while True:
            conn,addr=server.accept()
            t.submit(communicate, conn,addr)
    View Code

    客户端

    import socket
    IP='127.0.0.1'
    PORT=8085
    ADDRESS=(IP,PORT)
    BUFFSIZE=1024
    
    client=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    client.connect(ADDRESS)
    
    while True:
        msg=input('>>>>').strip()
        if len(msg)==0:continue
        if msg=='q':break
        client.send(msg.encode('utf-8'))
        data = client.recv(BUFFSIZE)
        print(data.decode('utf-8'))
    
    client.close()
    View Code

    二、协程

    1. 目标:

    在线程下实现并发
    并发(多个任务看起来是同时执行就是并发):切换+保存状态

    2. 协程:

    协程是单线程实现并发
    注意:协程是程序员意淫出来的东西,操作系统里只有进程和线程的概念(操作系统调度的是线程)

    在单线程下实现多个任务间遇到IO就切换就可以降低单线程的IO时间,从而最大限度地提升单线程的效率

    单纯的切换,反而更降低效率

    #串行执行
    import time
    
    def func1():
        for i in range(10000000):
            i+1
    
    def func2():
        for i in range(10000000):
            i+1
    
    start = time.time()
    func1()
    func2()
    stop = time.time()
    print(stop - start)
    
    
    
    #基于yield并发执行
    import time
    def func1():
        while True:
            yield
    
    def func2():
        g=func1()
        for i in range(10000000):
            i+1
            next(g)
    
    start=time.time()
    func2()
    stop=time.time()
    print(stop-start)#2.7103893756866455
    View Code

    三、gevent模块

    1.使用

    from gevent import monkey;monkey.patch_all()#用来识别IO阻塞,必须放到文件头
    from gevent import spawn,joinall
    import time
    
    
    def foo1(name):
        print('%s  play1'%name)
        time.sleep(2)#模拟IO操作,遇到IO切换任务
        print('%s  play2'%name)
    
    
    def foo2(name):
        print('%s  eat1'%name)
        time.sleep(3)#模拟IO操作,遇到IO切换任务
        print('%s  eat2'%name)
    
    f1=spawn(foo1,'egon')#提交任务
    f2=spawn(foo2,'egon')#提交任务
    
    joinall([f1,f2])#主线程等待任务完成
    print('')
    
    #结果:
      #egon play1
      #egon eat1
      #egon play2
      #egon eat2
      #

    2.基于gevent的套接字通信

    服务端

    from gevent import monkey;monkey.patch_all()
    from gevent import spawn
    import socket
    from threading import current_thread
    IP='127.0.0.1'
    PORT=8086
    ADDRESS=(IP,PORT)
    BUFFSIZE=1024
    
    def communicate(conn,addr):
        while True:
            try:
                data=conn.recv(BUFFSIZE)
                if not data:
                    print('%s客户端断开....'%addr)
                    break
                conn.send(data.upper())
            except ConnectionResetError:
                break
    
    
    def server():
        server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        server.bind(ADDRESS)
        server.listen(2)
        print(current_thread().name)
        while True:
            conn,addr=server.accept()
            spawn(communicate,conn,addr)
    
    if __name__ == '__main__':
        s1=spawn(server)
        s1.join()
    View Code

    多个客户端并发

    import socket
    from threading import Thread,current_thread
    
    IP = '127.0.0.1'
    PORT = 8086
    ADDRESS = (IP, PORT)
    BUFFSIZE = 1024
    def client():
    
    
        client=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
        client.connect(ADDRESS)
        n=0
        while True:
            msg='%s say hello %s' %(current_thread().name,n)
            n+=1
            client.send(msg.encode('utf-8'))
            data=client.recv(BUFFSIZE)
            print(data.decode('utf-8'))
    
    if __name__ == '__main__':
        for i in range(500):
            t=Thread(target=client)
            t.start()
    View Code
  • 相关阅读:
    Subgraph Search Over Large Graph Database
    聚类算法:K-means 算法(k均值算法)
    MongoDB 学习笔记之 TTL索引,部分索引和文本索引
    MongoDB 学习笔记之 索引选项和重建索引
    快学Scala 第二十一课 (初始化trait的抽象字段)
    MongoDB 学习笔记之 分析器和explain
    MongoDB 学习笔记之 检测存储引擎
    ELK 学习笔记之 elasticsearch基本概念和CRUD
    ELK 学习笔记之 elasticsearch head插件安装
    ELK 学习笔记之 elasticsearch环境搭建
  • 原文地址:https://www.cnblogs.com/xvchengqi/p/9621989.html
Copyright © 2011-2022 走看看