zoukankan      html  css  js  c++  java
  • Python实战网站开发:Day5搭建Web框架

      搭建Web框架

      由于aiohttp作为一个Web框架比较底层,我们还需要基于aiohttp编写一个更方便处理URL的Web框架。

      在www目录新建coroweb.py

    import asyncio, os, inspect, logging, functools
    
    from urllib import parse
    
    from aiohttp import web
    
    ## apis是处理分页的模块,代码在本章页面末尾,请将apis.py放在www下以防报错
    ## APIError 是指API调用时发生逻辑错误
    from apis import APIError
    
    ## 编写装饰函数 @get()
    def get(path):
        ## Define decorator @get('/path')
        def decorator(func):
            @functools.wraps(func)
            def wrapper(*args, **kw):
                return func(*args, **kw)
            wrapper.__method__ = 'GET'
            wrapper.__route__ = path
            return wrapper
        return decorator
    
    ## 编写装饰函数 @post()
    def post(path):
        ## Define decorator @post('/path')
        def decorator(func):
            @functools.wraps(func)
            def wrapper(*args, **kw):
                return func(*args, **kw)
            wrapper.__method__ = 'POST'
            wrapper.__route__ = path
            return wrapper
        return decorator
    
    ## 以下是RequestHandler需要定义的一些函数
    def get_required_kw_args(fn):
        args = []
        params = inspect.signature(fn).parameters
        for name, param in params.items():
            if param.kind == inspect.Parameter.KEYWORD_ONLY and param.default == inspect.Parameter.empty:
                args.append(name)
        return tuple(args)
    
    def get_named_kw_args(fn):
        args = []
        params = inspect.signature(fn).parameters
        for name, param in params.items():
            if param.kind == inspect.Parameter.KEYWORD_ONLY:
                args.append(name)
        return tuple(args)
    
    def has_named_kw_args(fn):
        params = inspect.signature(fn).parameters
        for name, param in params.items():
            if param.kind == inspect.Parameter.KEYWORD_ONLY:
                return True
    
    def has_var_kw_arg(fn):
        params = inspect.signature(fn).parameters
        for name, param in params.items():
            if param.kind == inspect.Parameter.VAR_KEYWORD:
                return True
    
    def has_request_arg(fn):
        sig = inspect.signature(fn)
        params = sig.parameters
        found = False
        for name, param in params.items():
            if name == 'request':
                found = True
                continue
            if found and (param.kind != inspect.Parameter.VAR_POSITIONAL and param.kind != inspect.Parameter.KEYWORD_ONLY and param.kind != inspect.Parameter.VAR_KEYWORD):
                raise ValueError('request parameter must be the last named parameter in function: %s%s' % (fn.__name__, str(sig)))
        return found
    
    ## 定义RequestHandler从URL函数中分析其需要接受的参数
    class RequestHandler(object):
    
        def __init__(self, app, fn):
            self._app = app
            self._func = fn
            self._has_request_arg = has_request_arg(fn)
            self._has_var_kw_arg = has_var_kw_arg(fn)
            self._has_named_kw_args = has_named_kw_args(fn)
            self._named_kw_args = get_named_kw_args(fn)
            self._required_kw_args = get_required_kw_args(fn)
    
        async def __call__(self, request):
            kw = None
            if self._has_var_kw_arg or self._has_named_kw_args or self._required_kw_args:
                if request.method == 'POST':
                    if not request.content_type:
                        return web.HTTPBadRequest(text='Missing Content-Type.')
                    ct = request.content_type.lower()
                    if ct.startswith('application/json'):
                        params = await request.json()
                        if not isinstance(params, dict):
                            return web.HTTPBadRequest(text='JSON body must be object.')
                        kw = params
                    elif ct.startswith('application/x-www-form-urlencoded') or ct.startswith('multipart/form-data'):
                        params = await request.post()
                        kw = dict(**params)
                    else:
                        return web.HTTPBadRequest(text='Unsupported Content-Type: %s' % request.content_type)
                if request.method == 'GET':
                    qs = request.query_string
                    if qs:
                        kw = dict()
                        for k, v in parse.parse_qs(qs, True).items():
                            kw[k] = v[0]
            if kw is None:
                kw = dict(**request.match_info)
            else:
                if not self._has_var_kw_arg and self._named_kw_args:
                    # remove all unamed kw:
                    copy = dict()
                    for name in self._named_kw_args:
                        if name in kw:
                            copy[name] = kw[name]
                    kw = copy
                # check named arg:
                for k, v in request.match_info.items():
                    if k in kw:
                        logging.warning('Duplicate arg name in named arg and kw args: %s' % k)
                    kw[k] = v
            if self._has_request_arg:
                kw['request'] = request
            # check required kw:
            if self._required_kw_args:
                for name in self._required_kw_args:
                    if not name in kw:
                        return web.HTTPBadRequest(text='Missing argument: %s' % name)
            logging.info('call with args: %s' % str(kw))
            try:
                r = await self._func(**kw)
                return r
            except APIError as e:
                return dict(error=e.error, data=e.data, message=e.message)
    ## 定义add_static函数,来注册static文件夹下的文件
    def add_static(app):
        path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'static')
        app.router.add_static('/static/', path)
        logging.info('add static %s => %s' % ('/static/', path))
    
    ## 定义add_route函数,来注册一个URL处理函数
    def add_route(app, fn):
        method = getattr(fn, '__method__', None)
        path = getattr(fn, '__route__', None)
        if path is None or method is None:
            raise ValueError('@get or @post not defined in %s.' % str(fn))
        if not asyncio.iscoroutinefunction(fn) and not inspect.isgeneratorfunction(fn):
            fn = asyncio.coroutine(fn)
        logging.info('add route %s %s => %s(%s)' % (method, path, fn.__name__, ', '.join(inspect.signature(fn).parameters.keys())))
        app.router.add_route(method, path, RequestHandler(app, fn))
    
    ## 定义add_routes函数,自动把handler模块的所有符合条件的URL函数注册了
    def add_routes(app, module_name):
        n = module_name.rfind('.')
        if n == (-1):
            mod = __import__(module_name, globals(), locals())
        else:
            name = module_name[n+1:]
            mod = getattr(__import__(module_name[:n], globals(), locals(), [name]), name)
        for attr in dir(mod):
            if attr.startswith('_'):
                continue
            fn = getattr(mod, attr)
            if callable(fn):
                method = getattr(fn, '__method__', None)
                path = getattr(fn, '__route__', None)
                if method and path:
                    add_route(app, fn)
    

      最后,在app.py中加入middleware,jinjia2模板和自注册的支持。app.py代码修改如下

      原app.py代码如下

    import logging; logging.basicConfig(level=logging.INFO)
    import asyncio
    from aiohttp import web
    
    ## 定义服务器响应请求的的返回为 "Awesome Website"
    async def index(request):
        return web.Response(body=b'<h1>Awesome Website</h1>', content_type='text/html')
    
    ## 建立服务器应用,持续监听本地9000端口的http请求,对首页"/"进行响应
    def init():
        app = web.Application()
        app.router.add_get('/', index)
        web.run_app(app, host='127.0.0.1', port=9000)
    
    if __name__ == "__main__":
        init()
    

      修改后如下

    import logging; logging.basicConfig(level=logging.INFO)
    import asyncio, os, json, time
    from datetime import datetime
    from aiohttp import web
    from jinja2 import Environment, FileSystemLoader
    
    ## config 配置代码在后面会创建添加, 可先从'https://github.com/yzyly1992/2019_Python_Web_Dev'下载或下一章中复制`config.py`和`config_default.py`到`www`下,以防报错
    from config import configs
    
    import orm
    from coroweb import add_routes, add_static
    
    ## handlers 是url处理模块, 当handlers.py在API章节里完全编辑完再将下一行代码的双井号去掉
    ## from handlers import cookie2user, COOKIE_NAME
    
    ## 初始化jinja2的函数
    def init_jinja2(app, **kw):
        logging.info('init jinja2...')
        options = dict(
            autoescape = kw.get('autoescape', True),
            block_start_string = kw.get('block_start_string', '{%'),
            block_end_string = kw.get('block_end_string', '%}'),
            variable_start_string = kw.get('variable_start_string', '{{'),
            variable_end_string = kw.get('variable_end_string', '}}'),
            auto_reload = kw.get('auto_reload', True)
        )
        path = kw.get('path', None)
        if path is None:
            path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'templates')
        logging.info('set jinja2 template path: %s' % path)
        env = Environment(loader=FileSystemLoader(path), **options)
        filters = kw.get('filters', None)
        if filters is not None:
            for name, f in filters.items():
                env.filters[name] = f
        app['__templating__'] = env
    
    ## 以下是middleware,可以把通用的功能从每个URL处理函数中拿出来集中放到一个地方
    ## URL处理日志工厂
    async def logger_factory(app, handler):
        async def logger(request):
            logging.info('fn is "logger_factory" Request: %s %s' % (request.method, request.path))
            return (await handler(request))
        return logger
    
    ## 认证处理工厂--把当前用户绑定到request上,并对URL/manage/进行拦截,检查当前用户是否是管理员身份
    ## 需要handlers.py的支持, 当handlers.py在API章节里完全编辑完再将下面代码的双井号去掉
    ##async def auth_factory(app, handler):
    ##    async def auth(request):
    ##        logging.info('check user: %s %s' % (request.method, request.path))
    ##        request.__user__ = None
    ##        cookie_str = request.cookies.get(COOKIE_NAME)
    ##        if cookie_str:
    ##            user = await cookie2user(cookie_str)
    ##            if user:
    ##                logging.info('set current user: %s' % user.email)
    ##                request.__user__ = user
    ##        if request.path.startswith('/manage/') and (request.__user__ is None or not request.__user__.admin):
    ##            return web.HTTPFound('/signin')
    ##        return (await handler(request))
    ##    return auth
    
    ## 数据处理工厂
    async def data_factory(app, handler):
        async def parse_data(request):
            if request.method == 'POST':
                if request.content_type.startswith('application/json'):
                    request.__data__ = await request.json()
                    logging.info('request json: %s' % str(request.__data__))
                elif request.content_type.startswith('application/x-www-form-urlencoded'):
                    request.__data__ = await request.post()
                    logging.info('request form: %s' % str(request.__data__))
            return (await handler(request))
        return parse_data
    
    ## 响应返回处理工厂
    async def response_factory(app, handler):
        async def response(request):
            logging.info('fn is "response_factory" Response handler... handler is %s' %handler)
            r = await handler(request)
            if isinstance(r, web.StreamResponse):
                return r
            if isinstance(r, bytes):
                resp = web.Response(body=r)
                resp.content_type = 'application/octet-stream'
                return resp
            if isinstance(r, str):
                if r.startswith('redirect:'):
                    return web.HTTPFound(r[9:])
                resp = web.Response(body=r.encode('utf-8'))
                resp.content_type = 'text/html;charset=utf-8'
                return resp
            if isinstance(r, dict):
                template = r.get('__template__')
                if template is None:
                    resp = web.Response(body=json.dumps(r, ensure_ascii=False, default=lambda o: o.__dict__).encode('utf-8'))
                    resp.content_type = 'application/json;charset=utf-8'
                    return resp
                else:
                    ## 在handlers.py完全完成后,去掉下一行的双井号
                    ##r['__user__'] = request.__user__
                    resp = web.Response(body=app['__templating__'].get_template(template).render(**r).encode('utf-8'))
                    resp.content_type = 'text/html;charset=utf-8'
                    return resp
            if isinstance(r, int) and r >= 100 and r < 600:
                return web.Response(r)
            if isinstance(r, tuple) and len(r) == 2:
                t, m = r
                if isinstance(t, int) and t >= 100 and t < 600:
                    return web.Response(t, str(m))
            # default:
            resp = web.Response(body=str(r).encode('utf-8'))
            resp.content_type = 'text/plain;charset=utf-8'
            return resp
        return response
    
    ## 时间转换
    def datetime_filter(t):
        delta = int(time.time() - t)
        if delta < 60:
            return u'1分钟前'
        if delta < 3600:
            return u'%s分钟前' % (delta // 60)
        if delta < 86400:
            return u'%s小时前' % (delta // 3600)
        if delta < 604800:
            return u'%s天前' % (delta // 86400)
        dt = datetime.fromtimestamp(t)
        return u'%s年%s月%s日' % (dt.year, dt.month, dt.day)
    # print(configs.db)
    async def init(loop):
        await orm.create_pool(loop=loop, **configs.db)
        ## 在handlers.py完全完成后,在下面middlewares的list中加入auth_factory
        app = web.Application(middlewares=[
            logger_factory, response_factory
        ])
        init_jinja2(app, filters=dict(datetime=datetime_filter))
        add_routes(app, 'handlers')
        add_static(app)
        runner = web.AppRunner(app)
        await runner.setup()
        srv = web.TCPSite(runner, '0.0.0.0', 9000)
        logging.info('fn is "init" server started at http://127.0.0.1:9000...')
        await srv.start()
    
    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
        loop.run_until_complete(init(loop))
        loop.run_forever()
    

      以下是处理分页和API错误的代码apis.py,请将之放到www下:

    import json, logging, inspect, functools
    
    ## 建立Page类来处理分页,可以在page_size更改每页项目的个数
    class Page(object):
    
        def __init__(self, item_count, page_index=1, page_size=8):
            self.item_count = item_count
            self.page_size = page_size
            self.page_count = item_count // page_size + (1 if item_count % page_size > 0 else 0)
            if (item_count == 0) or (page_index > self.page_count):
                self.offset = 0
                self.limit = 0
                self.page_index = 1
            else:
                self.page_index = page_index
                self.offset = self.page_size * (page_index - 1)
                self.limit = self.page_size
            self.has_next = self.page_index < self.page_count
            self.has_previous = self.page_index > 1
    
        def __str__(self):
            return 'item_count: %s, page_count: %s, page_index: %s, page_size: %s, offset: %s, limit: %s' % (self.item_count, self.page_count, self.page_index, self.page_size, self.offset, self.limit)
    
        __repr__ = __str__
    
    ## 以下为API的几类错误代码
    class APIError(Exception):
        def __init__(self, error, data='', message=''):
            super(APIError, self).__init__(message)
            self.error = error
            self.data = data
            self.message = message
    
    class APIValueError(APIError):
        def __init__(self, field, message=''):
            super(APIValueError, self).__init__('value:invalid', field, message)
    
    class APIResourceNotFoundError(APIError):
        def __init__(self, field, message=''):
            super(APIResourceNotFoundError, self).__init__('value:notfound', field, message)
    
    class APIPermissionError(APIError):
        def __init__(self, message=''):
            super(APIPermissionError, self).__init__('permission:forbidden', 'permission', message)
    
    if __name__=='__main__':
        import doctest
        doctest.testmod()
    

      有了Web框架,接下来就可以添加需要的URL到handlers模块来处理了。

  • 相关阅读:
    wordpress 的主题
    yapi api协作管理平台
    美团外卖券小程序路径过长导致插入文本消息失败的问题解决办法
    mp://XzDiXafjfvLnjvp
    supervisor 命令
    YII beego gin 框架对比
    芝麻微客-企业微信公域到私域流量运营助手
    H5跳转小程序
    PowerBI开发 第十九篇:基于Page创建Tooltip
    PowerBI开发 第十八篇:行级安全(RLS)
  • 原文地址:https://www.cnblogs.com/minseo/p/15612319.html
Copyright © 2011-2022 走看看