zoukankan      html  css  js  c++  java
  • Python实现简单框架

    手撸web框架

    简单的请求响应实现

    要实现最简单的web框架,首先要对网络熟悉,首先HTTP协议是应用层的协议,只要我们给数据加上HTTP格式的响应报头,我们的数据就能基于socket进行实现了

    import socket
    
    sever = socket.socket()
    
    sever.bind(('127.0.0.1',10000))
    
    sever.listen(5)
    
    while True:
        conn,addr = sever.accept()
        data = conn.recv(1024)
    
        print(data)
        #响应行
        conn.send(b'HTTP/1.1 200 OK
    
    ')
        #响应头
        conn.send(b'name:zx
    ')
        conn.send(b'age:23
    ')
        conn.send(b'sex:man
    ')
        #空行!区分头和响应体
        conn.send(b'
    
    ')
        #响应体
        conn.send(b'<h1>Hello world!</h1>')
        #关闭连接
        conn.close()
    

    web框架的特点

    我们熟悉的web框架其实都很类似,基本上也就三大块

    路由选择-业务处理-ORM

    路由选择

    根据客户端的请求,跳转到响应的业务处理

    业务处理

    业务处理

    数据库操作

    网页模板渲染

    ORM

    数据库关系映射

    代码实现

    路由选择-urls.py

    from views import *
    
    urls = [
        ('/index',index),
        ('/login',login),
        ('/xxx',xxx),
        ('/get_time',get_time),
        ('/get_db',get_db)
    ]
    

    业务处理-views.py

    from orm import Teacher
    
    def index(env):
        return 'index'
    
    def login(env):
        return 'login'
    
    def error(env):
        return '404 error'
    
    
    def xxx(env):
        return 'xxx'
    
    from datetime import datetime
    
    def get_time(env):
        current_time = datetime.now().strftime('%Y-%m-%d %X')
        with open(r'C:UsersAdministratorDesktop1pythonwebzx_web	ime.html','r',encoding='utf-8') as f:
            data = f.read()
        #模板HTML渲染
        data = data.replace('$$time$$',current_time)
        return data
    
    def get_db(env):
    	#ORM数据库操作
        ret = Teacher.select(tid=1)[0]
        print(ret)
        return str(ret)
    

    ORM

    orm.py

    from MySQL import MySQL
    
    # 定义字段类
    class Field(object):
        def __init__(self, name, column_type, primary_key, default):
            self.name = name
            self.column_type = column_type
            self.primary_key = primary_key
            self.default = default
    
    class StringField(Field):
        def __init__(self,name,
                     column_type='varchar=(255)',
                     primary_key=False,
                     default=None):
            super().__init__(name,column_type,primary_key,default)
    
    class IntegerField(Field):
        def __init__(self,
                     name,
                     column_type='int',
                     primary_key=False,
                     default=None):
            super().__init__(name, column_type, primary_key, default)
    
    class ModelMetaClass(type):
        print("ModelMetaClass")
        def __new__(cls,class_name,class_base,class_attrs):
            print("ModelMetaClass_new")
            #实例化对象的时候也会执行,我们要把这一次拦截掉
            if class_name == 'Models':
                #为了能让实例化顺利完成,返回一个空对象就行
                return type.__new__(cls,class_name,class_base,class_attrs)
            #获取表名
            table_name = class_attrs.get('table_name',class_name)
    
            #定义一个存主键的的变量
            primary_key = None
    
            #定义一个字典存储字段信息
            mapping = {}
    
            #name='tid',primary_key=True
            #for来找到主键字段
            for k,v in class_attrs.items():
                #判断信息是否是字段
                if isinstance(v,Field):
                    mapping[k] = v
                    #寻找主键
                    if v.primary_key:
                        if primary_key:
                            raise TypeError("主键只有一个")
                        primary_key=v.name
    
            #将重复的键值对删除,因为已经放入了mapping
            for k in mapping.keys():
                class_attrs.pop(k)
            if not primary_key:
                raise TypeError("表必须要有一个主键")
            class_attrs['table_name']=table_name
            class_attrs['primary_key']=primary_key
            class_attrs['mapping']=mapping
            return type.__new__(cls,class_name,class_base,class_attrs)
    
    class Models(dict,metaclass=ModelMetaClass):
        print("Models")
        def __init__(self,**kwargs):
            print(f'Models_init')
            super().__init__(self,**kwargs)
    
        def __getattr__(self, item):
            return self.get(item,"没有该值")
    
        def __setattr__(self, key, value):
            self[key]=value
    
        #查找
        @classmethod
        def select(cls,**kwargs):
            ms=MySQL()
    
            #如果没有参数默认是查询全部的
            if not kwargs:
                sql='select * from %s'%cls.table_name
                res=ms.select(sql)
            else:
                k = list(kwargs.keys())[0]
                v = kwargs.get(k)
                sql='select * from %s where %s=?'%(cls.table_name,k)
    
                #防sql注入
                sql=sql.replace('?','%s')
    
                res=ms.select(sql,v)
            if res:
                return [cls(**i) for i in res]
    
        #新增
        def save(self):
            ms=MySQL()
    
            #存字段名
            fields=[]
            #存值
            values=[]
            args=[]
    
            for k,v in self.mapping.items():
                #主键自增,不用给他赋值
                if not v.primary_key:
                    fields.append(v.name)
                    args.append("?")
                    values.append(getattr(self,v.name))
    
                sql = "insert into %s(%s) values(%s)"%(self.table_name,",".join(fields),",".join((args)))
    
                sql = sql.replace('?','%s')
    
            ms.execute(sql,values)
    
        def update(self):
            ms = MySQL()
            fields = []
            valuse = []
            pr = None
            for k,v in self.mapping.items():
                #获取主键值
                if v.primary_key:
                    pr = getattr(self,v.name,v.default)
                else:
                    fields.append(v.name+'=?')
                    valuse.append(getattr(self,v.name,v.default))
                print(fields,valuse)
            sql = 'update %s set %s where %s = %s'%(self.table_name,','.join(fields),self.primary_key,pr)
    
            sql = sql.replace('?',"%s")
    
            ms.execute(sql,valuse)
    
    
    class Teacher(Models):
        print("teacher")
        table_name='teacher'
        tid = IntegerField(name='tid',primary_key=True)
        tname = StringField(name='tname')
    
    if __name__ == '__main__':
        # tea=Teacher(tname="haha")
        tea2=Teacher(tname="haha",tid=5)
        # print(Teacher.select(tid=1))
        # Teacher.save(tea)
        Teacher.update(tea2)
    
    

    MYSQL.py

    import pymysql
    
    class MySQL:
    
        #单例模式
        __instance = None
    
        def __new__(cls, *args, **kwargs):
            if not cls.__instance:
                cls.__instance = object.__new__(cls)
            return cls.__instance
    
        def __init__(self):
            self.mysql = pymysql.connect(
                host='127.0.0.1',
                port=3306,
                user='root',
                database='orm_demo',
                password='root',
                charset='utf8',
                autocommit=True
            )
    
            #获取游标
            self.cursor = self.mysql.cursor(
                pymysql.cursors.DictCursor
            )
    
        #查看
        def select(self,sql,args=None):
            print(sql,args)
    
            #提交sql语句
            self.cursor.execute(sql,args)
    
            #获取查询的结果
            res = self.cursor.fetchall()
            return res
    
        #提交
        def execute(self,sql,args):
            #提交语句可能会发生异常
    
            print(sql,args)
            try:
                self.cursor.execute(sql,args)
            except Exception as e:
                print(e)
    
        def close(self):
            self.cursor.close()
            self.mysql.close()
    

    socket层

    import socket
    from urls import urls
    from views import *
    
    sever = socket.socket()
    
    sever.bind(('127.0.0.1',10000))
    
    sever.listen(5)
    
    while True:
        conn,addr = sever.accept()
        #获取HTTP请求信息
        data = conn.recv(1024)
        data = data.decode('utf8')
        print(data)
        #用户请求的路由
        choice = data.split(' ')[1]
        #找到路由
        func = None
        for url in urls:
            if choice == url[0]:
                func = url[1]
                break
        if func:
            res = func(data)
        else:
            res = '<h1>404 error</h1>'
    
        #响应行
        conn.send(b'HTTP/1.1 200 OK
    
    ')
        #响应头
        conn.send(b'name:zx
    ')
        conn.send(b'age:23
    ')
        conn.send(b'sex:man')
        #空行!区分头和响应体
        conn.send(b'
    
    ')
        #响应体
        conn.send(res.encode('utf8'))
        #关闭连接
        conn.close()
    

    总结

    其实并不是所有内容都要自己写,Python有很多的模块可以帮我们实现许多的功能

    wsgiref模块:封装的一个socket服务,只需要关注数据发送和接收,不需要太多的关注HTTP协议的部分

    from wsgiref.simple_server import make_server
    from urls import urls
    from views import *
    
    def run(env,response):
        """
        :param env: 请求相关的所有数据
        :param response: 响应相关的所有数据
        :return:
        """
        response('200 OK',[])
        # print(env)
        current_path = env.get('PATH_INFO')
        
        # 先定义一个变量名 用来存储后续匹配到的函数名
        func = None
        # for循环 匹配后缀
        for url in urls:
            if current_path == url[0]:
                func = url[1]  # 一旦匹配成功 就将匹配到的函数名赋值给func变量
                break  # 主动结束匹配
        # 判断func是否有值
        if func:
            res = func(env)
        else:
            res = error(env)
        return [res.encode('utf-8')]
    
    if __name__ == '__main__':
        server = make_server('127.0.0.1',8080,run)
        # 实时监听该地址  只要有客户端来连接 统一交给run函数去处理
        server.serve_forever()  # 启动服务端
    

    jinja2模块:模板渲染功能

    模板语法(极其贴近python后端语法)
    		<p>{{ user }}</p>
    		<p>{{ user.name }}</p>
    		<p>{{ user['pwd'] }}</p>
    		<p>{{ user.get('hobby') }}</p>
    		
    		
    		{% for user_dict in user_list %}
    			<tr>
    				<td>{{ user_dict.id }}</td>
    				<td>{{ user_dict.name }}</td>
    				<td>{{ user_dict.pwd }}</td>
    			</tr>
    		{% endfor %}
    
  • 相关阅读:
    linux(fedora) 下dvwa 建筑环境
    【ThinkingInC++】2、输入和输出流
    Caused by: java.lang.ClassNotFoundException: javax.transaction.TransactionManager
    SpringMVC注释启用
    XML wsdl soap xslt xsl ide
    一个解析RTSP 的URL函数
    PHP:header()函数
    jquery实现鼠标焦点十字效果
    拼出漂亮的表格
    Oracle中如何插入特殊字符:& 和 ' (多种解决方案)
  • 原文地址:https://www.cnblogs.com/zx125/p/11704617.html
Copyright © 2011-2022 走看看