zoukankan      html  css  js  c++  java
  • youku服务端

    文件结构

    config

    import os
    
    
    IP_PORT = ('127.0.0.1',8080)
    BACKLOG = 5
    
    BASE_DIR = os.path.dirname(os.path.dirname(__file__))
    BASE_MOVIE_LIST = os.path.join(BASE_DIR,'movie_list')
    setting

    db

    from orm_pool.orm import Models,StringField,IntegerField
    
    
    class User(Models):
        table_name = 'userinfo'
        id = IntegerField('id',primary_key=True)
        name = StringField('name')
        password = StringField('password')
        is_vip = IntegerField('is_vip')
        locked = IntegerField('locked')
        user_type = StringField('user_type')
    
    
    class Notice(Models):
        table_name = 'notice'
        id = IntegerField('id',primary_key=True)
        name = StringField('name')
        content = StringField('content')
        create_time = StringField('create_time')
        user_id = IntegerField('user_id')
    
    
    class Movie(Models):
        table_name = 'movie'
        id = IntegerField('id',primary_key=True)
        name = StringField('name')
        path = StringField('path')
        is_free = IntegerField('is_free')
        is_delete = IntegerField('is_delete')
        create_time = StringField('create_time')
        user_id = IntegerField('user_id')
        file_md5 = StringField('file_md5')
    
    
    class Download_Record(Models):
        table_name = 'download_record'
        id = IntegerField('id', primary_key=True)
        user_id = IntegerField('user_id')
        movie_id = IntegerField('movie_id')
    models

    interface

    import os
    from db import models
    from lib import common
    from conf import setting
    
    
    @common.login_auth
    def check_upload_movie_exist(msg_dic,conn):
        movie = models.Movie.select_one(file_md5=msg_dic['file_md5'])
        if movie:
            back_dic = {'flag':False,'msg':'该影片已存在'}
        else:
            back_dic = {'flag':True,'msg':'影片可上传'}
        common.send_back(back_dic,conn)
    
    
    @common.login_auth
    def upload_movie(msg_dic,conn):
        movie_name = common.get_file_uuid(msg_dic['file_name']) + msg_dic['file_name']
        movie_path = os.path.join(setting.BASE_MOVIE_LIST,movie_name)
        recv_size = 0
        file_size = msg_dic['file_size']
        with open(movie_path,'wb') as f:
            while recv_size < file_size:
                recv_data = conn.recv(1024)
                if not recv_data:
                    back_dic = {'flag':False,'msg':'服务器收到数据为空'}
                    common.send_back(back_dic, conn)
                    break
                f.write(recv_data)
                recv_size += len(recv_data)
                percent = recv_size / file_size
                common.print_progress(percent)
    
        movie = models.Movie(
            name=movie_name,
            path=movie_path,
            is_free=msg_dic['is_free'],
            is_delete=0,
            create_time=str(common.get_current_time()),
            user_id=msg_dic['user_id'],
            file_md5=msg_dic['file_md5']
        )
        movie.save()
        back_dic = {'flag':True,'msg':'上传成功'}
        common.send_back(back_dic,conn)
    
    
    @common.login_auth
    def delete_movie(msg_dic,conn):
        movie = models.Movie.select_one(id=msg_dic['del_movie_id'])
        if movie:
            movie.is_delete = 1
            movie.user_id = msg_dic['user_id']
            movie.update()
            back_dic = {'flag':True,'msg':'删除成功'}
        else:
            back_dic = {'flag':False,'msg':'无此影片'}
        common.send_back(back_dic,conn)
    
    
    @common.login_auth
    def release_notice(msg_dic,conn):
        name = msg_dic['name']
        content = msg_dic['content']
        user_id = msg_dic['user_id']
        create_time = str(common.get_current_time())
        notice = models.Notice(
            name=name,
            content=content,
            user_id=user_id,
            create_time=create_time
        )
        notice.save()
        back_dic = {'flag':True,'msg':'发布公告成功'}
        common.send_back(back_dic,conn)
    admin_interface
    from db import models
    from lib import common
    from tcp_server import online_user
    from interface import user_interface
    
    
    def register(msg_dic,conn):
        user = models.User.select_one(name=msg_dic['name'])
        if user:
            back_dic = {'flag':False,'msg':'用户名已存在'}
        else:
            user = models.User(
                name=msg_dic['name'],
                password=msg_dic['password'],
                is_vip=0,
                locked=0,
                user_type=msg_dic['user_type']
            )
            user.save()
            back_dic = {'flag':True,'msg':'用户注册成功'}
        common.send_back(back_dic,conn)
    
    
    def login(msg_dic,conn):
        user = models.User.select_one(name=msg_dic['name'])
        # 用户存在
        if user:
            # 密码一致
            if msg_dic['password'] == user.password:
                # 类型一致
                if msg_dic['user_type'] == user.user_type:
                    # 登录成功,服务器记录状态,并发送session给客户端cookie赋值
                    session = common.get_uuid(msg_dic['name']) + msg_dic['name']
                    online_user.mutex.acquire()
                    online_user.alive_user[msg_dic['addr']] = [session,user.id]
                    online_user.mutex.release()
                    back_dic = {'flag':True,'session':session,'msg':'登录成功'}
    
                    # 若登录用户为普通用户,则在客户端打印最近一条公告
                    if msg_dic['user_type'] == 'user':
                        back_notice = user_interface.check_notice_by_count(1)
                        if back_notice:
                            back_dic = {'flag':True,'back_notice':back_notice,'is_vip':user.is_vip,'session':session,'msg':'登录成功'}
                        else:
                            back_dic = {'flag': True, 'back_notice': '暂无公告内容', 'is_vip': user.is_vip, 'session': session,
                                        'msg': '登录成功'}
                else:
                    back_dic = {'flag': False, 'msg': '用户登录类型不一致'}
            else:
                back_dic = {'flag': False, 'msg': '密码不一致'}
        else:
            back_dic = {'flag':False,'msg':'用户不存在'}
        common.send_back(back_dic,conn)
    
    
    def check_moive_list(msg_dic,conn):
        movie_list = models.Movie.select_many()
        back_movie_list = []
        if movie_list:
            for movie in movie_list:
                # 视频未删除
                if not movie.is_delete:
                    if msg_dic['movie_type'] == 'all':
                        back_movie_list.append([movie.id,movie.name,'免费' if movie.is_free else '收费'])
                    elif msg_dic['movie_type'] == 'free':
                        if movie.is_free:
                            back_movie_list.append([movie.id, movie.name, '免费'])
                    else:
                        if not movie.is_free:
                            back_movie_list.append([movie.id, movie.name, '收费'])
            if back_movie_list:
                back_dic = {'flag':True,'msg':'查询成功','back_movie_list':back_movie_list}
            else:
                back_dic = {'flag': False, 'msg': '查询失败,视频数据为空'}
        else:
            back_dic = {'flag':False,'msg':'服务器上无视频数据或视频已被删除'}
        common.send_back(back_dic,conn)
    common_interface
    import os
    from db import models
    from lib import common
    
    
    @common.login_auth
    def check_notice(msg_dic,conn):
        notice_list = check_notice_by_count()
        back_dic = {'flag':True,'msg':'历史公告信息如下:','notice_list':notice_list}
        common.send_back(back_dic,conn)
    
    
    # 内部借口使用功能
    def check_notice_by_count(count=None):
        notice_list = models.Notice.select_many()
        if notice_list:
            back_notice = []
            if not count:
                for notice in notice_list:
                    back_notice.append({notice.name: notice.content})
            else:
                notice_list = sorted(notice_list,key=lambda notice:notice.create_time)
                current_notice = notice_list[len(notice_list)-1]
                back_notice.append({current_notice.name:current_notice.content})
            return back_notice
        else:
            return False
    
    
    @common.login_auth
    def charge_vip(msg_dic,conn):
        user = models.User.select_one(id=msg_dic['user_id'])
        user.is_vip = 1
        user.update()
        back_dic = {'flag':True,'msg':'充值成功'}
        common.send_back(back_dic,conn)
    
    
    @common.login_auth
    def download_free_movie(msg_dic,conn):
        movie = models.Movie.select_one(id=msg_dic['movie_id'])
        if movie:
            user = models.User.select_one(id=msg_dic['user_id'])
            if user.is_vip:
                wait_time = 0
            else:
                wait_time = 5
    
            send_size = 0
            total_size = os.path.getsize(movie.path)
            back_dic = {'flag': True, 'wait_time': wait_time,'file_size':total_size,'movie_id':movie.id}
            common.send_back(back_dic, conn)
    
            with open(movie.path,'rb') as f:
                while send_size < total_size:
                    send_data = f.readline()
                    conn.send(send_data)
                    send_size += len(send_data)
                    percent = send_size / total_size
                    common.print_progress(percent)
    
            download_record = models.Download_Record(
                user_id=msg_dic['user_id'],
                movie_id=movie.id
            )
            download_record.save()
    
    
        else:
            back_dic = {'flag':False,'msg':'影片不存在'}
            common.send_back(back_dic, conn)
    
    
    @common.login_auth
    def download_charge_movie(msg_dic,conn):
        movie = models.Movie.select_one(id=msg_dic['movie_id'])
        if movie:
            user = models.User.select_one(id=msg_dic['user_id'])
            if user.is_vip:
                cost_money = 5
            else:
                cost_money = 10
    
            send_size = 0
            total_size = os.path.getsize(movie.path)
            back_dic = {'flag': True, 'cost_money': cost_money,'file_size':total_size,'movie_id':movie.id}
            common.send_back(back_dic, conn)
    
            with open(movie.path,'rb') as f:
                while send_size < total_size:
                    send_data = f.readline()
                    conn.send(send_data)
                    send_size += len(send_data)
                    percent = send_size / total_size
                    common.print_progress(percent)
    
            download_record = models.Download_Record(
                user_id=msg_dic['user_id'],
                movie_id=movie.id
            )
            download_record.save()
    
    
        else:
            back_dic = {'flag':False,'msg':'影片不存在'}
            common.send_back(back_dic, conn)
    
    
    
    @common.login_auth
    def check_download_record(msg_dic,conn):
        download_record_list = models.Download_Record.select_many(user_id=msg_dic['user_id'])
        back_download_record_list = []
        if download_record_list:
            for record in download_record_list:
                movie_id = record.movie_id
                movie = models.Movie.select_one(id=movie_id)
                back_download_record_list.append('影片信息:%s,%s' % (movie.name,movie.create_time))
            back_dic = {'flag':True,'msg':'查看下载记录成功:','back_download_record_list':back_download_record_list}
    
        else:
            back_dic = {'flag':False,'msg':'暂无下载记录'}
        common.send_back(back_dic,conn)
    user_interface

    lib

    import json
    import struct
    import hashlib
    import time
    from tcp_server import online_user
    
    
    def login_auth(func):
        def wrapper(*args,**kwargs):
            # args = (msg_dic,conn) 元祖
            for value_list in online_user.alive_user.values():
                # value_list = [session,user.id]
                if args[0]['session'] == value_list[0]:
                    user_id = value_list[1]
                    args[0]['user_id'] = user_id
                    break
            user_id = args[0].get('user_id',None)
            if user_id:
                func(*args,**kwargs)
            else:
                back_dic = {'flag': False, 'msg': '非授权用户'}
                send_back(back_dic, args[1])
        return wrapper
    
    
    def send_back(back_dic,conn):
        back_dic_json = json.dumps(back_dic)
        back_dic_bytes = back_dic_json.encode('utf-8')
        back_dic_head = struct.pack('i',len(back_dic_bytes))
        conn.send(back_dic_head)
        conn.send(back_dic_bytes)
    
    
    # session,file_name中使用
    def get_uuid(name):
        md = hashlib.md5()
        md.update(str(time.time()).encode('utf-8'))
        md.update(name.encode('utf-8'))
        return md.hexdigest()
    
    
    
    def get_current_time():
        return time.strftime('%Y-%m-%d %X',time.localtime(time.time()))
    
    
    
    def print_progress(percent):
        width = 70
        sign = ''
        if percent > 1:
            percent = 1
        print_str = '[%%-%ds]' % width % (int(percent * width) * sign)
        print('
    %s %d%%' % (print_str,int(percent * 100)),end='')
    
    
    
    def get_file_uuid(file_name):
        md = hashlib.md5()
        md.update('密码加盐'.encode('utf-8'))
        md.update(file_name.encode('utf-8'))
        return md.hexdigest()
    
    if __name__ == '__main__':
        percent = 0
        while percent < 1:
            percent += 0.1
            print_progress(percent)
    common

    orm_pool

    import pymysql
    from DBUtils.PooledDB import PooledDB
    
    POOL = PooledDB(
        creator=pymysql,
        maxconnections=6,
        mincached=2,
        maxcached=5,
        maxshared=3,
        maxusage=None,
        setsession=[],
        blocking=True,
        ping=0,
        host='127.0.0.1',
        port=3306,
        user='root',
        password='123456',
        database='youku',
        charset='utf8',
        autocommit=True
    )
    DBUtils_interface
    import pymysql
    from orm_pool import DBUtils_interface
    
    class Mysql_pool_interface:
        def __init__(self):
            self.conn = DBUtils_interface.POOL.connection()
            self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
    
        def select(self,sql,args=None):
            self.cursor.execute(sql,args)
            res = self.cursor.fetchall()
            return res
    
        def execute(self,sql,args):
            try:
                self.cursor.execute(sql,args)
                affected = self.cursor.rowcount
            except Exception as e:
                print(e)
            return affected
    
    if __name__ == '__main__':
        ms = Mysql_pool_interface()
        res = ms.select('select * from userinfo where id = %s',1)
        print(res)
    mysql_pool
    from orm_pool import mysql_pool
    
    class Field:
        def __init__(self, name, column_type, primary_key, default):
            self.name = name
            self.column_type = column_type
            self.primary_key = primary_key
            self.default = default
    
    
    class StringField(Field):
        def __init__(self, name=None, column_type='varchar(255)', primary_key=False, default=None):
            super().__init__(name, column_type, primary_key, default)
    
    
    class IntegerField(Field):
        def __init__(self, name=None, column_type='int', primary_key=False, default=None):
            super().__init__(name, column_type, primary_key, default)
    
    
    class ModelsMetaclass(type):
        def __new__(cls, name, bases, attrs):
            if name == 'Models':
                return type.__new__(cls, name, bases, attrs)
            table_name = attrs.get('table_name', None)
            if not table_name:
                table_name = name
            primary_key = None
            mappings = dict()
            for k, v in attrs.items():
                if isinstance(v, Field):
                    mappings[k] = v
                    if v.primary_key:
                        if primary_key:
                            raise KeyError('重复主键:%s' % k)
                        primary_key = k
            for k in mappings.keys():
                attrs.pop(k)
            if not primary_key:
                raise KeyError('无主键')
            attrs['table_name'] = table_name
            attrs['primary_key'] = primary_key
            attrs['mappings'] = mappings
            return type.__new__(cls,name,bases,attrs)
    
    
    class Models(dict,metaclass=ModelsMetaclass):
        def __init__(self,**kwargs):
            super().__init__(**kwargs)
    
        def __getattr__(self, item):
            try:
                return self[item]
            except KeyError:
                raise AttributeError('无此属性')
    
        def __setattr__(self, key, value):
            self[key] = value
    
        @classmethod
        def select_one(cls,**kwargs):
            ms = mysql_pool.Mysql_pool_interface()
            key = list(kwargs.keys())[0]
            value = kwargs[key]
    
            # select * from userinfo where id = ?
            sql = 'select * from %s where %s = ?' % (cls.table_name,key)
            sql = sql.replace('?','%s')
            res = ms.select(sql,value)
            if res:
                return cls(**res[0])
            else:
                return
    
        @classmethod
        def select_many(cls,**kwargs):
            ms = mysql_pool.Mysql_pool_interface()
    
            # select * from userinfo where id = ?
            # select * from userinfo
            if kwargs:
                key = list(kwargs.keys())[0]
                value = kwargs[key]
                sql = 'select * from %s where %s = ?' % (cls.table_name,key)
                sql = sql.replace('?','%s')
                res = ms.select(sql,value)
            else:
                sql = 'select * from %s' % cls.table_name
                res = ms.select(sql)
            obj_list = [cls(**r) for r in res]
            return obj_list
    
        def update(self):
            ms = mysql_pool.Mysql_pool_interface()
    
            # update userinfo set name=?,password=? where id = %s
            field_data = []
            pr = None
            args = []
            for k,v in self.mappings.items():
                if v.primary_key:
                    pr = getattr(self,v.name,v.default)
                field_data.append(v.name + '=?')
                args.append(getattr(self,v.name,v.default))
            sql = 'update %s set %s where %s = %s' % (self.table_name,','.join(field_data),self.primary_key,pr)
            sql = sql.replace('?','%s')
            ms.execute(sql,args)
    
    
        def save(self):
            ms = mysql_pool.Mysql_pool_interface()
    
            # insert into %s (name,password) values (?,?)
            field_data = []
            value_data = []
            args = []
            for k,v in self.mappings.items():
                if not v.primary_key:
                    # 避免对主键的赋值要求
                    field_data.append(v.name)
                    value_data.append('?')
                    args.append(getattr(self,v.name,v.default))
            sql = 'insert into %s (%s) values (%s)' % (self.table_name,','.join(field_data),','.join(value_data))
            sql = sql.replace('?','%s')
            ms.execute(sql,args)
    
    
    class User(Models):
        table_name = 'userinfo'
        id = IntegerField('id',primary_key=True)
        name = StringField('name')
        password = StringField('password')
        is_vip = IntegerField('is_vip')
        locked = IntegerField('locked')
        user_type = StringField('user_type')
    
    
    if __name__ == '__main__':
        user = User.select_one(id=1)
        user.name = 'lmj'
        user.update()
        print(user)
    
        user2 = User(name='cly',password='123',is_vip=0,locked=0,user_type='user')
        user2.save()
    orm

    tcp_server

    # {addr:[session,user_id}
    alive_user = {}
    mutex = None
    online_user
    import socket
    import struct
    import json
    from concurrent.futures import ThreadPoolExecutor
    from threading import Lock
    from tcp_server import online_user
    from conf import setting
    from lib import common
    from interface import admin_interface,common_interface,user_interface
    
    
    
    pool = ThreadPoolExecutor()
    mutex = Lock()
    online_user.mutex = mutex
    
    
    # 功能任务字典
    func_dic = {
        'register':common_interface.register,
        'login':common_interface.login,
        'check_upload_movie_exist':admin_interface.check_upload_movie_exist,
        'upload_movie':admin_interface.upload_movie,
        'check_moive_list':common_interface.check_moive_list,
        'delete_movie':admin_interface.delete_movie,
        'release_notice':admin_interface.release_notice,
        'charge_vip':user_interface.charge_vip,
        'download_free_movie':user_interface.download_free_movie,
        'check_download_record':user_interface.check_download_record,
        'check_notice':user_interface.check_notice,
        'download_charge_movie':user_interface.download_charge_movie
    }
    
    
    # 分发功能模块
    def dispatch(msg_dic,conn):
        if msg_dic['func_type'] in func_dic:
            func_dic[msg_dic['func_type']](msg_dic,conn)
        else:
            back_dic = {'flag':False,'msg':'非法操作'}
            common.send_back(back_dic,conn)
    
    
    # 循环通信
    def communication(conn,addr):
        while True:
            try:
                msg_dic_head = conn.recv(4)
                if not msg_dic_head:
                    online_user.mutex.acquire()
                    if str(addr) in online_user.alive_user:
                        online_user.alive_user.pop(str(addr))
                    online_user.mutex.release()
                    conn.close()
                    print(addr, '33[31m 断开 33[0m')
                    break
                msg_dic_len = struct.unpack('i',msg_dic_head)[0]
                msg_dic_bytes = conn.recv(msg_dic_len)
                msg_dic = json.loads(msg_dic_bytes.decode('utf-8'))
                msg_dic['addr'] = addr
    
                # 分发功能任务
                dispatch(msg_dic,conn)
    
            except Exception as e:
                print('错误信息:%s' % e)
                online_user.mutex.acquire()
                if str(addr) in online_user.alive_user:
                    online_user.alive_user.pop(str(addr))
                online_user.mutex.release()
                conn.close()
                print(addr,'33[31m 断开 33[0m')
                break
    
    
    def get_server():
        server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
        server.bind(setting.IP_PORT)
        server.listen(setting.BACKLOG)
        print('33[32m 服务器激活 33[0m')
    
        # 循环链接
        while True:
            conn,addr = server.accept()
            print(addr,'33[32m 连接 33[0m')
            pool.submit(communication,conn,addr)
        server.close()
    
    
    def run():
        get_server()
    tcpserver

    根目录下:

    import os
    import sys
    
    
    path = os.path.dirname(__file__)
    sys.path.append(path)
    
    from tcp_server import tcpserver
    
    if __name__ == '__main__':
        tcpserver.run()
    start
  • 相关阅读:
    Power BI 根据用户权限动态生成导航跳转目标
    Power BI Tooltips 增强功能
    Power BI refresh error “could not load file or assembly…provided impersonation level is invalid”
    SQL 错误代码 18456
    如何使用SQL Server Integration Services从多个Excel文件读取数据
    通过表格编辑器将现有表引入Power BI数据流
    Power BI 中动态增长的柱状图
    ambari2.7.3离线安装hdp3.1.0时,ambari-hdp-1.repo中baseurl无值
    ambari 安装 cannot download file mysql-connector-java from http://8080/resource/mysql-connector-java.jar
    洛谷P4180 [BJWC2010]严格次小生成树
  • 原文地址:https://www.cnblogs.com/limengjie0104/p/9074194.html
Copyright © 2011-2022 走看看