''' 存放配置文件 ''' import os #获取项目根目录 BASE_PATH=os.path.dirname(os.path.dirname(__file__)) #获取用户目录 USER_DATE_PATH=os.path.join(BASE_PATH,'interface','user_date') """ logging配置 """ # 定义三种日志输出格式 开始 standard_format = '[%(asctime)s][%(threadName)s:%(thread)d][task_id:%(name)s][%(filename)s:%(lineno)d]' '[%(levelname)s][%(message)s]' # 其中name为getlogger指定的名字 simple_format = '[%(levelname)s][%(asctime)s][%(filename)s:%(lineno)d]%(message)s' id_simple_format = '[%(levelname)s][%(asctime)s] %(message)s' # 定义日志输出格式 结束 # ****************注意1: log文件的目录 BASE_PATH = os.path.dirname(os.path.dirname(__file__)) logfile_dir = os.path.join(BASE_PATH, 'log') # print(logfile_dir) # ****************注意2: log文件名 logfile_name = 'atm.log' # 如果不存在定义的日志目录就创建一个 if not os.path.isdir(logfile_dir): os.mkdir(logfile_dir) # log文件的全路径 logfile_path = os.path.join(logfile_dir, logfile_name) LOGGING_DIC = { 'version': 1, 'disable_existing_loggers': False, 'formatters': { 'standard': { 'format': standard_format }, 'simple': { 'format': simple_format }, }, 'filters': {}, 'handlers': { # 打印到终端的日志 'console': { 'level': 'DEBUG', 'class': 'logging.StreamHandler', # 打印到屏幕 'formatter': 'simple' }, # 打印到文件的日志,收集info及以上的日志 'default': { 'level': 'DEBUG', 'class': 'logging.handlers.RotatingFileHandler', # 保存到文件 'formatter': 'standard', 'filename': logfile_path, # 日志文件 'maxBytes': 1024 * 1024 * 5, # 日志大小 5M 'backupCount': 5, 'encoding': 'utf-8', # 日志文件的编码,再也不用担心中文log乱码了 }, }, 'loggers': { # logging.getLogger(__name__)拿到的logger配置 '': { 'handlers': ['default', 'console'], # 这里把上面定义的两个handler都加上,即log数据既写入文件又打印到屏幕 'level': 'DEBUG', 'propagate': True, # 向上(更高level的logger)传递 }, }, }
''' 管理员视图层 ''' from interface import admin_interface # - 冻结账户 def lock_user(): while True: lock_user=input('请输入需要冻结的用户:').strip() flag,msg=admin_interface.lock_user_interface(lock_user) if flag: print(msg) break else: print(msg) # - 添加账户 def add_user(): from core import src src.register() # - 修改额度 def alter_balance(): while True: to_user=input('请输入需要修改的用户').strip() money=input('请输入修改后的额度:').strip() if not money.isdigit(): print('请输入整数') continue money=int(money) flag,msg=admin_interface.alter_balance(to_user,money) if flag: print(msg) break else: print(msg) admin_dic={ '1':lock_user, '2':add_user, '3':alter_balance, } def run(): while True: print(''' 1 冻结账户 2 添加账户 3 修改额度 ''') choice=input('请输入功能编号:').strip() if choice not in admin_dic: print('没有该编号') continue admin_dic.get(choice)() break
''' 用户视图层 ''' from interface import user_interface from interface import bank_interface from interface import shop_interface from interface import admin_interface from lib import common #记录管理状态 login_user=None # 1、注册(5分) def register(): while True: username=input('请输入用户名:').strip() pwd=input('请输入密码:').strip() re_pwd=input('请确认密码:').strip() if pwd == re_pwd: flag,msg=user_interface.register_interface(username,pwd) if flag: print(msg) break else: print(msg) continue else: print('两次密码不一致,请重新输入!') # 2、登录(5分) def login(): while True: username=input('请输入用户名:').strip() pwd=input('请输入密码:').strip() flag,msg=user_interface.login_interface(username,pwd) if flag: global login_user login_user=username print(msg) break else: print(msg) # 3、查看余额(2分) @common.login_auth def check_balance(): balance=bank_interface.check_balance(login_user) print(f'当前用户:{login_user} 余额为:{balance}$') # 4、提现(5分) @common.login_auth def withdraw(): while True: money=input('请输入提现金额:').strip() if not money.isdigit(): print('请输入整数') continue money=int(money) if money > 0: flag,msg=bank_interface.withdraw_interface(login_user,money) if flag: print(msg) break else: print(msg) else: print('请输入大于0的整数') # 5、还款(5分) @common.login_auth def re_pay(): while True: money=input('请输入还款金额:').strip() if not money.isdigit(): print('请输入整数') continue money=int(money) if money > 0: flag,msg=bank_interface.re_pay_interface(login_user,money) print(msg) break # 6、转账(10分) @common.login_auth def transfer(): while True: to_user=input('请输入收款用户:').strip() money=input('请输入转账金额:').strip() if not money.isdigit(): print('请输入整数') continue money=int(money) if money > 0: flag,msg=bank_interface.transfer_interface(login_user,to_user,money) if flag: print(msg) break else: print(msg) # 7、查看流水(2分) @common.login_auth def check_flow(): flag,msg=bank_interface.check_flow_interface(login_user) if flag: for flow in msg: print(flow) else: print(msg) # 8、购物功能(15分) @common.login_auth def shopping(): shop_car = {} shop_list = [ ['macbook', 10000], ['iphone', 6999], ['雪梨PHONE', 999], ] while True: for index,shop in enumerate(shop_list): shop_name,shop_price=shop print(f'商品编号:{index},商品名称:{shop_name},单价:{shop_price}$') choice=input('请输入商品编号(y结账n添加购物车):').strip() if choice == 'y': flag,msg=shop_interface.pay_interface(login_user,shop_car) if flag: print(msg) break else: print(msg) continue elif choice == 'n': if shop_car: flag,msg=shop_interface.add_shop_car_interface(login_user,shop_car) print(msg) break else: print('当前购物车为空') continue if not choice.isdigit(): print('没有这个编号') continue choice=int(choice) if choice not in range(len(shop_list)): print('没有这个编号') continue #获取商品名称,单价 shop_name,price=shop_list[choice] if shop_name in shop_car: shop_car[shop_name][1] += 1 else: shop_car[shop_name]=[price,1] print(shop_car) # 9、查看购物车功能(2分) @common.login_auth def check_shop_car(): while True: shop_car = shop_interface.check_shop_car_interface(login_user) print(shop_car) choice=input('输入y结算购物车,输入q清空购物车:').strip() if choice == 'y': flag,msg=shop_interface.pay_interface(login_user,shop_car) if flag: print(msg) break else: print(msg) elif choice == 'q': msg=shop_interface.qingkong_shop_car_interface(login_user) print(msg) break else: print('没有该编号') # 10、管理员功能 @common.login_auth def admin(): from core import admin admin.run() func_dic={ '1':register, '2':login, '3':check_balance, '4':withdraw, '5':re_pay, '6':transfer, '7':check_flow, '8':shopping, '9':check_shop_car, '10':admin, } def run(): while True: print(''' =======欢迎来到ATM +购物车 APP======= 1、注册功能 2、登录功能 3、查看余额功能 4、提现功能 5、还款功能 6、转账功能 7、查看流水功能 8、购物功能 9、查看购物车功能 10、管理员功能 ============ END ============ ''') choice=input('请输入功能编号:').strip() if choice not in func_dic: print('没有该编号') continue func_dic[choice]()
''' 数据处理层 ''' from conf import settings import os import json #查看数据 def select(username): user_path=os.path.join(settings.USER_DATE_PATH,f'{username}.json') if os.path.exists(user_path): with open(user_path,'r',encoding='utf-8') as f: user_dic=json.load(f) return user_dic #保存数据 def save(user_dic): username=user_dic['username'] user_path=os.path.join(settings.USER_DATE_PATH,f'{username}.json') with open(user_path,'w',encoding='utf-8') as f: json.dump(user_dic,f,ensure_ascii=False) return True
{"username": "lqb", "password": "b51c4142d51ce2f5651fbf5baa8861bb", "balance": 960007, "flow": ["用户:lqb,还款:1000000$成功"], "shop_car": {}, "locked": false}
''' 管理员接口 ''' from db import db_handler from lib import common admin_logger=common.get_logger('admin') #冻结用户 def lock_user_interface(lock_user): user_dic=db_handler.select(lock_user) if user_dic: user_dic['locked']=True db_handler.save(user_dic) msg=f'冻结用户{lock_user}成功' admin_logger.info(msg) return True,msg else: return False,f'用户{lock_user}不存在,无法冻结' #修改用户额度 def alter_balance(to_user,money): to_user_dic=db_handler.select(to_user) #用户存在则修改额度 if to_user_dic: to_user_dic['balance']=money db_handler.save(to_user_dic) msg=f'用户{to_user}额度修改为{money}$成功' admin_logger.info(msg) return True,msg else: return False,f'被修改用户不存在,重新输入'
''' 银行逻辑接口 ''' from db import db_handler from lib import common bank_logger=common.get_logger('bank') #查看余额 def check_balance(login_user): user_dic=db_handler.select(login_user) return user_dic['balance'] #提现接口 def withdraw_interface(login_user,money): user_dic=db_handler.select(login_user) money2=money*1.05 if user_dic['balance'] >= money2: user_dic['balance'] -= money2 # 记录流水 flow = f'用户:{login_user} 提现金额:{money} $成功,手续费:{money2-money}$' bank_logger.info(flow) user_dic['flow'].append(flow) #保存数据 db_handler.save(user_dic) return True,flow else: msg=f'用户:{login_user} 金额不足,无法提现' bank_logger.warn(msg) return False,msg #还款接口 def re_pay_interface(login_user,money): user_dic=db_handler.select(login_user) user_dic['balance'] += money # 记录流水 flow=f'用户:{login_user},还款:{money}$成功' bank_logger.info(flow) user_dic['flow'].append(flow) db_handler.save(user_dic) return True,flow #转账接口 def transfer_interface(login_user,to_user,money): login_user_dic=db_handler.select(login_user) to_user_dic = db_handler.select(to_user) if login_user_dic['balance'] >= money: #判断收款用户是否存在 if to_user_dic: #存在则当前用户减钱,收款用户加钱 login_user_dic['balance'] -= money to_user_dic['balance'] += money #记录流水 login_user_flow = f'用户:{login_user} 向用户:{to_user}转账{money}$成功' login_user_dic['flow'].append(login_user_flow) to_user_flow=f'用户:{to_user} 收到用户:{login_user}转账{money}$' to_user_dic['flow'].append(to_user_flow) # 保存数据 db_handler.save(login_user_dic) db_handler.save(to_user_dic) return True,login_user_flow return False,f'用户:{to_user}不存在' return False,f'当前用户:{login_user} 金额不足无法转账,请重新输入金额' #查看流水 def check_flow_interface(login_user): user_dic=db_handler.select(login_user) flow=user_dic['flow'] if flow: return True,flow else: return False,f'用户{login_user}还没有流水'
''' 商场逻辑接口 ''' from db import db_handler from lib import common shop_logger=common.get_logger('shop') #查看购物车 def check_shop_car_interface(login_user): user_dic=db_handler.select(login_user) shop_car=user_dic['shop_car'] if shop_car: return shop_car else: return f'用户{login_user}购物车没有商品' #商品结账接口 def pay_interface(login_user,shop_car): cost=0 if shop_car: for price_number in shop_car.values(): price,number=price_number cost+=(price*number) login_user_dic=db_handler.select(login_user) if login_user_dic['balance'] >= cost: #扣钱 login_user_dic['balance'] -= cost #扣钱之后清空购物车 login_user_dic['shop_car']={} #保存数据 db_handler.save(login_user_dic) msg = f'用户{login_user}结账成功,消费{cost}$' shop_logger.info(msg) return True,msg else: msg = f'用户{login_user}余额不足,无法结账' shop_logger.warn(msg) return False,msg else: msg = f'用户{login_user}购物车为空,不需要支付' shop_logger.warn(msg) return False, msg #添加购物车接口 def add_shop_car_interface(login_user,shopping_car): user_dic = db_handler.select(login_user) shop_car = user_dic['shop_car'] for shop_name, price_number in shopping_car.items(): number = price_number[1] if shop_name in shop_car: user_dic['shop_car'][shop_name][1] += 1 else: user_dic['shop_car'].update( {shop_name: price_number} ) db_handler.save(user_dic) msg = '购物车添加成功' shop_logger.info(msg) return True, msg #清空购物车接口 def qingkong_shop_car_interface(login_user): user_dic=db_handler.select(login_user) user_dic['shop_car']=[] db_handler.save(user_dic) msg = '清空购物车成功' shop_logger.info(msg) return msg
''' 用户逻辑接口 ''' from db import db_handler from lib import common user_logger=common.get_logger('user') #注册接口 def register_interface(username,pwd): # 判断用户是否存在 user_dic=db_handler.select(username) if user_dic: msg=f'用户名{username}已存在,请重新输入!' user_logger.warn(msg) return False,msg pwd=common.get_pwd_md5(pwd) user_dic = { 'username': username, 'password': pwd, 'balance': 15000, 'flow': [], 'shop_car': {}, 'locked': False, } msg = f'用户{username}注册成功!' user_logger.info(msg) db_handler.save(user_dic) return True,msg #登录接口 def login_interface(username,pwd): user_dic=db_handler.select(username) if user_dic: if user_dic['locked']: msg = f'用户{username}已被锁定,无法登录' user_logger.warn(msg) return False,msg pwd = common.get_pwd_md5(pwd) if pwd == user_dic['password']: msg = f'用户{username}登录成功' user_logger.info(msg) return True,msg else: msg = f'用户{username}登录失败,密码错误' user_logger.warn(msg) return False,msg else: msg = f'用户{username}不存在,请重新输入' user_logger.warn(msg) return False,msg
''' 存放公共方法 ''' import hashlib import logging.config from conf import settings # 10、记录日志(5分) # def (): # pass # 11、登录认证装饰器(2分) def login_auth(func): from core import src def inner(*args,**kwargs): if src.login_user: res=func(*args,**kwargs) return res else: print('未登录不能操作,请登录') src.login() func() return inner # 12、密码加密(2分) def get_pwd_md5(pwd): m=hashlib.md5() salt='这是yan' m.update(salt.encode('utf-8')) m.update(pwd.encode('utf-8')) m.update(salt.encode('utf-8')) return m.hexdigest() #添加日志功能 def get_logger(log_type): logging.config.dictConfig(settings.LOGGING_DIC) logger=logging.getLogger(log_type) return logger
3机试题(60分) - ATM + 购物车 1、注册(5分) 2、登录(5分) 3、查看余额(2分) 4、提现(5分) 5、还款(5分) 6、转账(10分) 7、查看流水(2分) 8、购物功能(15分) 9、查看购物车功能(2分) 10、记录日志(5分) 11、登录认证装饰器(2分) 12、密码加密(2分) 4拔高题(5分) - 管理员功能 - 冻结账户 - 添加账户 - 修改额度
#项目的入口 #添加解释器环境变量 import os import sys sys.path.append(os.path.dirname(__file__)) from core import src if __name__ == '__main__': src.run()