zoukankan      html  css  js  c++  java
  • python flask框架搭建以及大佬代码参考

    1.基本搭建

    python3.5 flask框架使用mysql配置(flask学习)
    falsk使用MySQL的方法试了几个终于找到没问题的了,以下是代码:
    #coding=utf-8
    
    from flask import Flask
    from flask_sqlalchemy import SQLAlchemy
    app=Flask(__name__)
    app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://root:root@127.0.0.1/blackwhite'
    app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True
    app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
    db = SQLAlchemy(app)
    
    class User(db.Model):
        __tablename__='bw_user'
        id = db.Column(db.Integer, primary_key=True)
        username = db.Column(db.String(55))
        password = db.Column(db.String(255))
        createTime = db.Column(db.Integer)
        email = db.Column(db.String(55))#, unique=True
    
        def __init__(self, username, password):
            self.username = username
            self.password = password
    
        #__repr__方法告诉Python如何打印class对象,方便我们调试使用。
        def __repr__(self):
            return '<User %r>' % self.username
    
        def checkUser(self):
            if User.query.filter_by(username=self.username,password=self.password).first()!=None:
                return True
            else:
                return False
    
        def save(self):
            db.session.add(self)
            db.session.commit()
    
    引入model:
    
    from models import User
    视图方法:
    @app.route('/login', methods=['GET', 'POST'])
    def login():
        form=loginForm()
        if form.validate_on_submit():
            user=User(form.username.data,form.password.data)
            if user.checkUser():
                session['user']=form.username.data
                return redirect(url_for('index'))
            else:
                flash('用户名或者密码错误')
        return render_template('login.html',form=form,title='登录')
    这就是个简单的是通过MySQL数据库实现用户登录的实例

    2.代码参考文件 api_moniter.py

    # -*- coding: utf-8 -*-
    
    import gevent
    from gevent import monkey; monkey.patch_all()
    from gevent.pywsgi import WSGIServer
    from gevent.pool import Pool
    from flask import Flask, make_response, request, jsonify
    import logging, logging.handlers, sys, json, platform, hashlib, requests, datetime, base64, hmac, time, os
    from pymongo import MongoClient
    from bson import ObjectId
    from functools import wraps
    from email.mime.multipart import MIMEMultipart
    from email.mime.base import MIMEBase
    from email.mime.text import MIMEText
    from email.utils import formatdate
    from email import encoders
    import smtplib
    
    if platform.system() != 'Windows':
        host_mongo, port_mongo = '192.168.1.25', 10000
        handler = logging.handlers.RotatingFileHandler("/home/guozx/log/api_moniter.log", maxBytes=1024 * 1024 * 5, backupCount=5)
    else:
        host_mongo, port_mongo = '127.0.0.1', 27017
        handler = logging.StreamHandler()
    
    
    app = Flask(__name__)
    route = app.route
    app.debug = False
    
    logging.basicConfig(
        level=logging.DEBUG,
        format=u"%(asctime)s - %(name)s - %(levelname)s - %(funcName)s - %(message)s",
    )
    formatter = logging.Formatter(u"%(asctime)s - %(name)s - %(levelname)s - %(funcName)s - %(message)s")
    handler.setFormatter(formatter)
    mylog = logging.getLogger('x')
    mylog.propagate = 0
    mylog.addHandler(handler)
    
    conn = MongoClient(host=host_mongo, port=port_mongo)
    db = conn['api_moniter']
    
    db_p = db['project']
    db_p.create_index([('name', 1), ], unique=True)
    
    db_pc = db['project_config']
    db_pc.create_index([('project_id', 1), ], unique=True)
    
    db_api = db['api']
    db_api.create_index([('url', 1), ('param', 1)], unique=True)
    
    db_arl = db['api_run_log']
    
    def allow_jsonp(fun):   #装饰器允许跨域
        @wraps(fun)
        def wrapper_fun(*args, **kwargs):
            mylog.info(u"%s%s" % (request, request.values.items() if request.method != 'GET' else ''))
            res = fun(*args, **kwargs)
            res = "%s(%s)" % (request.values.get('callback'), res) if request.values.get('callback', None) else res
            rst = make_response(res)
            rst.headers['Access-Control-Allow-Origin'] = '*'
            rst.headers['Access-Control-Allow-Headers'] = "accept, origin, content-type, content-length"
            rst.headers['Access-Control-Allow-Methods'] = 'GET,PUT,POST,DELETE,OPTIONS'
            rst.headers['Content-Type'] = 'application/json; charset=UTF-8'
            return rst
        return wrapper_fun
    
    def send_mail(server, consignee, title, text, files=[], timeout=180, subtype='plain'):
        msg = MIMEMultipart()
        msg['From'] = server['user']
        msg['Subject'] = title
        msg['To'] = ",".join(consignee)
        msg['Date'] = formatdate(localtime=True)
        msg.attach(MIMEText(text, _subtype=subtype, _charset='utf-8'))
    
        coding = 'utf-8' if "Linux" == platform.system() else 'gbk'
        for file in files:
            part = MIMEBase('application', 'octet-stream')
            part.set_payload(open(file,'rb').read())
            encoders.encode_base64(part)
            part.add_header('Content-Disposition', 'attachment',filename="=?utf-8?b?%s?="%base64.b64encode(os.path.basename(file).decode(coding).encode('utf-8')))
            msg.attach(part)
    
        smtp = smtplib.SMTP(server['host'],server['port'] if server.get('port') else 0, timeout=timeout)
        smtp.login(server['user'], server['passwd'])
        smtp.sendmail(server['user'],consignee, msg.as_string())
        smtp.close()
    
    send_mail({'host': "smtp.163.com", 'user': "url_giiso@163.com", 'passwd': "giiso123", "port":465}, ['guozx@giiso.com'], "API监控异常报告", 'text', subtype='html')
    xxx
    def emaillog(text, consignee=["76040100@qq.com"]):
        try:
            text = u"""
            <textarea style="font-size: 12px !important; resize: both;  100%; height:100%;" spellcheck="false" rows="{0}">
            {1}
            </textarea>
            """.format(text.count('
    ') + 5, text)
            send_mail({'host': "smtp.qq.com", 'user': "guozx@giiso.com", 'passwd': "RiUt4Hed6MNegkj3"}, consignee, "API监控异常报告", text, subtype='html')
        except Exception as e:
            mylog.exception(e)
    
    def timed_task_thread(interval=600):
        while 1:
            try:
                data = list(db_api.find())
            except Exception as e:
                mylog.exception(e)
                emaillog(u"get mongodb data error: " % e)
            else:
                idDict = {}
                for d in data:
                    id = str(d['_id'])
                    x = {'name': d['name'], 'url': d['url'], 'param': d['param'], 'id': id}
                    timed_task = d.get('timed_task')
                    if timed_task:
                        if timed_task.get('auto_run', 1):
                            idDict[id] = x
                    else:
                        idDict[id] = x
                mylog.info("start run task len: %s" % len(idDict))
                errorDict = {}
                for id, v in idDict.items():
                    rData, consignee = _api_run(id, run_mode=1)
                    consignee = ','.join(sorted(consignee.split(','))) if consignee else consignee
                    if rData['code'] != 0:
                        rData.update(v)
                        if consignee in errorDict.keys():
                            errorDict[consignee]['error'].append(rData)
                            errorDict[consignee]['name'].append(v['name'])
                        else:
                            errorDict[consignee] = {'error': [rData], 'name': [v['name']]}
                for k, v in errorDict.iteritems():
                    if k:
                        emaillog(json.dumps(v, indent=4, ensure_ascii=False, sort_keys=True), k.split(','))
                mylog.info(u"end run task ids: %s" % json.dumps(dict([(k, v['name']) for k, v in errorDict.iteritems()]), ensure_ascii=False))
            time.sleep(interval)
    gevent.spawn(timed_task_thread)
    
    
    def _data_page(tb, page, limit, find, sort):
        rDict = {'code': 1}
        try:
            findDict = _mongo_find(find)
            sort = _mongo_sort(sort)
            sum = tb.find(findDict).count()
            q = tb.find(findDict)
            if limit:
                q = q.skip((page - 1) * limit).limit(limit)
            if sort:
                q = q.sort(sort)
            rDict['data'] = _id_to_str(q)
            rDict['page'] = page
            rDict['limit'] = limit
            rDict['total'] = sum
            rDict['code'] = 0
        except Exception as e:
            mylog.exception(e)
            rDict['error'] = '%s' % e
        return rDict
    def _id_to_str(d):
        def _x(x):
            x['_id'] = str(x['_id'])
            return x
        return map(_x, d)
    def _mongo_find(find):
        findDict = {}
        if find:
            findDict = json.loads(find)
        return findDict
    def _mongo_sort(sort):
        sl = []
        if sort:
            for s in sort.split(","):
                x = s.split(':')
                sl.append((x[0], int(x[1]) if len(x) == 2 and x[1] == '-1' else 1))
        return sl
    
    def api_run_log_add(logDict):
        # run_mode  运行模式 1系统调用  2接口调用
        logDict2 = {'api_id': None, 'api_name': None, 'run_mode': 1, 'isread': 0, 'code': 1, 'request_time': None,
                   'data': None, 'error': None, 'status_code': 0, 'create_time': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
        logDict2.update(logDict)
        db_arl.insert(logDict2)
    
    def _api_run(id, run_mode=2):
        rDict = {'code': 1}
        consignee = None
        try:
            api = db_api.find_one(ObjectId(id))
            pmc = db_pc.find_one({'project_id': api['project_id']}) if api else None
        except Exception as e:
            mylog.exception(e)
            rDict['error'] = u'%s' % e
        else:
            logDict = {}    #记录log
            if api:
                logDict['api_id'] = id
                logDict['api_name'] = api['name']
                logDict['run_mode'] = run_mode
                if pmc:
                    secretId = pmc['secret_id']
                    secretKey = pmc['secret_key']
                    consignee = pmc['consignee']
                    fDate = (datetime.datetime.now() - datetime.timedelta(hours=8)).strftime('%a, %d %b %Y %H:%M:%S GMT')
    
                    secretData = "date: " + fDate
                    try:
                        sign = base64.b64encode(hmac.new(secretKey.encode('utf-8'), secretData, hashlib.sha1).digest())
                        authorization = "hmac username="" + secretId + "", algorithm="hmac-sha1", headers="date", signature="" + sign + """;
                        headers = {
                            'contentType': 'UTF-8',
                            'Accept-Charset': 'UTF-8',
                            'date': fDate,
                            'Authorization': authorization,
                        }
                        req = requests.request(api['method'], api['url'], timeout=100, params=api['param'].encode('utf-8'), headers=headers)
                        data = req.content.strip()
                        if data.startswith("{") and data.endswith("}"):
                            data = json.loads(data)
                    except Exception as e:
                        mylog.exception(e)
                        rDict['error'] = u'%s' % e
                    else:
                        if req.status_code == 200:
                            rDict['code'] = 0
                        else:
                            rDict['code'] = 1
                            rDict['status_code'] = req.status_code
                        rDict['data'] = data
                        rDict['request_time'] = req.elapsed.microseconds / 1000000.0
                else:
                    rDict['error'] = 'not secret'
            else:
                rDict['error'] = 'not id'
        logDict.update(rDict)
        gevent.spawn(api_run_log_add, logDict)  # 开启协程记录调用记录
        return rDict, consignee
    
    
    @route('/')
    @allow_jsonp
    def index():
        return '''hello word'''
    
    @route('/project/add', methods=['POST', 'GET'])
    @allow_jsonp
    def project_add():
        rDict = {'code': 1}
        d = {
            'parent_id': request.values.get('parent_id'),
            'name': request.values['name'],
            'update_time': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            'description': request.values.get('description'),
        }
        try:
            d['_id'] = str(db_p.insert(d))
            rDict['code'] = 0
            rDict['data'] = d
        except Exception as e:
            mylog.exception(e)
            rDict['error'] = u'%s' % e
        return jsonify(rDict)
    
    @route('/project/update', methods=['POST', 'GET'])
    @allow_jsonp
    def project_update():
        rDict = {'code': 1}
        id = request.values['id']
        d = {
            'parent_id': request.values.get('parent_id'),
            'name': request.values.get('name'),
            'update_time': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            'description': request.values.get('description'),
        }
        [d.pop(k) for k in d.values() if k == None]
        try:
            rd = db_p.update({'_id': ObjectId(id)}, {"$set": d})
            if rd['n'] > 0:
                d['_id'] = id
                rDict['code'] = 0
                rDict['data'] = d
            else:
                rDict['error'] = u'id不存在'
        except Exception as e:
            mylog.exception(e)
            rDict['error'] = u'%s' % e
        return jsonify(rDict)
    
    @route('/project/delete', methods=['POST', 'GET'])
    @allow_jsonp
    def project_delete():
        rDict = {'code': 1}
        id = request.values['id']
        try:
            if db_p.find({'parent_id': id}).count() + db_pc.find({'project_id': id}).count() + db_api.find({'project_id': id}).count() > 0:
                rDict['error'] = u'关联表未删除'
            else:
                rd = db_p.remove({'_id': ObjectId(id)})
                if rd['n'] > 0:
                    rDict['code'] = 0
                else:
                    rDict['error'] = u'id不存在'
        except Exception as e:
            mylog.exception(e)
            rDict['error'] = u'%s' % e
        return jsonify(rDict)
    
    @route('/project/find', methods=['POST', 'GET'])
    @allow_jsonp
    def project_find():
        rDict = {'code': 0}
        isconfig = request.values.get('isconfig')
        isapi = request.values.get('isapi')
        find = request.values.get('find')
        parentDict = {}
        pmDict = {}
        pmList = []
        try:
            findDict = _mongo_find(find) if find else {}
            for d in db_p.find(findDict):
                id = str(d['_id'])
                if not d.get('parent_id'):
                    parentDict[id] = d
                else:
                    pmDict[id] = d
            if parentDict:
                ids = parentDict.keys() + pmDict.keys()
                if isconfig:
                    for d in db_pc.find({'project_id': {'$in': ids}}):
                        pm = parentDict.get(d['project_id'])
                        pm = pm if pm else pmDict[d['project_id']]
                        pm['config'] = d
                if isapi:
                    for d in db_api.find({'project_id': {'$in': ids}}):
                        pm = parentDict.get(d['project_id'])
                        pm = pm if pm else pmDict[d['project_id']]
                        apis = pm.get('apis')
                        if apis:
                            apis.append(d)
                        else:
                            pm['apis'] = [d]
                for k, v in pmDict.items():
                    p = parentDict.get(v['parent_id'])
                    if p:
                        items = p.get('items')
                        if items:
                            items.append(v)
                        else:
                            p['items'] = [v]
                        pmDict.pop(k)
                    else:
                        pmList.append(v)
                pmList += parentDict.itervalues()
            rDict['code'] = 0
            rDict['data'] = pmList
        except Exception as e:
            mylog.exception(e)
            rDict['error'] = u'%s' % e
        return json.dumps(rDict, default=lambda x: str(x) if type(x) == ObjectId else x)
    
    @route('/project_config/add', methods=['POST', 'GET'])
    @allow_jsonp
    def project_config_add():
        rDict = {'code': 1}
        d = {
            'project_id': request.values['project_id'],
            'secret_id': request.values.get('secret_id'),
            'secret_key': request.values.get('secret_key'),
            'consignee': request.values.get('consignee'),
            'update_time':  datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            'description': request.values.get('description'),
        }
        try:
            d['_id'] = str(db_pc.insert(d))
            rDict['code'] = 0
            rDict['data'] = d
        except Exception as e:
            mylog.exception(e)
            rDict['error'] = u'%s' % e
        return jsonify(rDict)
    
    @route('/project_config/update', methods=['POST', 'GET'])
    @allow_jsonp
    def project_config_update():
        rDict = {'code': 1}
        id = request.values['id']
        d = {
            'project_id': request.values.get('project_id'),
            'secret_id': request.values.get('secret_id'),
            'secret_key': request.values.get('secret_key'),
            'consignee': request.values.get('consignee'),
            'update_time': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            'description': request.values.get('description'),
        }
        [d.pop(k) for k in d.values() if k == None]
        try:
            rd = db_pc.update({'_id': ObjectId(id)}, {"$set": d})
            if rd['n'] > 0:
                d['_id'] = id
                rDict['code'] = 0
                rDict['data'] = d
            else:
                rDict['error'] = u'id不存在'
        except Exception as e:
            mylog.exception(e)
            rDict['error'] = u'%s' % e
        return jsonify(rDict)
    
    @route('/project_config/delete', methods=['POST', 'GET'])
    @allow_jsonp
    def project_config_delete():
        rDict = {'code': 1}
        id = request.values['id']
        rd = db_pc.remove({'_id': ObjectId(id)})
        if rd['n'] > 0:
            rDict['code'] = 0
        else:
            rDict['error'] = u'id不存在'
        return jsonify(rDict)
    
    @route('/project_config/find', methods=['POST', 'GET'])
    @allow_jsonp
    def get_project_menu_config():
        page = int(request.values.get('page', 1))
        limit = int(request.values.get('limit', 0))
        find = request.values.get('find')
        sort = request.values.get('sort')
        rDict = _data_page(db_pc, page, limit, find, sort)
        return jsonify(rDict)
    
    
    @route('/api/add', methods=['POST', 'GET'])
    @allow_jsonp
    def api_add():
        rDict = {'code': 1}
        timed_task = request.values.get('timed_task')
        error_check = request.values.get('error_check')
        other = request.values.get('other')
        d = {
            'project_id': request.values['project_id'],
            'name': request.values['name'],
            'url': request.values['url'],
            'param': request.values.get('param'),
            'method': request.values.get('method', 'get').lower(),
            'timed_task': timed_task,
            'other': other,
        }
        try:
            if timed_task:
                d['timed_task'] = json.loads(timed_task)
            if error_check:
                d['error_check'] = json.loads(error_check)
            if other:
                d['other'] = json.loads(other)
            d['_id'] = str(db_api.insert(d))
            rDict['code'] = 0
            rDict['data'] = d
        except Exception as e:
            mylog.exception(e)
            rDict['error'] = u'%s' % e
        return jsonify(rDict)
    
    @route('/api/update', methods=['POST', 'GET'])
    @allow_jsonp
    def api_update():
        rDict = {'code': 1}
        id = request.values['id']
        timed_task = request.values.get('timed_task')
        error_check = request.values.get('error_check')
        other = request.values.get('other')
        d = {
            'project_id': request.values.get('project_id'),
            'name': request.values.get('name'),
            'url': request.values.get('url'),
            'param': request.values.get('param'),
            'method': request.values.get('method'),
            'timed_task': timed_task,
            'other': other,
        }
        [d.pop(k) for k in d.values() if k == None]
        try:
            if timed_task:
                d['timed_task'] = json.loads(timed_task)
            if error_check:
                d['error_check'] = json.loads(error_check)
            if other:
                d['other'] = json.loads(other)
            rd = db_api.update({'_id': ObjectId(id)}, {"$set": d})
            if rd['n'] > 0:
                d['_id'] = id
                rDict['code'] = 0
                rDict['data'] = d
            else:
                rDict['error'] = u'id不存在'
        except Exception as e:
            mylog.exception(e)
            rDict['error'] = u'%s' % e
        return jsonify(rDict)
    
    @route('/api/delete', methods=['POST', 'GET'])
    @allow_jsonp
    def api_delete():
        rDict = {'code': 1}
        id = request.values['id']
        rd = db_api.remove({'_id': ObjectId(id)})
        if rd['n'] > 0:
            rDict['code'] = 0
        else:
            rDict['error'] = u'id不存在'
        return jsonify(rDict)
    
    @route('/api/find', methods=['POST', 'GET'])
    @allow_jsonp
    def get_api():
        page = int(request.values.get('page', 1))
        limit = int(request.values.get('limit', 20))
        find = request.values.get('find')
        sort = request.values.get('sort')
        rDict = _data_page(db_api, page, limit, find, sort)
        return jsonify(rDict)
    
    @route('/api/run', methods=['POST', 'GET'])
    @allow_jsonp
    def api_run():
        id = request.values['id']
        rDict, _ = _api_run(id, 2)
        return jsonify(rDict)
    
    @route('/api/run_log_find', methods=['POST', 'GET'])
    @allow_jsonp
    def api_run_log_find():
        page = int(request.values.get('page', 1))
        limit = int(request.values.get('limit', 20))
        find = request.values.get('find')
        sort = request.values.get('sort')
        rDict = _data_page(db['run_api_log'], page, limit, find, sort)
        return jsonify(rDict)
    
    @route('/api/run_log_update', methods=['POST', 'GET'])
    @allow_jsonp
    def api_run_log_update():
        rDict = {'code': 1}
        ids = request.values['ids']
        try:
            rd = db_arl.update({'_id': {'$in': [ObjectId(id) for id in ids.split(',')]}}, {"$set": {'isread': 1}})
        except Exception as e:
            mylog.exception(e)
        else:
            if rd['n'] > 0:
                rDict['code'] = 0
                rDict['data'] = rd['n']
            else:
                rDict['error'] = u'id不存在'
        return jsonify(rDict)
    
    
    if __name__ == "__main__":
        host, port = '0.0.0.0', int(sys.argv[1]) if len(sys.argv) == 2 else 8000
        mylog.info("%s:%s service start..." % (host, port))
        WSGIServer((host, port), app, spawn=Pool(1000)).serve_forever()
  • 相关阅读:
    02.CentOS Linux 7.7 Nginx安装部署文档
    rpm操作
    mysql命令行备份方法
    nginx reload的原理
    Linux操作笔记
    mysql账户授权
    centos系统内核升级
    docker随笔
    linux系统查看当前正在运行的服务
    数据库锁表问题
  • 原文地址:https://www.cnblogs.com/xmyfsj/p/15231956.html
Copyright © 2011-2022 走看看