一、前言
在系统设计中,我们希望很多可变的内容能够可配置化,比如有个多选按钮,显示某个值得类型(float,int,string,dict),如果有个地方能够配置这些值,这样需要修改的话就不用更新前端了。字典管理就很好的实现了这一功能,通过网页配置,只要添加或修改了某个值,所有的组件内容都会变化。
二、功能实现
1.实体类ORM
from models.BaseModel import BaseModel from db import db class Dict_Data(BaseModel): """ 字典数据表 """ __tablename__ = "t_dict_data" id = db.Column(db.Integer, primary_key=True, autoincrement=True, comment="id") dict_id = db.Column(db.Integer, nullable=False, comment="dict_id") dict_sort = db.Column(db.Integer, comment="字典排序") dict_label = db.Column(db.String(100), comment="字典标签") dict_number = db.Column(db.Integer, comment="字典值") dict_value = db.Column(db.String(100), default="", comment="字典键值") dict_type = db.Column(db.String(100), comment="字典类型") dict_value_type = db.Column(db.Integer, comment="标识") css_class = db.Column(db.String(100), comment="样式属性(其他样式扩展)") list_class = db.Column(db.String(100), comment="表格回显样式") is_default = db.Column(db.Integer, default=1, comment="是否默认(1是 0否)") status = db.Column(db.INTEGER, default=1, comment="状态(1正常 2停用)")
class Dict_Type(BaseModel): """ 字典类型表 """ __tablename__ = "t_dict_type" id = db.Column(db.Integer, primary_key=True, autoincrement=True, comment="ID") dict_name = db.Column(db.String(100), comment="字典名称") dict_type = db.Column(db.String(100), index=True, comment="字典类型") dict_value_type = db.Column(db.Integer, comment="标识") status = db.Column(db.Integer, default=1, comment="状态(1正常 2停用)")
2.permission下新建dict.py和dict_data.py,并在app.py注册蓝图
from permission import * dict = Blueprint('dict', __name__)
from permission import * dictData = Blueprint('dictData', __name__)
app.register_blueprint(dict.dict, url_prefix='/api/dict') app.register_blueprint(dict_data.dictData, url_prefix='/api/dictData')
3.字典数据的增删改查实现
# !/usr/bin/python3 # -*- coding: utf-8 -*- """ @Author : Huguodong @Version : ------------------------------------ @File : dict_route.py @Description : 字典数据 @CreateTime : 2020/3/14 16:10 ------------------------------------ @ModifyTime : """ from permission import * dictData = Blueprint('dictData', __name__) @dictData.route('/index', methods=["POST"]) def index(): res_dir = request.get_json() if res_dir is None: return NO_PARAMETER() dict_type = res_dir.get("dict_type") dict_id = res_dir.get("dict_id") page = res_dir.get("page") page_size = res_dir.get("page_size") dict_label = res_dir.get("dict_label") dict_number = res_dir.get("dict_number") status = res_dir.get("status") order_column_name = res_dir.get("order_column_name") order_type = res_dir.get("order_type") if not page or page <= 0: page = 1 if not page_size or page_size <= 0: page_size = 10 if dict_type and dict_id is None: dict_data = Dict_Data.query.filter(Dict_Data.dict_type == dict_type).order_by('dict_sort').paginate(page, page_size, error_out=False) data = construct_page_data(dict_data) # 格式化返回数据 return jsonify(code=Code.SUCCESS.value, msg="ok", data=data) elif dict_id is not None: dict_data = Dict_Data.query if dict_id != 0: dict_data = dict_data.filter(Dict_Data.dict_id == dict_id) if dict_label: dict_data = dict_data.filter(Dict_Data.dict_label.like("%" + dict_label + "%")) if dict_number: dict_data = dict_data.filter(Dict_Data.dict_number == int(dict_number)) if dict_type: dict_data = dict_data.filter(Dict_Data.dict_type.like("%" + dict_type + "%")) if status is not None: dict_data = dict_data.filter(Dict_Data.status.in_((1, 2))) if status == 0 else dict_data.filter( Dict_Data.status == status) if order_column_name and order_type and order_type.lower() in ['asc', 'desc']: dict_data = dict_data.order_by(text(f"{order_column_name} {order_type}")) result = dict_data.paginate(page, page_size, error_out=False) data = construct_page_data(result) # 格式化返回数据 return SUCCESS(data=data) else: return PARAMETER_ERR() @dictData.route('/create', methods=["PUT"]) def create(): res_dir = request.get_json() if res_dir is None: return NO_PARAMETER() dict_id = res_dir.get("dict_id") dict_label = res_dir.get("dict_label") dict_number = res_dir.get("dict_number") css_class = res_dir.get("css_class") list_class = res_dir.get("list_class") is_default = res_dir.get("is_default") dict_sort = res_dir.get("dict_sort") dict_value = res_dir.get("dict_value") status = res_dir.get("status") remark = res_dir.get("remark") if dict_id and dict_label if dict_label is not None else dict_value: try: token = request.headers["Authorization"] user = verify_token(token) dict_type = Dict_Type.query.get(dict_id).dict_type model = Dict_Data() model.dict_id = dict_id model.dict_type = dict_type model.dict_label = dict_label model.dict_number = dict_number model.css_class = css_class model.list_class = list_class model.is_default = is_default model.dict_sort = dict_sort model.dict_value = dict_value model.remark = remark model.status = status model.create_by = user['name'] model.save() return SUCCESS() except Exception as e: app.logger.error(f"创建字典数据失败:{e}") return CREATE_ERROR() else: return NO_PARAMETER() @dictData.route('/update', methods=["POST", "PUT"]) def update(): ''' 更新字典数据 POST方法根据id返回数据 PUT方法更新 :return: ''' res_dir = request.get_json() if res_dir is None: return NO_PARAMETER() if request.method == "POST": id = res_dir.get("id") if id: model = Dict_Data.query.get(id) if model: dict_data = model_to_dict(model) return SUCCESS(dict_data) else: return ID_NOT_FOUND() else: PARAMETER_ERR() if request.method == "PUT": id = res_dir.get("id") dict_id = res_dir.get("dict_id") dict_label = res_dir.get("dict_label") dict_number = res_dir.get("dict_number") css_class = res_dir.get("css_class") list_class = res_dir.get("list_class") is_default = res_dir.get("is_default") dict_sort = res_dir.get("dict_sort") status = res_dir.get("status") remark = res_dir.get("remark") if id and dict_id and dict_label and dict_number: model = Dict_Data.query.get(id) if model: token = request.headers["Authorization"] user = verify_token(token) model.dict_id = dict_id model.dict_label = dict_label model.dict_number = dict_number model.css_class = css_class model.list_class = list_class model.is_default = is_default model.dict_sort = dict_sort model.remark = remark model.status = status model.update_by = user['name'] try: model.update() return SUCCESS() except Exception as e: app.logger.error(f"更新字典数据失败:{e}") return UPDATE_ERROR() else: return ID_NOT_FOUND() else: return NO_PARAMETER() @dictData.route('/delete', methods=["DELETE"]) def delete(): ''' 根据id删除字典数据 :return: ''' res_dir = request.get_json() if res_dir is None: return NO_PARAMETER() dict_id = res_dir.get("id") if dict_id: try: model = Dict_Data.query.get(dict_id) if model: model.delete() return SUCCESS() else: return ID_NOT_FOUND() except Exception as e: app.logger.error(f"删除失败:{e}") return DELETE_ERROR() else: return PARAMETER_ERR()
4.字典管理的增删改查实现
# !/usr/bin/python3 # -*- coding: utf-8 -*- """ @Author : Huguodong @Version : ------------------------------------ @File : dict_route.py.py @Description : @CreateTime : 2020/3/16 21:45 ------------------------------------ @ModifyTime : """ from permission import * dict = Blueprint('dict', __name__) @dict.route('/index', methods=["POST"]) def index(): res_dir = request.get_json() if res_dir is None: return NO_PARAMETER() model = Dict_Type.query dict_name = res_dir.get("dict_name") dict_type = res_dir.get("dict_type") page = res_dir.get("page") page_size = res_dir.get("page_size") status = res_dir.get("status") order_column_name = res_dir.get("order_column_name") order_type = res_dir.get("order_type") try: if dict_name: model = model.filter(Dict_Type.dict_name.like("%" + dict_name + "%")) if dict_type: model = model.filter(Dict_Type.dict_type.like("%" + dict_type + "%")) if status is not None: model = model.filter(Dict_Type.status.in_((1, 2))) if status == 0 else model.filter( Dict_Type.status == status) if order_column_name and order_type and order_type.lower() in ['asc', 'desc']: model = model.order_by(text(f"{order_column_name} {order_type}")) if not page or page <= 0: page = 1 if not page_size or page_size <= 0: page_size = 10 result = model.paginate(page, page_size, error_out=False) data = construct_page_data(result) return SUCCESS(data=data) except Exception as e: app.logger.error(f"获取字典失败:{e}") return REQUEST_ERROR() @dict.route('/update', methods=["POST", "PUT"]) def update(): ''' 更新字典 POST方法根据id返回数据 PUT方法更新 :return: ''' res_dir = request.get_json() if res_dir is None: return NO_PARAMETER() if request.method == "POST": id = res_dir.get("id") if id: model = Dict_Type.query.get(id) if model: data = model_to_dict(model) return SUCCESS(data) else: return ID_NOT_FOUND() else: PARAMETER_ERR() if request.method == "PUT": id = res_dir.get("id") dict_value_type = res_dir.get("dict_value_type") dict_name = res_dir.get("dict_name") dict_type = res_dir.get("dict_type") status = res_dir.get("status") dict_id = res_dir.get("dict_id") if id and dict_type and dict_name and dict_value_type: model = Dict_Type.query.get(id) if model: token = request.headers["Authorization"] user = verify_token(token) model.dict_name = dict_name model.dict_type = dict_type model.dict_value_type = dict_value_type model.status = status model.dict_id = dict_id model.update_by = user['name'] try: model.update() return SUCCESS() except Exception as e: app.logger.error(f"更新字典失败:{e}") return UPDATE_ERROR() else: return ID_NOT_FOUND() else: return NO_PARAMETER() @dict.route('/create', methods=["PUT"]) def create(): ''' 创建字典 :return: ''' res_dir = request.get_json() if res_dir is None: return NO_PARAMETER() dict_name = res_dir.get("dict_name") dict_type = res_dir.get("dict_type") dict_value_type = res_dir.get("dict_value_type") remark = res_dir.get("remark") status = res_dir.get("status") if dict_name and dict_type and dict_value_type: try: is_exist = Dict_Type.query.filter(Dict_Type.dict_type == dict_type).first() if is_exist: return CREATE_ERROR(msg="字典类型已存在") token = request.headers["Authorization"] user = verify_token(token) model = Dict_Type() model.dict_name = dict_name model.dict_type = dict_type model.dict_value_type = dict_value_type model.remark = remark model.status = status model.create_by = user['name'] model.save() return SUCCESS() except Exception as e: app.logger.error(f"新建字典失败:{e}") return CREATE_ERROR() else: return NO_PARAMETER() @dict.route('/delete', methods=["DELETE"]) def delete(): ''' 根据ID删除字典 :return: ''' res_dir = request.get_json() if res_dir is None: return NO_PARAMETER() dict_id = res_dir.get("id") if dict_id: try: is_exist = Dict_Data.query.filter(Dict_Data.dict_id == dict_id).first() if is_exist: return DELETE_ERROR(msg="该字典存在数据,无法删除!") model = Dict_Type.query.get(dict_id) if model: model.delete() return SUCCESS() else: return ID_NOT_FOUND() except Exception as e: app.logger.error(f"删除字典失败:{e}") return DELETE_ERROR() else: return PARAMETER_ERR()