1.基本搭建
python3.5 flask框架使用mysql配置(flask学习) falsk使用MySQL的方法试了几个终于找到没问题的了,以下是代码: #coding=utf-8 from flask import Flask from flask_sqlalchemy import SQLAlchemy app=Flask(__name__) app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://root:root@127.0.0.1/blackwhite' app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True db = SQLAlchemy(app) class User(db.Model): __tablename__='bw_user' id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(55)) password = db.Column(db.String(255)) createTime = db.Column(db.Integer) email = db.Column(db.String(55))#, unique=True def __init__(self, username, password): self.username = username self.password = password #__repr__方法告诉Python如何打印class对象,方便我们调试使用。 def __repr__(self): return '<User %r>' % self.username def checkUser(self): if User.query.filter_by(username=self.username,password=self.password).first()!=None: return True else: return False def save(self): db.session.add(self) db.session.commit() 引入model: from models import User 视图方法: @app.route('/login', methods=['GET', 'POST']) def login(): form=loginForm() if form.validate_on_submit(): user=User(form.username.data,form.password.data) if user.checkUser(): session['user']=form.username.data return redirect(url_for('index')) else: flash('用户名或者密码错误') return render_template('login.html',form=form,title='登录') 这就是个简单的是通过MySQL数据库实现用户登录的实例
2.代码参考文件 api_moniter.py
# -*- coding: utf-8 -*- import gevent from gevent import monkey; monkey.patch_all() from gevent.pywsgi import WSGIServer from gevent.pool import Pool from flask import Flask, make_response, request, jsonify import logging, logging.handlers, sys, json, platform, hashlib, requests, datetime, base64, hmac, time, os from pymongo import MongoClient from bson import ObjectId from functools import wraps from email.mime.multipart import MIMEMultipart from email.mime.base import MIMEBase from email.mime.text import MIMEText from email.utils import formatdate from email import encoders import smtplib if platform.system() != 'Windows': host_mongo, port_mongo = '192.168.1.25', 10000 handler = logging.handlers.RotatingFileHandler("/home/guozx/log/api_moniter.log", maxBytes=1024 * 1024 * 5, backupCount=5) else: host_mongo, port_mongo = '127.0.0.1', 27017 handler = logging.StreamHandler() app = Flask(__name__) route = app.route app.debug = False logging.basicConfig( level=logging.DEBUG, format=u"%(asctime)s - %(name)s - %(levelname)s - %(funcName)s - %(message)s", ) formatter = logging.Formatter(u"%(asctime)s - %(name)s - %(levelname)s - %(funcName)s - %(message)s") handler.setFormatter(formatter) mylog = logging.getLogger('x') mylog.propagate = 0 mylog.addHandler(handler) conn = MongoClient(host=host_mongo, port=port_mongo) db = conn['api_moniter'] db_p = db['project'] db_p.create_index([('name', 1), ], unique=True) db_pc = db['project_config'] db_pc.create_index([('project_id', 1), ], unique=True) db_api = db['api'] db_api.create_index([('url', 1), ('param', 1)], unique=True) db_arl = db['api_run_log'] def allow_jsonp(fun): #装饰器允许跨域 @wraps(fun) def wrapper_fun(*args, **kwargs): mylog.info(u"%s%s" % (request, request.values.items() if request.method != 'GET' else '')) res = fun(*args, **kwargs) res = "%s(%s)" % (request.values.get('callback'), res) if request.values.get('callback', None) else res rst = make_response(res) rst.headers['Access-Control-Allow-Origin'] = '*' rst.headers['Access-Control-Allow-Headers'] = "accept, origin, content-type, content-length" rst.headers['Access-Control-Allow-Methods'] = 'GET,PUT,POST,DELETE,OPTIONS' rst.headers['Content-Type'] = 'application/json; charset=UTF-8' return rst return wrapper_fun def send_mail(server, consignee, title, text, files=[], timeout=180, subtype='plain'): msg = MIMEMultipart() msg['From'] = server['user'] msg['Subject'] = title msg['To'] = ",".join(consignee) msg['Date'] = formatdate(localtime=True) msg.attach(MIMEText(text, _subtype=subtype, _charset='utf-8')) coding = 'utf-8' if "Linux" == platform.system() else 'gbk' for file in files: part = MIMEBase('application', 'octet-stream') part.set_payload(open(file,'rb').read()) encoders.encode_base64(part) part.add_header('Content-Disposition', 'attachment',filename="=?utf-8?b?%s?="%base64.b64encode(os.path.basename(file).decode(coding).encode('utf-8'))) msg.attach(part) smtp = smtplib.SMTP(server['host'],server['port'] if server.get('port') else 0, timeout=timeout) smtp.login(server['user'], server['passwd']) smtp.sendmail(server['user'],consignee, msg.as_string()) smtp.close() send_mail({'host': "smtp.163.com", 'user': "url_giiso@163.com", 'passwd': "giiso123", "port":465}, ['guozx@giiso.com'], "API监控异常报告", 'text', subtype='html') xxx def emaillog(text, consignee=["76040100@qq.com"]): try: text = u""" <textarea style="font-size: 12px !important; resize: both; 100%; height:100%;" spellcheck="false" rows="{0}"> {1} </textarea> """.format(text.count(' ') + 5, text) send_mail({'host': "smtp.qq.com", 'user': "guozx@giiso.com", 'passwd': "RiUt4Hed6MNegkj3"}, consignee, "API监控异常报告", text, subtype='html') except Exception as e: mylog.exception(e) def timed_task_thread(interval=600): while 1: try: data = list(db_api.find()) except Exception as e: mylog.exception(e) emaillog(u"get mongodb data error: " % e) else: idDict = {} for d in data: id = str(d['_id']) x = {'name': d['name'], 'url': d['url'], 'param': d['param'], 'id': id} timed_task = d.get('timed_task') if timed_task: if timed_task.get('auto_run', 1): idDict[id] = x else: idDict[id] = x mylog.info("start run task len: %s" % len(idDict)) errorDict = {} for id, v in idDict.items(): rData, consignee = _api_run(id, run_mode=1) consignee = ','.join(sorted(consignee.split(','))) if consignee else consignee if rData['code'] != 0: rData.update(v) if consignee in errorDict.keys(): errorDict[consignee]['error'].append(rData) errorDict[consignee]['name'].append(v['name']) else: errorDict[consignee] = {'error': [rData], 'name': [v['name']]} for k, v in errorDict.iteritems(): if k: emaillog(json.dumps(v, indent=4, ensure_ascii=False, sort_keys=True), k.split(',')) mylog.info(u"end run task ids: %s" % json.dumps(dict([(k, v['name']) for k, v in errorDict.iteritems()]), ensure_ascii=False)) time.sleep(interval) gevent.spawn(timed_task_thread) def _data_page(tb, page, limit, find, sort): rDict = {'code': 1} try: findDict = _mongo_find(find) sort = _mongo_sort(sort) sum = tb.find(findDict).count() q = tb.find(findDict) if limit: q = q.skip((page - 1) * limit).limit(limit) if sort: q = q.sort(sort) rDict['data'] = _id_to_str(q) rDict['page'] = page rDict['limit'] = limit rDict['total'] = sum rDict['code'] = 0 except Exception as e: mylog.exception(e) rDict['error'] = '%s' % e return rDict def _id_to_str(d): def _x(x): x['_id'] = str(x['_id']) return x return map(_x, d) def _mongo_find(find): findDict = {} if find: findDict = json.loads(find) return findDict def _mongo_sort(sort): sl = [] if sort: for s in sort.split(","): x = s.split(':') sl.append((x[0], int(x[1]) if len(x) == 2 and x[1] == '-1' else 1)) return sl def api_run_log_add(logDict): # run_mode 运行模式 1系统调用 2接口调用 logDict2 = {'api_id': None, 'api_name': None, 'run_mode': 1, 'isread': 0, 'code': 1, 'request_time': None, 'data': None, 'error': None, 'status_code': 0, 'create_time': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")} logDict2.update(logDict) db_arl.insert(logDict2) def _api_run(id, run_mode=2): rDict = {'code': 1} consignee = None try: api = db_api.find_one(ObjectId(id)) pmc = db_pc.find_one({'project_id': api['project_id']}) if api else None except Exception as e: mylog.exception(e) rDict['error'] = u'%s' % e else: logDict = {} #记录log if api: logDict['api_id'] = id logDict['api_name'] = api['name'] logDict['run_mode'] = run_mode if pmc: secretId = pmc['secret_id'] secretKey = pmc['secret_key'] consignee = pmc['consignee'] fDate = (datetime.datetime.now() - datetime.timedelta(hours=8)).strftime('%a, %d %b %Y %H:%M:%S GMT') secretData = "date: " + fDate try: sign = base64.b64encode(hmac.new(secretKey.encode('utf-8'), secretData, hashlib.sha1).digest()) authorization = "hmac username="" + secretId + "", algorithm="hmac-sha1", headers="date", signature="" + sign + """; headers = { 'contentType': 'UTF-8', 'Accept-Charset': 'UTF-8', 'date': fDate, 'Authorization': authorization, } req = requests.request(api['method'], api['url'], timeout=100, params=api['param'].encode('utf-8'), headers=headers) data = req.content.strip() if data.startswith("{") and data.endswith("}"): data = json.loads(data) except Exception as e: mylog.exception(e) rDict['error'] = u'%s' % e else: if req.status_code == 200: rDict['code'] = 0 else: rDict['code'] = 1 rDict['status_code'] = req.status_code rDict['data'] = data rDict['request_time'] = req.elapsed.microseconds / 1000000.0 else: rDict['error'] = 'not secret' else: rDict['error'] = 'not id' logDict.update(rDict) gevent.spawn(api_run_log_add, logDict) # 开启协程记录调用记录 return rDict, consignee @route('/') @allow_jsonp def index(): return '''hello word''' @route('/project/add', methods=['POST', 'GET']) @allow_jsonp def project_add(): rDict = {'code': 1} d = { 'parent_id': request.values.get('parent_id'), 'name': request.values['name'], 'update_time': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), 'description': request.values.get('description'), } try: d['_id'] = str(db_p.insert(d)) rDict['code'] = 0 rDict['data'] = d except Exception as e: mylog.exception(e) rDict['error'] = u'%s' % e return jsonify(rDict) @route('/project/update', methods=['POST', 'GET']) @allow_jsonp def project_update(): rDict = {'code': 1} id = request.values['id'] d = { 'parent_id': request.values.get('parent_id'), 'name': request.values.get('name'), 'update_time': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), 'description': request.values.get('description'), } [d.pop(k) for k in d.values() if k == None] try: rd = db_p.update({'_id': ObjectId(id)}, {"$set": d}) if rd['n'] > 0: d['_id'] = id rDict['code'] = 0 rDict['data'] = d else: rDict['error'] = u'id不存在' except Exception as e: mylog.exception(e) rDict['error'] = u'%s' % e return jsonify(rDict) @route('/project/delete', methods=['POST', 'GET']) @allow_jsonp def project_delete(): rDict = {'code': 1} id = request.values['id'] try: if db_p.find({'parent_id': id}).count() + db_pc.find({'project_id': id}).count() + db_api.find({'project_id': id}).count() > 0: rDict['error'] = u'关联表未删除' else: rd = db_p.remove({'_id': ObjectId(id)}) if rd['n'] > 0: rDict['code'] = 0 else: rDict['error'] = u'id不存在' except Exception as e: mylog.exception(e) rDict['error'] = u'%s' % e return jsonify(rDict) @route('/project/find', methods=['POST', 'GET']) @allow_jsonp def project_find(): rDict = {'code': 0} isconfig = request.values.get('isconfig') isapi = request.values.get('isapi') find = request.values.get('find') parentDict = {} pmDict = {} pmList = [] try: findDict = _mongo_find(find) if find else {} for d in db_p.find(findDict): id = str(d['_id']) if not d.get('parent_id'): parentDict[id] = d else: pmDict[id] = d if parentDict: ids = parentDict.keys() + pmDict.keys() if isconfig: for d in db_pc.find({'project_id': {'$in': ids}}): pm = parentDict.get(d['project_id']) pm = pm if pm else pmDict[d['project_id']] pm['config'] = d if isapi: for d in db_api.find({'project_id': {'$in': ids}}): pm = parentDict.get(d['project_id']) pm = pm if pm else pmDict[d['project_id']] apis = pm.get('apis') if apis: apis.append(d) else: pm['apis'] = [d] for k, v in pmDict.items(): p = parentDict.get(v['parent_id']) if p: items = p.get('items') if items: items.append(v) else: p['items'] = [v] pmDict.pop(k) else: pmList.append(v) pmList += parentDict.itervalues() rDict['code'] = 0 rDict['data'] = pmList except Exception as e: mylog.exception(e) rDict['error'] = u'%s' % e return json.dumps(rDict, default=lambda x: str(x) if type(x) == ObjectId else x) @route('/project_config/add', methods=['POST', 'GET']) @allow_jsonp def project_config_add(): rDict = {'code': 1} d = { 'project_id': request.values['project_id'], 'secret_id': request.values.get('secret_id'), 'secret_key': request.values.get('secret_key'), 'consignee': request.values.get('consignee'), 'update_time': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), 'description': request.values.get('description'), } try: d['_id'] = str(db_pc.insert(d)) rDict['code'] = 0 rDict['data'] = d except Exception as e: mylog.exception(e) rDict['error'] = u'%s' % e return jsonify(rDict) @route('/project_config/update', methods=['POST', 'GET']) @allow_jsonp def project_config_update(): rDict = {'code': 1} id = request.values['id'] d = { 'project_id': request.values.get('project_id'), 'secret_id': request.values.get('secret_id'), 'secret_key': request.values.get('secret_key'), 'consignee': request.values.get('consignee'), 'update_time': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), 'description': request.values.get('description'), } [d.pop(k) for k in d.values() if k == None] try: rd = db_pc.update({'_id': ObjectId(id)}, {"$set": d}) if rd['n'] > 0: d['_id'] = id rDict['code'] = 0 rDict['data'] = d else: rDict['error'] = u'id不存在' except Exception as e: mylog.exception(e) rDict['error'] = u'%s' % e return jsonify(rDict) @route('/project_config/delete', methods=['POST', 'GET']) @allow_jsonp def project_config_delete(): rDict = {'code': 1} id = request.values['id'] rd = db_pc.remove({'_id': ObjectId(id)}) if rd['n'] > 0: rDict['code'] = 0 else: rDict['error'] = u'id不存在' return jsonify(rDict) @route('/project_config/find', methods=['POST', 'GET']) @allow_jsonp def get_project_menu_config(): page = int(request.values.get('page', 1)) limit = int(request.values.get('limit', 0)) find = request.values.get('find') sort = request.values.get('sort') rDict = _data_page(db_pc, page, limit, find, sort) return jsonify(rDict) @route('/api/add', methods=['POST', 'GET']) @allow_jsonp def api_add(): rDict = {'code': 1} timed_task = request.values.get('timed_task') error_check = request.values.get('error_check') other = request.values.get('other') d = { 'project_id': request.values['project_id'], 'name': request.values['name'], 'url': request.values['url'], 'param': request.values.get('param'), 'method': request.values.get('method', 'get').lower(), 'timed_task': timed_task, 'other': other, } try: if timed_task: d['timed_task'] = json.loads(timed_task) if error_check: d['error_check'] = json.loads(error_check) if other: d['other'] = json.loads(other) d['_id'] = str(db_api.insert(d)) rDict['code'] = 0 rDict['data'] = d except Exception as e: mylog.exception(e) rDict['error'] = u'%s' % e return jsonify(rDict) @route('/api/update', methods=['POST', 'GET']) @allow_jsonp def api_update(): rDict = {'code': 1} id = request.values['id'] timed_task = request.values.get('timed_task') error_check = request.values.get('error_check') other = request.values.get('other') d = { 'project_id': request.values.get('project_id'), 'name': request.values.get('name'), 'url': request.values.get('url'), 'param': request.values.get('param'), 'method': request.values.get('method'), 'timed_task': timed_task, 'other': other, } [d.pop(k) for k in d.values() if k == None] try: if timed_task: d['timed_task'] = json.loads(timed_task) if error_check: d['error_check'] = json.loads(error_check) if other: d['other'] = json.loads(other) rd = db_api.update({'_id': ObjectId(id)}, {"$set": d}) if rd['n'] > 0: d['_id'] = id rDict['code'] = 0 rDict['data'] = d else: rDict['error'] = u'id不存在' except Exception as e: mylog.exception(e) rDict['error'] = u'%s' % e return jsonify(rDict) @route('/api/delete', methods=['POST', 'GET']) @allow_jsonp def api_delete(): rDict = {'code': 1} id = request.values['id'] rd = db_api.remove({'_id': ObjectId(id)}) if rd['n'] > 0: rDict['code'] = 0 else: rDict['error'] = u'id不存在' return jsonify(rDict) @route('/api/find', methods=['POST', 'GET']) @allow_jsonp def get_api(): page = int(request.values.get('page', 1)) limit = int(request.values.get('limit', 20)) find = request.values.get('find') sort = request.values.get('sort') rDict = _data_page(db_api, page, limit, find, sort) return jsonify(rDict) @route('/api/run', methods=['POST', 'GET']) @allow_jsonp def api_run(): id = request.values['id'] rDict, _ = _api_run(id, 2) return jsonify(rDict) @route('/api/run_log_find', methods=['POST', 'GET']) @allow_jsonp def api_run_log_find(): page = int(request.values.get('page', 1)) limit = int(request.values.get('limit', 20)) find = request.values.get('find') sort = request.values.get('sort') rDict = _data_page(db['run_api_log'], page, limit, find, sort) return jsonify(rDict) @route('/api/run_log_update', methods=['POST', 'GET']) @allow_jsonp def api_run_log_update(): rDict = {'code': 1} ids = request.values['ids'] try: rd = db_arl.update({'_id': {'$in': [ObjectId(id) for id in ids.split(',')]}}, {"$set": {'isread': 1}}) except Exception as e: mylog.exception(e) else: if rd['n'] > 0: rDict['code'] = 0 rDict['data'] = rd['n'] else: rDict['error'] = u'id不存在' return jsonify(rDict) if __name__ == "__main__": host, port = '0.0.0.0', int(sys.argv[1]) if len(sys.argv) == 2 else 8000 mylog.info("%s:%s service start..." % (host, port)) WSGIServer((host, port), app, spawn=Pool(1000)).serve_forever()