zoukankan      html  css  js  c++  java
  • python 利用selectors实现异步I/O

    它的功能与linux的epoll,还是select模块,poll等类似;实现高效的I/O multiplexing,  常用于非阻塞的socket的编程中; 简单介绍一下这个模块,更多内容查看 python文档:https://docs.python.org/3/library/selectors.html

    1. 模块定义了一个 BaseSelector的抽象基类, 以及它的子类,包括:SelectSelector, PollSelector, EpollSelector, DevpollSelector, KqueueSelector.    

    另外还有一个DefaultSelector类,它其实是以上其中一个子类的别名而已,它自动选择为当前环境中最有效的Selector,所以平时用 DefaultSelector类就可以了,其它用不着。

    2. 模块定义了两个常量,用于描述 event Mask

    EVENT_READ :      表示可读的; 它的值其实是1;

    EVENT_WRITE:      表示可写的; 它的值其实是2;

    3. 模块定义了一个 SelectorKey类, 一般用这个类的实例 来描述一个已经注册的文件对象的状态, 这个类的几个属性常用到:

    fileobj:   表示已经注册的文件对象;

    fd:          表示文件对象的描述符,是一个整数,它是文件对象的 fileno()方法的返回值;

    events:    表示注册一个文件对象时,我们等待的events, 即上面的event Mask, 是可读呢还是可写呢!!

    data:       表示注册一个文件对象是邦定的data;

    4. 最后说说抽象基类中的方法;

    register(fileobj, events, data=None) 

    作用:注册一个文件对象。

    参数: fileobj——即可以是fd 也可以是一个拥有fileno()方法的对象;

    events——上面的event Mask 常量; data

    返回值: 一个SelectorKey类的实例;

    unregister(fileobj) 

    作用: 注销一个已经注册过的文件对象;

    返回值:一个SelectorKey类的实例;

    modify(fileobj, events, data=None)

    作用:用于修改一个注册过的文件对象,比如从监听可读变为监听可写;它其实就是register() 后再跟unregister(), 但是使用 modify( ) 更高效;

    返回值:一个SelectorKey类的实例;

    select(timeout=None)

    作用: 用于选择满足我们监听的event的文件对象;

    返回值: 是一个(key, events)的元组, 其中key是一个SelectorKey类的实例, 而events 就是 event Mask(EVENT_READ或EVENT_WRITE,或者二者的组合)

    close() 

    作用:关闭 selector。 最后一定要记得调用它, 要确保所有的资源被释放;

    get_key(fileobj)  

    作用: 返回注册文件对象的 key;

    返回值 :一个SelectorKey类的实例;

     服务端

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    from socket import *
    import selectors
    
    sel=selectors.DefaultSelector()
    
    def accept(server_fileobj,mask):
        coon,addr = server_fileobj.accept()
        print(coon,addr,mask)
        sel.register(coon,selectors.EVENT_READ,read)
    
    def read(conn,mask):
        try:
            data = conn.recv(1024)
            if not data:
                print('closing',conn)
                sel.unregister(conn)
                conn.close()
                return
            conn.send(b'hello')
        except Exception:
            print('closing', conn)
            sel.unregister(conn)
            conn.close()
    
    
    
    server_fileobj = socket(AF_INET,SOCK_STREAM)
    server_fileobj.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
    server_fileobj.bind(('127.0.0.1',8088))
    server_fileobj.listen(5)
    server_fileobj.setblocking(False)#设置socket的接口为非阻塞
    
    #相当于往select的读列表里append了一个文件句柄server_fileobj,并且绑定了一个回调函数accept
    sel.register(server_fileobj,selectors.EVENT_READ,accept)
    
    
    
    while True:
        #检测所有的fileobj,是否有完成wait data的
        events = sel.select()
        # SelectorKey = namedtuple('SelectorKey', ['fileobj', 'fd', 'events', 'data'])
        for sel_obj,mask in events:
            callable = sel_obj.data
            callable(sel_obj.fileobj,mask)

     客户端

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    from socket import *
    c=socket(AF_INET,SOCK_STREAM)
    c.connect(('127.0.0.1',8088))
    
    while True:
        msg=input('>>: ')
        if not msg:continue
        c.send(msg.encode('utf-8'))
        data=c.recv(1024)
        print(data.decode('utf-8'))

    模拟请求

     1 #1. epoll并不代表一定比select好
     2 # 在并发高的情况下,连接活跃度不是很高, epoll比select
     3 # 并发性不高,同时连接很活跃, select比epoll好
     4 
     5 #通过非阻塞io实现http请求
     6 # select + 回调 + 事件循环
     7 #  并发性高
     8 # 使用单线程
     9 
    10 import socket
    11 from urllib.parse import urlparse
    12 from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
    13 
    14 
    15 selector = DefaultSelector()
    16 #使用select完成http请求
    17 urls = []
    18 stop = False
    19 
    20 
    21 class Fetcher:
    22     def connected(self, key):
    23         selector.unregister(key.fd)
    24         self.client.send("GET {} HTTP/1.1
    Host:{}
    Connection:close
    
    ".format(self.path, self.host).encode("utf8"))
    25         selector.register(self.client.fileno(), EVENT_READ, self.readable)
    26 
    27     def readable(self, key):
    28         d = self.client.recv(1024)
    29         if d:
    30             self.data += d
    31         else:
    32             selector.unregister(key.fd)
    33             data = self.data.decode("utf8")
    34             html_data = data.split("
    
    ")[1]
    35             print(html_data)
    36             self.client.close()
    37             urls.remove(self.spider_url)
    38             if not urls:
    39                 global stop
    40                 stop = True
    41 
    42     def get_url(self, url):
    43         self.spider_url = url
    44         url = urlparse(url)
    45         self.host = url.netloc
    46         self.path = url.path
    47         self.data = b""
    48         if self.path == "":
    49             self.path = "/"
    50 
    51         # 建立socket连接
    52         self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    53         self.client.setblocking(False)
    54 
    55         try:
    56             self.client.connect((self.host, 80))  # 阻塞不会消耗cpu
    57         except BlockingIOError as e:
    58             pass
    59 
    60         #注册
    61         selector.register(self.client.fileno(), EVENT_WRITE, self.connected)
    62 
    63 
    64 #心脏, 心脏在不停的跳动,跳动就会知道调用什么代码,执行什么方法
    65 #不停的向操作系统询问哪些socket已经准备好了,然后执行回调方法
    66 def loop():
    67     #事件循环,不停的请求socket的状态并调用对应的回调函数
    68     #1. select本身是不支持register模式
    69     #2. socket状态变化以后的回调是由程序员完成的
    70     while not stop:
    71         ready = selector.select()
    72         for key, mask in ready:
    73             call_back = key.data
    74             call_back(key)
    75     #回调+事件循环+select(pollepoll)
    76 
    77 if __name__ == "__main__":
    78     fetcher = Fetcher()
    79     import time
    80     start_time = time.time()
    81     for url in range(20):
    82         url = "http://shop.projectsedu.com/goods/{}/".format(url)
    83         urls.append(url)
    84         fetcher = Fetcher()
    85         fetcher.get_url(url)
    86     loop()
    87     print(time.time()-start_time)
  • 相关阅读:
    Zookeeper 3.4 官方文档翻译
    西门子PLC学习笔记六-(Step7指令简单介绍)
    Leetcode[129]-Sum Root to Leaf Numbers
    一个Exchange 2010 的password不定期弹框的问题处理,希望对大家可以有所帮助。
    数据库分表和分库的原理及基于thinkPHP的实现方法
    电力企业信息化建设方案之调度信息报送系统
    自己定义控件三部曲视图篇(一)——測量与布局
    VS自己定义project向导开发(Custom Wizard)
    Android ROM开发(二)——ROM架构以及Updater-Script脚本分析,常见的Status错误解决的方法
    ALM11需求和测试覆盖率图解1
  • 原文地址:https://www.cnblogs.com/sunlong88/p/9490581.html
Copyright © 2011-2022 走看看