Pandas 和 sqlalchemy 配合实现分页查询 Mysql 并获取总条数
@api.route('/show', methods=["POST"])
def api_show(): # 分页查询并获取总数
offset = request.json.get('offset', 0)
limit = request.json.get('limit', 10)
sql = "select SQL_CALC_FOUND_ROWS * from bidata.gen_adid_cost order by id desc limit {offset},{limit}".format(
offset=offset,
limit=limit)
con = db.get_engine(current_app, 'tm_new_hfjy')
with con.connect() as conn:
df1 = pd.read_sql(sql, con=conn)
df2 = pd.read_sql("SELECT FOUND_ROWS() as total;", con=conn)
# df1 = df1.drop(index=[0])
# print(df1["update_time"].head())
# print(df1.dtypes)
# df1["create_time"] = df1["create_time"].dt.strftime("%Y/%m/%d")
total = df2.loc[0, "total"]
df = df.where(df.isnull(), None)
result = list(df1.T.to_dict().values())
import numpy as np
class MyNpEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, np.integer):
return int(obj)
elif isinstance(obj, np.floating):
return float(obj)
elif isinstance(obj, np.ndarray):
return obj.tolist()
elif isinstance(obj, pd.datetime):
return obj.strftime("%Y-%m-%d %H:%M:%S")
elif isinstance(obj, date):
return obj.strftime("%Y/%m/%d")
else:
return super(MyNpEncoder, self).default(obj)
data = dict(code=RET.OK, msg="共 %s 条,本页加载 %s 条记录!" % (total, len(result)), data=result, total=total)
return json.dumps(data, cls=MyNpEncoder)