zoukankan      html  css  js  c++  java
  • Tornado之自定义异步非阻塞的服务器和客户端

    一、自定义的异步非阻塞的客户端

    #!/usr/bin/env python
    # -*- coding: utf8 -*-
    
    
    import select
    import socket
    import pprint
    
    """
    自定义了异步IO模块
    利用非阻塞的socket,不等待连接是否成功,不等待请求的响应
    select模块,去监听创建的套接字,是否有准备写,准备读的
    
    """
    
    
    class HttpResponse:
        def __init__(self, response_data):
            self.raw_data = response_data
            self.data = str(response_data, encoding='utf-8')
            self.Header = {}
            self.GET = {}
            self.BODY = {}
            self.response_info = ''
            self.initialize()
    
        def initialize(self):
            header_data, body_data = self.data.split('
    
    ', 1)
            self.BODY = body_data
    
            header_list = header_data.split('
    ')
            # print(header_list)
            for header_item in header_list:
                header_byte = header_item.split(':', 1)
                if len(header_byte) == 2:
                    self.Header[header_byte[0]] = header_byte[1]
    
                else:
                    self.response_info = header_byte
    
        def __str__(self):
            return self.response_info
    
    
    class HttpRequest:
        """
        对HttpRequest的简单封装,将socket,host和callback封装成一个对象
        """
    
        def __init__(self, sk, host, callback):
            """
            收到socket,host, callback回调函数,将其封装成HttRequest对象
            :param sk:一个socket对象
            :param host:主机名或者域名
            :param callback:socket结束后的回调函数
            """
            self.socket = sk
            self.host = host
            self.callback = callback
    
        def fileno(self):
            """
            select可以直接监听
            定义一个fileno()并返回一个,可以将HttpRequest()对象直接放在select直接去轮训,监听
            :return:fileno()文件描述符
            """
            return self.socket.fileno()
    
    
    class AsyncHttpClient:
        """
        异步Http客户端
        """
    
        def __init__(self):
            self.conn = []
            self.connection = []
    
        def add_request(self, host, callback):
            """
            传过来一个host和callback,自动将其封装HttpRequest对象,然后将其加入到self.conn和self.connection中
            :param host: 传过来一个host,一个回调函数callback
            :param callback:
            :return:
            """
            sk = socket.socket()
            sk.setblocking(0)  # 创建一个非阻塞的socket
            try:
                sk.connect((host, 80))  # 这个socket去连指定的主机的80端口
            except BlockingIOError as e:
                pass
            # 将socket,host,callback封装成一个对象
            http_request = HttpRequest(sk, host, callback)
    
            # 将对象追加到conn和connection
            self.conn.append(http_request)
            self.connection.append(http_request)
    
        def run(self):
            # 任务调用的接口
            while True:
                # 监听套接字的信号
                # select(rlist,wlist,xlist)
                # 三个列表,select分别监听 三个列表,rlist是否有读信号,wlist是否写信号,xlist是否有异常信号,最后是timeout
                # 一旦有某个对象相应的信号,就将当前的当前的对象返回给对应的位置
                rlist, wlist, xlist = select.select(self.conn, self.connection, self.conn, 0.05)
                # 一旦有写信号
                for w in wlist:
                    # 首次就是服务器,告知连接成功,你可以往里面写数据啦
                    # print(w.host,'连接成功,赶紧写数据吧!')
                    # 在套接字赶紧写上我要获取你的首页
                    tpl = "GET / HTTP/1.0
    Host:%s
    
    " % (w.host,)
                    w.socket.send(bytes(tpl, encoding='utf-8'))
                    # select就不用监听这个套接字是否有写入信号,直接移除掉
                    self.connection.remove(w)
    
                # 一旦有读信号
                for r in rlist:
                    # print(r.host,'开始收到数据啦~~~')
    
                    rec_data = bytes()
                    while True:
                        # 开始收数据,直到接收不到数据
                        try:
                            chunk = r.socket.recv(8096)  # 一次接受8096个字节的数据
                            rec_data += chunk
                        except BlockingIOError as e:
                            break
                    # print(r.host,rec_data)
    
                    # 执行打包中的callback方法,将接收到数据传过去
                    r.callback(rec_data)
                    r.socket.close()
                    # 获取到相应的相应数据,就不在监听其是否有读数据
                    self.conn.remove(r)
                if len(self.conn) == 0:
                    # 如果没有要监听是否有读信号的套接字,就可以跳出循环任务结束了
                    break
    
    
    def f1(rec_data):
        response = HttpResponse(rec_data)
        print(response.BODY)
    
    
    def f2(data):
        print('输出到屏幕')
    
    
    # 定义一个字典列表,字典列表包含主机名和回调函数
    url_list = [
        {'host': 'www.baidu.com', 'callback': f1},
        {'host': 'cn.bing.com', 'callback': f1},
        {'host': 'www.cnblogs.com', 'callback': f1},
    ]
    # 声明一个异步Async
    
    requestclient = AsyncHttpClient()
    
    for item in url_list:
        requestclient.add_request(item['host'], item['callback'])
    
    requestclient.run()

    二、自定义的异步非阻塞的服务端

    #!/usr/bin/env python
    # -*- coding: utf8 -*-
    
    
    import socket, select
    
    EOL1 = b'/r/n'
    EOL2 = b'/r/n/r/n'
    
    # 拼接成的response
    response = b'HTTP/1.0 200 OK/r/nDate: Mon, 1 Jan 1996 01:01:01 GMT/r/n'
    response += b'Content-Type: text/plain/r/nContent-Length: 13/r/n/r/n'
    response += b'Hello, world!'
    
    # 创建一个服务端的socket,来监听是否有请求过来
    serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    serversocket.bind(('0.0.0.0', 8080))  # 绑定
    serversocket.listen(1)  # 监听
    serversocket.setblocking(0)  # 设置时非阻塞
    print(serversocket.fileno())
    epoll = select.epoll()  # 准备设计一个IO多路复用
    epoll.register(serversocket.fileno(), select.EPOLLIN)  # 把上面的serversocket的fileno() 和 监听准备读信号 注册到epoll中去
    
    try:
        # 设置三个空字典
        connections = {}
        requests = {}
        responses = {}
        while True:
            # 查看epoll是否有信号有的话,就放在events中
            events = epoll.poll(1)
    
            # 循环events,分别拿到 文件描述号 和对应的事件
            for fileno, event in events:
                # 如果当前的文件描述号是serversocket,那么说明有新的连接
                if fileno == serversocket.fileno():
                    # 所以就得接受,创建了 连接,拿到了对方的IP地址
                    connection, address = serversocket.accept()
                    # connection就是客户端连接过来建立的socket,设置为非阻塞
                    connection.setblocking(0)
                    # 客户端建立的socket也注册到select模块的IO多路复用中去
                    epoll.register(connection.fileno(), select.EPOLLIN)
    
                    # 以Connection的文件描述号 作为键 socket作为值保存在connections中
                    connections[connection.fileno()] = connection
    
                    # 同时在requests和responses字典中,
                    # requests中 以connection.fileno() 作为键 以请求的内容作为值
                    # responses中 以connection.fileno() 作为键 以相应的内容作为值,这个我们返回的是固定的,仅仅返回hello world
    
                    requests[connection.fileno()] = b''
                    responses[connection.fileno()] = response
    
                # 如果请求的数据不是socketserver,那肯定是客户端的,判断是否是准备读的信号
                elif event & select.EPOLLIN:
                    # 立马来开始读取数据,加到requests对象套接字的内容中去
                    requests[fileno] += connections[fileno].recv(1024)
    
                    # 判断换行 和 两个换行是否在接收过来的数据中
                    if EOL1 in requests[fileno] or EOL2 in requests[fileno]:
                        # 如果是的话,就将这个套接字的监听更新为准备写
                        epoll.modify(fileno, select.EPOLLOUT)
    
                        # 打印40个-,然后换行,加上请求的内容
                        print('-' * 40 + '/n' + requests[fileno].decode()[:-2])
                # 如果请求的数据不是socketserver,那肯定是客户端的,判断是否是准备写的信号
                elif event & select.EPOLLOUT:
                    # 立马来开始读取数据,就将response的对应的套接字号对应的值拿出来,其实就是hello world,O(∩_∩)O哈哈~
                    # 并统计发送了多少个字节
                    byteswritten = connections[fileno].send(responses[fileno])
                    # 更新response对应套接字内容为剩下的内容
                    responses[fileno] = responses[fileno][byteswritten:]
                    # 如果内容发完了,剩下的长度就是0,如果长度是0
                    if len(responses[fileno]) == 0:
                        # 就修改select,不监听该套接字的内容,其就变成了EPOLLHUP
                        epoll.modify(fileno, 0)
                        # 然后关闭该socket的管道
                        connections[fileno].shutdown(socket.SHUT_RDWR)
                elif event & select.EPOLLHUP:
                    # 如果不监听该套接字的内容,就将其注销掉
                    epoll.unregister(fileno)
                    # 关闭该套接字
                    connections[fileno].close()
                    # 从连接中删除该文件描述符
                    del connections[fileno]
    finally:
        # 最后关闭serversocket服务器套接字
        epoll.unregister(serversocket.fileno())
        # 关闭epoll
        epoll.close()
        # 套接字关闭
        serversocket.close()
  • 相关阅读:
    SharePoint 2013 商务智能报表发布
    sharepoint designer web 服务器似乎没有安装microsoft sharepoint foundation
    SharePoint 2013 Designer系列之数据视图
    SharePoint 2013 Designer系列之数据视图筛选
    SharePoint 2013 Designer系列之自定义列表表单
    SharePoint 2013 入门教程之创建及修改母版页
    SharePoint 2013 入门教程之创建页面布局及页面
    SharePoint 2010 级联下拉列表 (Cascading DropDownList)
    使用SharePoint Designer定制开发专家库系统实例!
    PL/SQL Developer 建立远程连接数据库的配置 和安装包+汉化包+注册机
  • 原文地址:https://www.cnblogs.com/nulige/p/6897694.html
Copyright © 2011-2022 走看看