zoukankan      html  css  js  c++  java
  • sanic官方文档解析之Example(一)

    1,示例

    这部的文档是简单的示例集合,它能够帮助你快速的启动应用大部分的应用,这些应用大多事分类的,并且提供给ini工作的连接代码:

    1.1,基础示例

    这部分示例集成了提供简单sanic简单的代码

    单一APP

    一个简单的sanic应用with一个简单的异步方法通过text和json类型的响应.

    from sanic import Sanic
    from sanic import response as res
    # 实例化Sanic对象
    app = Sanic(__name__)
    
    
    # 服务器访问的路由
    @app.route("/")
    async def test(req):
        return res.text("I am a teapot", status=418)  # 返回的是字符串格式的数据
    
    # 开启服务
    if __name__ == '__main__':
        app.run(host="0.0.0.0", port=8000, debug=True)
    from sanic import Sanic
    from sanic import response
    
    # 实例化Sanic对象
    app = Sanic(__name__)
    
    
    # 服务器的访问路由
    @app.route("/")
    async def test(request):
        return response.json({"test": True})  # 返回的是json数据格式
    
    # 启动服务
    if __name__ == '__main__':
        app.run(host="0.0.0.0", port=8000)

    1.2,使用sanic.view来创建app应用

    展示了使用sanic.viewes.httpmethodview的简单机制,以及将其扩展为为为视图提供自定义异步行为的方法

    from sanic import Sanic
    from sanic.views import HTTPMethodView
    from sanic.response import text
    
    # 实例化Sanic对象
    app = Sanic("some_name")
    
    
    class SimpleView(HTTPMethodView):
    
        def get(self, request):
            return text("I am get method")
    
        def post(self, request):
            return text("I am post method")
    
        def put(self, request):
            return text("I am put method")
    
        def patch(self, requets):
            return text("I am patch method")
    
        def delete(self, request):
            return text("I am delete method")
    
    
    class SimpleAsyncView(HTTPMethodView):
        async def get(self, request):
            return text("I am async get method")
        
        async def post(self, request):
            return text("I am async post method")
        
        async def put(self, request):
            return text("I am async put method")
        
        
    app.add_route(SimpleView.as_view(), "/")  # 注册视图类(后边加访问的路由)
    app.add_route(SimpleAsyncView.as_view(), "/async")  # 注册视图类(后边加访问的路由)
    
    if __name__ == '__main__':
        app.run(host="0.0.0.0", port=8000, debug=True)

    1.3,URL跳转

    from sanic import Sanic
    from sanic import response
    
    app = Sanic(__name__)
    
    
    # 同步函数的跳转
    @app.route("/")
    def handle_response(request):
        return response.redirect("/redirect")
    
    
    # 异步处理函数到的跳转
    @app.route("/redirect")
    async def test(request):
        return response.json({"Redirected": True})
    
    if __name__ == '__main__':
        app.run(host="0.0.0.0", port=8000)

    1.4,redirection URL

    Sanic提供了一种跳转页面的简单方法叫作url_for,它将会独一无二的url名字作为参数并且返回给你分配实际的路由地址.,有了url_for这个方法将帮助我们简化在不同应用跳转中请求的影响,

    from sanic import Sanic
    from sanic import response
    
    app = Sanic(__name__)
    
    
    @app.route("/")
    async def index(request):
        # 用post_handler生成url端
        url = app.url_for("post_handler", post_id=5)
        # 提升成的url最后跳转的页面变成了"/post/5"
        return response.redirect(url)
    
    
    @app.route("/posts/<post_id>")
    async def post_handler(request, post_id):
        return response.text("Post- {}".format(post_id))
                             
    if __name__ == '__main__':
        app.run(host="0.0.0.0", port=8000, debug=True)

    1.5,蓝图

    Sanic提供了一个惊人的特性,可以将您的API和路由分组到一个逻辑集合中,这个集合可以很容易地导入并插入到您的任何Sanic应用程序中,它被称为蓝图。

    from sanic import Blueprint, Sanic
    from sanic.response import file,json
    # 实例化Sanic对象
    app = Sanic(__name__)
    blueprint = Blueprint("name", url_prefix="/my_blueprint")
    blueprint2 = Blueprint("name2", url_prefix="/my_blueprint2")
    blueprint3 = Blueprint("name3", url_prefix="/my_blueprint3")
    
    
    # blueprint蓝图的路由
    @blueprint.route("/foo")
    async def foo(request):
        return json({"msg": "hi from blueprint"})
    
    
    # blueprint2蓝图的路由
    @blueprint2.route("/foo")
    async def foo2(request):
        return json({"msg": "hi from blueprint2"})
    
    
    # blueprint3蓝图的路由
    @blueprint3.route("/foo")
    async def index(request):
        return await file("websocket.html")
    
    
    # websocket的路由
    @app.websocket("/foo")
    async def foo3(request, ws):
        while True:
            data = "hello!"
            print("Sending:" + data)
            await ws.send(data)
            data = await ws.recv()
            print("Received:" + data)
    # 注册蓝图
    app.blueprint(blueprint)  
    app.blueprint(blueprint2)
    app.blueprint(blueprint3)
    
    app.run(host="0.0.0.0", port=8000)

    1.6,日志功能增强

    from sanic import Sanic
    from sanic import response
    import logging
    
    logging_format = "[%(asctime)s] % (process)d-%(Levelname)s"
    logging_format += "%(module)s::%(funcName)s():1%(Lineno)d:"
    logging_format += "%(message)s"
    
    logging.basicConfig(
        format=logging_format,
        level=logging.DEBUG
    )
    log = logging.getLogger()
    
    # 设置日志去重写默认的配置
    sanic = Sanic()
    
    
    @sanic.route("/")
    def test(request):
        log.info("received request; responding with 'hey'")
        return response.text("hey")
    
    
    sanic.run(host="0.0.0.0", port=8000)

    下面的示例代码演示了sanic.app.sanic.middleware()的用法,以便提供一种机制为每个传入请求分配一个唯一的请求ID,并通过aiotask上下文记录它们

    """
    基于https://github.com/skyscanner/aiotask-context中的示例
    和“examples/override logging,run.py”。
    需要https://github.com/skyscanner/aiotask-context/tree/52efbc21e2def2d52abb9a8e951f3ce5e6f690或更高版本
    $pip安装git+https://github.com/skyscanner/aiotask-context.git
    """
    import asyncio
    import uuid
    import logging
    from signal import signal, SIGINT
    from sanic import Sanic
    from sanic import response
    import uvloop  # windows下不支持uvloop
    import aiotask_content as context
    
    log = logging.getLogger(__name__)
    
    
    class RequestIdFilter(logging.Filter):
        def filter(self, record):
            record.request_id = context.get("X-Request-ID")
            return True
    
    
    LOG_SETTINGS = {
        'version': 1,
        'disable_existing_loggers': False,
        'handlers': {
            'console': {
                'class': 'logging.StreamHandler',
                'level': 'DEBUG',
                'formatter': 'default',
                'filters': ['requestid'],
            },
        },
        'filters': {
            'requestid': {
                '()': RequestIdFilter,
            },
        },
        'formatters': {
            'default': {
                'format': '%(asctime)s %(levelname)s %(name)s:%(lineno)d %(request_id)s | %(message)s',
            },
        },
        'loggers': {
            '': {
                'level': 'DEBUG',
                'handlers': ['console'],
                'propagate': True
            },
        }
    }
    app = Sanic(__name__, log_config=LOG_SETTINGS)
    @app.route("/")
    async def test(request):
        log.debug("X-Request-ID: %s", context.get("X-Request-ID"))
        log.info("Hello from test!")
        return response.json({"test": True})
    
    if __name__ == '__main__':
        asyncio.set_event_loop(uvloop.new_event_loop())
        server = app.create_server(host="0.0.0.0", port=8000)
        loop = asyncio.get_event_loop()
        loop.set_task_factory(context.task_factory)
        task = asyncio.ensure_future(server)
        try:
            loop.run_forever()
        except:
            loop.stop()

    1.7,sanic流支持

    Sanic框架内置了对大文件流的支持,下面的代码解释了使用流支持设置SANIC应用程序的过程。

    from sanic import Sanic
    from sanic.views import CompositionView
    from sanic.views import HTTPMethodView
    from sanic.views import stream as stream_decorator
    from sanic.blueprints import Blueprint
    from sanic.response import stream, text
    
    bp = Blueprint("blueprint_request_stream")
    app = Sanic("request_stream")
    
    
    class SimpleView(HTTPMethodView):
        
        @stream_decorator
        async def post(self, request):
            result = ""
            while 1:
                body = await request.stream.get()
                if not body:
                    break
                result += body.decode("utf-8")
            return text(result)
        
        
    @app.post("/stream", stream=True)
    async def handler(request):
        async def streaming(response):
            while 1:
                body = await request.stream.get()
                if not body:
                    break
                body = body.decode("utf-8").replace("1", "A")
                await response.write(body)
        return stream(streaming)
    
    
    @bp.put("/bp_stream", stream=True)
    async def bp_handler(request):
        result = ""
        while 1:
            body = await request.stream.get()
            if not body:
                break
            result += body.decode("utf-8").replace("1", "A")
        return text(result)
    
    
    async def post_handler(request):
        result = ""
        while 1:
            body = await request.stream.get()
            if not body:
                break
            result += body.decode("utf-8")
        return text(result)
    
    
    app.blueprint(bp)
    app.add_route(SimpleView.as_view(), "/method_view")
    view = CompositionView()
    view.add(["POST"], post_handler, stream=True)
    app.add_route(view, "/composition_view")
    
    if __name__ == '__main__':
        app.run(host="0.0.0.0", port=8000)

    简单的客户端展示使用客户端代码流应用(测试一下代码)

    import requests
    # 警告:这是一个繁重的进程
    data = ""
    for i in range(1, 250000):
        data += str(i)
        
    r = requests.post("http://0.0.0.0:8000/stream", data=data)
    
    print(r.text)

    1.8,sanic的并发支持 

    SANIC支持通过多个工作人员支持启动应用程序。但是,为了确保高效的执行,能够限制每个进程/循环的并发性是很重要的。下面的代码部分提供了一个简单的示例,说明如何在asyncio.semaphore的帮助下限制并发性

    from sanic import Sanic
    from sanic.response import json
    
    import asyncio
    import aiohttp
    app = Sanic(__name__)
    
    sem = None
    
    
    @app.listener("before_server_start")
    def init(sanic, loop):
        global sem
        concurrency_per_work = 4
        sem = asyncio.Semaphore(concurrency_per_work,loop=loop)
    
    
    async def bounded_fetch(session, url):
        # 使用session对象执行url的get请求
        async with sem, session.get(url) as response:
            return await response.json()
        
        
    @app.route("/")
    async def test(request):
        # 下载并且提供json例子的服务
        url = "http://api.github.com/repos/channelcat/sanic"
        
        async with aiohttp.ClientSession() as session:
            response = await bounded_fetch(session, url)
            return json(response)
        
    app.run(host="0.0.0.0", port=8000, workers=2)
  • 相关阅读:
    《不懂说话你怎么带团队》读书笔记
    Java内存模型与指令重排
    线程的基本操作
    Spring MVC核心技术
    Spring MVC注解式开发
    Spring MVC执行流程
    大厂技术博客汇总
    内嵌tomcat启动速度慢
    Java 对IP请求进行限流.
    Java过滤XSS脚本, 可通过Appscan扫描
  • 原文地址:https://www.cnblogs.com/ljc-0923/p/10392092.html
Copyright © 2011-2022 走看看