zoukankan      html  css  js  c++  java
  • Python之旅.第九章.并发编程.

    socketserver = 多进程 + 多线程 +  IO模型

    一、上节课复习及作业讲解

    aspawn用法复习

    from gevent import monkey,spawn;monkey.patch_all()

    import time

     

    def f1():

        print('from f1 1')

        time.sleep(3)  # 直接调用gevent模块实现遇到IO切换+保持状态

        print('from f1 2')

     

    def f2():

        print('from f2 1')

        time.sleep(2)

        print('from f2 2')

     

    def f3():

        print('from f3 1')

        time.sleep(5)

        print('from f3 2')

     

    g1=spawn(f1)

    g2=spawn(f2)

    g3=spawn(f3)

    # time.sleep(10) # spawn 默认为异步调用,如果不加time.sleep g.join()spawn提交后不在原地等待执行, 程序直接结束

    g1.join()

    g2.join()

    g3.join()

     

    b、作业讲解  

    服务端:

    from gevent import monkey,spawn;monkey.patch_all()

    from threading import Thread

    from socket import *

     

    def talk(conn):

        while True:

            try:

                data=conn.recv(1024)

                if not data:break

                conn.send(data.upper())

            except ConnectionResetError:

                break

        conn.close()

     

    def server(ip,port,backlog=5):

        s = socket()

        s.bind((ip,port))

        s.listen(backlog)

     

        while True:

            conn, addr = s.accept()

            print(addr)

            # 通信

            g=spawn(talk,conn)

     

        s.close()

     

    if __name__ == '__main__':

        spawn(server,'127.0.0.1',8080).join() # spawn 默认为异步调用,如果不加time.sleep g.join()spawn提交后不在原地等待执行, 程序直接结束

        # server('127.0.0.1',8080)  # 效果等用于spawn(server,'127.0.0.1',8080).join()

     

    客户端:

    from threading import Thread,current_thread

    from socket import *

     

    import os

     

    def client():

        client = socket()

        client.connect(('127.0.0.1', 8080))

     

        while True:

            data = '%s hello' % current_thread().name

            client.send(data.encode('utf-8'))

            res = client.recv(1024)

            print(res.decode('utf-8'))

     

    if __name__ == '__main__':

        for i in range(1000):  #开启1000个线程,以提高效率

            t=Thread(target=client)

            t.start()

     

    二、网络IO操作之wait datacopy data

    网路IO的两个阶段(copy data阶段 + wait data阶段),换言之,所有IO都围绕这两个阶段

     

    服务端:

    from socket import *

     

    s = socket()  # 等同于s=socket(AF_INET, SOCK_STREAM) 默认AF_INET SOCK_STREAM

    s.bind(('127.0.0.1',8080))

    s.listen(5)

     

    while True:

        conn, addr = s.accept()  # wait data (wait的时间取决于客户端和网络两方面因素)+ copy data(app <=> kernel)accept可感觉到明显的阻塞

        print(addr)

        while True:

            try:

                data = conn.recv(1024)  # wait data + copy data(app <=> kernel) recv可感觉到明显的阻塞

                if not data: break      # for linux, do not pop ConnectionResetError

                print('from client msg: ',data)

            except ConnectionResetError:  # for windows 

                break

        conn.close()

     

    客户端:

    from socket import *

     

    client = socket()  # s=socket(AF_INET, SOCK_STREAM) 默认AF_INET SOCK_STREAM

    client.connect(('127.0.0.1', 8080))

     

    while True:

     

        data = input('>>: ').strip()

        if not data:continue   # if just enter, ask to re-input

    client.send(data.encode('utf-8')) # copy data(app <=> kernel) only

    #send(只有copy data阶段)是IO操作,但有时可能感觉不到明显的阻塞,一是因为传输的数据量少,二是因为是本地copy操作不经历网络过程;但如果send的数据量特别大,是有可能感受到阻塞的。

    print('has send')

     

    IO模型的目的:自己实现gevent模块,解决单线程下的IO问题(网络IO,不含time.sleep),从而得到高性能。(之前讲的多进程和多线程并没有解决IO

     

    三、阻塞IO模型

     

    wait datacopy data阶段一个都不能少,完完整整的等下来即为阻塞IO模型

    之前所接触的多进程、多线程、进程池、线程池(除了gevent模块以外)都是阻塞IO模型。

     

    四、非阻塞IO模型(更好的利用wait data阶段)

     

    非阻塞IO只能监测网络IO,不监测time.sleep()这种IO

    非阻塞IO有可能大规模占用CPU做无用操作,所以不推荐使用非阻塞IO

     

    a 非阻塞IO模型(基础bug版)

    服务端:

    from socket import *

    import time

     

    s = socket()

    s.bind(('127.0.0.1',8080))

    s.listen(5)

    s.setblocking(False)  # 不设置默认是True;将其设置成False,即将所有阻塞编程非阻塞(遇到等不到数据的情况,不阻塞,会抛出信息:BlockingIOError

    #gevent模块中 monkey.patch_all() s.setblocking(False)

    r_list=[]

    while True:

        try:

            conn, addr = s.accept()

            r_list.append(conn)

     

        except BlockingIOError:

            # time.sleep(3)  # 非阻塞IO即完全没有阻塞,不应该人为加入time.sleep()

            print('可以去干其他的活了')

            print('rlist: ',len(r_list))

            for conn in r_list:

                try:

                    data=conn.recv(1024)

                    conn.send(data.upper())

                except BlockingIOError:  #如果等不到数据,报出的错误为BlockingIOError

                    continue

     

    客户端:

    from socket import *

    import os

     

    client = socket()

    client.connect(('127.0.0.1', 8080))

     

    while True:

        data='%s say hello' %os.getpid()

        client.send(data.encode('utf-8'))

        res=client.recv(1024)

        print(res.decode('utf-8'))

     

    b 非阻塞IO模型(修正)

    服务端: 

    from socket import *

    import time

     

    s = socket()

    s.bind(('127.0.0.1',8080))

    s.listen(5)

    s.setblocking(False)

     

    r_list=[]

    w_list=[]

    while True:

        try:

            conn, addr = s.accept()

            r_list.append(conn)

     

        except BlockingIOError:

            # time.sleep(0.05)  #非阻塞模型不应该加time.sleep(); 加上time.sleep(0.05) 即把非阻塞IO模型变成IO多路复用模型

            print('可以去干其他的活了')

            print('rlist: ',len(r_list))

     

            # 收消息

            del_rlist=[]

            for conn in r_list:

                try:

                    data=conn.recv(1024)

                    if not data:   # for linux,不抛出ConnectionResetError,监测是否收到数据

                        conn.close()

                        del_rlist.append(conn)

                        continue

                    # conn.send(data.upper())  # 错误做法: send亦有可能阻塞,所以不推荐放在这个位置,宜分到下面发消息模块独立完成

                    w_list.append((conn,data.upper())) # 正确做法: 收集待send数据信息; 以小元组的形式写入列表

                except BlockingIOError:

                    continue

                except ConnectionResetError:

                    conn.close()

                    # r_list.remove(conn)   # 错误做法: 在循环期间不推荐改变所循环对象(listdict等)的结构

                    del_rlist.append(conn)  # 正确做法: 在循环期间不推荐改变所循环对象(listdict等)的结构

     

            # 发消息

            del_wlist=[]

            for item in w_list:

                try:

                    conn=item[0] # 将小元组中数据依次取出

                    res=item[1]  # 将小元组中数据依次取出

                    conn.send(res)

                    del_wlist.append(item)

                except BlockingIOError:

                    continue

                except ConnectionResetError:

                    conn.close()

                    del_wlist.append(item)

     

            # 回收无用连接

            for conn in del_rlist:

                r_list.remove(conn) # 正确做法: 在循环期间不推荐改变所循环对象(listdict等)的结构

     

            for item in del_wlist:

                w_list.remove(item)

     

    客户端: 

    from socket import *

    import os

     

    client = socket()

    client.connect(('127.0.0.1', 8080))

     

    while True:

        data='%s say hello' %os.getpid()

        client.send(data.encode('utf-8'))

        res=client.recv(1024)

        print(res.decode('utf-8'))

     

    五、IO多路复用

     

    IO多路复用可同时监测多个套接字,循环询问操作系统是否已准备好数据。在之前修正版的非阻塞IO模型中加入time.sleep() 即将非阻塞IO模型转化成IO多路复用模型

    当只监测一个套接字时,多路复用比阻塞IO的效率还要低。

    一般会使用select模块帮忙完成IO多路复用模型。(注意: select不能监测到ConnectionResetError,只能监测到BlockingIOError

     

    服务端: 

    from socket import *

    import select

     

    s = socket()

    s.bind(('127.0.0.1',8080))

    s.listen(5)

    s.setblocking(False)

    # print(s)

     

    r_list=[s,]

    w_list=[]

    w_data={}

    while True:

        print('被检测r_list ',len(r_list))

        print('被检测w_list ',len(w_list))

        rl,wl,xl=select.select(r_list,w_list,[],) #r_list=[server,conn] rl等存放等到数据的对象

     

        # print('rl: ',len(rl)) #rl=[conn,]

        # print('wl: ',len(wl))

     

        # 收消息

        for r in rl: #r=conn

            if r == s:   #r l为已经有等到信息的对象,可能为s,亦可为conn;当为s时,执行accept,当为conn时,执行recv

                conn,addr=r.accept()

                r_list.append(conn) # 建立好连接后,将连接丢入r_list中监测

            else:

                try:

                    data=r.recv(1024)

                    if not data:  # select模块不帮忙捕捉ConnectionResetError,此操作针对linux系统

                        r.close()

                        r_list.remove(r)

                        continue

                    # r.send(data.upper())

                    w_list.append(r)

                    w_data[r]=data.upper()

                except ConnectionResetError: #select模块不帮忙捕捉ConnectionResetError,此操作针对windows系统

                    r.close()

                    r_list.remove(r)

                    continue

     

        # 发消息

        for w in wl:

            w.send(w_data[w])

            w_list.remove(w)

            w_data.pop(w)

     

    客户端: 

    from socket import *

    import os

     

    client = socket()

    client.connect(('127.0.0.1', 8080))

     

    while True:

        data='%s say hello' %os.getpid()

        client.send(data.encode('utf-8'))

        res=client.recv(1024)

        print(res.decode('utf-8'))

     

    六、异步IO模型

     

    异步IO模型的效率最高

    之前设计到的异步调用+回调即用到了异步IO模型。具体的实现操作会在爬虫中详细介绍 

    from concurrent.futures import ThreadPoolExecutor

    from threading import current_thread

    import time

    import os

     

    def task(n):

        print('%s is running' %current_thread().name)

        time.sleep(2)

        return n**2

     

    def parse(obj):

        res=obj.result()

        print(res)

     

    if __name__ == '__main__':

        t=ThreadPoolExecutor(4)

     

        future1=t.submit(task,1)

        future1.add_done_callback(parse) #parse函数会在future1对应的任务执行完毕后自动执行,会把future1自动传给parse

     

        future2=t.submit(task,2)

        future2.add_done_callback(parse)

     

        future3=t.submit(task,3)

        future3.add_done_callback(parse)

     

        future4=t.submit(task,4)

    future4.add_done_callback(parse) 

     

    七、重点知识归纳(网络编程+ 并发编程)

    网络编程

                           目标:编写一个C/SB/S架构的基于网络通信的软件

                                                                         

                           1C/S,B/S*****

                                                  server<===============>client

                                                  服务端特点:

                                                                         1、不间断地提供服务

                                                                         2、服务端要支持并发+高性能

     

                           2、互联网

                                                  互联网=物理连接介质+互联网协议(OSI七层***

                                                  

                                                  tcp三次握手,四次挥手 *****

                                                  tcp可靠,但不如udp效率高 *****

                                                  udp不可靠,但效率高 *****

                           

                           3socket*****

                                                  socket抽象层位于传输层与应用层之间

                           

                           4、基于tcp协议的套接字通信(*****

                                                  加上连接循环

                                                  加上通信循环

                           

                           5、粘包问题:(*****

                                                  tcp流式协议独有的粘包问题

                                                                         解决方法:自定义报头

                           

                                                  udp数据报协议没有粘包问题

                                                                         

                           6、远程执行命令的小程序/上传下载文件的程序(*****

                           

                           7、基于udp协议的套接字通信(***

                           

                           

    并发编程

                           目标:让服务端能够支持高并发+高性能

     

                           1               操作系统发展史

                                                  多道技术(*****

                                                                         产生背景

                                                                         多道技术的核心:

                                                                                                1、空间上的复用

                                                                                                2、时间上的复用

                                                                                                

                                                  *****

                                                  并发:看起来同时运行

                                                  并行:真正意义上的同时运行,一个cpu同一时刻只能做一件事

                                                                         只有多核才能同时做多件事,即并行的效果

                                                                         

                                                  

                           2、进程

                                                  1、进程理论(*****

                                                  2、开启进程的两种方式(*****

                                                  3、守护进程(**

                                                  4、互斥锁与信号量(**

                                                  5IPC机制:队列,管道(*

                                                  6、进程queue=管道+ ***

                                                  7、生产者消费者模型(*****

                                                  

                           3、线程

                                                  1、线程理论(*****

                                                  2、开启线程的两种方式(*****

                                                  3、守护线程(**

                                                  4、互斥锁与信号量(**

                                                  5GIL vs 互斥锁(*****

                                                  6Cpython的解释器下(*****

                                                                         多个任务是IO密集型:多线程

                                                                         多个任务是计算密集型:多进程

                                                  7、死锁现象与递归锁(**

                                                  8、线程queue***

                                                  9Event事件(**

                           

                           4、池(*****

                                                  为何要用池:

                                                                         操作系统无法无限开启进程或线程

                                                                         池作用是将进程或线程控制操作系统可承受的范围内

                                                  什么时候用池:

                                                                         当并发的任务数要远超过操作系统所能承受的进程数或

                                                                         线程数的情况应该使用池对进程数或线程数加以限制

                                                  

                                                  如何用池?

                                                                         池内装的东西有两种:

                                                                                                装进程:进程池

                                                                                                装线程:线程池

                                                                                                

                                                                         进程线程池的使用

                           

                                                  

                                                  提交的两种方式:

                                                                         同步调用

                                                                         异步调用+回调机制

                                                                         

                                                  任务执行的三种状态:

                                                                         阻塞

                                                                                                阻塞

                                                                         非阻塞:

                                                                                                就绪

                                                                                                运行

                                                                         

                                                  

                           

                           5、单线程下实现并发(****

                                                  协程:在应用程序级别实现多个任务之间切换+保存状态

                                                  

                                                  高性能:

                                                                         单纯地切换,或者说么有遇到io操作也切换,反而会降低效率

                                                                         检测单线程下的IO行为,实现遇到IO立即切换到其他任务执行

                                                                         

                                                  gevent

                                                                         

                           6IO模型(主要掌握理论****

                                                  阻塞IO

                                                  非阻塞IO

                                                  IO多路复用

                                                  异步IO

  • 相关阅读:
    Spring Cloud (八):服务调用追踪 sleuth & zipkin
    Spring Cloud (七):API 网关
    Spring Cloud (六):声明式 REST 请求 Feign
    maven 下载 jar 包到本地
    K8S 设置 Pod 使用 host 网络、配置 DNS
    Spring Cloud (五):容错处理 Hystrix
    Spring Cloud (四):客户端实现负载均衡
    [数仓]数据仓库设计方案
    [知识图谱]Neo4j知识图谱构建(neo4j-python-pandas-py2neo-v3)
    [Pandas]利用Pandas处理excel数据
  • 原文地址:https://www.cnblogs.com/yangli0504/p/8982567.html
Copyright © 2011-2022 走看看