手撸web框架
简单的请求响应实现
要实现最简单的web框架,首先要对网络熟悉,首先HTTP协议是应用层的协议,只要我们给数据加上HTTP格式的响应报头,我们的数据就能基于socket进行实现了
import socket
sever = socket.socket()
sever.bind(('127.0.0.1',10000))
sever.listen(5)
while True:
conn,addr = sever.accept()
data = conn.recv(1024)
print(data)
#响应行
conn.send(b'HTTP/1.1 200 OK
')
#响应头
conn.send(b'name:zx
')
conn.send(b'age:23
')
conn.send(b'sex:man
')
#空行!区分头和响应体
conn.send(b'
')
#响应体
conn.send(b'<h1>Hello world!</h1>')
#关闭连接
conn.close()
web框架的特点
我们熟悉的web框架其实都很类似,基本上也就三大块
路由选择-业务处理-ORM
路由选择
根据客户端的请求,跳转到响应的业务处理
业务处理
业务处理
数据库操作
网页模板渲染
ORM
数据库关系映射
代码实现
路由选择-urls.py
from views import *
urls = [
('/index',index),
('/login',login),
('/xxx',xxx),
('/get_time',get_time),
('/get_db',get_db)
]
业务处理-views.py
from orm import Teacher
def index(env):
return 'index'
def login(env):
return 'login'
def error(env):
return '404 error'
def xxx(env):
return 'xxx'
from datetime import datetime
def get_time(env):
current_time = datetime.now().strftime('%Y-%m-%d %X')
with open(r'C:UsersAdministratorDesktop 1pythonwebzx_web ime.html','r',encoding='utf-8') as f:
data = f.read()
#模板HTML渲染
data = data.replace('$$time$$',current_time)
return data
def get_db(env):
#ORM数据库操作
ret = Teacher.select(tid=1)[0]
print(ret)
return str(ret)
ORM
orm.py
from MySQL import MySQL
# 定义字段类
class Field(object):
def __init__(self, name, column_type, primary_key, default):
self.name = name
self.column_type = column_type
self.primary_key = primary_key
self.default = default
class StringField(Field):
def __init__(self,name,
column_type='varchar=(255)',
primary_key=False,
default=None):
super().__init__(name,column_type,primary_key,default)
class IntegerField(Field):
def __init__(self,
name,
column_type='int',
primary_key=False,
default=None):
super().__init__(name, column_type, primary_key, default)
class ModelMetaClass(type):
print("ModelMetaClass")
def __new__(cls,class_name,class_base,class_attrs):
print("ModelMetaClass_new")
#实例化对象的时候也会执行,我们要把这一次拦截掉
if class_name == 'Models':
#为了能让实例化顺利完成,返回一个空对象就行
return type.__new__(cls,class_name,class_base,class_attrs)
#获取表名
table_name = class_attrs.get('table_name',class_name)
#定义一个存主键的的变量
primary_key = None
#定义一个字典存储字段信息
mapping = {}
#name='tid',primary_key=True
#for来找到主键字段
for k,v in class_attrs.items():
#判断信息是否是字段
if isinstance(v,Field):
mapping[k] = v
#寻找主键
if v.primary_key:
if primary_key:
raise TypeError("主键只有一个")
primary_key=v.name
#将重复的键值对删除,因为已经放入了mapping
for k in mapping.keys():
class_attrs.pop(k)
if not primary_key:
raise TypeError("表必须要有一个主键")
class_attrs['table_name']=table_name
class_attrs['primary_key']=primary_key
class_attrs['mapping']=mapping
return type.__new__(cls,class_name,class_base,class_attrs)
class Models(dict,metaclass=ModelMetaClass):
print("Models")
def __init__(self,**kwargs):
print(f'Models_init')
super().__init__(self,**kwargs)
def __getattr__(self, item):
return self.get(item,"没有该值")
def __setattr__(self, key, value):
self[key]=value
#查找
@classmethod
def select(cls,**kwargs):
ms=MySQL()
#如果没有参数默认是查询全部的
if not kwargs:
sql='select * from %s'%cls.table_name
res=ms.select(sql)
else:
k = list(kwargs.keys())[0]
v = kwargs.get(k)
sql='select * from %s where %s=?'%(cls.table_name,k)
#防sql注入
sql=sql.replace('?','%s')
res=ms.select(sql,v)
if res:
return [cls(**i) for i in res]
#新增
def save(self):
ms=MySQL()
#存字段名
fields=[]
#存值
values=[]
args=[]
for k,v in self.mapping.items():
#主键自增,不用给他赋值
if not v.primary_key:
fields.append(v.name)
args.append("?")
values.append(getattr(self,v.name))
sql = "insert into %s(%s) values(%s)"%(self.table_name,",".join(fields),",".join((args)))
sql = sql.replace('?','%s')
ms.execute(sql,values)
def update(self):
ms = MySQL()
fields = []
valuse = []
pr = None
for k,v in self.mapping.items():
#获取主键值
if v.primary_key:
pr = getattr(self,v.name,v.default)
else:
fields.append(v.name+'=?')
valuse.append(getattr(self,v.name,v.default))
print(fields,valuse)
sql = 'update %s set %s where %s = %s'%(self.table_name,','.join(fields),self.primary_key,pr)
sql = sql.replace('?',"%s")
ms.execute(sql,valuse)
class Teacher(Models):
print("teacher")
table_name='teacher'
tid = IntegerField(name='tid',primary_key=True)
tname = StringField(name='tname')
if __name__ == '__main__':
# tea=Teacher(tname="haha")
tea2=Teacher(tname="haha",tid=5)
# print(Teacher.select(tid=1))
# Teacher.save(tea)
Teacher.update(tea2)
MYSQL.py
import pymysql
class MySQL:
#单例模式
__instance = None
def __new__(cls, *args, **kwargs):
if not cls.__instance:
cls.__instance = object.__new__(cls)
return cls.__instance
def __init__(self):
self.mysql = pymysql.connect(
host='127.0.0.1',
port=3306,
user='root',
database='orm_demo',
password='root',
charset='utf8',
autocommit=True
)
#获取游标
self.cursor = self.mysql.cursor(
pymysql.cursors.DictCursor
)
#查看
def select(self,sql,args=None):
print(sql,args)
#提交sql语句
self.cursor.execute(sql,args)
#获取查询的结果
res = self.cursor.fetchall()
return res
#提交
def execute(self,sql,args):
#提交语句可能会发生异常
print(sql,args)
try:
self.cursor.execute(sql,args)
except Exception as e:
print(e)
def close(self):
self.cursor.close()
self.mysql.close()
socket层
import socket
from urls import urls
from views import *
sever = socket.socket()
sever.bind(('127.0.0.1',10000))
sever.listen(5)
while True:
conn,addr = sever.accept()
#获取HTTP请求信息
data = conn.recv(1024)
data = data.decode('utf8')
print(data)
#用户请求的路由
choice = data.split(' ')[1]
#找到路由
func = None
for url in urls:
if choice == url[0]:
func = url[1]
break
if func:
res = func(data)
else:
res = '<h1>404 error</h1>'
#响应行
conn.send(b'HTTP/1.1 200 OK
')
#响应头
conn.send(b'name:zx
')
conn.send(b'age:23
')
conn.send(b'sex:man')
#空行!区分头和响应体
conn.send(b'
')
#响应体
conn.send(res.encode('utf8'))
#关闭连接
conn.close()
总结
其实并不是所有内容都要自己写,Python有很多的模块可以帮我们实现许多的功能
wsgiref模块:封装的一个socket服务,只需要关注数据发送和接收,不需要太多的关注HTTP协议的部分
from wsgiref.simple_server import make_server
from urls import urls
from views import *
def run(env,response):
"""
:param env: 请求相关的所有数据
:param response: 响应相关的所有数据
:return:
"""
response('200 OK',[])
# print(env)
current_path = env.get('PATH_INFO')
# 先定义一个变量名 用来存储后续匹配到的函数名
func = None
# for循环 匹配后缀
for url in urls:
if current_path == url[0]:
func = url[1] # 一旦匹配成功 就将匹配到的函数名赋值给func变量
break # 主动结束匹配
# 判断func是否有值
if func:
res = func(env)
else:
res = error(env)
return [res.encode('utf-8')]
if __name__ == '__main__':
server = make_server('127.0.0.1',8080,run)
# 实时监听该地址 只要有客户端来连接 统一交给run函数去处理
server.serve_forever() # 启动服务端
jinja2模块:模板渲染功能
模板语法(极其贴近python后端语法)
<p>{{ user }}</p>
<p>{{ user.name }}</p>
<p>{{ user['pwd'] }}</p>
<p>{{ user.get('hobby') }}</p>
{% for user_dict in user_list %}
<tr>
<td>{{ user_dict.id }}</td>
<td>{{ user_dict.name }}</td>
<td>{{ user_dict.pwd }}</td>
</tr>
{% endfor %}