zoukankan      html  css  js  c++  java
  • 仿优酷

    客户端

    项目目录结构

    client目录

        tcpClient.py---》客户端连接

    conf目录

        setting.py:存放配置信息

    core目录

        admin.py:管理员视图相关功能函数

        src.py    :主视图

        user.py :用户视图相关功能函数

    download_movie目录

        存放下载完的电影

    upload_movie目录

        存放要上传的电影

    lib目录

        common.py:存放公共方法

    start.py:启动文件

    各文件功能代码

    client.py

    复制代码
    import socket
    from conf import setting
    
    
    def client_conn():
        # 先建立连接
        client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        client.connect(setting.server_address)
        return client
    复制代码

    setting.py

    复制代码
    import os
    
    BASE_DIR = os.path.dirname(os.path.dirname(__file__))
    BASE_MOVIE_UP = os.path.join(BASE_DIR, 'upload_movie')
    BASE_MOVIE_DOWN = os.path.join(BASE_DIR, 'download_movie')
    
    server_address = ('127.0.0.1', 8087)
    复制代码

    admin.py

    复制代码
    from client import tcpClient
    from lib import common
    from conf import setting
    import os
    
    admin_data = {
        'session': None,
    }
    
    send_dic = {'type': None, 'user_type': 'admin', 'session': None}
    
    
    def admin_register(client):
        print('管理员注册')
        while True:
            name = input('请输入手机号>>:').strip()
            if name == 'q': break
            password = input('请输入密码>>').strip()
            conf_password = input('请确认密码>>:').strip()
            if password == conf_password:
                send_dic = {'type': 'register', 'user_type': 'admin', 'name': name, 'password': common.make_md5(password)}
                back_dic = common.send_data(client, send_dic, None)
                if back_dic['flag']:
                    print(back_dic['msg'])
                    break
                else:
                    print(back_dic['msg'])
    
            else:
                print('两次密码不一致')
    
    
    def admin_login(client):
        print('管理员登录')
        while True:
            name = input('用户名>>').strip()
            if name == 'q': break
            password = input('密码>>').strip()
            send_dic = {'type': 'login', 'user_type': 'admin', 'name': name, 'password': common.make_md5(password)}
    
            back_dic = common.send_data(client, send_dic, None)
            if back_dic['flag']:
                admin_data['session'] = back_dic['session']
                print(back_dic['msg'])
                break
            else:
                print(back_dic['msg'])
    
    
    def upload_movie(client):
        if not admin_data['session']:
            print('请先登录')
            return
        print('上传视频')
        while True:
            up_list = common.get_allfile_by_path(setting.BASE_MOVIE_UP)
            if not up_list:
                print('暂无能上传的影片')
                break
            for i, m in enumerate(up_list):
                print('%s : %s' % (i, m))
            choose = input('请选择要上传的影片').strip()
            if choose == 'q': break
            if choose.isdigit():
                choose = int(choose)
                # 先把md5值传上去校验一下文件是否存在,在决定要不要上传
                if choose >=len(up_list):
                    print('请输入范围内的数字')
                    continue
                movie_path = os.path.join(setting.BASE_MOVIE_UP, up_list[choose])
                file_md5 = common.get_bigfile_md5(movie_path)
                send_dic={'type':'check_movie','session':admin_data['session'],'file_md5':file_md5}
                back_dic=common.send_data(client,send_dic,None)
                if back_dic['flag']:
                    is_free = input('是否免费(y/n)>>:').strip()
                    if is_free == 'y':
                        is_free = 1
                    else:
                        is_free = 0
                    fsize = os.path.getsize(movie_path)
                    send_dic = {'type': 'upload', 'user_type': 'admin',
                                'session': admin_data['session'],
                                'file_name': up_list[choose], 'file_size': fsize, 'file_md5':common.get_bigfile_md5(movie_path),'is_free': is_free}
                    back_dic = common.send_data(client, send_dic, movie_path)
                    if back_dic['flag']:
                        print(back_dic['msg'])
                        break
                    else:
                        print(back_dic['msg'])
                else:
                    print(back_dic['msg'])
            else:
                print('请输入数字')
    
    
    def delete_movie(client):
        '''
        1 先拿到视频列表,打印
        2 根据视频前数字,选择要删除的视频
        3 删除成功/失败,打印
        '''
        if not admin_data['session']:
            print('请先登录')
            return
        print('删除视频')
        while True:
            send_dic['type'] = 'get_movie_list'
            send_dic['session'] = admin_data['session']
            send_dic['movie_type'] = 'all'
            back_dic = common.send_data(client, send_dic, None)
            if back_dic['flag']:
                for i, mo in enumerate(back_dic['movie_list']):
                    print('%s : %s--%s' % (i, mo[0], mo[1]))
                choose = input('请输入要删除的电影(数字):').strip()
                if choose == 'q': break
                if choose.isdigit():
                    choose = int(choose)
                    if choose >= len(back_dic['movie_list']):
                        print('请输入范围内的数字')
                        continue
                    send_dic['type'] = 'delete_movie'
                    # 回忆后台返回的数据是什么样的
                    send_dic['movie_id'] = back_dic['movie_list'][choose][2]
                    back_dic = common.send_data(client, send_dic, None)
                    if back_dic['flag']:
                        print(back_dic['msg'])
                        break
                    else:
                        print(back_dic['msg'])
    
                else:
                    print('请输入数字')
    
            else:
                print(back_dic['msg'])
                break
    
    
    def release_notice(client):
        if not admin_data['session']:
            print('请先登录')
            return
        print('发布公告')
        while True:
            notice_name = input('请输入公告标题:').strip()
            notice_content = input('请输入公告内容:').strip()
            if notice_name == 'q': break
            send_dic['type'] = 'release_notice'
            send_dic['session'] = admin_data['session']
            send_dic['notice_name'] = notice_name
            send_dic['notice_content'] = notice_content
            back_dic = common.send_data(client, send_dic, None)
            if back_dic['flag']:
                print(back_dic['msg'])
                break
            else:
                print(back_dic['msg'])
    
    
    fun_dic = {
        '1': admin_register,
        '2': admin_login,
        '3': upload_movie,
        '4': delete_movie,
        '5': release_notice,
    }
    
    
    def admin_view():
        client = tcpClient.client_conn()
        while True:
            print('''
            1 注册
            2 登录
            3 上传视频
            4 删除视频
            5 发布公告
            ''')
    
            choose = input('please choose>>:').strip()
            if 'q' == choose: break
            if choose not in fun_dic: continue
            fun_dic[choose](client)
        client.close()
    复制代码

    src.py

    from core import admin, user
    
    fun_dic = {
        '1': admin.admin_view,
        '2': user.user_view
    }
    
    
    def run():
        while True:
            print('''
            1 管理员视图
            2 用户视图
            ''')
            choose = input('please choose>>:').strip()
            if 'q' == choose: break
            if choose not in fun_dic: continue
            fun_dic[choose]()
    主视图

    user.py

    from client import tcpClient
    from lib import common
    from conf import setting
    import os
    import time
    
    user_data = {
        'session': None,
        'is_vip': None
    }
    
    send_dic = {'type': None, 'user_type': 'user', 'session': None}
    
    
    def user_register(client):
        print('用户注册')
        while True:
            name = input('请输入手机号>>:').strip()
            password = input('请输入密码>>').strip()
            conf_password = input('请确认密码>>:').strip()
            if password == conf_password:
                send_dic = {'type': 'register', 'user_type': 'user', 'name': name, 'password': common.make_md5(password)}
                back_dic = common.send_data(client, send_dic, None)
                if back_dic['flag']:
                    print(back_dic['msg'])
                    break
                else:
                    print(back_dic['msg'])
    
            else:
                print('两次密码不一致')
    
    
    def user_login(client):
        print('用户登录')
        while True:
            name = input('用户名>>').strip()
            if name == 'q': break
            password = input('密码>>').strip()
            send_dic = {'type': 'login', 'user_type': 'user', 'name': name, 'password': common.make_md5(password)}
    
            back_dic = common.send_data(client, send_dic, None)
            if back_dic['flag']:
                user_data['session'] = back_dic['session']
                user_data['is_vip'] = back_dic['is_vip']
                print(back_dic['msg'])
                print(back_dic['last_notice'])
                break
            else:
                print(back_dic['msg'])
    
    
    def buy_member(client):
        print('购买会员')
        if not user_data['session']:
            print('请先登录')
            return
        if user_data['is_vip'] == 1:
            print('您已经是会员了')
            return
        while True:
            buy = input('是否购买会员(y/n)q 退出').strip()
            if 'y' == buy:
                send_dic['type'] = 'buy_member'
                send_dic['session'] = user_data['session']
                back_dic = common.send_data(client, send_dic, None)
                if back_dic['flag']:
                    user_data['is_vip'] = 1
                    print(back_dic['msg'])
                    break
                else:
                    print(back_dic['msg'])
    
            elif 'q' == buy:
                break
            else:
                print('您没有购买')
    
    
    def get_movie_list(client):
        if not user_data['session']:
            print('请先登录')
            return
        print('查看视频列表')
        send_dic['type'] = 'get_movie_list'
        send_dic['movie_type'] = 'all'
        send_dic['session'] = user_data['session']
        back_dic = common.send_data(client, send_dic, None)
        if back_dic['flag']:
            for i, mo in enumerate(back_dic['movie_list']):
                print('%s : %s-->%s' % (i, mo[0], mo[1]))
        else:
            print(back_dic['msg'])
    
    
    def down_free_movie(client):
        if not user_data['session']:
            print('请先登录')
            return
        print('下载免费视频')
        '''
        先查询免费视频,打印出来,
        用户选择后,通过电影id去后台下载
        '''
        send_dic['type'] = 'get_movie_list'
        send_dic['movie_type'] = 'free'
        send_dic['session'] = user_data['session']
        back_dic = common.send_data(client, send_dic, None)
        if back_dic['flag']:
            for i, mo in enumerate(back_dic['movie_list']):
                print('%s : %s-->%s' % (i, mo[0], mo[1]))
            choose = input('请输入要下载的电影(数字):').strip()
            if choose.isdigit():
                choose = int(choose)
                # 回忆后台返回的数据是什么样的
                send_dic['type'] = 'download_movie'
                send_dic['session'] = user_data['session']
                send_dic['movie_id'] = back_dic['movie_list'][choose][2]
                send_dic['movie_type'] = 'free'
                back_dic = common.send_data(client, send_dic, None)
                if back_dic['flag']:
                    if back_dic['wait_time'] > 0:
                        print('请等待 %s 秒' % back_dic['wait_time'])
                        time.sleep(back_dic['wait_time'])
                    recv_size = 0
                    print('----->', back_dic['filename'])
                    path = os.path.join(setting.BASE_MOVIE_DOWN, back_dic['filename'])
                    with open(path, 'wb') as f:
                        while recv_size < back_dic['filesize']:
                            recv_data = client.recv(1024)
                            f.write(recv_data)
                            recv_size += len(recv_data)
                            print('recvsize:%s filesize:%s' % (recv_size, back_dic['filesize']))
                    print('%s :下载成功' % back_dic['filename'])
                else:
                    print(back_dic['msg'])
        else:
            print(back_dic['msg'])
    
    
    def down_charge_movie(client):
        if not user_data['session']:
            print('请先登录')
            return
        print('下载收费视频')
        if user_data['is_vip']:
            charge = input('您是会员,收费5元(y 确认)').strip()
        else:
            charge = input('您不是会员,收费10元(y 确认)').strip()
        if not charge == 'y':  # 不是y,相当于没付钱,直接返回
            return
        send_dic['type'] = 'get_movie_list'
        send_dic['movie_type'] = 'charge'
        send_dic['session'] = user_data['session']
        back_dic = common.send_data(client, send_dic, None)
        if back_dic['flag']:
            for i, mo in enumerate(back_dic['movie_list']):
                print('%s : %s-->%s' % (i, mo[0], mo[1]))
            choose = input('请输入要下载的电影(数字):').strip()
            if choose.isdigit():
                choose = int(choose)
                # 回忆后台返回的数据是什么样的
                send_dic['type'] = 'download_movie'
                send_dic['session'] = user_data['session']
                send_dic['movie_id'] = back_dic['movie_list'][choose][2]
                send_dic['movie_type'] = 'charge'
                back_dic = common.send_data(client, send_dic, None)
                if back_dic['flag']:
                    recv_size = 0
                    print('----->', back_dic['filename'])
                    path = os.path.join(setting.BASE_MOVIE_DOWN, back_dic['filename'])
                    with open(path, 'wb') as f:
                        while recv_size < back_dic['filesize']:
                            recv_data = client.recv(1024)
                            f.write(recv_data)
                            recv_size += len(recv_data)
                            print('recvsize:%s filesize:%s' % (recv_size, back_dic['filesize']))
                    print('%s :下载成功' % back_dic['filename'])
                else:
                    print(back_dic['msg'])
        else:
            print(back_dic['msg'])
    
    
    def check_download_record(client):
        if not user_data['session']:
            print('请先登录')
            return
        print('查看观影记录')
        send_dic['type'] = 'check_download_record'
        send_dic['session'] = user_data['session']
        back_dic = common.send_data(client, send_dic, None)
        if back_dic['flag']:
            for re in back_dic['download_list']:
                print(re)
        else:
            print(back_dic['msg'])
    
    
    def check_notice(client):
        if not user_data['session']:
            print('请先登录')
            return
        print('查看公告')
        send_dic['type'] = 'check_notice'
        send_dic['session'] = user_data['session']
        back_dic = common.send_data(client, send_dic, None)
        if back_dic['flag']:
            for value in back_dic['notice_list']:
                print(value)
        else:
            print(back_dic['msg'])
    
    
    fun_dic = {
        '1': user_register,
        '2': user_login,
        '3': buy_member,
        '4': get_movie_list,
        '5': down_free_movie,
        '6': down_charge_movie,
        '7': check_download_record,
        '8': check_notice
    }
    
    
    def user_view():
        client = tcpClient.client_conn()
        while True:
            print('''
            1 注册
            2 登录
            3 冲会员
            4 查看视频
            5 下载免费视频
            6 下载收费视频
            7 查看观影记录
            8 查看公告
            ''')
    
            choose = input('please choose>>:').strip()
            if 'q' == choose: break
            if choose not in fun_dic: continue
            fun_dic[choose](client)
        client.close()
    用户视图

     

    common.py

    import struct
    import json
    import os
    import hashlib
    
    
    def send_data(client, send_dic, file):
        # 发送部分
        head_json_bytes = json.dumps(send_dic).encode('utf-8')  # 先把报头转为bytes格式
        client.send(struct.pack('i', len(head_json_bytes)))  # 先发报头的长度
        client.send(head_json_bytes)  # 再发送报头
        if file:  # 如果存在文件,再把文件打开一行一行发送
            with open(file, 'rb') as f:
                for line in f:
                    client.send(line)
        # 接收部分
        back_len_bytes = client.recv(4)  # 先收报头4个bytes,得到报头长度的字节格式
        back_head_len = struct.unpack('i', back_len_bytes)[0]  # 提取报头的长度
        head_bytes = client.recv(back_head_len)  # 按照报头长度back_head_len,收取报头的bytes格式
        header = json.loads(head_bytes.decode('utf-8'))  # 把bytes格式的报头,转换为json格式
    
        return header
    
    
    def get_allfile_by_path(path):
        file_list = os.listdir(path)
        return file_list
    
    
    def make_md5(password):
        md = hashlib.md5()
        md.update(password.encode('utf-8'))
        return md.hexdigest()
    
    
    def get_bigfile_md5(file_path):
        if os.path.exists(file_path):
            md = hashlib.md5()
            filesize = os.path.getsize(file_path)
            file_list = [0, filesize // 3, (filesize // 3) * 2, filesize - 10]
            with open(file_path, 'rb') as f:
                for line in file_list:
                    f.seek(line)
                    md.update(f.read(10))
            return md.hexdigest()
    公共方法

     

    start.py

    import os, sys
    
    path = os.path.dirname(__file__)
    sys.path.append(path)
    from core import src
    
    if __name__ == '__main__':
        src.run()
    启动文件

        

    服务端

    项目目录结构

          

    conf目录

        setting.py:配置信息相关

    db目录

        models.py:数据库表对应程序中的类

    interface目录

        admin_interface.py:管理员相关操作的接口

        common_interface.py:公共操作的相关接口(登录,注册)

        user_interface.py:用户相关操作的接口

    lib目录

        common.py:公共方法

    orm目录:

        fuckorm.py:单例版orm框架

        mysql_singleton.py:数据库连接类

    ormpool目录:

        db_pool.py:数据库链接池

        fuckorm_pool.py:连接池版orm框架

        mysql_pool.py:连接池版数据库连接类

    server目录:

        tcpServer.py:服务端核心代码

        use_data.py:存放用户信息,和全局锁

    movie_list目录:

        存放客户端上传上来的电影

    start.py:启动文件

     各文件功能代码

    setting.py

    import os
    
    host = '127.0.0.1'
    port = 3306
    user = 'root'
    password = '123456'
    database = 'youku2'
    charset = 'utf8'
    autocommit = True
    
    BASE_DIR = os.path.dirname(os.path.dirname(__file__))
    BASE_DB = os.path.join(BASE_DIR, 'db')
    BASE_MOVIE = os.path.join(BASE_DIR, 'movie')
    BASE_MOVIE_LIST = os.path.join(BASE_DIR, 'movie_list')
    server_address = ('127.0.0.1', 8087)
    配置信息

    models.py

    # 用单例版
    # from orm.fuckorm import Model, StringField, IntegerField
    
    
    # 用池版
    from ormpool.fuckorm_pool import Model, StringField, IntegerField
    
    
    class User(Model):
        table_name = 'userinfo'
        id = IntegerField('id', primary_key=True)
        name = StringField('name')
        password = StringField('password')
        locked = IntegerField('locked', default=0)
        is_vip= IntegerField('is_vip',default=0)
        user_type = StringField('user_type')
    
    
    class Movie(Model):
        table_name = 'movie'
        id = IntegerField('id', primary_key=True)
        name = StringField('name')
        path = StringField('path')
        is_free = IntegerField('is_free',default=1)
        is_delete = IntegerField('is_delete',default=0)
        create_time = StringField('create_time')
        user_id = IntegerField('user_id')
        file_md5=StringField('file_md5')
    
    
    class Notice(Model):
        table_name = 'notice'
        id = IntegerField('id', primary_key=True)
        name = StringField('name')
        content = StringField('content')
        user_id = IntegerField('user_id')
        create_time = StringField('create_time')
    
    
    class DownloadRecord(Model):
        table_name = 'download_record'
        id = IntegerField('id', primary_key=True)
        user_id = IntegerField('user_id')
        movie_id = IntegerField('movie_id')
    程序中的类与数据表对应

    admin_interface.py

    from conf import setting
    import os
    from db import models
    from lib import common
    @common.login_auth
    def upload_movie(user_dic, conn):
        '''
        上传视频功能
        :param user_dic:
        :param conn:
        :return:
        '''
        recv_size = 0
        print('----->', user_dic['file_name'])
        file_name = common.get_uuid(user_dic['file_name']) + user_dic['file_name']
        path = os.path.join(setting.BASE_MOVIE_LIST, file_name)
        with open(path, 'wb') as f:
            while recv_size < user_dic['file_size']:
                recv_data = conn.recv(1024)
                f.write(recv_data)
                recv_size += len(recv_data)
                # print('recvsize:%s filesize:%s' % (recv_size, user_dic['file_size']))
        print('%s :上传成功' % file_name)
        movie = models.Movie(name=file_name, path=path, is_free=user_dic['is_free'],
                             user_id=user_dic['user_id'], file_md5=user_dic['file_md5'])
        movie.save()
        back_dic = {'flag': True, 'msg': '上传成功'}
        common.send_back(back_dic,conn)
    @common.login_auth
    def delete_movie(user_dic,conn):
        '''
        删除视频,不是真正的删除,在视频表中的is_delete字段设为1
        :param user_dic:
        :param conn:
        :return:
        '''
        movie = models.Movie.select_one(id=user_dic['movie_id'])
        movie.is_delete = 1
        movie.update()
        back_dic = {'flag': True, 'msg': '电影删除成功'}
        common.send_back(back_dic, conn)
    
    @common.login_auth
    def release_notice(user_dic,conn):
        '''
        发布公告功能,取出字典中的公告名字,公告内容,用户id,存入数据库
        :param user_dic:
        :param conn:
        :return:
        '''
        notice = models.Notice(name=user_dic['notice_name'], content=user_dic['notice_content'],
                               user_id=user_dic['user_id'])
        notice.save()
        back_dic = {'flag': True, 'msg': '公告发布成功'}
        common.send_back(back_dic, conn)
    
    @common.login_auth
    def check_movie(user_dic,conn):
        '''
        通过md5校验数据中是否该电影已经存在了
        :param user_dic:
        :param conn:
        :return:
        '''
        movie = models.Movie.select_one(file_md5=user_dic['file_md5'])
        if movie:
            back_dic = {'flag': False, 'msg': '该电影已经存在'}
        else:
            back_dic = {'flag': True}
        common.send_back(back_dic,conn)
    管理员接口功能

    common_interface.py

    from db import models
    from lib import common
    from interface import user_interface
    from server import use_data as da
    
    
    def login(user_dic, conn):
        '''
        登录功能,登录成功,将用户信息以{"addr":[session,user_id]}的形式,放到内存中,
        多线程操作,必须加锁,锁需要在主线程中生成
        :param user_dic:
        :param conn:
        :return:
        '''
        user = models.User.select_one(name=user_dic['name'])
        if user:  # 用户存在
            if user.user_type == user_dic['user_type']:
                if user.password == user_dic['password']:
                    session = common.get_uuid(user_dic['name'])
                    da.mutex.acquire()
                    if user_dic['addr'] in da.alive_user:
                        # 如果当前的客户端已经登录,再次登录的时候,把原来的用户踢出,再重新加入进去
                        da.alive_user.pop(user_dic['addr'])
                    da.alive_user[user_dic['addr']] = [session, user.id]
                    da.mutex.release()
                    back_dic = {'flag': True, 'session': session, 'is_vip': user.is_vip, 'msg': 'login success'}
                    if user_dic['user_type'] == 'user':
                        last_notice = user_interface.check_notice_by_count(1)
                        back_dic['last_notice'] = last_notice
                else:
                    back_dic = {'flag': False, 'msg': 'password error'}
            else:
                back_dic = {'flag': False, 'msg': '登录类型不匹配'}
        else:
            back_dic = {'flag': False, 'msg': 'user do not exisit'}
        common.send_back(back_dic, conn)
    
    
    def register(user_dic, conn):
        '''
        注册功能
        :param user_dic:
        :param conn:
        :return:
        '''
        user = models.User.select_one(name=user_dic['name'])
        if user:  # 用户存在
            back_dic = {'flag': False, 'msg': 'user is exisit'}
        else:
            user = models.User(name=user_dic['name'], password=user_dic['password'], user_type=user_dic['user_type'])
            user.save()
            back_dic = {'flag': True, 'msg': 'register success'}
    
        common.send_back(back_dic, conn)
    公共接口

    user_interface.py

    # 注册(用手机号注册,密码用md5加密)
    # 登录(登录后显示最新一条公告)
    # 冲会员
    # 查看视频(即将所有视频循环打印出来)
    # 下载普通视频(非会员下载视频需要等30s广告,会员下载无需等待)
    # 下载收费视频(非会员下载需要10元,会员下载需要5元)
    # 查看观影记录(就是查看自己下载过的视频)
    # 查看公告(包括历史公告)
    from db import models
    import os
    from lib import common
    
    
    @common.login_auth
    def buy_member(user_dic, conn):
        '''
        购买会员功能,直接将is_vip字段设为1
        :param user_dic:
        :param conn:
        :return:
        '''
        user = models.User.select_one(id=user_dic['user_id'])
        user.is_vip = 1
        user.update()
        back_dic = {'flag': True, 'msg': 'buy success'}
        common.send_back(back_dic, conn)
    
    
    @common.login_auth
    def get_movie_list(user_dic, conn):
        '''
        获取视频列表:取出全部视频,过滤掉删除的视频,根据前台传来的查询条件,把电影放到列表里
        :param user_dic:
        :param conn:
        :return:
        '''
        back_dic = {}
        movie_list = models.Movie.select_all()
        back_movie_list = []
        if movie_list:  # 不为空,继续查询,为空直接返回false
            for movie in movie_list:
                if not movie.is_delete:
                    # 拼成一个列表['电影名字','收费/免费','电影id']
                    if user_dic['movie_type'] == 'all':
                        # 全部
                        back_movie_list.append([movie.name, '免费' if movie.is_free else '收费', movie.id])
                    elif user_dic['movie_type'] == 'free':
                        # 免费电影
                        if movie.is_free:  # 免费的才往列表里放
                            back_movie_list.append([movie.name, '免费', movie.id])
                    else:
                        # 收费电影
                        if not movie.is_free:  # 收费的才往列表里放
                            back_movie_list.append([movie.name, '收费', movie.id])
    
            if back_movie_list:
                back_dic = {'flag': True, 'movie_list': back_movie_list}
            else:
                back_dic = {'flag': False, 'msg': '暂无可查看影片'}
        else:
            back_dic = {'flag': False, 'msg': '暂无影片'}
        common.send_back(back_dic, conn)
    
    
    @common.login_auth
    def download_movie(user_dic, conn):
        movie = models.Movie.select_one(id=user_dic['movie_id'])
        if not movie:  # 电影不存在,返回false
            back_dic = {'flag': False, 'msg': '该电影不存在'}
            common.send_back(back_dic, conn)
            return
        user = models.User.select_one(id=user_dic['user_id'])
        send_back_dic = {'flag': True}
        if user_dic['movie_type'] == 'free':  # 下载免费电影,非会员需要等待;下载收费电影,不需要等待了直接下
            if user.is_vip:
                send_back_dic['wait_time'] = 0
            else:
                send_back_dic['wait_time'] = 30
    
        send_back_dic['filename'] = movie.name
        send_back_dic['filesize'] = os.path.getsize(movie.path)
        # 把下载记录保存到记录表中
        down_record = models.DownloadRecord(user_id=user_dic['user_id'], movie_id=movie.id)
        down_record.save()
        common.send_back(send_back_dic, conn)
        with open(movie.path, 'rb')as f:
            for line in f:
                conn.send(line)
    
    
    @common.login_auth
    def check_notice(user_dic, conn):
        '''
        查看公告功能
        :param user_dic:
        :param conn:
        :return:
        '''
        # 直接调用通过条数查询的接口,传入None表示全查
        notice_list = check_notice_by_count(count=None)
        if notice_list:
            back_dic={'flag': True, 'notice_list': notice_list}
        else:
            back_dic={'flag': False, 'msg': '暂无公告'}
    
        common.send_back(back_dic, conn)
    
    
    def check_notice_by_count(count=None):
        '''
        查看功能的方法,供内部调用
        count 为None,查全部,为1 查一条
        :param count:
        :return:
        '''
        notice_list = models.Notice.select_all()
        back_notice_list = []
        if notice_list:  # 不为空,继续查询,为空直接返回false
            if not count:
                for notice in notice_list:
                    back_notice_list.append({notice.name: notice.content})
            else:  # 查一条
                notice_list=sorted(notice_list,key=lambda notice:notice.create_time)
                last_row=len(notice_list)-1
                back_notice_list.append({notice_list[last_row].name: notice_list[last_row].content})
            return back_notice_list
        else:
            return False
    
    
    @common.login_auth
    def check_download_record(user_dic, conn):
        '''
        查看下载记录:
        先通过user_id到DownloadRecord表中查到下载的每一条记录,
        通过每一条记录中的电影id再去电影表查询电影,取出名字,返回
        :param user_dic:
        :return:
        '''
        download_record = models.DownloadRecord.select_all(user_id=user_dic['user_id'])
        if not download_record:
            back_dic = {'flag': False, 'msg': '暂无观影记录'}
            common.send_back(back_dic, conn)
        else:
            download_list = []
            for record in download_record:
                movie = models.Movie.select_one(id=record.movie_id)
                download_list.append(movie.name)
            back_dic = {'flag': True, 'msg': 'buy success', 'download_list': download_list}
            common.send_back(back_dic, conn)
    用户接口

    common.py

    import hashlib
    import os
    import time
    import json
    import struct
    
    def login_auth(func):
        def wrapper(*args, **kwargs):
            from server import use_data as mu
            for value in mu.alive_user.values():
                if value[0] == args[0]['session']:
                    args[0]['user_id'] = value[1]
                    break
            if not args[0].get('user_id', None):
                send_back({'flag': False, 'msg': '您没有登录'}, args[1])
            else:
                return func(*args, **kwargs)
    
        return wrapper
    
    
    def get_uuid(name):
        md = hashlib.md5()
        md.update(name.encode('utf-8'))
        md.update(str(time.clock()).encode('utf-8'))
        return md.hexdigest()
    
    
    def get_time():
        now_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
        return now_time
    
    
    def get_colck_time():
        return str(time.clock())
    
    
    def get_bigfile_md5(file_path):
        if os.path.exists(file_path):
            md = hashlib.md5()
            filesize = os.path.getsize(file_path)
            file_list = [0, filesize // 3, (filesize // 3) * 2, filesize - 10]
            with open(file_path, 'rb') as f:
                for line in file_list:
                    f.seek(line)
                    md.update(f.read(10))
            return md.hexdigest()
    
    
    def send_back(back_dic, conn):
    
        head_json_bytes = json.dumps(back_dic).encode('utf-8')
        conn.send(struct.pack('i', len(head_json_bytes)))  # 先发报头的长度
        conn.send(head_json_bytes)
    公共方法

    funcorm.py

    from orm import mysql_singleton
    
    
    class Field(object):
        def __init__(self, name, column_type, primary_key, default):
            self.name = name  # 列名
            self.column_type = column_type  # 数据类型
            self.primary_key = primary_key  # 是否为主键
            self.default = default  # 默认值
    
    
    class StringField(Field):
        def __init__(self, name=None, column_type='varchar(100)', primary_key=False, default=None):
            super().__init__(name, column_type, primary_key, default)
    
    
    class IntegerField(Field):
        def __init__(self, name=None, primary_key=False, default=0):
            super().__init__(name, 'int', primary_key, default)
    
    
    class ModelMetaclass(type):
        def __new__(cls, name, bases, attrs):
            if name == "Model":
                return type.__new__(cls, name, bases, attrs)
            table_name = attrs.get('table_name', None)
            if not table_name:
                raise TypeError('没有表名')
            primary_key = None  # 查找primary_key字段
    
            # 保存列类型的对象
            mappings = dict()
            for k, v in attrs.items():
                # 是列名的就保存下来
                if isinstance(v, Field):
                    mappings[k] = v
                    if v.primary_key:
                        # 找到主键:
                        if primary_key:
                            raise TypeError('主键重复: %s' % k)
                        primary_key = k
    
            for k in mappings.keys():
                attrs.pop(k)
            if not primary_key:
                raise TypeError('没有主键')
    
            # 给cls增加一些字段:
            attrs['mapping'] = mappings
            attrs['primary_key'] = primary_key
            attrs['table_name'] = table_name
            return type.__new__(cls, name, bases, attrs)
    
    
    class Model(dict, metaclass=ModelMetaclass):
        def __init__(self, **kw):
            super(Model, self).__init__(**kw)
    
        def __getattr__(self, key):  # .访问属性触发
            try:
                return self[key]
            except KeyError:
                raise AttributeError('没有属性:%s' % key)
    
        def __setattr__(self, key, value):
            self[key] = value
    
        @classmethod
        def select_all(cls, **kwargs):
            ms = mysql_singleton.Mysql().singleton()
            if kwargs:  # 当有参数传入的时候
                key = list(kwargs.keys())[0]
                value = kwargs[key]
                sql = "select * from %s where %s=?" % (cls.table_name, key)
                sql = sql.replace('?', '%s')
                re = ms.select(sql, value)
            else:  # 当无参传入的时候查询所有
                sql = "select * from %s" % cls.table_name
                re = ms.select(sql)
            return [cls(**r) for r in re]
    
        @classmethod
        def select_one(cls, **kwargs):
            # 此处只支持单一条件查询
            key = list(kwargs.keys())[0]
            value = kwargs[key]
            ms = mysql_singleton.Mysql().singleton()
            sql = "select * from %s where %s=?" % (cls.table_name, key)
    
            sql = sql.replace('?', '%s')
            re = ms.select(sql, value)
            if re:
                return cls(**re[0])
            else:
                return None
    
        def save(self):
            ms = mysql_singleton.Mysql().singleton()
            fields = []
            params = []
            args = []
            for k, v in self.mapping.items():
                fields.append(v.name)
                params.append('?')
                args.append(getattr(self, k, v.default))
            sql = "insert into %s (%s) values (%s)" % (self.table_name, ','.join(fields), ','.join(params))
            sql = sql.replace('?', '%s')
            ms.execute(sql, args)
    
        def update(self):
            ms = mysql_singleton.Mysql().singleton()
            fields = []
            args = []
            pr = None
            for k, v in self.mapping.items():
                if v.primary_key:
                    pr = getattr(self, k, v.default)
                else:
                    fields.append(v.name + '=?')
                    args.append(getattr(self, k, v.default))
            sql = "update %s set %s where %s = %s" % (
                self.table_name, ', '.join(fields), self.primary_key, pr)
    
            sql = sql.replace('?', '%s')
            print(sql)
            ms.execute(sql, args)
    单例版orm

    mysql_singleton.py

    from conf import setting
    import pymysql
    
    
    class Mysql:
        __instance = None
        def __init__(self):
            self.conn = pymysql.connect(host=setting.host,
                                        user=setting.user,
                                        password=setting.password,
                                        database=setting.database,
                                        charset=setting.charset,
                                        autocommit=setting.autocommit)
            self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
    
        def close_db(self):
            self.conn.close()
    
        def select(self, sql, args=None):
            self.cursor.execute(sql, args)
            rs = self.cursor.fetchall()
            return rs
    
        def execute(self, sql, args):
            try:
                self.cursor.execute(sql, args)
                affected = self.cursor.rowcount
                # self.conn.commit()
            except BaseException as e:
                print(e)
            return affected
    
        @classmethod
        def singleton(cls):
            if not cls.__instance:
                cls.__instance = cls()
            return cls.__instance
    数据库链接类

    db_pool.py

    import pymysql
    from conf import setting
    from DBUtils.PooledDB import PooledDB
    
    POOL = PooledDB(
        creator=pymysql,  # 使用链接数据库的模块
        maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
        mincached=6,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
        maxcached=5,  # 链接池中最多闲置的链接,0和None不限制
        maxshared=3,
        # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
        blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
        maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
        setsession=[],  # 开始会话前执行的命令列表。
        ping=0,
        # ping MySQL服务端,检查是否服务可用。
        host=setting.host,
        port=setting.port,
        user=setting.user,
        password=setting.password,
        database=setting.database,
        charset=setting.charset,
        autocommit=setting.autocommit
    )
    数据库连接池

    funcorm_pool.py

    from ormpool import mysql_pool
    
    
    class Field(object):
        def __init__(self, name, column_type, primary_key, default):
            self.name = name  # 列名
            self.column_type = column_type  # 数据类型
            self.primary_key = primary_key  # 是否为主键
            self.default = default  # 默认值
    
    
    class StringField(Field):
        def __init__(self, name=None, ddl='varchar(100)', primary_key=False, default=None):
            super().__init__(name, ddl, primary_key, default)
    
    
    class IntegerField(Field):
        def __init__(self, name=None, primary_key=False, default=0):
            super().__init__(name, 'int', primary_key, default)
    
    
    class ModelMetaclass(type):
        def __new__(cls, name, bases, attrs):
            if name == "Model":
                return type.__new__(cls, name, bases, attrs)
            table_name = attrs.get('table_name', None)
            if not table_name:
                raise TypeError('没有表名')
            primary_key = None  # 查找primary_key字段
    
            # 保存列类型的对象
            mappings = dict()
            for k, v in attrs.items():
                # 是列名的就保存下来
                if isinstance(v, Field):
                    mappings[k] = v
                    if v.primary_key:
                        # 找到主键:
                        if primary_key:
                            raise TypeError('主键重复: %s' % k)
                        primary_key = k
    
            for k in mappings.keys():
                attrs.pop(k)
            if not primary_key:
                raise TypeError('没有主键')
    
            # 给cls增加一些字段:
            attrs['mapping'] = mappings
            attrs['primary_key'] = primary_key
            attrs['table_name'] = table_name
            return type.__new__(cls, name, bases, attrs)
    
    
    class Model(dict, metaclass=ModelMetaclass):
        def __init__(self, **kw):
            super(Model, self).__init__(**kw)
    
        def __getattr__(self, key):  # .访问属性触发
            try:
                return self[key]
            except KeyError:
                raise AttributeError('没有属性:%s' % key)
    
        def __setattr__(self, key, value):
            self[key] = value
    
        @classmethod
        def select_all(cls, **kwargs):
            ms = mysql_pool.MysqlPool()
            if kwargs:  # 当有参数传入的时候
                key = list(kwargs.keys())[0]
                value = kwargs[key]
                sql = "select * from %s where %s=?" % (cls.table_name, key)
                sql = sql.replace('?', '%s')
                re = ms.select(sql, value)
            else:  # 当无参传入的时候查询所有
                sql = "select * from %s" % cls.table_name
                re = ms.select(sql)
            return [cls(**r) for r in re]
    
        @classmethod
        def select_one(cls, **kwargs):
            # 此处只支持单一条件查询
            key = list(kwargs.keys())[0]
            value = kwargs[key]
            ms = mysql_pool.MysqlPool()
            sql = "select * from %s where %s=?" % (cls.table_name, key)
    
            sql = sql.replace('?', '%s')
            re = ms.select(sql, value)
            if re:
                return cls(**re[0])
            else:
                return None
    
        def save(self):
            ms = mysql_pool.MysqlPool()
            fields = []
            params = []
            args = []
            for k, v in self.mapping.items():
                fields.append(v.name)
                params.append('?')
                args.append(getattr(self, k, v.default))
            sql = "insert into %s (%s) values (%s)" % (self.table_name, ','.join(fields), ','.join(params))
            sql = sql.replace('?', '%s')
            ms.execute(sql, args)
    
        def update(self):
            ms = mysql_pool.MysqlPool()
            fields = []
            args = []
            pr = None
            for k, v in self.mapping.items():
                if v.primary_key:
                    pr = getattr(self, k, None)
                else:
                    fields.append(v.name + '=?')
                    args.append(getattr(self, k, v.default))
            sql = "update %s set %s where %s = %s" % (
                self.table_name, ', '.join(fields), self.primary_key, pr)
    
            sql = sql.replace('?', '%s')
            print(sql)
            ms.execute(sql, args)
    连接池版orm

    mysql_pool.py

    import pymysql
    from ormpool import db_pool
    from threading import current_thread
    
    
    class MysqlPool:
        def __init__(self):
            self.conn = db_pool.POOL.connection()
            # print(db_pool.POOL)
            # print(current_thread().getName(), '拿到连接', self.conn)
            # print(current_thread().getName(), '池子里目前有', db_pool.POOL._idle_cache, '
    ')
            self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
    
        def close_db(self):
            self.cursor.close()
            self.conn.close()
    
        def select(self, sql, args=None):
            self.cursor.execute(sql, args)
            rs = self.cursor.fetchall()
            return rs
    
        def execute(self, sql, args):
    
            try:
                self.cursor.execute(sql, args)
                affected = self.cursor.rowcount
                # self.conn.commit()
            except BaseException as e:
                print(e)
            finally:
                self.close_db()
            return affected
    池版数据库链接

    tcpServer.py

    import json
    import socket
    import struct
    from concurrent.futures import ThreadPoolExecutor
    from threading import Lock
    from threading import current_thread
    
    from conf import setting
    from interface import common_interface, admin_interface, user_interface
    from server import use_data
    
    server_pool = ThreadPoolExecutor(10)
    mutex = Lock()
    use_data.mutex = mutex
    dispatch_dic = {
        'login': common_interface.login,
        'register': common_interface.register,
        'upload': admin_interface.upload_movie,
        'delete_movie': admin_interface.delete_movie,
        'download_movie': user_interface.download_movie,
        'upload': admin_interface.upload_movie,
        'release_notice': admin_interface.release_notice,
        'buy_member': user_interface.buy_member,
        'get_movie_list': user_interface.get_movie_list,
        'check_notice': user_interface.check_notice,
        'check_download_record': user_interface.check_download_record,
        'check_movie': admin_interface.check_movie
    }
    
    
    def working(conn, addr):
        print(current_thread().getName())
        while True:
            try:
                head_struct = conn.recv(4)
                if not head_struct: break
                head_len = struct.unpack('i', head_struct)[0]
                head_json = conn.recv(head_len).decode('utf-8')
                head_dic = json.loads(head_json)
                # 分发之前,先判断是不是伪造
                head_dic['addr'] = str(addr)
    
                dispatch(head_dic, conn)
    
            except Exception as e:
                print('错误信息:', e)
                conn.close()
                # 把服务器保存的用户信息清掉
                mutex.acquire()
                if str(addr) in use_data.alive_user:
                    use_data.alive_user.pop(str(addr))
                # print('***********end*************%s'%len(login_user_data.alive_user))
                mutex.release()
    
                print('客户端:%s :断开链接' % str(addr))
                break
    
    
    def dispatch(head_dic, conn):
        if head_dic['type'] not in dispatch_dic:
            back_dic = {'flag': False, 'msg': '请求不存在'}
            send_back(back_dic, conn)
        else:
            dispatch_dic[head_dic['type']](head_dic, conn)
    
    
    def send_back(back_dic, conn):
        head_json_bytes = json.dumps(back_dic).encode('utf-8')
        conn.send(struct.pack('i', len(head_json_bytes)))  # 先发报头的长度
        conn.send(head_json_bytes)
    
    
    def server_run():
        socket_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        socket_server.bind(setting.server_address)
        socket_server.listen(5)
    
        while True:
            conn, addr = socket_server.accept()
            print('客户端:%s 链接成功' % str(addr))
            server_pool.submit(working, conn, addr)
    
        socket_server.close()
    服务端核心功能

    use_data.py

    alive_user = {}
    mutex = None
    登录用户和全局锁

    start.py

    import os, sys
    
    path = os.path.dirname(__file__)
    sys.path.append(path)
    from server import tcpServer
    
    if __name__ == '__main__':
        tcpServer.server_run()
    启动文件
  • 相关阅读:
    Flume将A服务器上的日志采集到B服务器上展示
    flume1.8的安装及环境配置
    springboot实现控制层返回二维码,扫描后打开PDF文件
    Eureka添加security验证后客户端无法注册报错
    Elasticsearch系统学习(八)-partial update
    Elasticsearch系统学习(七)-ES并发控制
    Elasticsearch系统学习(六)-document元数据及基本操作
    Elasticsearch系统学习(五)-分布式架构及shard容错原理
    Elasticsearch系统学习(四)-简单查询及聚合函数
    Elasticsearch系统学习(三)-document基本操作
  • 原文地址:https://www.cnblogs.com/bubu99/p/14774475.html
Copyright © 2011-2022 走看看