客户端
项目目录结构
client目录
tcpClient.py---》客户端连接
conf目录
setting.py:存放配置信息
core目录
admin.py:管理员视图相关功能函数
src.py :主视图
user.py :用户视图相关功能函数
download_movie目录
存放下载完的电影
upload_movie目录
存放要上传的电影
lib目录
common.py:存放公共方法
start.py:启动文件
各文件功能代码
client.py
import socket from conf import setting def client_conn(): # 先建立连接 client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.connect(setting.server_address) return client
setting.py
import os BASE_DIR = os.path.dirname(os.path.dirname(__file__)) BASE_MOVIE_UP = os.path.join(BASE_DIR, 'upload_movie') BASE_MOVIE_DOWN = os.path.join(BASE_DIR, 'download_movie') server_address = ('127.0.0.1', 8087)
admin.py
from client import tcpClient from lib import common from conf import setting import os admin_data = { 'session': None, } send_dic = {'type': None, 'user_type': 'admin', 'session': None} def admin_register(client): print('管理员注册') while True: name = input('请输入手机号>>:').strip() if name == 'q': break password = input('请输入密码>>').strip() conf_password = input('请确认密码>>:').strip() if password == conf_password: send_dic = {'type': 'register', 'user_type': 'admin', 'name': name, 'password': common.make_md5(password)} back_dic = common.send_data(client, send_dic, None) if back_dic['flag']: print(back_dic['msg']) break else: print(back_dic['msg']) else: print('两次密码不一致') def admin_login(client): print('管理员登录') while True: name = input('用户名>>').strip() if name == 'q': break password = input('密码>>').strip() send_dic = {'type': 'login', 'user_type': 'admin', 'name': name, 'password': common.make_md5(password)} back_dic = common.send_data(client, send_dic, None) if back_dic['flag']: admin_data['session'] = back_dic['session'] print(back_dic['msg']) break else: print(back_dic['msg']) def upload_movie(client): if not admin_data['session']: print('请先登录') return print('上传视频') while True: up_list = common.get_allfile_by_path(setting.BASE_MOVIE_UP) if not up_list: print('暂无能上传的影片') break for i, m in enumerate(up_list): print('%s : %s' % (i, m)) choose = input('请选择要上传的影片').strip() if choose == 'q': break if choose.isdigit(): choose = int(choose) # 先把md5值传上去校验一下文件是否存在,在决定要不要上传 if choose >=len(up_list): print('请输入范围内的数字') continue movie_path = os.path.join(setting.BASE_MOVIE_UP, up_list[choose]) file_md5 = common.get_bigfile_md5(movie_path) send_dic={'type':'check_movie','session':admin_data['session'],'file_md5':file_md5} back_dic=common.send_data(client,send_dic,None) if back_dic['flag']: is_free = input('是否免费(y/n)>>:').strip() if is_free == 'y': is_free = 1 else: is_free = 0 fsize = os.path.getsize(movie_path) send_dic = {'type': 'upload', 'user_type': 'admin', 'session': admin_data['session'], 'file_name': up_list[choose], 'file_size': fsize, 'file_md5':common.get_bigfile_md5(movie_path),'is_free': is_free} back_dic = common.send_data(client, send_dic, movie_path) if back_dic['flag']: print(back_dic['msg']) break else: print(back_dic['msg']) else: print(back_dic['msg']) else: print('请输入数字') def delete_movie(client): ''' 1 先拿到视频列表,打印 2 根据视频前数字,选择要删除的视频 3 删除成功/失败,打印 ''' if not admin_data['session']: print('请先登录') return print('删除视频') while True: send_dic['type'] = 'get_movie_list' send_dic['session'] = admin_data['session'] send_dic['movie_type'] = 'all' back_dic = common.send_data(client, send_dic, None) if back_dic['flag']: for i, mo in enumerate(back_dic['movie_list']): print('%s : %s--%s' % (i, mo[0], mo[1])) choose = input('请输入要删除的电影(数字):').strip() if choose == 'q': break if choose.isdigit(): choose = int(choose) if choose >= len(back_dic['movie_list']): print('请输入范围内的数字') continue send_dic['type'] = 'delete_movie' # 回忆后台返回的数据是什么样的 send_dic['movie_id'] = back_dic['movie_list'][choose][2] back_dic = common.send_data(client, send_dic, None) if back_dic['flag']: print(back_dic['msg']) break else: print(back_dic['msg']) else: print('请输入数字') else: print(back_dic['msg']) break def release_notice(client): if not admin_data['session']: print('请先登录') return print('发布公告') while True: notice_name = input('请输入公告标题:').strip() notice_content = input('请输入公告内容:').strip() if notice_name == 'q': break send_dic['type'] = 'release_notice' send_dic['session'] = admin_data['session'] send_dic['notice_name'] = notice_name send_dic['notice_content'] = notice_content back_dic = common.send_data(client, send_dic, None) if back_dic['flag']: print(back_dic['msg']) break else: print(back_dic['msg']) fun_dic = { '1': admin_register, '2': admin_login, '3': upload_movie, '4': delete_movie, '5': release_notice, } def admin_view(): client = tcpClient.client_conn() while True: print(''' 1 注册 2 登录 3 上传视频 4 删除视频 5 发布公告 ''') choose = input('please choose>>:').strip() if 'q' == choose: break if choose not in fun_dic: continue fun_dic[choose](client) client.close()
src.py
from core import admin, user fun_dic = { '1': admin.admin_view, '2': user.user_view } def run(): while True: print(''' 1 管理员视图 2 用户视图 ''') choose = input('please choose>>:').strip() if 'q' == choose: break if choose not in fun_dic: continue fun_dic[choose]()
user.py
from client import tcpClient from lib import common from conf import setting import os import time user_data = { 'session': None, 'is_vip': None } send_dic = {'type': None, 'user_type': 'user', 'session': None} def user_register(client): print('用户注册') while True: name = input('请输入手机号>>:').strip() password = input('请输入密码>>').strip() conf_password = input('请确认密码>>:').strip() if password == conf_password: send_dic = {'type': 'register', 'user_type': 'user', 'name': name, 'password': common.make_md5(password)} back_dic = common.send_data(client, send_dic, None) if back_dic['flag']: print(back_dic['msg']) break else: print(back_dic['msg']) else: print('两次密码不一致') def user_login(client): print('用户登录') while True: name = input('用户名>>').strip() if name == 'q': break password = input('密码>>').strip() send_dic = {'type': 'login', 'user_type': 'user', 'name': name, 'password': common.make_md5(password)} back_dic = common.send_data(client, send_dic, None) if back_dic['flag']: user_data['session'] = back_dic['session'] user_data['is_vip'] = back_dic['is_vip'] print(back_dic['msg']) print(back_dic['last_notice']) break else: print(back_dic['msg']) def buy_member(client): print('购买会员') if not user_data['session']: print('请先登录') return if user_data['is_vip'] == 1: print('您已经是会员了') return while True: buy = input('是否购买会员(y/n)q 退出').strip() if 'y' == buy: send_dic['type'] = 'buy_member' send_dic['session'] = user_data['session'] back_dic = common.send_data(client, send_dic, None) if back_dic['flag']: user_data['is_vip'] = 1 print(back_dic['msg']) break else: print(back_dic['msg']) elif 'q' == buy: break else: print('您没有购买') def get_movie_list(client): if not user_data['session']: print('请先登录') return print('查看视频列表') send_dic['type'] = 'get_movie_list' send_dic['movie_type'] = 'all' send_dic['session'] = user_data['session'] back_dic = common.send_data(client, send_dic, None) if back_dic['flag']: for i, mo in enumerate(back_dic['movie_list']): print('%s : %s-->%s' % (i, mo[0], mo[1])) else: print(back_dic['msg']) def down_free_movie(client): if not user_data['session']: print('请先登录') return print('下载免费视频') ''' 先查询免费视频,打印出来, 用户选择后,通过电影id去后台下载 ''' send_dic['type'] = 'get_movie_list' send_dic['movie_type'] = 'free' send_dic['session'] = user_data['session'] back_dic = common.send_data(client, send_dic, None) if back_dic['flag']: for i, mo in enumerate(back_dic['movie_list']): print('%s : %s-->%s' % (i, mo[0], mo[1])) choose = input('请输入要下载的电影(数字):').strip() if choose.isdigit(): choose = int(choose) # 回忆后台返回的数据是什么样的 send_dic['type'] = 'download_movie' send_dic['session'] = user_data['session'] send_dic['movie_id'] = back_dic['movie_list'][choose][2] send_dic['movie_type'] = 'free' back_dic = common.send_data(client, send_dic, None) if back_dic['flag']: if back_dic['wait_time'] > 0: print('请等待 %s 秒' % back_dic['wait_time']) time.sleep(back_dic['wait_time']) recv_size = 0 print('----->', back_dic['filename']) path = os.path.join(setting.BASE_MOVIE_DOWN, back_dic['filename']) with open(path, 'wb') as f: while recv_size < back_dic['filesize']: recv_data = client.recv(1024) f.write(recv_data) recv_size += len(recv_data) print('recvsize:%s filesize:%s' % (recv_size, back_dic['filesize'])) print('%s :下载成功' % back_dic['filename']) else: print(back_dic['msg']) else: print(back_dic['msg']) def down_charge_movie(client): if not user_data['session']: print('请先登录') return print('下载收费视频') if user_data['is_vip']: charge = input('您是会员,收费5元(y 确认)').strip() else: charge = input('您不是会员,收费10元(y 确认)').strip() if not charge == 'y': # 不是y,相当于没付钱,直接返回 return send_dic['type'] = 'get_movie_list' send_dic['movie_type'] = 'charge' send_dic['session'] = user_data['session'] back_dic = common.send_data(client, send_dic, None) if back_dic['flag']: for i, mo in enumerate(back_dic['movie_list']): print('%s : %s-->%s' % (i, mo[0], mo[1])) choose = input('请输入要下载的电影(数字):').strip() if choose.isdigit(): choose = int(choose) # 回忆后台返回的数据是什么样的 send_dic['type'] = 'download_movie' send_dic['session'] = user_data['session'] send_dic['movie_id'] = back_dic['movie_list'][choose][2] send_dic['movie_type'] = 'charge' back_dic = common.send_data(client, send_dic, None) if back_dic['flag']: recv_size = 0 print('----->', back_dic['filename']) path = os.path.join(setting.BASE_MOVIE_DOWN, back_dic['filename']) with open(path, 'wb') as f: while recv_size < back_dic['filesize']: recv_data = client.recv(1024) f.write(recv_data) recv_size += len(recv_data) print('recvsize:%s filesize:%s' % (recv_size, back_dic['filesize'])) print('%s :下载成功' % back_dic['filename']) else: print(back_dic['msg']) else: print(back_dic['msg']) def check_download_record(client): if not user_data['session']: print('请先登录') return print('查看观影记录') send_dic['type'] = 'check_download_record' send_dic['session'] = user_data['session'] back_dic = common.send_data(client, send_dic, None) if back_dic['flag']: for re in back_dic['download_list']: print(re) else: print(back_dic['msg']) def check_notice(client): if not user_data['session']: print('请先登录') return print('查看公告') send_dic['type'] = 'check_notice' send_dic['session'] = user_data['session'] back_dic = common.send_data(client, send_dic, None) if back_dic['flag']: for value in back_dic['notice_list']: print(value) else: print(back_dic['msg']) fun_dic = { '1': user_register, '2': user_login, '3': buy_member, '4': get_movie_list, '5': down_free_movie, '6': down_charge_movie, '7': check_download_record, '8': check_notice } def user_view(): client = tcpClient.client_conn() while True: print(''' 1 注册 2 登录 3 冲会员 4 查看视频 5 下载免费视频 6 下载收费视频 7 查看观影记录 8 查看公告 ''') choose = input('please choose>>:').strip() if 'q' == choose: break if choose not in fun_dic: continue fun_dic[choose](client) client.close()
common.py
import struct import json import os import hashlib def send_data(client, send_dic, file): # 发送部分 head_json_bytes = json.dumps(send_dic).encode('utf-8') # 先把报头转为bytes格式 client.send(struct.pack('i', len(head_json_bytes))) # 先发报头的长度 client.send(head_json_bytes) # 再发送报头 if file: # 如果存在文件,再把文件打开一行一行发送 with open(file, 'rb') as f: for line in f: client.send(line) # 接收部分 back_len_bytes = client.recv(4) # 先收报头4个bytes,得到报头长度的字节格式 back_head_len = struct.unpack('i', back_len_bytes)[0] # 提取报头的长度 head_bytes = client.recv(back_head_len) # 按照报头长度back_head_len,收取报头的bytes格式 header = json.loads(head_bytes.decode('utf-8')) # 把bytes格式的报头,转换为json格式 return header def get_allfile_by_path(path): file_list = os.listdir(path) return file_list def make_md5(password): md = hashlib.md5() md.update(password.encode('utf-8')) return md.hexdigest() def get_bigfile_md5(file_path): if os.path.exists(file_path): md = hashlib.md5() filesize = os.path.getsize(file_path) file_list = [0, filesize // 3, (filesize // 3) * 2, filesize - 10] with open(file_path, 'rb') as f: for line in file_list: f.seek(line) md.update(f.read(10)) return md.hexdigest()
start.py
import os, sys path = os.path.dirname(__file__) sys.path.append(path) from core import src if __name__ == '__main__': src.run()
服务端
项目目录结构
conf目录
setting.py:配置信息相关
db目录
models.py:数据库表对应程序中的类
interface目录
admin_interface.py:管理员相关操作的接口
common_interface.py:公共操作的相关接口(登录,注册)
user_interface.py:用户相关操作的接口
lib目录
common.py:公共方法
orm目录:
fuckorm.py:单例版orm框架
mysql_singleton.py:数据库连接类
ormpool目录:
db_pool.py:数据库链接池
fuckorm_pool.py:连接池版orm框架
mysql_pool.py:连接池版数据库连接类
server目录:
tcpServer.py:服务端核心代码
use_data.py:存放用户信息,和全局锁
movie_list目录:
存放客户端上传上来的电影
start.py:启动文件
各文件功能代码
setting.py
import os host = '127.0.0.1' port = 3306 user = 'root' password = '123456' database = 'youku2' charset = 'utf8' autocommit = True BASE_DIR = os.path.dirname(os.path.dirname(__file__)) BASE_DB = os.path.join(BASE_DIR, 'db') BASE_MOVIE = os.path.join(BASE_DIR, 'movie') BASE_MOVIE_LIST = os.path.join(BASE_DIR, 'movie_list') server_address = ('127.0.0.1', 8087)
models.py
# 用单例版 # from orm.fuckorm import Model, StringField, IntegerField # 用池版 from ormpool.fuckorm_pool import Model, StringField, IntegerField class User(Model): table_name = 'userinfo' id = IntegerField('id', primary_key=True) name = StringField('name') password = StringField('password') locked = IntegerField('locked', default=0) is_vip= IntegerField('is_vip',default=0) user_type = StringField('user_type') class Movie(Model): table_name = 'movie' id = IntegerField('id', primary_key=True) name = StringField('name') path = StringField('path') is_free = IntegerField('is_free',default=1) is_delete = IntegerField('is_delete',default=0) create_time = StringField('create_time') user_id = IntegerField('user_id') file_md5=StringField('file_md5') class Notice(Model): table_name = 'notice' id = IntegerField('id', primary_key=True) name = StringField('name') content = StringField('content') user_id = IntegerField('user_id') create_time = StringField('create_time') class DownloadRecord(Model): table_name = 'download_record' id = IntegerField('id', primary_key=True) user_id = IntegerField('user_id') movie_id = IntegerField('movie_id')
admin_interface.py
from conf import setting import os from db import models from lib import common @common.login_auth def upload_movie(user_dic, conn): ''' 上传视频功能 :param user_dic: :param conn: :return: ''' recv_size = 0 print('----->', user_dic['file_name']) file_name = common.get_uuid(user_dic['file_name']) + user_dic['file_name'] path = os.path.join(setting.BASE_MOVIE_LIST, file_name) with open(path, 'wb') as f: while recv_size < user_dic['file_size']: recv_data = conn.recv(1024) f.write(recv_data) recv_size += len(recv_data) # print('recvsize:%s filesize:%s' % (recv_size, user_dic['file_size'])) print('%s :上传成功' % file_name) movie = models.Movie(name=file_name, path=path, is_free=user_dic['is_free'], user_id=user_dic['user_id'], file_md5=user_dic['file_md5']) movie.save() back_dic = {'flag': True, 'msg': '上传成功'} common.send_back(back_dic,conn) @common.login_auth def delete_movie(user_dic,conn): ''' 删除视频,不是真正的删除,在视频表中的is_delete字段设为1 :param user_dic: :param conn: :return: ''' movie = models.Movie.select_one(id=user_dic['movie_id']) movie.is_delete = 1 movie.update() back_dic = {'flag': True, 'msg': '电影删除成功'} common.send_back(back_dic, conn) @common.login_auth def release_notice(user_dic,conn): ''' 发布公告功能,取出字典中的公告名字,公告内容,用户id,存入数据库 :param user_dic: :param conn: :return: ''' notice = models.Notice(name=user_dic['notice_name'], content=user_dic['notice_content'], user_id=user_dic['user_id']) notice.save() back_dic = {'flag': True, 'msg': '公告发布成功'} common.send_back(back_dic, conn) @common.login_auth def check_movie(user_dic,conn): ''' 通过md5校验数据中是否该电影已经存在了 :param user_dic: :param conn: :return: ''' movie = models.Movie.select_one(file_md5=user_dic['file_md5']) if movie: back_dic = {'flag': False, 'msg': '该电影已经存在'} else: back_dic = {'flag': True} common.send_back(back_dic,conn)
common_interface.py
from db import models from lib import common from interface import user_interface from server import use_data as da def login(user_dic, conn): ''' 登录功能,登录成功,将用户信息以{"addr":[session,user_id]}的形式,放到内存中, 多线程操作,必须加锁,锁需要在主线程中生成 :param user_dic: :param conn: :return: ''' user = models.User.select_one(name=user_dic['name']) if user: # 用户存在 if user.user_type == user_dic['user_type']: if user.password == user_dic['password']: session = common.get_uuid(user_dic['name']) da.mutex.acquire() if user_dic['addr'] in da.alive_user: # 如果当前的客户端已经登录,再次登录的时候,把原来的用户踢出,再重新加入进去 da.alive_user.pop(user_dic['addr']) da.alive_user[user_dic['addr']] = [session, user.id] da.mutex.release() back_dic = {'flag': True, 'session': session, 'is_vip': user.is_vip, 'msg': 'login success'} if user_dic['user_type'] == 'user': last_notice = user_interface.check_notice_by_count(1) back_dic['last_notice'] = last_notice else: back_dic = {'flag': False, 'msg': 'password error'} else: back_dic = {'flag': False, 'msg': '登录类型不匹配'} else: back_dic = {'flag': False, 'msg': 'user do not exisit'} common.send_back(back_dic, conn) def register(user_dic, conn): ''' 注册功能 :param user_dic: :param conn: :return: ''' user = models.User.select_one(name=user_dic['name']) if user: # 用户存在 back_dic = {'flag': False, 'msg': 'user is exisit'} else: user = models.User(name=user_dic['name'], password=user_dic['password'], user_type=user_dic['user_type']) user.save() back_dic = {'flag': True, 'msg': 'register success'} common.send_back(back_dic, conn)
user_interface.py
# 注册(用手机号注册,密码用md5加密) # 登录(登录后显示最新一条公告) # 冲会员 # 查看视频(即将所有视频循环打印出来) # 下载普通视频(非会员下载视频需要等30s广告,会员下载无需等待) # 下载收费视频(非会员下载需要10元,会员下载需要5元) # 查看观影记录(就是查看自己下载过的视频) # 查看公告(包括历史公告) from db import models import os from lib import common @common.login_auth def buy_member(user_dic, conn): ''' 购买会员功能,直接将is_vip字段设为1 :param user_dic: :param conn: :return: ''' user = models.User.select_one(id=user_dic['user_id']) user.is_vip = 1 user.update() back_dic = {'flag': True, 'msg': 'buy success'} common.send_back(back_dic, conn) @common.login_auth def get_movie_list(user_dic, conn): ''' 获取视频列表:取出全部视频,过滤掉删除的视频,根据前台传来的查询条件,把电影放到列表里 :param user_dic: :param conn: :return: ''' back_dic = {} movie_list = models.Movie.select_all() back_movie_list = [] if movie_list: # 不为空,继续查询,为空直接返回false for movie in movie_list: if not movie.is_delete: # 拼成一个列表['电影名字','收费/免费','电影id'] if user_dic['movie_type'] == 'all': # 全部 back_movie_list.append([movie.name, '免费' if movie.is_free else '收费', movie.id]) elif user_dic['movie_type'] == 'free': # 免费电影 if movie.is_free: # 免费的才往列表里放 back_movie_list.append([movie.name, '免费', movie.id]) else: # 收费电影 if not movie.is_free: # 收费的才往列表里放 back_movie_list.append([movie.name, '收费', movie.id]) if back_movie_list: back_dic = {'flag': True, 'movie_list': back_movie_list} else: back_dic = {'flag': False, 'msg': '暂无可查看影片'} else: back_dic = {'flag': False, 'msg': '暂无影片'} common.send_back(back_dic, conn) @common.login_auth def download_movie(user_dic, conn): movie = models.Movie.select_one(id=user_dic['movie_id']) if not movie: # 电影不存在,返回false back_dic = {'flag': False, 'msg': '该电影不存在'} common.send_back(back_dic, conn) return user = models.User.select_one(id=user_dic['user_id']) send_back_dic = {'flag': True} if user_dic['movie_type'] == 'free': # 下载免费电影,非会员需要等待;下载收费电影,不需要等待了直接下 if user.is_vip: send_back_dic['wait_time'] = 0 else: send_back_dic['wait_time'] = 30 send_back_dic['filename'] = movie.name send_back_dic['filesize'] = os.path.getsize(movie.path) # 把下载记录保存到记录表中 down_record = models.DownloadRecord(user_id=user_dic['user_id'], movie_id=movie.id) down_record.save() common.send_back(send_back_dic, conn) with open(movie.path, 'rb')as f: for line in f: conn.send(line) @common.login_auth def check_notice(user_dic, conn): ''' 查看公告功能 :param user_dic: :param conn: :return: ''' # 直接调用通过条数查询的接口,传入None表示全查 notice_list = check_notice_by_count(count=None) if notice_list: back_dic={'flag': True, 'notice_list': notice_list} else: back_dic={'flag': False, 'msg': '暂无公告'} common.send_back(back_dic, conn) def check_notice_by_count(count=None): ''' 查看功能的方法,供内部调用 count 为None,查全部,为1 查一条 :param count: :return: ''' notice_list = models.Notice.select_all() back_notice_list = [] if notice_list: # 不为空,继续查询,为空直接返回false if not count: for notice in notice_list: back_notice_list.append({notice.name: notice.content}) else: # 查一条 notice_list=sorted(notice_list,key=lambda notice:notice.create_time) last_row=len(notice_list)-1 back_notice_list.append({notice_list[last_row].name: notice_list[last_row].content}) return back_notice_list else: return False @common.login_auth def check_download_record(user_dic, conn): ''' 查看下载记录: 先通过user_id到DownloadRecord表中查到下载的每一条记录, 通过每一条记录中的电影id再去电影表查询电影,取出名字,返回 :param user_dic: :return: ''' download_record = models.DownloadRecord.select_all(user_id=user_dic['user_id']) if not download_record: back_dic = {'flag': False, 'msg': '暂无观影记录'} common.send_back(back_dic, conn) else: download_list = [] for record in download_record: movie = models.Movie.select_one(id=record.movie_id) download_list.append(movie.name) back_dic = {'flag': True, 'msg': 'buy success', 'download_list': download_list} common.send_back(back_dic, conn)
common.py
import hashlib import os import time import json import struct def login_auth(func): def wrapper(*args, **kwargs): from server import use_data as mu for value in mu.alive_user.values(): if value[0] == args[0]['session']: args[0]['user_id'] = value[1] break if not args[0].get('user_id', None): send_back({'flag': False, 'msg': '您没有登录'}, args[1]) else: return func(*args, **kwargs) return wrapper def get_uuid(name): md = hashlib.md5() md.update(name.encode('utf-8')) md.update(str(time.clock()).encode('utf-8')) return md.hexdigest() def get_time(): now_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) return now_time def get_colck_time(): return str(time.clock()) def get_bigfile_md5(file_path): if os.path.exists(file_path): md = hashlib.md5() filesize = os.path.getsize(file_path) file_list = [0, filesize // 3, (filesize // 3) * 2, filesize - 10] with open(file_path, 'rb') as f: for line in file_list: f.seek(line) md.update(f.read(10)) return md.hexdigest() def send_back(back_dic, conn): head_json_bytes = json.dumps(back_dic).encode('utf-8') conn.send(struct.pack('i', len(head_json_bytes))) # 先发报头的长度 conn.send(head_json_bytes)
funcorm.py
from orm import mysql_singleton class Field(object): def __init__(self, name, column_type, primary_key, default): self.name = name # 列名 self.column_type = column_type # 数据类型 self.primary_key = primary_key # 是否为主键 self.default = default # 默认值 class StringField(Field): def __init__(self, name=None, column_type='varchar(100)', primary_key=False, default=None): super().__init__(name, column_type, primary_key, default) class IntegerField(Field): def __init__(self, name=None, primary_key=False, default=0): super().__init__(name, 'int', primary_key, default) class ModelMetaclass(type): def __new__(cls, name, bases, attrs): if name == "Model": return type.__new__(cls, name, bases, attrs) table_name = attrs.get('table_name', None) if not table_name: raise TypeError('没有表名') primary_key = None # 查找primary_key字段 # 保存列类型的对象 mappings = dict() for k, v in attrs.items(): # 是列名的就保存下来 if isinstance(v, Field): mappings[k] = v if v.primary_key: # 找到主键: if primary_key: raise TypeError('主键重复: %s' % k) primary_key = k for k in mappings.keys(): attrs.pop(k) if not primary_key: raise TypeError('没有主键') # 给cls增加一些字段: attrs['mapping'] = mappings attrs['primary_key'] = primary_key attrs['table_name'] = table_name return type.__new__(cls, name, bases, attrs) class Model(dict, metaclass=ModelMetaclass): def __init__(self, **kw): super(Model, self).__init__(**kw) def __getattr__(self, key): # .访问属性触发 try: return self[key] except KeyError: raise AttributeError('没有属性:%s' % key) def __setattr__(self, key, value): self[key] = value @classmethod def select_all(cls, **kwargs): ms = mysql_singleton.Mysql().singleton() if kwargs: # 当有参数传入的时候 key = list(kwargs.keys())[0] value = kwargs[key] sql = "select * from %s where %s=?" % (cls.table_name, key) sql = sql.replace('?', '%s') re = ms.select(sql, value) else: # 当无参传入的时候查询所有 sql = "select * from %s" % cls.table_name re = ms.select(sql) return [cls(**r) for r in re] @classmethod def select_one(cls, **kwargs): # 此处只支持单一条件查询 key = list(kwargs.keys())[0] value = kwargs[key] ms = mysql_singleton.Mysql().singleton() sql = "select * from %s where %s=?" % (cls.table_name, key) sql = sql.replace('?', '%s') re = ms.select(sql, value) if re: return cls(**re[0]) else: return None def save(self): ms = mysql_singleton.Mysql().singleton() fields = [] params = [] args = [] for k, v in self.mapping.items(): fields.append(v.name) params.append('?') args.append(getattr(self, k, v.default)) sql = "insert into %s (%s) values (%s)" % (self.table_name, ','.join(fields), ','.join(params)) sql = sql.replace('?', '%s') ms.execute(sql, args) def update(self): ms = mysql_singleton.Mysql().singleton() fields = [] args = [] pr = None for k, v in self.mapping.items(): if v.primary_key: pr = getattr(self, k, v.default) else: fields.append(v.name + '=?') args.append(getattr(self, k, v.default)) sql = "update %s set %s where %s = %s" % ( self.table_name, ', '.join(fields), self.primary_key, pr) sql = sql.replace('?', '%s') print(sql) ms.execute(sql, args)
mysql_singleton.py
from conf import setting import pymysql class Mysql: __instance = None def __init__(self): self.conn = pymysql.connect(host=setting.host, user=setting.user, password=setting.password, database=setting.database, charset=setting.charset, autocommit=setting.autocommit) self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor) def close_db(self): self.conn.close() def select(self, sql, args=None): self.cursor.execute(sql, args) rs = self.cursor.fetchall() return rs def execute(self, sql, args): try: self.cursor.execute(sql, args) affected = self.cursor.rowcount # self.conn.commit() except BaseException as e: print(e) return affected @classmethod def singleton(cls): if not cls.__instance: cls.__instance = cls() return cls.__instance
db_pool.py
import pymysql from conf import setting from DBUtils.PooledDB import PooledDB POOL = PooledDB( creator=pymysql, # 使用链接数据库的模块 maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数 mincached=6, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建 maxcached=5, # 链接池中最多闲置的链接,0和None不限制 maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。 blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错 maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制 setsession=[], # 开始会话前执行的命令列表。 ping=0, # ping MySQL服务端,检查是否服务可用。 host=setting.host, port=setting.port, user=setting.user, password=setting.password, database=setting.database, charset=setting.charset, autocommit=setting.autocommit )
funcorm_pool.py
from ormpool import mysql_pool class Field(object): def __init__(self, name, column_type, primary_key, default): self.name = name # 列名 self.column_type = column_type # 数据类型 self.primary_key = primary_key # 是否为主键 self.default = default # 默认值 class StringField(Field): def __init__(self, name=None, ddl='varchar(100)', primary_key=False, default=None): super().__init__(name, ddl, primary_key, default) class IntegerField(Field): def __init__(self, name=None, primary_key=False, default=0): super().__init__(name, 'int', primary_key, default) class ModelMetaclass(type): def __new__(cls, name, bases, attrs): if name == "Model": return type.__new__(cls, name, bases, attrs) table_name = attrs.get('table_name', None) if not table_name: raise TypeError('没有表名') primary_key = None # 查找primary_key字段 # 保存列类型的对象 mappings = dict() for k, v in attrs.items(): # 是列名的就保存下来 if isinstance(v, Field): mappings[k] = v if v.primary_key: # 找到主键: if primary_key: raise TypeError('主键重复: %s' % k) primary_key = k for k in mappings.keys(): attrs.pop(k) if not primary_key: raise TypeError('没有主键') # 给cls增加一些字段: attrs['mapping'] = mappings attrs['primary_key'] = primary_key attrs['table_name'] = table_name return type.__new__(cls, name, bases, attrs) class Model(dict, metaclass=ModelMetaclass): def __init__(self, **kw): super(Model, self).__init__(**kw) def __getattr__(self, key): # .访问属性触发 try: return self[key] except KeyError: raise AttributeError('没有属性:%s' % key) def __setattr__(self, key, value): self[key] = value @classmethod def select_all(cls, **kwargs): ms = mysql_pool.MysqlPool() if kwargs: # 当有参数传入的时候 key = list(kwargs.keys())[0] value = kwargs[key] sql = "select * from %s where %s=?" % (cls.table_name, key) sql = sql.replace('?', '%s') re = ms.select(sql, value) else: # 当无参传入的时候查询所有 sql = "select * from %s" % cls.table_name re = ms.select(sql) return [cls(**r) for r in re] @classmethod def select_one(cls, **kwargs): # 此处只支持单一条件查询 key = list(kwargs.keys())[0] value = kwargs[key] ms = mysql_pool.MysqlPool() sql = "select * from %s where %s=?" % (cls.table_name, key) sql = sql.replace('?', '%s') re = ms.select(sql, value) if re: return cls(**re[0]) else: return None def save(self): ms = mysql_pool.MysqlPool() fields = [] params = [] args = [] for k, v in self.mapping.items(): fields.append(v.name) params.append('?') args.append(getattr(self, k, v.default)) sql = "insert into %s (%s) values (%s)" % (self.table_name, ','.join(fields), ','.join(params)) sql = sql.replace('?', '%s') ms.execute(sql, args) def update(self): ms = mysql_pool.MysqlPool() fields = [] args = [] pr = None for k, v in self.mapping.items(): if v.primary_key: pr = getattr(self, k, None) else: fields.append(v.name + '=?') args.append(getattr(self, k, v.default)) sql = "update %s set %s where %s = %s" % ( self.table_name, ', '.join(fields), self.primary_key, pr) sql = sql.replace('?', '%s') print(sql) ms.execute(sql, args)
mysql_pool.py
import pymysql from ormpool import db_pool from threading import current_thread class MysqlPool: def __init__(self): self.conn = db_pool.POOL.connection() # print(db_pool.POOL) # print(current_thread().getName(), '拿到连接', self.conn) # print(current_thread().getName(), '池子里目前有', db_pool.POOL._idle_cache, ' ') self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor) def close_db(self): self.cursor.close() self.conn.close() def select(self, sql, args=None): self.cursor.execute(sql, args) rs = self.cursor.fetchall() return rs def execute(self, sql, args): try: self.cursor.execute(sql, args) affected = self.cursor.rowcount # self.conn.commit() except BaseException as e: print(e) finally: self.close_db() return affected
tcpServer.py
import json import socket import struct from concurrent.futures import ThreadPoolExecutor from threading import Lock from threading import current_thread from conf import setting from interface import common_interface, admin_interface, user_interface from server import use_data server_pool = ThreadPoolExecutor(10) mutex = Lock() use_data.mutex = mutex dispatch_dic = { 'login': common_interface.login, 'register': common_interface.register, 'upload': admin_interface.upload_movie, 'delete_movie': admin_interface.delete_movie, 'download_movie': user_interface.download_movie, 'upload': admin_interface.upload_movie, 'release_notice': admin_interface.release_notice, 'buy_member': user_interface.buy_member, 'get_movie_list': user_interface.get_movie_list, 'check_notice': user_interface.check_notice, 'check_download_record': user_interface.check_download_record, 'check_movie': admin_interface.check_movie } def working(conn, addr): print(current_thread().getName()) while True: try: head_struct = conn.recv(4) if not head_struct: break head_len = struct.unpack('i', head_struct)[0] head_json = conn.recv(head_len).decode('utf-8') head_dic = json.loads(head_json) # 分发之前,先判断是不是伪造 head_dic['addr'] = str(addr) dispatch(head_dic, conn) except Exception as e: print('错误信息:', e) conn.close() # 把服务器保存的用户信息清掉 mutex.acquire() if str(addr) in use_data.alive_user: use_data.alive_user.pop(str(addr)) # print('***********end*************%s'%len(login_user_data.alive_user)) mutex.release() print('客户端:%s :断开链接' % str(addr)) break def dispatch(head_dic, conn): if head_dic['type'] not in dispatch_dic: back_dic = {'flag': False, 'msg': '请求不存在'} send_back(back_dic, conn) else: dispatch_dic[head_dic['type']](head_dic, conn) def send_back(back_dic, conn): head_json_bytes = json.dumps(back_dic).encode('utf-8') conn.send(struct.pack('i', len(head_json_bytes))) # 先发报头的长度 conn.send(head_json_bytes) def server_run(): socket_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) socket_server.bind(setting.server_address) socket_server.listen(5) while True: conn, addr = socket_server.accept() print('客户端:%s 链接成功' % str(addr)) server_pool.submit(working, conn, addr) socket_server.close()
use_data.py
alive_user = {}
mutex = None
start.py
import os, sys path = os.path.dirname(__file__) sys.path.append(path) from server import tcpServer if __name__ == '__main__': tcpServer.server_run()