使用sqlalchemy 使mysq自动读写分离:
代码如下:
from flask import Flask from flask_sqlalchemy import SQLAlchemy, SignallingSession, get_state from sqlalchemy import orm app = Flask(__name__) app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@192.168.105.134:3306/test30' # 设置数据库连接地址 app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False # 是否追踪数据库变化(触发某些钩子函数), 开启后效率会变 app.config['SQLALCHEMY_ECHO'] = True # 开启后, 控制台会打印底层执行的SQL语句 app.config['SQLALCHEMY_BINDS'] = { # get_engine的bind参数为该配置的键 'master': 'mysql://root:mysql@192.168.105.134:3306/test30', 'slave': 'mysql://root:mysql@192.168.105.134:8306/test30' } class RoutingSession(SignallingSession): def get_bind(self, mapper=None, clause=None): """当进行数据操作时, 会调用该方法来获取进行该操作的数据库引擎(连接)""" state = get_state(self.app) if self._flushing: # 增删改操作, 使用主库 print('使用主库') return state.db.get_engine(self.app, bind='master') else: # 读操作, 使用从库 print('使用从库') return state.db.get_engine(self.app, bind='slave') class RoutingSQLAlchemy(SQLAlchemy): def create_session(self, options): return orm.sessionmaker(class_=RoutingSession, db=self, **options) # 初始化组件(建立数据库连接) db = RoutingSQLAlchemy(app) # ORM 类->表 类属性->字段 对象->记录 class User(db.Model): __tablename__ = "t_user" # 设置表名, 默认为类名的小写 id = db.Column(db.Integer, primary_key=True) # 主键 name = db.Column(db.String(20), unique=True, nullable=False) # 设置唯一&非空约束 age = db.Column(db.Integer, default=10, index=True) # 设置默认值约束&建立索引 @app.route('/') def index(): # 增加数据 进行检验是否读写分离 # 1.创建模型对象 user1 = User(name='zs', age=20) db.session.add(user1) db.session.commit() print('-' * 30) # 查询数据 print(User.query.all()) return "index" if __name__ == '__main__': db.drop_all() # 删除所有继承自db.Model的表 db.create_all() # 创建所有继承自db.Model的表 app.run(debug=True)
手动时mysql读写分离:
1,修改源码:routing_sqlalchemy.py
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
import random from flask_sqlalchemy import SQLAlchemy, BaseQuery, Model, SignallingSession, get_state from sqlalchemy import orm class Config: SQLALCHEMY_BINDS = { "bj_m1": 'mysql://root:mysql@192.168.105.134:3306/test30', "bj_m2": 'mysql://root:mysql@192.168.105.134:3306/test30', "bj_s1": 'mysql://root:mysql@192.168.105.134:8306/test30', "bj_s2": 'mysql://root:mysql@192.168.105.134:8306/test30', } SQLALCHEMY_CLUSTER = { "masters": ["bj_m1", "bj_m2"], "slaves": ['bj_s1', 'bj_s2'], "default": 'bj_m1' } SQLALCHEMY_TRACK_MODIFICATIONS = False SQLALCHEMY_ECHO = False class RoutingSession(SignallingSession): def __init__(self, db, autocommit=False, autoflush=True, **options): SignallingSession.__init__(self, db, autocommit=autocommit, autoflush=autoflush, **options) self.default_key = db.default_key self.master_keys = db.master_keys if len(db.master_keys) else self.default_key self.slave_keys = db.slave_keys if len(db.slave_keys) else self.default_key self.bind_key = None def get_bind(self, mapper=None, clause=None): """获取会话使用的数据库连接engine""" state = get_state(self.app) if self.bind_key: # 指定 print('Using DB bind: _name={}'.format(self.bind_key)) return state.db.get_engine(self.app, bind=self.bind_key) else: # 默认数据库 print('Using default DB bind: _name={}'.format(self.default_key)) return state.db.get_engine(self.app, bind=self.default_key) def set_to_write(self): """使用写数据库""" self.bind_key = random.choice(self.master_keys) def set_to_read(self): """使用读数据库""" self.bind_key = random.choice(self.slave_keys) class RoutingSQLAlchemy(SQLAlchemy): def init_app(self, app): config_binds = app.config.get("SQLALCHEMY_BINDS") if not config_binds: raise RuntimeError('Missing SQLALCHEMY_BINDS config') cluster = app.config.get("SQLALCHEMY_CLUSTER") if not cluster: raise RuntimeError('Missing SQLALCHEMY_CLUSTER config') default_key = cluster.get('default') if not default_key: raise KeyError("deafult is not in SQLALCHEMY_CLUSTER") # 生成并保存数据库引擎 self.master_keys = cluster.get("masters") or [] self.slave_keys = cluster.get("slaves") or [] self.default_key = default_key super(RoutingSQLAlchemy, self).init_app(app) def create_session(self, options): return orm.sessionmaker(class_=RoutingSession, db=self, **options)
2, __init__.py下导入routing_sqlalchemy.py 中的类创建 db
from .routing_sqlalchemy import RoutingSQLAlchemy db = RoutingSQLAlchemy()
3,定义装饰器类,实现读写分离
import functools from user_select import db def set_read_db(f): @functools.wraps(f) def wrapper(*args, **kwargs): db.session().set_to_read() return f(*args, **kwargs) return wrapper def set_write_db(f): @functools.wraps(f) def wrapper(*args, **kwargs): db.session().set_to_write() return f(*args, **kwargs) return wrapper
4,使用装饰器方式,调用读写分离,如果用装饰器,都使用主库进行读写
from flask import Flask from user_select import db from user_select.routing_sqlalchemy import Config from user_select.decorator import set_read_db, set_write_db app = Flask(__name__) app.config.from_object(Config) # 初始化数据库连接对象 db.init_app(app) # 建立映射模型 类->表 类属性->字段 对象->记录 class User(db.Model): __tablename__ = 't_user' # 设置表名 表名默认为类名小写 id = db.Column(db.Integer, primary_key=True) # 主键 默认主键自增 name = db.Column(db.String(20), unique=True) # 设置唯一 age = db.Column(db.Integer) @app.route('/') @set_write_db def index(): """增加数据""" user1 = User(name='lisi', age=20) db.session.add(user1) db.session.commit() return "index" @app.route('/demo1') # @set_read_db def demo1(): users = User.query.all() print(users) return 'demo1' if __name__ == '__main__': # db.drop_all() # 删除所有继承自db.Model的表 # db.create_all() # 创建所有继承自db.Model的表 app.run(debug=True)