zoukankan      html  css  js  c++  java
  • python 微服务方案

    介绍

    使用python做web开发面临的一个最大的问题就是性能,在解决C10K问题上显的有点吃力。有些异步框架Tornado、Twisted、Gevent 等就是为了解决性能问题。这些框架在性能上有些提升,但是也出现了各种古怪的问题难以解决。

    在python3.6中,官方的异步协程库asyncio正式成为标准。在保留便捷性的同时对性能有了很大的提升,已经出现许多的异步框架使用asyncio。

    使用较早的异步框架是aiohttp,它提供了server端和client端,对asyncio做了很好的封装。但是开发方式和最流行的微框架flask不同,flask开发简单,轻量,高效。正是结合这些优点, 以Sanic为基础,集成多个流行的库来搭建微服务。 Sanic框架是和Flask相似的异步协程框架,简单轻量,并且性能很高。本项目就是以Sanic为基础搭建的python微服务框架。(思想适用于其他语言)

    微服务设计原则个人总结:

    X 轴 :指的是水平复制,很好理解,就是讲单体系统多运行几个实例,做个集群加负载均衡的模式。
    Z 轴 :是基于类似的数据分区,比如一个互联网打车应用突然或了,用户量激增,集群模式撑不住了,那就按照用户请求的地区进行数据分区,北京、上海、四川等多建几个集群。简单理解数据库拆分,比如分库分表
    Y 轴 :就是我们所说的微服务的拆分模式,就是基于不同的业务拆分。

    微服务总体架构:

        

    特点

    • 使用sanic异步框架,简单,轻量,高效。
    • 使用uvloop为核心引擎,使sanic在很多情况下单机并发甚至不亚于Golang。
    • 使用asyncpg为数据库驱动,进行数据库连接,执行sql语句执行。
    • 使用aiohttp为Client,对其他微服务进行访问。
    • 使用peewee为ORM,但是只是用来做模型设计和migration。
    • 使用opentracing为分布式追踪系统。
    • 使用unittest做单元测试,并且使用mock来避免访问其他微服务。
    • 使用swagger做API标准,能自动生成API文档。

    服务端

    使用sanic异步框架,有较高的性能,但是使用不当会造成blocking, 对于有IO请求的都要选用异步库。添加库要慎重。
    sanic使用uvloop异步驱动,uvloop基于libuv使用Cython编写,性能比nodejs还要高。

    功能说明:

    启动前

    1.  
      @app.listener('before_server_start')
    2.  
      async def before_srver_start(app, loop):
    3.  
      queue = asyncio.Queue()
    4.  
      app.queue = queue
    5.  
      loop.create_task(consume(queue, app.config.ZIPKIN_SERVER))
    6.  
      reporter = AioReporter(queue=queue)
    7.  
      tracer = BasicTracer(recorder=reporter)
    8.  
      tracer.register_required_propagators()
    9.  
      opentracing.tracer = tracer
    10.  
      app.db = await ConnectionPool(loop=loop).init(DB_CONFIG)
    • 创建DB连接池
    • 创建Client连接
    • 创建queue, 消耗span,用于日志追踪
    • 创建opentracing.tracer进行日志追踪

    中间件

    1.  
      @app.middleware('request')
    2.  
      async def cros(request):
    3.  
      if request.method == 'POST' or request.method == 'PUT':
    4.  
      request['data'] = request.json
    5.  
      span = before_request(request)
    6.  
      request['span'] = span
    7.  
       
    8.  
       
    9.  
      @app.middleware('response')
    10.  
      async def cors_res(request, response):
    11.  
      span = request['span'] if 'span' in request else None
    12.  
      if response is None:
    13.  
      return response
    14.  
      result = {'code': 0}
    15.  
      if not isinstance(response, HTTPResponse):
    16.  
      if isinstance(response, tuple) and len(response) == 2:
    17.  
      result.update({
    18.  
      'data': response[0],
    19.  
      'pagination': response[1]
    20.  
      })
    21.  
      else:
    22.  
      result.update({'data': response})
    23.  
      response = json(result)
    24.  
      if span:
    25.  
      span.set_tag('http.status_code', "200")
    26.  
      if span:
    27.  
      span.set_tag('component', request.app.name)
    28.  
      span.finish()
    29.  
      return response
    • 创建span, 用于日志追踪
    • 对response进行封装,统一格式

    异常处理

    对抛出的异常进行处理,返回统一格式

    任务

    创建task消费queue中对span,用于日志追踪

    异步处理

    由于使用的是异步框架,可以将一些IO请求并行处理

    Example:

    1.  
      async def async_request(datas):
    2.  
      # async handler request
    3.  
      results = await asyncio.gather(*[data[2] for data in datas])
    4.  
      for index, obj in enumerate(results):
    5.  
      data = datas[index]
    6.  
      data[0][data[1]] = results[index]
    7.  
       
    8.  
      @user_bp.get('/<id:int>')
    9.  
      @doc.summary("get user info")
    10.  
      @doc.description("get user info by id")
    11.  
      @doc.produces(Users)
    12.  
      async def get_users_list(request, id):
    13.  
      async with request.app.db.acquire(request) as cur:
    14.  
      record = await cur.fetch(
    15.  
      """ SELECT * FROM users WHERE id = $1 """, id)
    16.  
      datas = [
    17.  
      [record, 'city_id', get_city_by_id(request, record['city_id'])]
    18.  
      [record, 'role_id', get_role_by_id(request, record['role_id'])]
    19.  
      ]
    20.  
      await async_request(datas)
    21.  
      return record

    get_city_by_id, get_role_by_id是并行处理。

    相关连接

    sanic

    模型设计 & ORM

    Peewee is a simple and small ORM. It has few (but expressive) concepts, making it easy to learn and intuitive to use。

    ORM使用peewee, 只是用来做模型设计和migration, 数据库操作使用asyncpg。

    Example:

    1.  
      # models.py
    2.  
       
    3.  
      class Users(Model):
    4.  
      id = PrimaryKeyField()
    5.  
      create_time = DateTimeField(verbose_name='create time',
    6.  
      default=datetime.datetime.utcnow)
    7.  
      name = CharField(max_length=128, verbose_name="user's name")
    8.  
      age = IntegerField(null=False, verbose_name="user's age")
    9.  
      sex = CharField(max_length=32, verbose_name="user's sex")
    10.  
      city_id = IntegerField(verbose_name='city for user', help_text=CityApi)
    11.  
      role_id = IntegerField(verbose_name='role for user', help_text=RoleApi)
    12.  
       
    13.  
      class Meta:
    14.  
      db_table = 'users'
    15.  
       
    16.  
       
    17.  
      # migrations.py
    18.  
       
    19.  
      from sanic_ms.migrations import MigrationModel, info, db
    20.  
       
    21.  
      class UserMigration(MigrationModel):
    22.  
      _model = Users
    23.  
       
    24.  
      # @info(version="v1")
    25.  
      # def migrate_v1(self):
    26.  
      # migrate(self.add_column('sex'))
    27.  
       
    28.  
      def migrations():
    29.  
      try:
    30.  
      um = UserMigration()
    31.  
      with db.transaction():
    32.  
      um.auto_migrate()
    33.  
      print("Success Migration")
    34.  
      except Exception as e:
    35.  
      raise e
    36.  
       
    37.  
      if __name__ == '__main__':
    38.  
      migrations()
    • 运行命令 python migrations.py
    • migrate_v1函数添加字段sex, 在BaseModel中要先添加name字段
    • info装饰器会创建表migrate_record来记录migrate,version每个model中必须唯一,使用version来记录是否执行过,还可以记录author,datetime
    • migrate函数必须以migrate_开头

    相关连接

    peewee

    数据库操作

    asyncpg is the fastest driver among common Python, NodeJS and Go implementations

    使用asyncpg为数据库驱动, 对数据库连接进行封装, 执行数据库操作。

    不使用ORM做数据库操作,一个原因是性能,ORM会有性能的损耗,并且无法使用asyncpg高性能库。另一个是单个微服务是很简单的,表结构不会很复杂,简单的SQL语句就可以处理来,没必要引入ORM。使用peewee只是做模型设计

    Example:

    1.  
      sql = "SELECT * FROM users WHERE name=$1"
    2.  
      name = "test"
    3.  
      async with request.app.db.acquire(request) as cur:
    4.  
      data = await cur.fetchrow(sql, name)
    5.  
       
    6.  
      async with request.app.db.transaction(request) as cur:
    7.  
      data = await cur.fetchrow(sql, name)
    • acquire() 函数为非事务, 对于只涉及到查询的使用非事务,可以提高查询效率
    • tansaction() 函数为事务操作,对于增删改必须使用事务操作
    • 传入request参数是为了获取到span,用于日志追踪
    • TODO 数据库读写分离

    相关连接

    asyncpg
    benchmarks

    客户端

    使用aiohttp中的client,对客户端进行了简单的封装,用于微服务之间访问。

    Don’t create a session per request. Most likely you need a session per application which performs all requests altogether.
    A session contains a connection pool inside, connection reusage and keep-alives (both are on by default) may speed up total performance.

    Example:

    1.  
      @app.listener('before_server_start')
    2.  
      async def before_srver_start(app, loop):
    3.  
      app.client = Client(loop, url='http://host:port')
    4.  
       
    5.  
      async def get_role_by_id(request, id):
    6.  
      cli = request.app.client.cli(request)
    7.  
      async with cli.get('/cities/{}'.format(id)) as res:
    8.  
      return await res.json()
    9.  
       
    10.  
      @app.listener('before_server_stop')
    11.  
      async def before_server_stop(app, loop):
    12.  
      app.client.close()
    13.  
       

    对于访问不同的微服务可以创建多个不同的client,这样每个client都会keep-alives

    日志 & 分布式追踪系统

    装饰器logger

    1.  
      @logger(type='method', category='test', detail='detail', description="des", tracing=True, level=logging.INFO)
    2.  
      async def get_city_by_id(request, id):
    3.  
      cli = request.app.client.cli(request)
    • type: 日志类型,如 method, route
    • category: 日志类别,默认为app的name
    • detail: 日志详细信息
    • description: 日志描述,默认为函数的注释
    • tracing: 日志追踪,默认为True
    • level: 日志级别,默认为INFO

    分布式追踪系统

    • OpenTracing是以Dapper,Zipkin等分布式追踪系统为依据, 建立了统一的标准。
    • Opentracing跟踪每一个请求,记录请求所经过的每一个微服务,以链条的方式串联起来,对分析微服务的性能瓶颈至关重要。
    • 使用opentracing框架,但是在输出时转换成zipkin格式。 因为大多数分布式追踪系统考虑到性能问题,都是使用的thrift进行通信的,本着简单,Restful风格的精神,没有使用RPC通信。以日志的方式输出, 可以使用fluentd, logstash等日志收集再输入到Zipkin。Zipkin是支持HTTP输入的。
    • 生成的span先无阻塞的放入queue中,在task中消费队列的span。后期可以添加上采样频率。
    • 对于DB,Client都加上了tracing

    相关连接

    opentracing
    zipkin
    jaeger

    API接口

    api文档使用swagger标准。

    Example:

    1.  
      from sanic_ms import doc
    2.  
       
    3.  
      @user_bp.post('/')
    4.  
      @doc.summary('create user')
    5.  
      @doc.description('create user info')
    6.  
      @doc.consumes(Users)
    7.  
      @doc.produces({'id': int})
    8.  
      async def create_user(request):
    9.  
      data = request['data']
    10.  
      async with request.app.db.transaction(request) as cur:
    11.  
      record = await cur.fetchrow(
    12.  
      """ INSERT INTO users(name, age, city_id, role_id)
    13.  
      VALUES($1, $2, $3, $4, $5)
    14.  
      RETURNING id
    15.  
      """, data['name'], data['age'], data['city_id'], data['role_id']
    16.  
      )
    17.  
      return {'id': record['id']}
    • summary: api概要
    • description: 详细描述
    • consumes: request的body数据
    • produces: response的返回数据
    • tag: API标签
    • 在consumes和produces中传入的参数可以是peewee的model,会解析model生成API数据, 在field字段的help_text参数来表示引用对象
    • http://host:ip/openapi/spec.json 获取生成的json数据

    相关连接

    swagger

    Response 数据

    在返回时,不要返回sanic的response,直接返回原始数据,会在Middleware中对返回的数据进行处理,返回统一的格式,具体的格式可以[查看]

    单元测试

    单元测试使用unittest。 mock是自己创建了MockClient,因为unittest还没有asyncio的mock,并且sanic的测试接口也是发送request请求,所以比较麻烦. 后期可以使用pytest。

    Example:

    1.  
      from sanic_ms.tests import APITestCase
    2.  
      from server import app
    3.  
       
    4.  
      class TestCase(APITestCase):
    5.  
      _app = app
    6.  
      _blueprint = 'visit'
    7.  
       
    8.  
      def setUp(self):
    9.  
      super(TestCase, self).setUp()
    10.  
      self._mock.get('/cities/1',
    11.  
      payload={'id': 1, 'name': 'shanghai'})
    12.  
      self._mock.get('/roles/1',
    13.  
      payload={'id': 1, 'name': 'shanghai'})
    14.  
       
    15.  
      def test_create_user(self):
    16.  
      data = {
    17.  
      'name': 'test',
    18.  
      'age': 2,
    19.  
      'city_id': 1,
    20.  
      'role_id': 1,
    21.  
      }
    22.  
      res = self.client.create_user(data=data)
    23.  
      body = ujson.loads(res.text)
    24.  
      self.assertEqual(res.status, 200)
    • 其中_blueprint为blueprint名称
    • 在setUp函数中,使用_mock来注册mock信息, 这样就不会访问真实的服务器, payload为返回的body信息
    • 使用client变量调用各个函数, data为body信息,params为路径的参数信息,其他参数是route的参数

    代码覆盖

    1.  
      coverage erase
    2.  
      coverage run --source . -m sanic_ms tests
    3.  
      coverage xml -o reports/coverage.xml
    4.  
      coverage2clover -i reports/coverage.xml -o reports/clover.xml
    5.  
      coverage html -d reports
    • coverage2colver 是将coverage.xml 转换成 clover.xml,bamboo需要的格式是clover的。

    相关连接

    unittest
    coverage

    异常处理

    使用 app.error_handler = CustomHander() 对抛出的异常进行处理

    Example:

    1.  
      from sanic_ms.exception import ServerError
    2.  
       
    3.  
      @visit_bp.delete('/users/<id:int>')
    4.  
      async def del_user(request, id):
    5.  
      raise ServerError(error='内部错误',code=10500, message="msg")
      • code: 错误码,无异常时为0,其余值都为异常
      • message: 状态码信息
      • error: 自定义错误信息
      • status_code: http状态码,使用标准的http状态码
  • 相关阅读:
    spring boot +mybatis 操作sqlite数据库
    katalon studio教程之通过录制/回放创建测试用例
    #katalon studio# 安装和设置(Installation and Setup)
    NET Core 基于Aspect Injector 实现面向AOP编程
    NET Core 3.1使用AutoMapper实现对象映射
    给NET core 智能感知提示安装中文汉化包
    代码注释规范
    软件升级版本号迭代规范-Semantic Versioning
    使用阿里云的Helm私有仓库
    Helm操作指南
  • 原文地址:https://www.cnblogs.com/ExMan/p/11134966.html
Copyright © 2011-2022 走看看