zoukankan      html  css  js  c++  java
  • 基于Python-Flask的权限管理5:字典管理

    一、前言

    在系统设计中,我们希望很多可变的内容能够可配置化,比如有个多选按钮,显示某个值得类型(float,int,string,dict),如果有个地方能够配置这些值,这样需要修改的话就不用更新前端了。字典管理就很好的实现了这一功能,通过网页配置,只要添加或修改了某个值,所有的组件内容都会变化。

    二、功能实现

    1.实体类ORM

    from models.BaseModel import BaseModel
    from db import db
    
    
    class Dict_Data(BaseModel):
        """
        字典数据表
        """
        __tablename__ = "t_dict_data"
        id = db.Column(db.Integer, primary_key=True, autoincrement=True, comment="id")
        dict_id = db.Column(db.Integer, nullable=False, comment="dict_id")
        dict_sort = db.Column(db.Integer, comment="字典排序")
        dict_label = db.Column(db.String(100), comment="字典标签")
        dict_number = db.Column(db.Integer, comment="字典值")
        dict_value = db.Column(db.String(100), default="", comment="字典键值")
        dict_type = db.Column(db.String(100), comment="字典类型")
        dict_value_type = db.Column(db.Integer, comment="标识")
        css_class = db.Column(db.String(100), comment="样式属性(其他样式扩展)")
        list_class = db.Column(db.String(100), comment="表格回显样式")
        is_default = db.Column(db.Integer, default=1, comment="是否默认(1是 0否)")
        status = db.Column(db.INTEGER, default=1, comment="状态(1正常 2停用)")
    class Dict_Type(BaseModel):
        """
        字典类型表
        """
        __tablename__ = "t_dict_type"
        id = db.Column(db.Integer, primary_key=True, autoincrement=True, comment="ID")
        dict_name = db.Column(db.String(100), comment="字典名称")
        dict_type = db.Column(db.String(100), index=True, comment="字典类型")
        dict_value_type = db.Column(db.Integer, comment="标识")
        status = db.Column(db.Integer, default=1, comment="状态(1正常 2停用)")

    2.permission下新建dict.py和dict_data.py,并在app.py注册蓝图

    from permission import *
    
    dict = Blueprint('dict', __name__)
    from permission import *
    
    dictData = Blueprint('dictData', __name__)
    app.register_blueprint(dict.dict, url_prefix='/api/dict')
    app.register_blueprint(dict_data.dictData, url_prefix='/api/dictData')

    3.字典数据的增删改查实现

    # !/usr/bin/python3
    # -*- coding: utf-8 -*-
    """
    @Author         :  Huguodong
    @Version        :  
    ------------------------------------
    @File           :  dict_route.py
    @Description    :  字典数据
    @CreateTime     :  2020/3/14 16:10
    ------------------------------------
    @ModifyTime     :  
    """
    from permission import *
    
    dictData = Blueprint('dictData', __name__)
    
    
    @dictData.route('/index', methods=["POST"])
    def index():
        res_dir = request.get_json()
        if res_dir is None:
            return NO_PARAMETER()
        dict_type = res_dir.get("dict_type")
        dict_id = res_dir.get("dict_id")
        page = res_dir.get("page")
        page_size = res_dir.get("page_size")
        dict_label = res_dir.get("dict_label")
        dict_number = res_dir.get("dict_number")
        status = res_dir.get("status")
        order_column_name = res_dir.get("order_column_name")
        order_type = res_dir.get("order_type")
        if not page or page <= 0:
            page = 1
        if not page_size or page_size <= 0:
            page_size = 10
    
        if dict_type and dict_id is None:
            dict_data = Dict_Data.query.filter(Dict_Data.dict_type == dict_type).order_by('dict_sort').paginate(page,
                                                                                                                page_size,
                                                                                                                error_out=False)
            data = construct_page_data(dict_data)  # 格式化返回数据
            return jsonify(code=Code.SUCCESS.value, msg="ok", data=data)
        elif dict_id is not None:
            dict_data = Dict_Data.query
            if dict_id != 0:
                dict_data = dict_data.filter(Dict_Data.dict_id == dict_id)
            if dict_label:
                dict_data = dict_data.filter(Dict_Data.dict_label.like("%" + dict_label + "%"))
            if dict_number:
                dict_data = dict_data.filter(Dict_Data.dict_number == int(dict_number))
            if dict_type:
                dict_data = dict_data.filter(Dict_Data.dict_type.like("%" + dict_type + "%"))
            if status is not None:
                dict_data = dict_data.filter(Dict_Data.status.in_((1, 2))) if status == 0 else dict_data.filter(
                    Dict_Data.status == status)
            if order_column_name and order_type and order_type.lower() in ['asc', 'desc']:
                dict_data = dict_data.order_by(text(f"{order_column_name} {order_type}"))
    
            result = dict_data.paginate(page, page_size, error_out=False)
            data = construct_page_data(result)  # 格式化返回数据
            return SUCCESS(data=data)
        else:
            return PARAMETER_ERR()
    
    
    @dictData.route('/create', methods=["PUT"])
    def create():
        res_dir = request.get_json()
        if res_dir is None:
            return NO_PARAMETER()
        dict_id = res_dir.get("dict_id")
        dict_label = res_dir.get("dict_label")
        dict_number = res_dir.get("dict_number")
        css_class = res_dir.get("css_class")
        list_class = res_dir.get("list_class")
        is_default = res_dir.get("is_default")
        dict_sort = res_dir.get("dict_sort")
        dict_value = res_dir.get("dict_value")
        status = res_dir.get("status")
        remark = res_dir.get("remark")
        if dict_id and dict_label if dict_label is not None else dict_value:
            try:
                token = request.headers["Authorization"]
                user = verify_token(token)
                dict_type = Dict_Type.query.get(dict_id).dict_type
                model = Dict_Data()
                model.dict_id = dict_id
                model.dict_type = dict_type
                model.dict_label = dict_label
                model.dict_number = dict_number
                model.css_class = css_class
                model.list_class = list_class
                model.is_default = is_default
                model.dict_sort = dict_sort
                model.dict_value = dict_value
                model.remark = remark
                model.status = status
                model.create_by = user['name']
                model.save()
                return SUCCESS()
            except Exception as e:
                app.logger.error(f"创建字典数据失败:{e}")
                return CREATE_ERROR()
        else:
            return NO_PARAMETER()
    
    
    @dictData.route('/update', methods=["POST", "PUT"])
    def update():
        '''
        更新字典数据
        POST方法根据id返回数据
        PUT方法更新
        :return:
        '''
        res_dir = request.get_json()
        if res_dir is None:
            return NO_PARAMETER()
        if request.method == "POST":
            id = res_dir.get("id")
            if id:
                model = Dict_Data.query.get(id)
                if model:
                    dict_data = model_to_dict(model)
                    return SUCCESS(dict_data)
                else:
                    return ID_NOT_FOUND()
            else:
                PARAMETER_ERR()
        if request.method == "PUT":
            id = res_dir.get("id")
            dict_id = res_dir.get("dict_id")
            dict_label = res_dir.get("dict_label")
            dict_number = res_dir.get("dict_number")
            css_class = res_dir.get("css_class")
            list_class = res_dir.get("list_class")
            is_default = res_dir.get("is_default")
            dict_sort = res_dir.get("dict_sort")
            status = res_dir.get("status")
            remark = res_dir.get("remark")
            if id and dict_id and dict_label and dict_number:
                model = Dict_Data.query.get(id)
                if model:
                    token = request.headers["Authorization"]
                    user = verify_token(token)
                    model.dict_id = dict_id
                    model.dict_label = dict_label
                    model.dict_number = dict_number
                    model.css_class = css_class
                    model.list_class = list_class
                    model.is_default = is_default
                    model.dict_sort = dict_sort
                    model.remark = remark
                    model.status = status
                    model.update_by = user['name']
                    try:
                        model.update()
                        return SUCCESS()
                    except Exception as e:
                        app.logger.error(f"更新字典数据失败:{e}")
                        return UPDATE_ERROR()
                else:
                    return ID_NOT_FOUND()
            else:
                return NO_PARAMETER()
    
    
    @dictData.route('/delete', methods=["DELETE"])
    def delete():
        '''
        根据id删除字典数据
        :return:
        '''
        res_dir = request.get_json()
        if res_dir is None:
            return NO_PARAMETER()
        dict_id = res_dir.get("id")
        if dict_id:
            try:
                model = Dict_Data.query.get(dict_id)
                if model:
                    model.delete()
                    return SUCCESS()
                else:
                    return ID_NOT_FOUND()
            except Exception as e:
                app.logger.error(f"删除失败:{e}")
                return DELETE_ERROR()
        else:
            return PARAMETER_ERR()
    View Code

    4.字典管理的增删改查实现

    # !/usr/bin/python3
    # -*- coding: utf-8 -*-
    """
    @Author         :  Huguodong
    @Version        :  
    ------------------------------------
    @File           :  dict_route.py.py
    @Description    :  
    @CreateTime     :  2020/3/16 21:45
    ------------------------------------
    @ModifyTime     :  
    """
    from permission import *
    
    dict = Blueprint('dict', __name__)
    
    
    @dict.route('/index', methods=["POST"])
    def index():
        res_dir = request.get_json()
        if res_dir is None:
            return NO_PARAMETER()
        model = Dict_Type.query
        dict_name = res_dir.get("dict_name")
        dict_type = res_dir.get("dict_type")
        page = res_dir.get("page")
        page_size = res_dir.get("page_size")
        status = res_dir.get("status")
        order_column_name = res_dir.get("order_column_name")
        order_type = res_dir.get("order_type")
        try:
            if dict_name:
                model = model.filter(Dict_Type.dict_name.like("%" + dict_name + "%"))
            if dict_type:
                model = model.filter(Dict_Type.dict_type.like("%" + dict_type + "%"))
            if status is not None:
                model = model.filter(Dict_Type.status.in_((1, 2))) if status == 0 else model.filter(
                    Dict_Type.status == status)
            if order_column_name and order_type and order_type.lower() in ['asc', 'desc']:
                model = model.order_by(text(f"{order_column_name} {order_type}"))
            if not page or page <= 0:
                page = 1
            if not page_size or page_size <= 0:
                page_size = 10
            result = model.paginate(page, page_size, error_out=False)
            data = construct_page_data(result)
            return SUCCESS(data=data)
        except Exception as e:
            app.logger.error(f"获取字典失败:{e}")
            return REQUEST_ERROR()
    
    
    @dict.route('/update', methods=["POST", "PUT"])
    def update():
        '''
        更新字典
        POST方法根据id返回数据
        PUT方法更新
        :return:
        '''
        res_dir = request.get_json()
        if res_dir is None:
            return NO_PARAMETER()
        if request.method == "POST":
            id = res_dir.get("id")
            if id:
                model = Dict_Type.query.get(id)
                if model:
                    data = model_to_dict(model)
                    return SUCCESS(data)
                else:
                    return ID_NOT_FOUND()
            else:
                PARAMETER_ERR()
        if request.method == "PUT":
            id = res_dir.get("id")
            dict_value_type = res_dir.get("dict_value_type")
            dict_name = res_dir.get("dict_name")
            dict_type = res_dir.get("dict_type")
            status = res_dir.get("status")
            dict_id = res_dir.get("dict_id")
            if id and dict_type and dict_name and dict_value_type:
                model = Dict_Type.query.get(id)
                if model:
                    token = request.headers["Authorization"]
                    user = verify_token(token)
                    model.dict_name = dict_name
                    model.dict_type = dict_type
                    model.dict_value_type = dict_value_type
                    model.status = status
                    model.dict_id = dict_id
                    model.update_by = user['name']
                    try:
                        model.update()
                        return SUCCESS()
                    except Exception as e:
                        app.logger.error(f"更新字典失败:{e}")
                        return UPDATE_ERROR()
                else:
                    return ID_NOT_FOUND()
            else:
                return NO_PARAMETER()
    
    
    @dict.route('/create', methods=["PUT"])
    def create():
        '''
        创建字典
        :return:
        '''
        res_dir = request.get_json()
        if res_dir is None:
            return NO_PARAMETER()
        dict_name = res_dir.get("dict_name")
        dict_type = res_dir.get("dict_type")
        dict_value_type = res_dir.get("dict_value_type")
        remark = res_dir.get("remark")
        status = res_dir.get("status")
        if dict_name and dict_type and dict_value_type:
            try:
                is_exist = Dict_Type.query.filter(Dict_Type.dict_type == dict_type).first()
                if is_exist:
                    return CREATE_ERROR(msg="字典类型已存在")
                token = request.headers["Authorization"]
                user = verify_token(token)
                model = Dict_Type()
                model.dict_name = dict_name
                model.dict_type = dict_type
                model.dict_value_type = dict_value_type
                model.remark = remark
                model.status = status
                model.create_by = user['name']
                model.save()
                return SUCCESS()
            except Exception as e:
                app.logger.error(f"新建字典失败:{e}")
                return CREATE_ERROR()
        else:
            return NO_PARAMETER()
    
    
    @dict.route('/delete', methods=["DELETE"])
    def delete():
        '''
        根据ID删除字典
        :return:
        '''
        res_dir = request.get_json()
        if res_dir is None:
            return NO_PARAMETER()
        dict_id = res_dir.get("id")
        if dict_id:
            try:
                is_exist = Dict_Data.query.filter(Dict_Data.dict_id == dict_id).first()
                if is_exist:
                    return DELETE_ERROR(msg="该字典存在数据,无法删除!")
                model = Dict_Type.query.get(dict_id)
                if model:
                    model.delete()
                    return SUCCESS()
                else:
                    return ID_NOT_FOUND()
            except Exception as e:
                app.logger.error(f"删除字典失败:{e}")
                return DELETE_ERROR()
        else:
            return PARAMETER_ERR()
    View Code
  • 相关阅读:
    【BZOJ 2324】 [ZJOI2011]营救皮卡丘
    【BZOJ 2809】 [Apio2012]dispatching
    网络流小结
    复活
    终结
    11.7模拟赛
    codevs 2173 忠诚
    P3386 【模板】二分图匹配
    Leetcode 大部分是medium难度不怎么按顺序题解(上)
    ATP的新博客!
  • 原文地址:https://www.cnblogs.com/huguodong/p/12592320.html
Copyright © 2011-2022 走看看