zoukankan      html  css  js  c++  java
  • 150行代码搭建异步非阻塞Web框架

    最近看Tornado源码给了我不少启发,心血来潮决定自己试着只用python标准库来实现一个异步非阻塞web框架。花了点时间感觉还可以,一百多行的代码已经可以撑起一个极简框架了。

    一、准备工作

    需要的相关知识点:

    • HTTP协议的请求和响应
    • IO多路复用
    • asyncio

    掌握上面三个点的知识就完全没有问题,不是很清楚的同学我也推荐几篇参考文章

      HTTP协议详细介绍(https://www.cnblogs.com/haiyan123/p/7777924.html

      Python篇-IO多路复用详解(https://www.jianshu.com/p/818f27379a5e

      Python异步IO之协程(一):从yield from到async的使用(https://blog.csdn.net/SL_World/article/details/86597738

    实验环境:

    python 3.7.3
    

     由于在框架中会使用到async/await关键字,所以只要确保python版本在3.5以上即可。

     二、框架功能目标

    我们的框架要实现最基本的几个功能:

    • 封装HTTP请求响应
    • 路由映射
    • 类视图和函数视图
    • 协程支持

     当然一个完善的web框架需要实现的远远不止这些,这里我们现在只需要它能跑起来就足够了。

    三、封装HTTP协议

    HTTP是基于TCP/IP通信协议来实现数据传输,与一般的C/S相比,它的特点在于当客户端(浏览器)向服务端发起HTTP请求,服务端响应数据后双方立马断开连接,服务端无法主动向客户端发送数据。HTTP协议数据传输内容分为请求头和请求体,请求头和请求体之间使用" "进行分隔。在请求头中,第一行包含了请求方式,请求路径和HTTP协议,此后每一行以key: value的形式传输数据。

    对于我们的web服务端来说,需要的就是解析http请求和处理http响应。

    我们通过写两个类,HttpRequest和HttpResponse来实现。

    3.1 HttpRequest

    HttpRequest设计目标是解析从socket接收request数据

     1 class HttpRequest(object):
     2     def __init__(self, content: bytes):
     3         self.content = content.decode('utf-8')
     4         self.headers = {}
     5         self.GET = {}
     6         self.url = ''
     7         self.full_path = ''
     8         self.body = ''
     9         try:
    10             header, self.body = self.content.split('
    
    ')
    11             temp = header.split('
    ')
    12             first_line = temp.pop(0)
    13             self.method, self.url, self.protocol = first_line.split(' ')
    14             self.full_path = self.url
    15             for t in temp:
    16                 k, v = t.split(': ', 1)
    17                 self.headers[k] = v
    18         except Exception as e:
    19             print(e)
    20         if len(self.url.split('?')) > 1: # 解析GET参数
    21             self.url = self.full_path.split('?')[0] # 把url中携带的参数去掉
    22             parms = self.full_path.split('?')[1].split('&')
    23             for p in parms: # 将GET参数添加到self.GET字典
    24                 k, v = p.split('=')
    25                 self.GET[k] = v

    在类中,我们实现解析http请求的headers、method、url和GET参数,其实还有很多事情没有做,比如使用POST传输数据时,数据是在请求体中,针对这部分内容我并没有开始写,原因在于本文主要目的还是异步非阻塞框架,目前的功能已经足以支持我们进行下一步实验了。

    3.2 HttpResponse

    HTTP响应也可以分为响应头和响应体,我们可以很简单的实现一个response:

     1 class HttpResponse(object):
     2     def __init__(self, data: str):
     3         self.status_code = 200 # 默认响应状态 200
     4         self.headers = 'HTTP/1.1 %s OK
    '
     5         self.headers += 'Server:AsyncWeb'
     6         self.headers += '
    
    '
     7         self.data = data
     8 
     9     @property
    10     def content(self):
    11         return bytes((self.headers + self.data) % self.status_code, encoding='utf8')

    HttpResponse中并没有做太多的事情,接受一个字符串,并使用content返回一个满足HTTP响应格式的bytes。

    从用户调用角度,可以使用return HttpResponse("欢迎来到AsynicWeb")来返回数据。

    我们也可以简单的定义一个404页面:

    Http404 = HttpResponse('<html><h1>404</h1></html>')
    Http404.status_code = 404

    四、路由映射

    路由映射简单理解就是从一个URL地址找到对应的逻辑函数。举个例子,我们访问http://127.0.0.1:8000这个页面,在http请求中它的url是"/",在web服务器中有一个函数index,web服务器能够由url地址"/"找到函数index,这就是一个路由映射。

    其实路由映射实现起来非常简单。我们只要定义一个映射列表,列表中的每个元素包含url和逻辑处理(视图函数)两部分,当一个http请求到达的时候,遍历映射列表,使用正则匹配每一个url,如果请求的url和映射表中的相同,我们就可以取出对应的视图函数。

    路由映射表是完全由用户来定义映射关系的,它应该使用一个我们定义的标准结构,比如:

    routers = [
        ('/$', IndexView),
        ('/home', asy)
    ]

    五、类视图和函数视图

    视图是指能够根据一个请求,执行某些逻辑运算,最终返回响应的模块。说到这里,一个web框架的运行流程就出来了:

        http请求——路由映射表——视图——执行视图获取返回值——http响应

    在我们的框架中,借鉴Django的设计,我们让它支持类视图(CBV)和函数视图(FBV)两种模式。

    对于函数视图,完全由用户自己定义,只要至少能够接受一个request参数即可

    对于类视图,我们需要做一些预处理,确保用户按我们的规则来实现类视图。

    定义一个View类:

    1 class View(object):
    2     # CBV应继承View类
    3     def dispatch(self, request):
    4         method = request.method.lower()
    5         if hasattr(self, method):
    6             return getattr(self, method)(request)
    7         else:
    8             return Http404

     在View类中,我们只写了一个dispatch方法,其实就做了一件事:反射。当我们在路由映射表中找对应的视图时,如果判断视图属于类,我们就调用dispatch方法。

    从用户角度来看,实现一个CBV只需要继承View类,然后通过定义get、post、delete等方法来实现不同的处理。

    六、socket和多路复用

    上面几个小节实现了web框架的大体执行路径,从这节开始我们实现web服务器的核心。

    通过IO多路复用可以达到单线程实现高并发的效果,一个标准的IO多路复用写法:

     1 server = socket(AF_INET, SOCK_STREAM)
     2 server.bind(("127.0.0.1", 8000))
     3 server.setblocking(False) # 设置非阻塞
     4 server.listen(128)
     5 Future_Task_Wait = {}
     6 rlist = [server, ]
     7 while True:
     8     r, w, x = select.select(rlist, [], [], 0.1)
     9     for o in r:
    10         if o == server:
    11             '''判断o是server还是conn'''
    12             conn, addr = o.accept()
    13             conn.setblocking(False) # 设置非阻塞
    14             rlist.append(conn) # 客户连接 加入轮询列表
    15         else:
    16             data = b""
    17             while True: # 接收客户传输数据
    18                 try:
    19                     chunk = o.recv(1024)
    20                     data = data + chunk
    21                 except Exception as e:
    22                     chunk = None
    23                 if not chunk:
    24                     break
    25             dosomething(o, data, routers) # 拿到数据干点啥

     通过这段代码我们可以获得所有的请求了,下一步就是处理这些请求。

    我们就定义一个dosomething函数

     1 import re
     2 import time
     3 from types import FunctionType
     4 
     5 def dosomething(o, data, routers):
     6     '''解析http请求,寻找映射函数并执行得到结果
    7 :param o: socket连接对象 8 :param data: socket接收数据 9 :return: 响应结果 10 ''' 11 request = HttpRequest(data) 12 print(time.strftime("【%Y-%m-%d %X】",time.localtime()), o.getpeername()[0], 13 request.method, request.url) 14 flag = False 15 for router in routers: 16 if re.match(router[0], request.url): 17 target = router[1] 18 flag = True 19 break 20 if flag: 21 # 判断targe是函数还是类 22 if isinstance(target, FunctionType): 23 result = target(request) 24 elif issubclass(target, View): 25 result = target().dispatch(request) 26 else: 27 result = Http404 28 else: 29 result = Http404 30 return result

    这段代码做了这么几件事。1.实例化HttpRequest;2.使用正则遍历路由映射表;3.将request传入视图函数或类视图的dispatch方法;4.拿到result结果

    我们通过result = dosomething(o, data, routers)可以拿到结果,接下来我们只需要把结果发回给客户端并断开连接就可以了

    o.sendall(result.content)  # 由于result是一个HttpResponse对象 我们使用content属性
    rlist.remove(o) # 从轮询中删除连接
    o.close() # 关闭连接

    至此,我们的web框架已经搭建好了。

    但它还是一个同步的框架,在我们的服务端中,其实一直通过while循环在监听select是否变化,假如我们在视图函数中添加IO操作,其他连接依然会阻塞等待,接下来让我们的框架实现对协程的支持。

    七、协程支持

    在实现协程之前,我们先聊聊Tornado的Future对象。可以说Tornado异步非阻塞的实现核心就是Future。

    Future对象内部维护了一个重要属性_result,这是一个标记位,一个刚实例化的Future内部的_result=None,我们可以通过其他操作来更改_result的状态。另一方面,我们可以一直监听每个Future对象的_result状态,如果发生变化就执行某些特定的操作。

    我们在第六节定义的dosomething函数中拿到了一个result,它应当是一个HttpResponse对象,那么能不能返回一个Future对象呢。

    假如result是一个Future对象,我们的服务端不立马返回结果,而是把Future放进另一个轮询列表中,当Future内的_result改变时再返回结果,就达到了异步的效果。

    我们也可以定义一个Future类,这个类维护只一个变量result:

    1 class Future(object):
    2     def __init__(self):
    3         self.result = None

     对于框架使用者来说,在视图函数要么返回一个HttpResponse对象代表立即返回,要么返回一个Future对象说你先别管我,我把事情干完了再通知你返回结果。

    既然视图函数返回的可能不只是HttpResponse对象,那么我们就需要对第六步的代码增加额外的处理:

    Future_Task_Wait = {} # 定义一个异步Future字典
    result = dosomething() # 拿到结果后执行下面判断
    if isinstance(result, Future):
        Future_Task_Wait[o] = result # Futre对象则加入字典
    else:
        o.sendall(result.content) # 非Future对象直接返回结果并断开连接
        rlist.remove(o)
        o.close()

    在while True轮询内再增加一段代码,遍历Future_Task_Wait字典:

    rm_conn = [] # 需要移除列表的conn
    for conn, future in Future_Task_Wait.items():
        if future.result:
            try:
                conn.sendall(HttpResponse(data=future.result).content) # 返回result
            finally:
                rlist.remove(conn) 
                conn.close()
                rm_conn.append(conn)
    for conn in rm_conn: # 在字典中删除conn
        del Future_Task_Wait[conn]

     这样,我们就可以返回一个Future来告诉服务器这是将来才返回的对象。

    那回归正题,我们到底该如何使用协程?这里我用的方法是创建一个子线程来执行协程事件循环,主线程永远在监听socket。

    from threading import Thread
    def start_loop(loop):
        asyncio.set_event_loop(loop)
        loop.run_forever()
    coroutine_loop = asyncio.new_event_loop()  # 创建协程事件循环
    run_loop_thread = Thread(target=start_loop, args=(coroutine_loop,))  # 新起线程运行事件循环, 防止阻塞主线程
    run_loop_thread.start()  # 运行线程,即运行协程事件循环

    当我们要把asyncdo方法添加作为协程任务时

    asyncio.run_coroutine_threadsafe(asyncdo(), coroutine_loop)

    好了,异步非阻塞的核心代码分析的差不多了,将六七节的代码整合写成一个类

      1 import re
      2 import time
      3 import select
      4 import asyncio
      5 from socket import *
      6 from threading import Thread
      7 from types import FunctionType
      8 from http.response import Http404, HttpResponse
      9 from http.request import HttpRequest
     10 from views import View
     11 from core.future import Future
     12 
     13 class App(object):
     14     # web应用程序
     15     coroutine_loop = None
     16 
     17     def __new__(cls, *args, **kwargs):
     18         # 使用单例模式
     19         if not hasattr(cls, '_instance'):
     20             App._instance = super().__new__(cls)
     21         return App._instance
     22 
     23     def listen(self, host, port, routers):
     24         # IO多路复用监听连接
     25         server = socket(AF_INET, SOCK_STREAM)
     26         server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
     27         server.bind((host, port))
     28         server.setblocking(False)
     29         server.listen(128)
     30         Future_Task_Wait = {}
     31         rlist = [server, ]
     32         while True:
     33             r, w, x = select.select(rlist, [], [], 0.01)
     34             for o in r:
     35                 if o == server:
     36                     '''判断o是server还是conn'''
     37                     conn, addr = o.accept()
     38                     conn.setblocking(False)
     39                     rlist.append(conn)
     40                 else:
     41                     data = b""
     42                     while True:
     43                         try:
     44                             chunk = o.recv(1024)
     45                             data = data + chunk
     46                         except Exception as e:
     47                             chunk = None
     48                         if not chunk:
     49                             break
     50                     try:
     51                         request = HttpRequest(data, o)
     52                         print(time.strftime("【%Y-%m-%d %X】",time.localtime()), o.getpeername()[0],
     53                               request.method, request.url)
     54                         flag = False
     55                         for router in routers:
     56                             if re.match(router[0], request.url):
     57                                 target = router[1]
     58                                 flag = True
     59                                 break
     60                         if flag:
     61                             # 判断targe是函数还是类
     62                             if isinstance(target, FunctionType):
     63                                 result = target(request)
     64                             elif issubclass(target, View):
     65                                 result = target().dispatch(request)
     66                             else:
     67                                 result = Http404
     68                         else:
     69                             result = Http404
     70                         # 判断result是不是future
     71                         if isinstance(result, Future):
     72                             Future_Task_Wait[o] = result
     73                         else:
     74                             o.sendall(result.content)
     75                             rlist.remove(o)
     76                             o.close()
     77                     except Exception as e:
     78                         print(e)
     79             rm_conn = []
     80             for conn, future in Future_Task_Wait.items():
     81                 if future.result:
     82                     try:
     83                         conn.sendall(HttpResponse(data=future.result).content)
     84                     finally:
     85                         rlist.remove(conn)
     86                         conn.close()
     87                         rm_conn.append(conn)
     88             for conn in rm_conn:
     89                 del Future_Task_Wait[conn]
     90 
     91     def run(self, host='127.0.0.1', port=8000, routers=()):
     92         # 主线程select多路复用,处理http请求和响应
     93         # 给协程单独创建一个子线程,负责处理View函数提交的协程
     94         def start_loop(loop):
     95             asyncio.set_event_loop(loop)
     96             loop.run_forever()
     97         self.coroutine_loop = asyncio.new_event_loop()  # 创建协程事件循环
     98         run_loop_thread = Thread(target=start_loop, args=(self.coroutine_loop,))  # 新起线程运行事件循环, 防止阻塞主线程
     99         run_loop_thread.start()  # 运行线程,即运行协程事件循环
    100         self.listen(host, port, routers)

    八、框架测试

    现在,可以测试我们的web框架了。

     1 import asyncio
     2 from core.server import App
     3 from views import View
     4 from http.response import *
     5 from core.future import Future
     6 
     7 
     8 class IndexView(View):
     9     def get(self, request):
    10         return HttpResponse('欢迎来到首页')
    11 
    12     def post(self, request):
    13         return HttpResponse('post')
    14 
    15 def asy(request):
    16     future = Future()
    17     print('异步调用')
    18     wait = request.url.split('/')[-1]
    19     try:
    20         wait = int(wait)
    21     except:
    22         wait = 5
    23     asyncio.run_coroutine_threadsafe(dosomething(future, wait), app.coroutine_loop)
    24     print('返回Future')
    25     return future
    26 
    27 async def dosomething(future, wait):
    28     # 异步函数
    29     await asyncio.sleep(wait)# 模拟异步操作
    30     future.result = '等待了%s秒' % wait
    31 
    32 routers = [
    33     ('/$', IndexView),
    34     ('/home', asy)
    35 ]
    36 
    37 # 从用户角度只需使用run()
    38 app = App()
    39 app.run('127.0.0.1', 8080, routers=routers)

    浏览器访问http://127.0.0.1:8080,返回没有问题,如果有同学使用Chrome可能会乱码,那是因为我们的HttpResponse没有返回指定编码,添加一个响应头即可。

    浏览器访问http://127.0.0.1:8080/home,这时候会执行协程,默认等待5s后返回结果,你可以在多个标签页访问这个地址,通过等待时间来验证我们的异步框架是否正常工作。

    九、其他

    至此,我们要实现的异步非阻塞web框架已经完成了。当然这个框架说到底还是太简陋,后续完全可以优化HttpRequest和HttpResponse、增加对 数据库、模板语言等等组件的扩展。

    完整源码已经上传至https://github.com/sswest/AsyncWeb

  • 相关阅读:
    设计模式-享元模式
    设计模式-外观模式
    设计模式-桥接模式
    设计模式-适配器模式
    设计模式-代理模式
    java设计模式中用到的UML图
    VS code 初次安装配置
    CMD部分操作、BAT、以及VS SQL部分快捷键
    网络部分
    CMD 中certutil 操作命令
  • 原文地址:https://www.cnblogs.com/qq575654643/p/11777979.html
Copyright © 2011-2022 走看看