1,通过docker部署sanic项目
通过Docker和Docker Compose部署SANIC应用程序是一项很容易实现的任务,下面的示例提供了示例simple_server.py的部署
FROM python:3.5
MAINTAINER Channel Cat <channelcat@gmail.com>
ADD . /code
RUN pip3 install git+https://github.com/channelcat/sanic
EXPOSE 8000
WORKDIR /code
CMD ["python", "simple_server.py"]
version: '2'
services:
sanic:
build: .
ports:
- "8000:8000"
2,监控和错误处理
Sanic通过sanic.handlers.errorhandler提供全局异常处理程序的可扩展的最低限度的实现。此示例演示如何扩展它以启用某些自定义行为
"""
使用SANIC的错误处理程序框架截取未捕获异常的示例。
这可能对希望使用哨兵、气闸等的开发者有用。
或者自定义系统来记录和监视生产中的意外错误。
首先,我们创建自己的类继承自sanic.exceptions中的处理程序,
当我们创建我们的理智的实例时,传递它的一个实例。在这里面
类的默认处理程序,我们可以执行任何操作,包括将异常发送到
外部服务
"""
from sanic.handlers import ErrorHandler
from sanic.exceptions import SanicException
"""
与CustomHandler类相关的导入和代码
(通常情况下,这将在单独的文件中)
"""
class CustomHandler(ErrorHandler):
def default(self, request, exception):
# 在这里,我们可以访问异常对象
# 并且可以用它做任何事情(日志、发送到外部服务等)
if not isinstance(exception, SanicException):
print(exception)
# 那么,我们必须通过返回来完成异常处理
# 我们对客户的回应
# 为此,我们可以调用超级类的默认处理程序
return super().default(request, exception)
"""
这是一个普通的sanic服务器,除了
服务器的错误处理程序到CustomHandler的一个实例
"""
from sanic import Sanic
app = Sanic(__name__)
handler = CustomHandler()
app.error_handler = handler
@app.route("/")
async def test(request):
# 这里,发生了一些导致意外异常的事情
# 这个异常将流向我们的自定义处理程序。
raise SanicException("You Broke It!")
if __name__ == '__main__':
app.run(host="0.0.0.0", port=True)
3,使用外部服务提供监控
import logging
import socket
from os import getenv
from platform import node
from uuid import getnode as get_mac
from logdna import LogDNAHandler
from sanic import Sanic
from sanic.response import json
from sanic.request import Request
log = logging.getLogger("logdna")
log.setLevel(logging.INFO)
def get_my_ip_address(remote_server="google".com):
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.connect((remote_server, 80))
return s.getsockname()[0]
def get_mac_address():
h = iter(hex(get_mac())[2:].zfill(12))
return ":".join(i + next(h) for i in h)
logdna_options = {
"app": __name__,
"index_meta": True,
"hostname": node(),
"ip": get_my_ip_address(),
"mac": get_mac_address()
}
logdna_handler = LogDNAHandler(getenv("LOGDNA_API_KEY"), options=logdna_options)
logdna = logging.getLogger(__name__)
logdna.setLevel(logging.INFO)
logdna.addHandler(logdna_handler)
app = Sanic(__name__)
@app.middleware
def log_request(request: Request):
logdna.info("I was Here with a new Request to URL: {}".format(request.url))
@app.route("/")
def default(request):
return json({
"response": "I was here"
})
if __name__ == "__main__":
app.run(
host="0.0.0.0",
port=getenv("PORT", 8080)
)
4,RayGun
from os import getenv
from raygun4py.raygunprovider import RaygunSender
from sanic import Sanic
from sanic.exceptions import SanicException
from sanic.handlers import ErrorHandler
class RaygunExceptionReporter(ErrorHandler):
def __init__(self, raygun_api_key=None):
super().__init__()
if raygun_api_key is None:
raygun_api_key = getenv("RAYGUN_API_KEY")
self.sender = RaygunSender(raygun_api_key)
def default(self, request, exception):
self.sender.send_exception(exception=exception)
return super().default(request, exception)
raygun_error_reporter = RaygunExceptionReporter()
app = Sanic(__name__, error_handler=raygun_error_reporter)
@app.route("/raise")
async def test(request):
raise SanicException('You Broke It!')
if __name__ == '__main__':
app.run(
host="0.0.0.0",
port=getenv("PORT", 8080)
)
5,Rollbar
import rollbar
from sanic.handlers import ErrorHandler
from sanic import Sanic
from sanic.exceptions import SanicException
from os import getenv
rollbar.init(getenv("ROLLBAR_API_KEY"))
class RollbarExceptionHandler(ErrorHandler):
def default(self, request, exception):
rollbar.report_message(str(exception))
return super().default(request, exception)
app = Sanic(__name__, error_handler=RollbarExceptionHandler())
@app.route("/raise")
def create_error(request):
raise SanicException("I was here and I don't like where I am")
if __name__ == "__main__":
app.run(
host="0.0.0.0",
port=getenv("PORT", 8080)
)
6,Sentry
from os import getenv
from sentry_sdk import init as sentry_init
from sentry_sdk.integrations.sanic import SanicIntegration
from sanic import Sanic
from sanic.response import json
sentry_init(
dsn=getenv("SENTRY_DSN"),
integrations=[SanicIntegration()],
)
app = Sanic(__name__)
# noinspection PyUnusedLocal
@app.route("/working")
async def working_path(request):
return json({
"response": "Working API Response"
})
# noinspection PyUnusedLocal
@app.route("/raise-error")
async def raise_error(request):
raise Exception("Testing Sentry Integration")
if __name__ == '__main__':
app.run(
host="0.0.0.0",
port=getenv("PORT", 8080)
)
7,安全
下面的示例代码展示一个简单的基于认证和授权机制的装饰器,且在你的sanicAPI端设置安全
# -*- coding: utf-8 -*-
from sanic import Sanic
from functools import wraps
from sanic.response import json
app = Sanic()
def check_request_for_authorization_status(request):
# 注意:定义检查,例如cookie、会话.
flag = True
return flag
def authorized():
def decorator(f):
@wraps(f)
async def decorated_function(request, *args, **kwargs):
# 运行检查请求的方法
# 客户授权状态
is_authorized = check_request_for_authorization_status(request)
if is_authorized:
# 用户被授权.
# 运行处理程序方法并返回响应
response = await f(request, *args, **kwargs)
return response
else:
#用户未经授权
return json({'status': 'not_authorized'}, 403)
return decorated_function
return decorator
@app.route("/")
@authorized()
async def test(request):
return json({'status': 'authorized'})
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8000)
8,sanic的Websocket(提供了一种增加路由的方法)
<!DOCTYPE html>
<html>
<head>
<title>WebSocket demo</title>
</head>
<body>
<script>
var ws = new WebSocket('ws://' + document.domain + ':' + location.port + '/feed'),
messages = document.createElement('ul');
ws.onmessage = function (event) {
var messages = document.getElementsByTagName('ul')[0],
message = document.createElement('li'),
content = document.createTextNode('Received: ' + event.data);
message.appendChild(content);
messages.appendChild(message);
};
document.body.appendChild(messages);
window.setInterval(function() {
data = 'bye!'
ws.send(data);
var messages = document.getElementsByTagName('ul')[0],
message = document.createElement('li'),
content = document.createTextNode('Sent: ' + data);
message.appendChild(content);
messages.appendChild(message);
}, 1000);
</script>
</body>
</html>
from sanic import Sanic
from sanic.response import file
app = Sanic(__name__)
@app.route('/')
async def index(request):
return await file('websocket.html')
@app.websocket('/feed')
async def feed(request, ws):
while True:
data = 'hello!'
print('Sending: ' + data)
await ws.send(data)
data = await ws.recv()
print('Received: ' + data)
if __name__ == '__main__':
app.run(host="0.0.0.0", port=8000, debug=True)
9主机托管:
from sanic import response
from sanic import Sanic
from sanic.blueprints import Blueprint
# Usage
# curl -H "Host: example.com" localhost:8000
# curl -H "Host: sub.example.com" localhost:8000
# curl -H "Host: bp.example.com" localhost:8000/question
# curl -H "Host: bp.example.com" localhost:8000/answer
app = Sanic()
bp = Blueprint("bp", host="bp.example.com")
@app.route('/', host=["example.com",
"somethingelse.com",
"therestofyourdomains.com"])
async def hello(request):
return response.text("Some defaults")
@app.route('/', host="sub.example.com")
async def hello(request):
return response.text("42")
@bp.route("/question")
async def hello(request):
return response.text("What is the meaning of life?")
@bp.route("/answer")
async def hello(request):
return response.text("42")
app.blueprint(bp)
if __name__ == '__main__':
app.run(host="0.0.0.0", port=8000)
10,支持并行测试运行的单元测试
下面的示例演示如何使用Pytest xdist插件提供的并行测试执行支持来启动和运行单元测试SANIC应用程序
"""
SANIC服务器的pytest xdist示例
安装测试工具:
$pip安装pytest pytest xdist
使用xdist参数运行:
$pytest示例/pytest_xdist.py-n 8 8名工人
"""
import re
from sanic import Sanic
from sanic.response import text
from sanic.testing import PORT as PORT_BASE, SanicTestClient
import pytest
@pytest.fixture(scope="session")
def test_port(worker_id):
m = re.search(r'[0-9]+', worker_id)
if m:
num_id = m.group(0)
else:
num_id = 0
port = PORT_BASE + int(num_id)
return port
@pytest.fixture(scope="session")
def app():
app = Sanic()
@app.route('/')
async def index(request):
return text('OK')
return app
@pytest.fixture(scope="session")
def client(app, test_port):
return SanicTestClient(app, test_port)
@pytest.mark.parametrize('run_id', range(100))
def test_index(client, run_id):
request, response = client._sanic_endpoint_test('get', '/')
assert response.status == 200
assert response.text == 'OK'
11,修改请求对象
Sanic中的请求对象是一种dict对象,这意味着request对象可以作为常规dict对象进行操作。
from sanic import Sanic
from sanic.response import text
from random import randint
app = Sanic()
@app.middleware('request')
def append_request(request):
# Add new key with random value
request['num'] = randint(0, 100)
@app.get('/pop')
def pop_handler(request):
# Pop key from request object
num = request.pop('num')
return text(num)
@app.get('/key_exist')
def key_exist_handler(request):
# Check the key is exist or not
if 'num' in request:
return text('num exist in request')
return text('num does not exist in reqeust')
app.run(host="0.0.0.0", port=8000, debug=True)