zoukankan      html  css  js  c++  java
  • 网络IO

    1、IO介绍

    对于一个network IO (这里我们以read举例),它会涉及到两个系统对象,一个是调用这个IO的process (or thread),另一个就是系统内核(kernel)。当一个read操作发生时,该操作会经历两个阶段:

    #1)等待数据准备 (Waiting for the data to be ready)
    #2)将数据从内核拷贝到进程中(Copying the data from the kernel to the process)

    服务端:

    from socket import *
    
    s = socket()
    s.bind(('127.0.0.1',8080))
    s.listen(5)
    
    while True:
        conn, addr = s.accept()
        print(addr)
        while True:
            try:
                data = conn.recv(1024)
                if not data: break
                print('from client msg: ',data)
            except ConnectionResetError:
                break
        conn.close()
    
    
    #data = conn.recv这里会有一个明显的阻塞效果recv本质是从网卡收消息,但应用程序不能操作硬件,它会给操作系统发请求,找操作系统拿数据,操作系统缓存里有没有数据取决于网卡,也就是客户端有没有向网卡发数据,等到操作系统缓存里有数据时,会把数据copy给应用程序的缓存
    
    所以recv经历了两个阶段,wait data 与 copy dataaccept和recv操作一样

    客户端:

    from socket import *
    
    client = socket()
    client.connect(('127.0.0.1', 8080))
    
    while True:
    
        data = input('>>: ').strip()
        if not data:continue
        client.send(data.encode('utf-8'))
        print('has send')
    
    #client.send里面的数据此时储存在应用程序中,需要调网卡把数据发送出去,但应用程序不能操作网卡,所以应用程序先把数据copy给自己的操作系统(此时send操作已经完成了),操作系统调网卡把数据发到对方去。这里我们不会感觉send操作会阻塞程序,因为send只要把数据copy给操作系统就完成任务了,如果此时的数据又很小,就不会感觉到阻塞,但send本身是一个IO行为,如果copy的数据量大到占满操作系统的缓冲时,这时候就不能向操作系统里写数据了,这就会感觉到阻塞
    
    #所以send 经历了一个copy data

     2、非阻塞IO:

    #服务端
    
    from socket import *
    import time
    
    s = socket()
    s.bind(('127.0.0.1',8080))
    s.listen(5)
    s.setblocking(False)   #非阻塞,默认情况下是阻塞的(True)
    
    r_list=[]
    w_list=[]
    while True:
        try:
            conn, addr = s.accept()  #捕捉连接,如果没连接来,干其它活
            r_list.append(conn)        #有连接来的话把连接存下来,再进入下来次循环检测连接
    
        except BlockingIOError:
            # time.sleep(0.05)
            print('可以去干其他的活了')
            print('rlist: ',len(r_list))
    
            # 收消息
            del_rlist=[]
            for conn in r_list:
                try:
                    data=conn.recv(1024)
                    if not data:  #如果是linux系统客户端单方面断连接recv会收空
                        conn.close()  #关闭连接
                        del_rlist.append(conn)  #把关闭掉的连接放到准备删除的列表里
                        continue   #直接运行下一行代码
                    # conn.send(data.upper())
                    w_list.append((conn,data.upper()))
                except BlockingIOError:  #当检测到recv没有收到数据时进行下一次循环
                    continue
                except ConnectionResetError:  #捕捉客户端单方面终止连接(window系统)
                    conn.close()   #回收连接
                    # r_list.remove(conn)
                    del_rlist.append(conn)  #把终止掉的连接放到准备删除的列表里
    
            # 发消息
            del_wlist=[]
            for item in w_list:
                try:
                    conn=item[0]   #拿连接
                    res=item[1]    #拿数据
                    conn.send(res)  #发送数据
                    del_wlist.append(item)   #发送成功后就不需要了
                except BlockingIOError:  #检测是否存在发送阻塞
                    continue
                except ConnectionResetError:  #检测发消息时客户端有没有单方面断连接
                    conn.close()  #关掉连接
                    del_wlist.append(item)  #加入要删除的列表里,等待被删除
    
            # 回收无用套接字,无需再监听它们的IO操作
            for conn in del_rlist:
                r_list.remove(conn)  #
    
            for item in del_wlist:    #
                w_list.remove(item)
    
    
    #客户端
    
    from socket import *
    import os
    
    client = socket()
    client.connect(('127.0.0.1', 8080))
    
    while True:
        data='%s say hello' %os.getpid()
        client.send(data.encode('utf-8'))
        res=client.recv(1024)
        print(res.decode('utf-8'))
    非阻塞IO例子

    但是非阻塞IO模型不被推荐,虽然它能够在等待任务完成的时间里干其他活了,但是它循环调用recv()将大幅度推高CPU占用率

    3、多路复用IO(IO multiplexing):

    它的好处就在于单个process就可以同时处理多个网络连接的IO,基本原理就是不断的轮询所负责的所有socket,当某个socket

    有数据到达了,就通知用户进程。

    4、异步IO:

    from concurrent.futures import ThreadPoolExecutor
    from threading import current_thread
    import time
    import os
    
    def task(n):
        print('%s is running' %current_thread().name)
        time.sleep(2)    #模拟IO
        return n**2
    
    def parse(obj):
        res=obj.result()  #拿到task的返回值的结果
        print(res)
    
    if __name__ == '__main__':
        t=ThreadPoolExecutor(4)
    
        future1=t.submit(task,1)
        future1.add_done_callback(parse) #parse函数会在future1对应的任务执行完毕后自动执行,会把future1自动传给parse
    
        future2=t.submit(task,2)
        future2.add_done_callback(parse)
    
        future3=t.submit(task,3)
        future3.add_done_callback(parse)
    
        future4=t.submit(task,4)
        future4.add_done_callback(parse)
    
    
    #打印结果
    ThreadPoolExecutor-0_0 is running
    ThreadPoolExecutor-0_1 is running
    ThreadPoolExecutor-0_2 is running
    ThreadPoolExecutor-0_3 is running
    4
    1
    16
    9
    异步IO

      五种IO模型:https://www.cnblogs.com/linhaifeng/articles/7454717.html

  • 相关阅读:
    Tomcat 性能监控与调优
    04 使用 BTrace 进行拦截调试
    03 JVisualVM(本地和远程可视化监控)
    02 jmap+MAT(内存溢出)、jstack(线程、死循环、死锁)
    01 JVM的参数类型、jinfo & jps(参数和进程查看)、jstat(类加载、垃圾收集、JIT 编译)
    69_缓存预热解决方案:基于storm实时热点统计的分布式并行缓存预热
    66_讲给Java工程师的史上最通俗易懂Storm教程:纯手工集群部署
    57_分布式缓存重建并发冲突问题以及zookeeper分布式锁解决方案
    54_基于nginx+lua+java完成多级缓存架构的核心业务逻辑
    53_部署分发层nginx以及基于lua完成基于商品id的定向流量分发策略
  • 原文地址:https://www.cnblogs.com/zh-xiaoyuan/p/11784190.html
Copyright © 2011-2022 走看看