zoukankan      html  css  js  c++  java
  • Tornado之自定义异步非阻塞的服务器和客户端

    一、自定义的异步非阻塞的客户端

    #!/usr/bin/env python
    # -*- coding: utf8 -*-
    
    
    import select
    import socket
    import pprint
    
    """
    自定义了异步IO模块
    利用非阻塞的socket,不等待连接是否成功,不等待请求的响应
    select模块,去监听创建的套接字,是否有准备写,准备读的
    
    """
    
    
    class HttpResponse:
        def __init__(self, response_data):
            self.raw_data = response_data
            self.data = str(response_data, encoding='utf-8')
            self.Header = {}
            self.GET = {}
            self.BODY = {}
            self.response_info = ''
            self.initialize()
    
        def initialize(self):
            header_data, body_data = self.data.split('
    
    ', 1)
            self.BODY = body_data
    
            header_list = header_data.split('
    ')
            # print(header_list)
            for header_item in header_list:
                header_byte = header_item.split(':', 1)
                if len(header_byte) == 2:
                    self.Header[header_byte[0]] = header_byte[1]
    
                else:
                    self.response_info = header_byte
    
        def __str__(self):
            return self.response_info
    
    
    class HttpRequest:
        """
        对HttpRequest的简单封装,将socket,host和callback封装成一个对象
        """
    
        def __init__(self, sk, host, callback):
            """
            收到socket,host, callback回调函数,将其封装成HttRequest对象
            :param sk:一个socket对象
            :param host:主机名或者域名
            :param callback:socket结束后的回调函数
            """
            self.socket = sk
            self.host = host
            self.callback = callback
    
        def fileno(self):
            """
            select可以直接监听
            定义一个fileno()并返回一个,可以将HttpRequest()对象直接放在select直接去轮训,监听
            :return:fileno()文件描述符
            """
            return self.socket.fileno()
    
    
    class AsyncHttpClient:
        """
        异步Http客户端
        """
    
        def __init__(self):
            self.conn = []
            self.connection = []
    
        def add_request(self, host, callback):
            """
            传过来一个host和callback,自动将其封装HttpRequest对象,然后将其加入到self.conn和self.connection中
            :param host: 传过来一个host,一个回调函数callback
            :param callback:
            :return:
            """
            sk = socket.socket()
            sk.setblocking(0)  # 创建一个非阻塞的socket
            try:
                sk.connect((host, 80))  # 这个socket去连指定的主机的80端口
            except BlockingIOError as e:
                pass
            # 将socket,host,callback封装成一个对象
            http_request = HttpRequest(sk, host, callback)
    
            # 将对象追加到conn和connection
            self.conn.append(http_request)
            self.connection.append(http_request)
    
        def run(self):
            # 任务调用的接口
            while True:
                # 监听套接字的信号
                # select(rlist,wlist,xlist)
                # 三个列表,select分别监听 三个列表,rlist是否有读信号,wlist是否写信号,xlist是否有异常信号,最后是timeout
                # 一旦有某个对象相应的信号,就将当前的当前的对象返回给对应的位置
                rlist, wlist, xlist = select.select(self.conn, self.connection, self.conn, 0.05)
                # 一旦有写信号
                for w in wlist:
                    # 首次就是服务器,告知连接成功,你可以往里面写数据啦
                    # print(w.host,'连接成功,赶紧写数据吧!')
                    # 在套接字赶紧写上我要获取你的首页
                    tpl = "GET / HTTP/1.0
    Host:%s
    
    " % (w.host,)
                    w.socket.send(bytes(tpl, encoding='utf-8'))
                    # select就不用监听这个套接字是否有写入信号,直接移除掉
                    self.connection.remove(w)
    
                # 一旦有读信号
                for r in rlist:
                    # print(r.host,'开始收到数据啦~~~')
    
                    rec_data = bytes()
                    while True:
                        # 开始收数据,直到接收不到数据
                        try:
                            chunk = r.socket.recv(8096)  # 一次接受8096个字节的数据
                            rec_data += chunk
                        except BlockingIOError as e:
                            break
                    # print(r.host,rec_data)
    
                    # 执行打包中的callback方法,将接收到数据传过去
                    r.callback(rec_data)
                    r.socket.close()
                    # 获取到相应的相应数据,就不在监听其是否有读数据
                    self.conn.remove(r)
                if len(self.conn) == 0:
                    # 如果没有要监听是否有读信号的套接字,就可以跳出循环任务结束了
                    break
    
    
    def f1(rec_data):
        response = HttpResponse(rec_data)
        print(response.BODY)
    
    
    def f2(data):
        print('输出到屏幕')
    
    
    # 定义一个字典列表,字典列表包含主机名和回调函数
    url_list = [
        {'host': 'www.baidu.com', 'callback': f1},
        {'host': 'cn.bing.com', 'callback': f1},
        {'host': 'www.cnblogs.com', 'callback': f1},
    ]
    # 声明一个异步Async
    
    requestclient = AsyncHttpClient()
    
    for item in url_list:
        requestclient.add_request(item['host'], item['callback'])
    
    requestclient.run()

    二、自定义的异步非阻塞的服务端

    #!/usr/bin/env python
    # -*- coding: utf8 -*-
    
    
    import socket, select
    
    EOL1 = b'/r/n'
    EOL2 = b'/r/n/r/n'
    
    # 拼接成的response
    response = b'HTTP/1.0 200 OK/r/nDate: Mon, 1 Jan 1996 01:01:01 GMT/r/n'
    response += b'Content-Type: text/plain/r/nContent-Length: 13/r/n/r/n'
    response += b'Hello, world!'
    
    # 创建一个服务端的socket,来监听是否有请求过来
    serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    serversocket.bind(('0.0.0.0', 8080))  # 绑定
    serversocket.listen(1)  # 监听
    serversocket.setblocking(0)  # 设置时非阻塞
    print(serversocket.fileno())
    epoll = select.epoll()  # 准备设计一个IO多路复用
    epoll.register(serversocket.fileno(), select.EPOLLIN)  # 把上面的serversocket的fileno() 和 监听准备读信号 注册到epoll中去
    
    try:
        # 设置三个空字典
        connections = {}
        requests = {}
        responses = {}
        while True:
            # 查看epoll是否有信号有的话,就放在events中
            events = epoll.poll(1)
    
            # 循环events,分别拿到 文件描述号 和对应的事件
            for fileno, event in events:
                # 如果当前的文件描述号是serversocket,那么说明有新的连接
                if fileno == serversocket.fileno():
                    # 所以就得接受,创建了 连接,拿到了对方的IP地址
                    connection, address = serversocket.accept()
                    # connection就是客户端连接过来建立的socket,设置为非阻塞
                    connection.setblocking(0)
                    # 客户端建立的socket也注册到select模块的IO多路复用中去
                    epoll.register(connection.fileno(), select.EPOLLIN)
    
                    # 以Connection的文件描述号 作为键 socket作为值保存在connections中
                    connections[connection.fileno()] = connection
    
                    # 同时在requests和responses字典中,
                    # requests中 以connection.fileno() 作为键 以请求的内容作为值
                    # responses中 以connection.fileno() 作为键 以相应的内容作为值,这个我们返回的是固定的,仅仅返回hello world
    
                    requests[connection.fileno()] = b''
                    responses[connection.fileno()] = response
    
                # 如果请求的数据不是socketserver,那肯定是客户端的,判断是否是准备读的信号
                elif event & select.EPOLLIN:
                    # 立马来开始读取数据,加到requests对象套接字的内容中去
                    requests[fileno] += connections[fileno].recv(1024)
    
                    # 判断换行 和 两个换行是否在接收过来的数据中
                    if EOL1 in requests[fileno] or EOL2 in requests[fileno]:
                        # 如果是的话,就将这个套接字的监听更新为准备写
                        epoll.modify(fileno, select.EPOLLOUT)
    
                        # 打印40个-,然后换行,加上请求的内容
                        print('-' * 40 + '/n' + requests[fileno].decode()[:-2])
                # 如果请求的数据不是socketserver,那肯定是客户端的,判断是否是准备写的信号
                elif event & select.EPOLLOUT:
                    # 立马来开始读取数据,就将response的对应的套接字号对应的值拿出来,其实就是hello world,O(∩_∩)O哈哈~
                    # 并统计发送了多少个字节
                    byteswritten = connections[fileno].send(responses[fileno])
                    # 更新response对应套接字内容为剩下的内容
                    responses[fileno] = responses[fileno][byteswritten:]
                    # 如果内容发完了,剩下的长度就是0,如果长度是0
                    if len(responses[fileno]) == 0:
                        # 就修改select,不监听该套接字的内容,其就变成了EPOLLHUP
                        epoll.modify(fileno, 0)
                        # 然后关闭该socket的管道
                        connections[fileno].shutdown(socket.SHUT_RDWR)
                elif event & select.EPOLLHUP:
                    # 如果不监听该套接字的内容,就将其注销掉
                    epoll.unregister(fileno)
                    # 关闭该套接字
                    connections[fileno].close()
                    # 从连接中删除该文件描述符
                    del connections[fileno]
    finally:
        # 最后关闭serversocket服务器套接字
        epoll.unregister(serversocket.fileno())
        # 关闭epoll
        epoll.close()
        # 套接字关闭
        serversocket.close()
  • 相关阅读:
    JDBC异常总结
    MySQL查看最大连接数和修改最大连接数
    标量子查询 子查询执行次数计算公式
    left join 关联条件位置
    MySQL查看最大连接数和修改最大连接数
    MySQL查看最大连接数和修改最大连接数
    MySQL查看最大连接数和修改最大连接数
    MySQL查看最大连接数和修改最大连接数
    MySQL查看最大连接数和修改最大连接数
    MySQL查看最大连接数和修改最大连接数
  • 原文地址:https://www.cnblogs.com/nulige/p/6897694.html
Copyright © 2011-2022 走看看