0 - python 用自带的 wgsi 也可以创建 web 服务
1)建立 hello.py 内容如下
# hello.py
def application(environ, start_response):
start_response('200 OK', [('Content-Type', 'text/html')])
return '<h1>Hello, web!</h1>'
2)建立 server.py
# -*- coding:utf-8 -*-
# 从wsgiref模块导入:
from wsgiref.simple_server import make_server
# 导入我们自己编写的application函数:
from hello import application
# 创建一个服务器,IP地址为空,端口是8000,处理函数是application:
httpd = make_server('', 8000, application)
print "Serving HTTP on port 8000..."
# 开始监听HTTP请求:
httpd.serve_forever()
缺点:貌似只能创建一个访问地址,不同 url 路由不好控制
1 - flask 建立一个 web server
1、pip install Flask
2、建立一个 test_flask.py 的文件,加入代码如下:
# -*- coding:utf-8 –*-
from flask import Flask, request, make_response, current_app
from functools import wraps
import json
import sys
import datetime
# 一个 app 是一个 Flask 文件
app = Flask(__name__)
def jsonp(func):
"""Wraps JSONified output for JSONP requests."""
@wraps(func)
def decorated_function(*args, **kwargs):
callback = request.args.get('callback', False)
if callback:
data = str(func(*args, **kwargs).data)
content = str(callback) + '(' + data + ')'
mimetype = 'application/javascript'
return current_app.response_class(content, mimetype=mimetype)
else:
return func(*args, **kwargs)
return decorated_function
# 有route
@app.route('/test', methods=['GET'])
def test():
resp = make_response(
(
json.dumps({'key':'value'}),
200,
{'Content-Type': 'application/json'}
)
)
return resp
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8899)
2 - 安装mysql
1、使用 brew 安装 mysql 服务
brew install mysql
2、启动 mysql 服务
mysql.server start
3、使用 root 用户免密登录
mysql -uroot
4、修改 root 密码
SET PASSWORD FOR 'root'@'localhost' = PASSWORD('newpass');
5、登录 mysql 数据库 建立测试表格
create table tb_dept(
id int primary key auto_increment,#部门编号 整形 主键 自增长
name varchar(18),#部门名称
description varchar(100)#描述
);
3 - 安装 MySQL-python
pip install MySQL-python
4 - 手写数据库连接代码
# -*- coding:utf-8 –*-
from flask import Flask, request, make_response, current_app
from functools import wraps
import MySQLdb as mysql
import json
import sys
import datetime
app = Flask(__name__)
def MysqlInit():
#connect to mysql
hostaddress = 'localhost'
username = 'root'
password = '*****'
database = 'test'
try:
conn = mysql.connect(
host=hostaddress,
user=username,
passwd=password,
db=database,
charset="utf8")
print 'connect to mysql db success'
return conn
except Exception, e:
print e
sys.exit('init mysql failed!')
def CheckConnect(conn):
try:
conn.ping()
return conn
except:
print 'mysql coonect has been closed now reconnect'
cur_conn = MysqlInit()
return cur_conn
def QueryClusterInfo(conn):
print "
coming QueryClusterInfo!!!"
try:
conn = CheckConnect(conn)
cursor = conn.cursor()
except:
conn = CheckConnect(conn)
cursor = conn.cursor()
response_dict = {'ret_code':0,'ret_msg':'ok'}
sql_cmd = 'select * from tb_dept'
try:
cursor.execute(sql_cmd)
cluster_result = cursor.fetchall()
except:
cursor.close()
response_dict['ret_code'] = 1
response_dict['ret_msg'] = 'sql_select_error_in_queryClusterInfo'
return json.dumps(response_dict)
response_dict['res'] = []
if cluster_result is not None:
for items in cluster_result:
res = {}
res['id'] = items[0]
res['name'] = items[1]
res['description'] = items[2]
response_dict['res'].append(res)
cursor.close()
return json.dumps(response_dict)
def jsonp(func):
"""Wraps JSONified output for JSONP requests."""
@wraps(func)
def decorated_function(*args, **kwargs):
callback = request.args.get('callback', False)
if callback:
data = str(func(*args, **kwargs).data)
content = str(callback) + '(' + data + ')'
mimetype = 'application/javascript'
return current_app.response_class(content, mimetype=mimetype)
else:
return func(*args, **kwargs)
return decorated_function
@app.route('/test_mysql', methods=['GET'])
def test():
conn = MysqlInit()
resp = make_response(
(
QueryClusterInfo(conn),
200,
{'Content-Type': 'application/json'}
)
)
return resp
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8989)