zoukankan      html  css  js  c++  java
  • youku服务端

    文件结构

    config

    import os
    
    
    IP_PORT = ('127.0.0.1',8080)
    BACKLOG = 5
    
    BASE_DIR = os.path.dirname(os.path.dirname(__file__))
    BASE_MOVIE_LIST = os.path.join(BASE_DIR,'movie_list')
    setting

    db

    from orm_pool.orm import Models,StringField,IntegerField
    
    
    class User(Models):
        table_name = 'userinfo'
        id = IntegerField('id',primary_key=True)
        name = StringField('name')
        password = StringField('password')
        is_vip = IntegerField('is_vip')
        locked = IntegerField('locked')
        user_type = StringField('user_type')
    
    
    class Notice(Models):
        table_name = 'notice'
        id = IntegerField('id',primary_key=True)
        name = StringField('name')
        content = StringField('content')
        create_time = StringField('create_time')
        user_id = IntegerField('user_id')
    
    
    class Movie(Models):
        table_name = 'movie'
        id = IntegerField('id',primary_key=True)
        name = StringField('name')
        path = StringField('path')
        is_free = IntegerField('is_free')
        is_delete = IntegerField('is_delete')
        create_time = StringField('create_time')
        user_id = IntegerField('user_id')
        file_md5 = StringField('file_md5')
    
    
    class Download_Record(Models):
        table_name = 'download_record'
        id = IntegerField('id', primary_key=True)
        user_id = IntegerField('user_id')
        movie_id = IntegerField('movie_id')
    models

    interface

    import os
    from db import models
    from lib import common
    from conf import setting
    
    
    @common.login_auth
    def check_upload_movie_exist(msg_dic,conn):
        movie = models.Movie.select_one(file_md5=msg_dic['file_md5'])
        if movie:
            back_dic = {'flag':False,'msg':'该影片已存在'}
        else:
            back_dic = {'flag':True,'msg':'影片可上传'}
        common.send_back(back_dic,conn)
    
    
    @common.login_auth
    def upload_movie(msg_dic,conn):
        movie_name = common.get_file_uuid(msg_dic['file_name']) + msg_dic['file_name']
        movie_path = os.path.join(setting.BASE_MOVIE_LIST,movie_name)
        recv_size = 0
        file_size = msg_dic['file_size']
        with open(movie_path,'wb') as f:
            while recv_size < file_size:
                recv_data = conn.recv(1024)
                if not recv_data:
                    back_dic = {'flag':False,'msg':'服务器收到数据为空'}
                    common.send_back(back_dic, conn)
                    break
                f.write(recv_data)
                recv_size += len(recv_data)
                percent = recv_size / file_size
                common.print_progress(percent)
    
        movie = models.Movie(
            name=movie_name,
            path=movie_path,
            is_free=msg_dic['is_free'],
            is_delete=0,
            create_time=str(common.get_current_time()),
            user_id=msg_dic['user_id'],
            file_md5=msg_dic['file_md5']
        )
        movie.save()
        back_dic = {'flag':True,'msg':'上传成功'}
        common.send_back(back_dic,conn)
    
    
    @common.login_auth
    def delete_movie(msg_dic,conn):
        movie = models.Movie.select_one(id=msg_dic['del_movie_id'])
        if movie:
            movie.is_delete = 1
            movie.user_id = msg_dic['user_id']
            movie.update()
            back_dic = {'flag':True,'msg':'删除成功'}
        else:
            back_dic = {'flag':False,'msg':'无此影片'}
        common.send_back(back_dic,conn)
    
    
    @common.login_auth
    def release_notice(msg_dic,conn):
        name = msg_dic['name']
        content = msg_dic['content']
        user_id = msg_dic['user_id']
        create_time = str(common.get_current_time())
        notice = models.Notice(
            name=name,
            content=content,
            user_id=user_id,
            create_time=create_time
        )
        notice.save()
        back_dic = {'flag':True,'msg':'发布公告成功'}
        common.send_back(back_dic,conn)
    admin_interface
    from db import models
    from lib import common
    from tcp_server import online_user
    from interface import user_interface
    
    
    def register(msg_dic,conn):
        user = models.User.select_one(name=msg_dic['name'])
        if user:
            back_dic = {'flag':False,'msg':'用户名已存在'}
        else:
            user = models.User(
                name=msg_dic['name'],
                password=msg_dic['password'],
                is_vip=0,
                locked=0,
                user_type=msg_dic['user_type']
            )
            user.save()
            back_dic = {'flag':True,'msg':'用户注册成功'}
        common.send_back(back_dic,conn)
    
    
    def login(msg_dic,conn):
        user = models.User.select_one(name=msg_dic['name'])
        # 用户存在
        if user:
            # 密码一致
            if msg_dic['password'] == user.password:
                # 类型一致
                if msg_dic['user_type'] == user.user_type:
                    # 登录成功,服务器记录状态,并发送session给客户端cookie赋值
                    session = common.get_uuid(msg_dic['name']) + msg_dic['name']
                    online_user.mutex.acquire()
                    online_user.alive_user[msg_dic['addr']] = [session,user.id]
                    online_user.mutex.release()
                    back_dic = {'flag':True,'session':session,'msg':'登录成功'}
    
                    # 若登录用户为普通用户,则在客户端打印最近一条公告
                    if msg_dic['user_type'] == 'user':
                        back_notice = user_interface.check_notice_by_count(1)
                        if back_notice:
                            back_dic = {'flag':True,'back_notice':back_notice,'is_vip':user.is_vip,'session':session,'msg':'登录成功'}
                        else:
                            back_dic = {'flag': True, 'back_notice': '暂无公告内容', 'is_vip': user.is_vip, 'session': session,
                                        'msg': '登录成功'}
                else:
                    back_dic = {'flag': False, 'msg': '用户登录类型不一致'}
            else:
                back_dic = {'flag': False, 'msg': '密码不一致'}
        else:
            back_dic = {'flag':False,'msg':'用户不存在'}
        common.send_back(back_dic,conn)
    
    
    def check_moive_list(msg_dic,conn):
        movie_list = models.Movie.select_many()
        back_movie_list = []
        if movie_list:
            for movie in movie_list:
                # 视频未删除
                if not movie.is_delete:
                    if msg_dic['movie_type'] == 'all':
                        back_movie_list.append([movie.id,movie.name,'免费' if movie.is_free else '收费'])
                    elif msg_dic['movie_type'] == 'free':
                        if movie.is_free:
                            back_movie_list.append([movie.id, movie.name, '免费'])
                    else:
                        if not movie.is_free:
                            back_movie_list.append([movie.id, movie.name, '收费'])
            if back_movie_list:
                back_dic = {'flag':True,'msg':'查询成功','back_movie_list':back_movie_list}
            else:
                back_dic = {'flag': False, 'msg': '查询失败,视频数据为空'}
        else:
            back_dic = {'flag':False,'msg':'服务器上无视频数据或视频已被删除'}
        common.send_back(back_dic,conn)
    common_interface
    import os
    from db import models
    from lib import common
    
    
    @common.login_auth
    def check_notice(msg_dic,conn):
        notice_list = check_notice_by_count()
        back_dic = {'flag':True,'msg':'历史公告信息如下:','notice_list':notice_list}
        common.send_back(back_dic,conn)
    
    
    # 内部借口使用功能
    def check_notice_by_count(count=None):
        notice_list = models.Notice.select_many()
        if notice_list:
            back_notice = []
            if not count:
                for notice in notice_list:
                    back_notice.append({notice.name: notice.content})
            else:
                notice_list = sorted(notice_list,key=lambda notice:notice.create_time)
                current_notice = notice_list[len(notice_list)-1]
                back_notice.append({current_notice.name:current_notice.content})
            return back_notice
        else:
            return False
    
    
    @common.login_auth
    def charge_vip(msg_dic,conn):
        user = models.User.select_one(id=msg_dic['user_id'])
        user.is_vip = 1
        user.update()
        back_dic = {'flag':True,'msg':'充值成功'}
        common.send_back(back_dic,conn)
    
    
    @common.login_auth
    def download_free_movie(msg_dic,conn):
        movie = models.Movie.select_one(id=msg_dic['movie_id'])
        if movie:
            user = models.User.select_one(id=msg_dic['user_id'])
            if user.is_vip:
                wait_time = 0
            else:
                wait_time = 5
    
            send_size = 0
            total_size = os.path.getsize(movie.path)
            back_dic = {'flag': True, 'wait_time': wait_time,'file_size':total_size,'movie_id':movie.id}
            common.send_back(back_dic, conn)
    
            with open(movie.path,'rb') as f:
                while send_size < total_size:
                    send_data = f.readline()
                    conn.send(send_data)
                    send_size += len(send_data)
                    percent = send_size / total_size
                    common.print_progress(percent)
    
            download_record = models.Download_Record(
                user_id=msg_dic['user_id'],
                movie_id=movie.id
            )
            download_record.save()
    
    
        else:
            back_dic = {'flag':False,'msg':'影片不存在'}
            common.send_back(back_dic, conn)
    
    
    @common.login_auth
    def download_charge_movie(msg_dic,conn):
        movie = models.Movie.select_one(id=msg_dic['movie_id'])
        if movie:
            user = models.User.select_one(id=msg_dic['user_id'])
            if user.is_vip:
                cost_money = 5
            else:
                cost_money = 10
    
            send_size = 0
            total_size = os.path.getsize(movie.path)
            back_dic = {'flag': True, 'cost_money': cost_money,'file_size':total_size,'movie_id':movie.id}
            common.send_back(back_dic, conn)
    
            with open(movie.path,'rb') as f:
                while send_size < total_size:
                    send_data = f.readline()
                    conn.send(send_data)
                    send_size += len(send_data)
                    percent = send_size / total_size
                    common.print_progress(percent)
    
            download_record = models.Download_Record(
                user_id=msg_dic['user_id'],
                movie_id=movie.id
            )
            download_record.save()
    
    
        else:
            back_dic = {'flag':False,'msg':'影片不存在'}
            common.send_back(back_dic, conn)
    
    
    
    @common.login_auth
    def check_download_record(msg_dic,conn):
        download_record_list = models.Download_Record.select_many(user_id=msg_dic['user_id'])
        back_download_record_list = []
        if download_record_list:
            for record in download_record_list:
                movie_id = record.movie_id
                movie = models.Movie.select_one(id=movie_id)
                back_download_record_list.append('影片信息:%s,%s' % (movie.name,movie.create_time))
            back_dic = {'flag':True,'msg':'查看下载记录成功:','back_download_record_list':back_download_record_list}
    
        else:
            back_dic = {'flag':False,'msg':'暂无下载记录'}
        common.send_back(back_dic,conn)
    user_interface

    lib

    import json
    import struct
    import hashlib
    import time
    from tcp_server import online_user
    
    
    def login_auth(func):
        def wrapper(*args,**kwargs):
            # args = (msg_dic,conn) 元祖
            for value_list in online_user.alive_user.values():
                # value_list = [session,user.id]
                if args[0]['session'] == value_list[0]:
                    user_id = value_list[1]
                    args[0]['user_id'] = user_id
                    break
            user_id = args[0].get('user_id',None)
            if user_id:
                func(*args,**kwargs)
            else:
                back_dic = {'flag': False, 'msg': '非授权用户'}
                send_back(back_dic, args[1])
        return wrapper
    
    
    def send_back(back_dic,conn):
        back_dic_json = json.dumps(back_dic)
        back_dic_bytes = back_dic_json.encode('utf-8')
        back_dic_head = struct.pack('i',len(back_dic_bytes))
        conn.send(back_dic_head)
        conn.send(back_dic_bytes)
    
    
    # session,file_name中使用
    def get_uuid(name):
        md = hashlib.md5()
        md.update(str(time.time()).encode('utf-8'))
        md.update(name.encode('utf-8'))
        return md.hexdigest()
    
    
    
    def get_current_time():
        return time.strftime('%Y-%m-%d %X',time.localtime(time.time()))
    
    
    
    def print_progress(percent):
        width = 70
        sign = ''
        if percent > 1:
            percent = 1
        print_str = '[%%-%ds]' % width % (int(percent * width) * sign)
        print('
    %s %d%%' % (print_str,int(percent * 100)),end='')
    
    
    
    def get_file_uuid(file_name):
        md = hashlib.md5()
        md.update('密码加盐'.encode('utf-8'))
        md.update(file_name.encode('utf-8'))
        return md.hexdigest()
    
    if __name__ == '__main__':
        percent = 0
        while percent < 1:
            percent += 0.1
            print_progress(percent)
    common

    orm_pool

    import pymysql
    from DBUtils.PooledDB import PooledDB
    
    POOL = PooledDB(
        creator=pymysql,
        maxconnections=6,
        mincached=2,
        maxcached=5,
        maxshared=3,
        maxusage=None,
        setsession=[],
        blocking=True,
        ping=0,
        host='127.0.0.1',
        port=3306,
        user='root',
        password='123456',
        database='youku',
        charset='utf8',
        autocommit=True
    )
    DBUtils_interface
    import pymysql
    from orm_pool import DBUtils_interface
    
    class Mysql_pool_interface:
        def __init__(self):
            self.conn = DBUtils_interface.POOL.connection()
            self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
    
        def select(self,sql,args=None):
            self.cursor.execute(sql,args)
            res = self.cursor.fetchall()
            return res
    
        def execute(self,sql,args):
            try:
                self.cursor.execute(sql,args)
                affected = self.cursor.rowcount
            except Exception as e:
                print(e)
            return affected
    
    if __name__ == '__main__':
        ms = Mysql_pool_interface()
        res = ms.select('select * from userinfo where id = %s',1)
        print(res)
    mysql_pool
    from orm_pool import mysql_pool
    
    class Field:
        def __init__(self, name, column_type, primary_key, default):
            self.name = name
            self.column_type = column_type
            self.primary_key = primary_key
            self.default = default
    
    
    class StringField(Field):
        def __init__(self, name=None, column_type='varchar(255)', primary_key=False, default=None):
            super().__init__(name, column_type, primary_key, default)
    
    
    class IntegerField(Field):
        def __init__(self, name=None, column_type='int', primary_key=False, default=None):
            super().__init__(name, column_type, primary_key, default)
    
    
    class ModelsMetaclass(type):
        def __new__(cls, name, bases, attrs):
            if name == 'Models':
                return type.__new__(cls, name, bases, attrs)
            table_name = attrs.get('table_name', None)
            if not table_name:
                table_name = name
            primary_key = None
            mappings = dict()
            for k, v in attrs.items():
                if isinstance(v, Field):
                    mappings[k] = v
                    if v.primary_key:
                        if primary_key:
                            raise KeyError('重复主键:%s' % k)
                        primary_key = k
            for k in mappings.keys():
                attrs.pop(k)
            if not primary_key:
                raise KeyError('无主键')
            attrs['table_name'] = table_name
            attrs['primary_key'] = primary_key
            attrs['mappings'] = mappings
            return type.__new__(cls,name,bases,attrs)
    
    
    class Models(dict,metaclass=ModelsMetaclass):
        def __init__(self,**kwargs):
            super().__init__(**kwargs)
    
        def __getattr__(self, item):
            try:
                return self[item]
            except KeyError:
                raise AttributeError('无此属性')
    
        def __setattr__(self, key, value):
            self[key] = value
    
        @classmethod
        def select_one(cls,**kwargs):
            ms = mysql_pool.Mysql_pool_interface()
            key = list(kwargs.keys())[0]
            value = kwargs[key]
    
            # select * from userinfo where id = ?
            sql = 'select * from %s where %s = ?' % (cls.table_name,key)
            sql = sql.replace('?','%s')
            res = ms.select(sql,value)
            if res:
                return cls(**res[0])
            else:
                return
    
        @classmethod
        def select_many(cls,**kwargs):
            ms = mysql_pool.Mysql_pool_interface()
    
            # select * from userinfo where id = ?
            # select * from userinfo
            if kwargs:
                key = list(kwargs.keys())[0]
                value = kwargs[key]
                sql = 'select * from %s where %s = ?' % (cls.table_name,key)
                sql = sql.replace('?','%s')
                res = ms.select(sql,value)
            else:
                sql = 'select * from %s' % cls.table_name
                res = ms.select(sql)
            obj_list = [cls(**r) for r in res]
            return obj_list
    
        def update(self):
            ms = mysql_pool.Mysql_pool_interface()
    
            # update userinfo set name=?,password=? where id = %s
            field_data = []
            pr = None
            args = []
            for k,v in self.mappings.items():
                if v.primary_key:
                    pr = getattr(self,v.name,v.default)
                field_data.append(v.name + '=?')
                args.append(getattr(self,v.name,v.default))
            sql = 'update %s set %s where %s = %s' % (self.table_name,','.join(field_data),self.primary_key,pr)
            sql = sql.replace('?','%s')
            ms.execute(sql,args)
    
    
        def save(self):
            ms = mysql_pool.Mysql_pool_interface()
    
            # insert into %s (name,password) values (?,?)
            field_data = []
            value_data = []
            args = []
            for k,v in self.mappings.items():
                if not v.primary_key:
                    # 避免对主键的赋值要求
                    field_data.append(v.name)
                    value_data.append('?')
                    args.append(getattr(self,v.name,v.default))
            sql = 'insert into %s (%s) values (%s)' % (self.table_name,','.join(field_data),','.join(value_data))
            sql = sql.replace('?','%s')
            ms.execute(sql,args)
    
    
    class User(Models):
        table_name = 'userinfo'
        id = IntegerField('id',primary_key=True)
        name = StringField('name')
        password = StringField('password')
        is_vip = IntegerField('is_vip')
        locked = IntegerField('locked')
        user_type = StringField('user_type')
    
    
    if __name__ == '__main__':
        user = User.select_one(id=1)
        user.name = 'lmj'
        user.update()
        print(user)
    
        user2 = User(name='cly',password='123',is_vip=0,locked=0,user_type='user')
        user2.save()
    orm

    tcp_server

    # {addr:[session,user_id}
    alive_user = {}
    mutex = None
    online_user
    import socket
    import struct
    import json
    from concurrent.futures import ThreadPoolExecutor
    from threading import Lock
    from tcp_server import online_user
    from conf import setting
    from lib import common
    from interface import admin_interface,common_interface,user_interface
    
    
    
    pool = ThreadPoolExecutor()
    mutex = Lock()
    online_user.mutex = mutex
    
    
    # 功能任务字典
    func_dic = {
        'register':common_interface.register,
        'login':common_interface.login,
        'check_upload_movie_exist':admin_interface.check_upload_movie_exist,
        'upload_movie':admin_interface.upload_movie,
        'check_moive_list':common_interface.check_moive_list,
        'delete_movie':admin_interface.delete_movie,
        'release_notice':admin_interface.release_notice,
        'charge_vip':user_interface.charge_vip,
        'download_free_movie':user_interface.download_free_movie,
        'check_download_record':user_interface.check_download_record,
        'check_notice':user_interface.check_notice,
        'download_charge_movie':user_interface.download_charge_movie
    }
    
    
    # 分发功能模块
    def dispatch(msg_dic,conn):
        if msg_dic['func_type'] in func_dic:
            func_dic[msg_dic['func_type']](msg_dic,conn)
        else:
            back_dic = {'flag':False,'msg':'非法操作'}
            common.send_back(back_dic,conn)
    
    
    # 循环通信
    def communication(conn,addr):
        while True:
            try:
                msg_dic_head = conn.recv(4)
                if not msg_dic_head:
                    online_user.mutex.acquire()
                    if str(addr) in online_user.alive_user:
                        online_user.alive_user.pop(str(addr))
                    online_user.mutex.release()
                    conn.close()
                    print(addr, '33[31m 断开 33[0m')
                    break
                msg_dic_len = struct.unpack('i',msg_dic_head)[0]
                msg_dic_bytes = conn.recv(msg_dic_len)
                msg_dic = json.loads(msg_dic_bytes.decode('utf-8'))
                msg_dic['addr'] = addr
    
                # 分发功能任务
                dispatch(msg_dic,conn)
    
            except Exception as e:
                print('错误信息:%s' % e)
                online_user.mutex.acquire()
                if str(addr) in online_user.alive_user:
                    online_user.alive_user.pop(str(addr))
                online_user.mutex.release()
                conn.close()
                print(addr,'33[31m 断开 33[0m')
                break
    
    
    def get_server():
        server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
        server.bind(setting.IP_PORT)
        server.listen(setting.BACKLOG)
        print('33[32m 服务器激活 33[0m')
    
        # 循环链接
        while True:
            conn,addr = server.accept()
            print(addr,'33[32m 连接 33[0m')
            pool.submit(communication,conn,addr)
        server.close()
    
    
    def run():
        get_server()
    tcpserver

    根目录下:

    import os
    import sys
    
    
    path = os.path.dirname(__file__)
    sys.path.append(path)
    
    from tcp_server import tcpserver
    
    if __name__ == '__main__':
        tcpserver.run()
    start
  • 相关阅读:
    C# using
    Spring框架
    is
    pycharm破解197
    python安装197
    python3.7.0安装197
    centos7 minimal 安装mysql197
    centos7 minimal 安装 &网络配置197
    ruby安装卸载197
    redis安装 卸载 启动 关闭197
  • 原文地址:https://www.cnblogs.com/limengjie0104/p/9074194.html
Copyright © 2011-2022 走看看