1,示例
这部的文档是简单的示例集合,它能够帮助你快速的启动应用大部分的应用,这些应用大多事分类的,并且提供给ini工作的连接代码:
1.1,基础示例
这部分示例集成了提供简单sanic简单的代码
单一APP
一个简单的sanic应用with一个简单的异步方法通过text和json类型的响应.
from sanic import Sanic
from sanic import response as res
# 实例化Sanic对象
app = Sanic(__name__)
# 服务器访问的路由
@app.route("/")
async def test(req):
return res.text("I am a teapot", status=418) # 返回的是字符串格式的数据
# 开启服务
if __name__ == '__main__':
app.run(host="0.0.0.0", port=8000, debug=True)
from sanic import Sanic
from sanic import response
# 实例化Sanic对象
app = Sanic(__name__)
# 服务器的访问路由
@app.route("/")
async def test(request):
return response.json({"test": True}) # 返回的是json数据格式
# 启动服务
if __name__ == '__main__':
app.run(host="0.0.0.0", port=8000)
1.2,使用sanic.view来创建app应用
展示了使用sanic.viewes.httpmethodview的简单机制,以及将其扩展为为为视图提供自定义异步行为的方法
from sanic import Sanic
from sanic.views import HTTPMethodView
from sanic.response import text
# 实例化Sanic对象
app = Sanic("some_name")
class SimpleView(HTTPMethodView):
def get(self, request):
return text("I am get method")
def post(self, request):
return text("I am post method")
def put(self, request):
return text("I am put method")
def patch(self, requets):
return text("I am patch method")
def delete(self, request):
return text("I am delete method")
class SimpleAsyncView(HTTPMethodView):
async def get(self, request):
return text("I am async get method")
async def post(self, request):
return text("I am async post method")
async def put(self, request):
return text("I am async put method")
app.add_route(SimpleView.as_view(), "/") # 注册视图类(后边加访问的路由)
app.add_route(SimpleAsyncView.as_view(), "/async") # 注册视图类(后边加访问的路由)
if __name__ == '__main__':
app.run(host="0.0.0.0", port=8000, debug=True)
1.3,URL跳转
from sanic import Sanic
from sanic import response
app = Sanic(__name__)
# 同步函数的跳转
@app.route("/")
def handle_response(request):
return response.redirect("/redirect")
# 异步处理函数到的跳转
@app.route("/redirect")
async def test(request):
return response.json({"Redirected": True})
if __name__ == '__main__':
app.run(host="0.0.0.0", port=8000)
1.4,redirection URL
Sanic提供了一种跳转页面的简单方法叫作url_for,它将会独一无二的url名字作为参数并且返回给你分配实际的路由地址.,有了url_for这个方法将帮助我们简化在不同应用跳转中请求的影响,
from sanic import Sanic
from sanic import response
app = Sanic(__name__)
@app.route("/")
async def index(request):
# 用post_handler生成url端
url = app.url_for("post_handler", post_id=5)
# 提升成的url最后跳转的页面变成了"/post/5"
return response.redirect(url)
@app.route("/posts/<post_id>")
async def post_handler(request, post_id):
return response.text("Post- {}".format(post_id))
if __name__ == '__main__':
app.run(host="0.0.0.0", port=8000, debug=True)
1.5,蓝图
Sanic提供了一个惊人的特性,可以将您的API和路由分组到一个逻辑集合中,这个集合可以很容易地导入并插入到您的任何Sanic应用程序中,它被称为蓝图。
from sanic import Blueprint, Sanic
from sanic.response import file,json
# 实例化Sanic对象
app = Sanic(__name__)
blueprint = Blueprint("name", url_prefix="/my_blueprint")
blueprint2 = Blueprint("name2", url_prefix="/my_blueprint2")
blueprint3 = Blueprint("name3", url_prefix="/my_blueprint3")
# blueprint蓝图的路由
@blueprint.route("/foo")
async def foo(request):
return json({"msg": "hi from blueprint"})
# blueprint2蓝图的路由
@blueprint2.route("/foo")
async def foo2(request):
return json({"msg": "hi from blueprint2"})
# blueprint3蓝图的路由
@blueprint3.route("/foo")
async def index(request):
return await file("websocket.html")
# websocket的路由
@app.websocket("/foo")
async def foo3(request, ws):
while True:
data = "hello!"
print("Sending:" + data)
await ws.send(data)
data = await ws.recv()
print("Received:" + data)
# 注册蓝图
app.blueprint(blueprint)
app.blueprint(blueprint2)
app.blueprint(blueprint3)
app.run(host="0.0.0.0", port=8000)
1.6,日志功能增强
from sanic import Sanic
from sanic import response
import logging
logging_format = "[%(asctime)s] % (process)d-%(Levelname)s"
logging_format += "%(module)s::%(funcName)s():1%(Lineno)d:"
logging_format += "%(message)s"
logging.basicConfig(
format=logging_format,
level=logging.DEBUG
)
log = logging.getLogger()
# 设置日志去重写默认的配置
sanic = Sanic()
@sanic.route("/")
def test(request):
log.info("received request; responding with 'hey'")
return response.text("hey")
sanic.run(host="0.0.0.0", port=8000)
下面的示例代码演示了sanic.app.sanic.middleware()的用法,以便提供一种机制为每个传入请求分配一个唯一的请求ID,并通过aiotask上下文记录它们
"""
基于https://github.com/skyscanner/aiotask-context中的示例
和“examples/override logging,run.py”。
需要https://github.com/skyscanner/aiotask-context/tree/52efbc21e2def2d52abb9a8e951f3ce5e6f690或更高版本
$pip安装git+https://github.com/skyscanner/aiotask-context.git
"""
import asyncio
import uuid
import logging
from signal import signal, SIGINT
from sanic import Sanic
from sanic import response
import uvloop # windows下不支持uvloop
import aiotask_content as context
log = logging.getLogger(__name__)
class RequestIdFilter(logging.Filter):
def filter(self, record):
record.request_id = context.get("X-Request-ID")
return True
LOG_SETTINGS = {
'version': 1,
'disable_existing_loggers': False,
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'level': 'DEBUG',
'formatter': 'default',
'filters': ['requestid'],
},
},
'filters': {
'requestid': {
'()': RequestIdFilter,
},
},
'formatters': {
'default': {
'format': '%(asctime)s %(levelname)s %(name)s:%(lineno)d %(request_id)s | %(message)s',
},
},
'loggers': {
'': {
'level': 'DEBUG',
'handlers': ['console'],
'propagate': True
},
}
}
app = Sanic(__name__, log_config=LOG_SETTINGS)
@app.route("/")
async def test(request):
log.debug("X-Request-ID: %s", context.get("X-Request-ID"))
log.info("Hello from test!")
return response.json({"test": True})
if __name__ == '__main__':
asyncio.set_event_loop(uvloop.new_event_loop())
server = app.create_server(host="0.0.0.0", port=8000)
loop = asyncio.get_event_loop()
loop.set_task_factory(context.task_factory)
task = asyncio.ensure_future(server)
try:
loop.run_forever()
except:
loop.stop()
1.7,sanic流支持
Sanic框架内置了对大文件流的支持,下面的代码解释了使用流支持设置SANIC应用程序的过程。
from sanic import Sanic
from sanic.views import CompositionView
from sanic.views import HTTPMethodView
from sanic.views import stream as stream_decorator
from sanic.blueprints import Blueprint
from sanic.response import stream, text
bp = Blueprint("blueprint_request_stream")
app = Sanic("request_stream")
class SimpleView(HTTPMethodView):
@stream_decorator
async def post(self, request):
result = ""
while 1:
body = await request.stream.get()
if not body:
break
result += body.decode("utf-8")
return text(result)
@app.post("/stream", stream=True)
async def handler(request):
async def streaming(response):
while 1:
body = await request.stream.get()
if not body:
break
body = body.decode("utf-8").replace("1", "A")
await response.write(body)
return stream(streaming)
@bp.put("/bp_stream", stream=True)
async def bp_handler(request):
result = ""
while 1:
body = await request.stream.get()
if not body:
break
result += body.decode("utf-8").replace("1", "A")
return text(result)
async def post_handler(request):
result = ""
while 1:
body = await request.stream.get()
if not body:
break
result += body.decode("utf-8")
return text(result)
app.blueprint(bp)
app.add_route(SimpleView.as_view(), "/method_view")
view = CompositionView()
view.add(["POST"], post_handler, stream=True)
app.add_route(view, "/composition_view")
if __name__ == '__main__':
app.run(host="0.0.0.0", port=8000)
简单的客户端展示使用客户端代码流应用(测试一下代码)
import requests
# 警告:这是一个繁重的进程
data = ""
for i in range(1, 250000):
data += str(i)
r = requests.post("http://0.0.0.0:8000/stream", data=data)
print(r.text)
1.8,sanic的并发支持
SANIC支持通过多个工作人员支持启动应用程序。但是,为了确保高效的执行,能够限制每个进程/循环的并发性是很重要的。下面的代码部分提供了一个简单的示例,说明如何在asyncio.semaphore的帮助下限制并发性
from sanic import Sanic
from sanic.response import json
import asyncio
import aiohttp
app = Sanic(__name__)
sem = None
@app.listener("before_server_start")
def init(sanic, loop):
global sem
concurrency_per_work = 4
sem = asyncio.Semaphore(concurrency_per_work,loop=loop)
async def bounded_fetch(session, url):
# 使用session对象执行url的get请求
async with sem, session.get(url) as response:
return await response.json()
@app.route("/")
async def test(request):
# 下载并且提供json例子的服务
url = "http://api.github.com/repos/channelcat/sanic"
async with aiohttp.ClientSession() as session:
response = await bounded_fetch(session, url)
return json(response)
app.run(host="0.0.0.0", port=8000, workers=2)