####
####
简单使用,代码示例
from flask import Flask, request from flask_restful import Resource, Api from flask_sqlalchemy import SQLAlchemy import json app = Flask(__name__) # api 和 app 要关联在一起 api = Api(app) # 连接mysql数据库 app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://root:Ji10201749@localhost:3306/flask_test?charset=utf8' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True db = SQLAlchemy(app) class TestCase(db.Model): id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(80), unique=False, nullable=False) desc = db.Column(db.String(120), unique=False, nullable=False) steps = db.Column(db.String(120), unique=False, nullable=False) def __repr__(self): return '<TestCase %r>' % self.name def model_to_dict(self): return {c.name: getattr(self, c.name) for c in self.__table__.columns} # 类要继承resource class TestCaseSearch(Resource): def get(self): print(request.args) id = request.args.get("id") if id: return TestCase.query.filter_by(id=id).first().model_to_dict() return [item.model_to_dict() for item in TestCase.query.all()] def post(self): print(request.json) data_dict = request.json data_dict["steps"] = json.dumps(data_dict["steps"]) print(data_dict) testcase = TestCase(**data_dict) db.session.add(testcase) db.session.commit() return { "msg": "success" } # 注册到api里面去 api.add_resource(TestCaseSearch, '/testcase') if __name__ == '__main__': db.create_all() app.run(debug=True, use_reloader=True)
###
1,flask,使用flask-sqlalchemy连接mysql数据库
2,使用flask-sqlalchemy创建表,
3,使用flask-sqlalchemy查询数据,单条和多条
4,使用flask-sqlalchemy查询数据,新增数据
5,把查询的数据json序列化,
###
###
用来发起post请求的代码:
import requests url = "http://127.0.0.1:5000/testcase" data = {"name": "test", "desc": "1234", "steps": ['1', '2', '3']} result = requests.post(url, json=data) print(result.text)
####
后续的新建表,一对多关联,多对多关联,等等后面在学
还有复杂的查询,都是后面在学,
现在先开一个头,
####