setting
1 ''' 2 存放配置信息 3 ''' 4 import os 5 6 # 获取项目根目录路径 7 BASE_PATH = os.path.dirname( 8 os.path.dirname(__file__) 9 ) 10 11 # 获取user_data文件夹目录路径 12 USER_DATA_PATH = os.path.join( 13 BASE_PATH, 'db', 'user_data' 14 ) 15 # print(USER_DATA_PATH) 16 17 # 管理员数据路径 18 ADMIN_DATA_PATH = os.path.join( 19 BASE_PATH, 'db', 'admin_data' 20 ) 21 22 # 商城目录路径 23 SHOP_DATA_PATH = os.path.join( 24 BASE_PATH,'db','shop_obj' 25 ) 26 27 """ 28 logging配置 29 """ 30 31 # 定义三种日志输出格式 开始 32 standard_format = '[%(asctime)s][%(threadName)s:%(thread)d][task_id:%(name)s][%(filename)s:%(lineno)d]' 33 '[%(levelname)s][%(message)s]' # 其中name为getlogger指定的名字 34 simple_format = '[%(levelname)s][%(asctime)s][%(filename)s:%(lineno)d]%(message)s' 35 id_simple_format = '[%(levelname)s][%(asctime)s] %(message)s' 36 37 # 定义日志输出格式 结束 38 # ****************注意1: log文件的目录 39 BASE_PATH = os.path.dirname(os.path.dirname(__file__)) 40 logfile_dir = os.path.join(BASE_PATH, 'log') 41 # print(logfile_dir) 42 43 # ****************注意2: log文件名 44 logfile_name = 'atm.log' 45 46 # 如果不存在定义的日志目录就创建一个 47 if not os.path.isdir(logfile_dir): 48 os.mkdir(logfile_dir) 49 50 # log文件的全路径 51 logfile_path = os.path.join(logfile_dir, logfile_name) 52 53 LOGGING_DIC = { 54 'version': 1, 55 'disable_existing_loggers': False, 56 'formatters': { 57 'standard': { 58 'format': standard_format 59 }, 60 'simple': { 61 'format': simple_format 62 }, 63 }, 64 'filters': {}, 65 'handlers': { 66 # 打印到终端的日志 67 'console': { 68 'level': 'DEBUG', 69 'class': 'logging.StreamHandler', # 打印到屏幕 70 'formatter': 'simple' 71 }, 72 # 打印到文件的日志,收集info及以上的日志 73 'default': { 74 'level': 'DEBUG', 75 'class': 'logging.handlers.RotatingFileHandler', # 保存到文件 76 'formatter': 'standard', 77 'filename': logfile_path, # 日志文件 78 'maxBytes': 1024 * 1024 * 5, # 日志大小 5M 79 'backupCount': 5, 80 'encoding': 'utf-8', # 日志文件的编码,再也不用担心中文log乱码了 81 }, 82 }, 83 'loggers': { 84 # logging.getLogger(__name__)拿到的logger配置 85 '': { 86 'handlers': ['default', 'console'], # 这里把上面定义的两个handler都加上,即log数据既写入文件又打印到屏幕 87 'level': 'DEBUG', 88 'propagate': True, # 向上(更高level的logger)传递 89 }, 90 }, 91 }
from core import src from interface import admin_interface def add_user(): src.register() def change_balance(): while True: change_user = input('请输入需要修改额度的用户: ').strip() money = input('请输入需要修改的用户额度: ').strip() if not money.isdigit(): continue flag, msg = admin_interface.change_balance_interface(change_user, money) if flag: print(msg) break else: print(msg) def lock_user(): while True: change_user = input('请输入需要修改额度的用户: ').strip() flag, msg = admin_interface.lock_user_interface( change_user) if flag: print(msg) break else: print(msg)
''' 用户视图层 ''' from interface import user_interface from interface import bank_interface from interface import admin_interface from interface import shop_interface from lib import common import os import json from conf import settings # 全局变量,记录用户是否已登录 login_user = None # 1、注册功能 def register(): while True: username = input('请输入注册账号:').strip() userpwd = input('请输入密码:').strip() userpwd1 = input('请确认密码:').strip() if userpwd == userpwd1: flag,msg = user_interface.register_interface(username,userpwd) if flag == True: print(msg) break else: print(msg) # 2、登录功能 def login(): while True: username = input('请输入账号:').strip() userpwd = input('请输入密码:').strip() flag,msg = user_interface.login_interface(username,userpwd) if flag == True: print(msg) global login_user login_user = username break else: print(msg) # 3、查看余额 @common.login_auth def check_balance(): balance = user_interface.user_check_balance(login_user) print(f'用户{login_user}余额为{balance}') # 4、提现功能 @common.login_auth def withdraw(): while True: withdraw_money = input('请输入提现金额:').strip() if not withdraw_money.isdigit(): print('请输入数字') continue flag,msg = bank_interface.withdraw_interface(login_user,withdraw_money) if flag == True: print(msg) break else: print(msg) # 5、还款功能 @common.login_auth def repay(): while True: repay_money = input('请输入还款金额:').strip() if not repay_money.isdigit(): print('请输入正确的金额') continue repay_money = float(repay_money) if repay_money > 0: flag,msg= bank_interface.repay_interface(login_user,repay_money) if flag: print(msg) break else: print('输入的金额必须大于0') # 6、转账功能 @common.login_auth def transfer(): while True: user1 = input('请输入您需要转账的账户:').strip() transfer_money = input('请输入您需要转账的金额:').strip() if not transfer_money.isdigit(): print('请输入正确的转账金额') continue transfer_money = float(transfer_money) flag, msg = bank_interface.transfer_interface(login_user,user1,transfer_money) if flag: print(msg) break else: print(msg) # 7、查看流水 @common.login_auth def check_flow(): flow_check = bank_interface.check_flow_interface(login_user) if flow_check: for flow in flow_check: print(flow) else: print('当前用户没有流水!') # 8、购物功能 @common.login_auth def shopping(): while True: list_kind_name = shop_interface.select_shop_interface() shopping_car = {} # {'商品名称': ['单价', '数量']]} choice1 = input('请选择购物种类(不想购物请输入q):').strip() if choice1 == 'q': break if choice1 not in list_kind_name: print('种类错误请重新输入') continue list_obj_name = shop_interface.select_shop_interface(choice1) choice2 = input('请选择商品名(不想购物请输入q):').strip() msg, shopping_dic = shop_interface.select_obj_interface(choice1, choice2) print(msg) if choice2 == 'q': break if choice2 not in shopping_dic: print('商品名错误请重新输入') continue choice3 = input('您购买几件:').strip() if not choice3.isdigit(): print('请输入数字:') continue choice4 = input('您是否添加到购物车:是/y,否/n').strip() if choice4 != 'y': continue # 添加购物车 shopping_car[choice2] = [shopping_dic[choice2][0],int(choice3)] # 购物车接口 flag, msg,shop_car = shop_interface.add_shop_car_interface( login_user, shopping_car, choice3 ) if flag: print(msg) print(f'当前购物车:{shop_car}') choice5 = input('是否继续购物?是/y,否/n').strip() if choice5 == 'n': choice6 = input('是否结账?是/y,否/n') if choice6 != 'y': break # 减库存 flag, msg = shop_interface.reduce_inventory_interface(shopping_car) if flag: flag1, msg1 = shop_interface.pay_shop_interface(login_user, shopping_car) if flag1: print(msg1) # 清空购物车 user_interface.user_empty_shopcar(login_user) break else: print(msg1) break else: print(msg) user_interface.user_empty_shopcar(login_user) # 9、查看购物车 @common.login_auth def check_shop_car(): shop_car = shop_interface.check_shop_car_interface(login_user) print(shop_car) # 10、管理员功能 @common.admin_login_auth def admin(): from core import admin admin.run() # 管理员登录功能 def admin_login(): while True: username = input('请输入账号:').strip() userpwd = input('请输入密码:').strip() flag,msg = admin_interface.admin_login_interface(username,userpwd) if flag == True: print(msg) global login_user login_user = username break else: print(msg) # 创建函数功能字典 func_dic = { '1': register, '2': login, '3': check_balance, '4': withdraw, '5': repay, '6': transfer, '7': check_flow, '8': shopping, '9': check_shop_car, '10': admin, } # 视图层主程序 def run(): while True: print(''' ===== ATM + 购物车 ===== 1、注册功能 2、登录功能 3、查看余额 4、提现功能 5、还款功能 6、转账功能 7、查看流水 8、购物功能 9、查看购物车 10、管理员功能 ========= end ========= ''') choice = input('请输入功能编号: ').strip() if choice not in func_dic: print('请输入正确的功能编号!') continue func_dic.get(choice)() # func_dic.get('1')() ---> register()
{"username": "yuding", "password": "fdf8ef1cdba3bc9f0e434ede814c5490"}
''' 数据处理层 - 专门用户处理数据的 ''' from conf import settings import os,json # 读取数据 def select(username): user_path = os.path.join( settings.USER_DATA_PATH, f'{username}.json' ) if os.path.exists(user_path): with open(user_path, mode='r', encoding='utf-8') as f: user_dic = json.load(f) return user_dic # 保存数据 def save(user_dic): username = user_dic['username'] user_path = os.path.join( settings.USER_DATA_PATH, f'{username}.json' ) with open(user_path, mode='w', encoding='utf-8') as f: json.dump(user_dic, f, ensure_ascii=False) # 读取管理员数据 def admin_select(username): user_path = os.path.join( settings.ADMIN_DATA_PATH, f'{username}.json' ) if os.path.exists(user_path): with open(user_path, mode='r', encoding='utf-8') as f: user_dic = json.load(f) return user_dic # 读取商城目录数据 def shop_select(i,path): with open(path, mode='r', encoding='utf-8') as f: shop_dic = json.load(f) l = [] for key in shop_dic: print(f'商品名:{key} 单价:{shop_dic[key][0]}元 库存:{shop_dic[key][1]}件') l.append(key) return l # 保存商品数据 def shop_save(user_dic,name): path = os.path.join(settings.SHOP_DATA_PATH,f'{name}.json') with open(path, mode='w', encoding='utf-8') as f: json.dump(user_dic,f,ensure_ascii=False) # 读取商品信息 def obj_select(kind_name,obj_name,path): with open(path, mode='r', encoding='utf-8') as f: shopping_dic = json.load(f) if obj_name in shopping_dic: msg = f'商品名:{obj_name} 单价:{shopping_dic[obj_name][0]}元 库存:{shopping_dic[obj_name][1]}件' return msg,shopping_dic return '没有此类商品',shopping_dic
#__author:"yuuding" #date: 2020/4/3 from db import db_handler from lib import common # 根据不同的接口类型传入不同的日志对象 admin_logger = common.get_logger(log_type='admin') # 管理员登录接口 def admin_login_interface(username,userpwd): user_dic = db_handler.admin_select(username) if user_dic: userpwd = common.md5_pwd(userpwd) if userpwd == user_dic['password']: msg = f'管理员{username}登录成功' admin_logger.info(msg) return True,msg else: return False, '密码错误' else: return False, '账号不存在,请重新输入' # 修改用户额度接口 def change_balance_interface(username, money): user_dic = db_handler.select(username) if user_dic: user_dic['balance'] = money db_handler.save(user_dic) msg = f'管理员修改用户: [{username}]额度修改成功!' admin_logger.info(msg) return True,msg return False,'修改额度的用户不存在' # 冻结账户接口 def lock_user_interface(username): user_dic = db_handler.select(username) if user_dic: # 将locked的默认值改为True user_dic['locked'] = True db_handler.save(user_dic) msg = f'用户{username}冻结成功!' msg = f'管理员{username}登录成功' admin_logger.info(msg) return True, msg return False, '冻结用户不存在!'
''' 银行相关业务的接口 ''' from db import db_handler import time,datetime from lib import common bank_logger = common.get_logger(log_type='bank') # 提现接口 def withdraw_interface(username,money): user_dic = db_handler.select(username) balance = float(user_dic['balance']) money1 = float(money) * 1.05 if balance >= money1: balance -= money1 user_dic['balance'] = balance db_handler.save(user_dic) msg = f'用户{username}成功提现{money1}元' bank_logger.info(msg) return True,msg return False, '余额不足,请充值' # 还款接口 def repay_interface(username,repay_money): user_dic = db_handler.select(username) user_dic['balance'] += repay_money db_handler.save(user_dic) msg = f'用户{username}还款成功,还款{repay_money}元' bank_logger.info(msg) return True,msg # 转账接口 def transfer_interface(login_user,user1,transfer_money): user_dic1 = db_handler.select(login_user) user_dic2 = db_handler.select(user1) if not user_dic2: return False,'目标账户不存在' if user_dic1['balance'] >= transfer_money: user_dic1['balance'] -= transfer_money user_dic2['balance'] += transfer_money flow_user1 = f'用户{login_user}向用户{user1}转账{transfer_money}元' user_dic1['flow'].append(f'{datetime.datetime.fromtimestamp(time.time())} {flow_user1}') flow_user2 = f'用户{user1}接受用户{login_user}转账{transfer_money}元' user_dic2['flow'].append(f'{datetime.datetime.fromtimestamp(time.time())} {flow_user2}') db_handler.save(user_dic1) db_handler.save(user_dic2) bank_logger.info(flow_user1) return True,flow_user1 return False,'当前用户余额不足' # 查看流水接口 def check_flow_interface(login_user): user_dic = db_handler.select(login_user) return user_dic.get('flow') # 商品支付结算接口 def pay_interface(login_user, cost): user_dic = db_handler.select(login_user) bal = float(user_dic['balance']) if bal >= float(cost): bal -= float(cost) user_dic['balance'] = bal flow = f'{datetime.datetime.fromtimestamp(time.time())} 用户消费金额: [{cost}]' user_dic['flow'].append(flow) db_handler.save(user_dic) return True return False
''' 购物商城接口 ''' from conf import settings from db import db_handler import os import json from lib import common shop_logger = common.get_logger(log_type='shop') # 读取商城物品目录接口 def select_shop_interface(choice=None): if not choice: l = [] print('=========欢迎来到购物商城=========') for i in os.listdir(settings.SHOP_DATA_PATH): l.append(i.split('.')[0]) path = os.path.join(settings.SHOP_DATA_PATH, i) db_handler.shop_select(i, path) print('==================================') return l else: print('=========欢迎来到购物商城=========') path = os.path.join(settings.SHOP_DATA_PATH, f'{choice}.json') l = db_handler.shop_select(choice, path) print('==================================') return l # 读取商品接口 def select_obj_interface(choice1,choice2): path = os.path.join(settings.SHOP_DATA_PATH, f'{choice1}.json') msg,shopping_dic = db_handler.obj_select(choice1, choice2,path) return msg,shopping_dic # 添加购物车接口 def add_shop_car_interface(login_user, shopping_car,shop_number): user_dic = db_handler.select(login_user) shop_car = user_dic.get('shop_car') for shop_name, price_number in shopping_car.items(): # 每个商品的数量 number = price_number[1] # 2.2) 若商品重复,则累加商品数量 if shop_name in shop_car: # [单价, 数量][1] ---> 数量 res = int(user_dic['shop_car'][shop_name][1]) res += number user_dic['shop_car'][shop_name][1] = res else: # 2.3)若不是重复的,更新到商品字典中 user_dic['shop_car'].update( {shop_name: price_number} ) # 保存用户数据 db_handler.save(user_dic) msg = '添加购物车成功!' shop_logger.info(msg) return True,msg,user_dic['shop_car'] # 商品准备支付接口 def pay_shop_interface(login_user, shopping_car): user_dic = db_handler.select(login_user) cost = 0 for name in shopping_car: cost += (shopping_car[name][0] * shopping_car[name][1]) from interface import bank_interface flag = bank_interface.pay_interface(login_user, cost) if flag: msg = f'用户:[{login_user}]支付 [{cost}] 成功, 准备发货!' # shop_logger.info(msg) return True, msg return False,'支付失败,金额不足' # 减少库存 def reduce_inventory_interface(shopping_car): for name,price_number in shopping_car.items(): price,number = price_number print(name,price,number,type(number)) l = select_shop_interface() for kind_name in l: msg,shopping_dic = select_obj_interface(kind_name,name) if name not in shopping_dic: continue if shopping_dic[name][1] >= int(number): shopping_dic[name][1] -= int(number) db_handler.shop_save(shopping_dic,kind_name) return True,'支付成功,库存已更新' else: msg1 = f'{name}存货不足,请重新选择' return False,msg1 # 查看购物车接口 def check_shop_car_interface(username): user_dic = db_handler.select(username) return user_dic.get('shop_car')
''' 逻辑接口层 用户接口 ''' from db import db_handler from lib import common # 注册接口 def register_interface(username, password, balance=15000): userdic = db_handler.select(username) if userdic: return False,'用户名已存在,请重新登陆' password = common.md5_pwd(password) userdic = { 'username': username, 'password': password, 'balance': balance, # 用于记录用户流水的列表 'flow': [], # 用于记录用户购物车 'shop_car': {}, # locked:用于记录用户是否被冻结 # False: 未冻结 True: 已被冻结 'locked': False } db_handler.save(userdic) return True, f'{username} 注册成功!' # 注册接口 def login_interface(username,userpwd): user_dic = db_handler.select(username) if user_dic: if not user_dic['locked']: userpwd = common.md5_pwd(userpwd) if userpwd == user_dic['password']: return True,f'用户{username}登录成功' else: return False,'密码错误' else: print('账号被锁定') else: return False,'账号不存在,请重新输入' # 查看接口 def user_check_balance(username): user_dic = db_handler.select(username) return user_dic['balance'] # 清空购物车接口 def user_empty_shopcar(login_user): user_dic = db_handler.select(login_user) user_dic['shop_car'] = {} db_handler.save(user_dic)
''' 存放公共方法 ''' import hashlib from conf import settings from logging import config from logging import getLogger # 加密 def md5_pwd(userpwd): md5_obj = hashlib.md5() obj1 = '天王盖地虎' md5_obj.update(obj1.encode('utf-8')) md5_obj.update(userpwd.encode('utf-8')) obj2 = '宝塔镇河妖' md5_obj.update(obj2.encode('utf-8')) return md5_obj.hexdigest() # 认证装饰器 def login_auth(func): from core import src def wrap(*args,**kwargs): if src.login_user: res = func() return res else: print('账户未登录。请登录') src.login() return wrap # 管理员认证装饰器 def admin_login_auth(func): from core import src def wrap(*args,**kwargs): while True: if src.login_user != 'admin': src.admin_login() elif src.login_user == 'admin': res = func() return res return wrap # 添加日志 def get_logger(log_type): # log_type ---> user # 1、加载日志配置信息 config.dictConfig(settings.LOGGING_DIC) # 2、获取日志对象 logger = getLogger(log_type) return logger
# 项目的说明书 ## 项目:ATM + 购物车 # 项目需求: 1.额度15000或自定义 --> 注册功能 2.实现购物商城,买东西加入购物车,调用信用卡接口结账 --> 购物功能、支付功能 3.可以提现,手续费5% --> 提现功能 4.支持多账户登录 --> 登录功能 5.支持账户间转账 --> 转账功能 6.记录日常消费 --> 记录流水功能 7.提供还款接口 --> 还款功能 8.ATM记录操作日志 --> 记录日志功能 9.提供管理接口,包括添加账户、用户额度,冻结账户等。。。 ---> 管理员功能 10.用户认证用装饰器 --> 登录认证装饰器 ## "用户视图层" 展示给用户选择的功能 1、注册功能 2、登录功能 3、查看余额 4、提现功能 5、还款功能 6、转账功能 7、查看流水 8、购物功能 9、查看购物车 10、管理员功能 # 一个项目是如何从无到有 ## 一 需求分析 1.拿到项目,会先在客户那里一起讨论需求, 商量项目的功能是否能实现,周期与价格,得到一个需求文档。 2.最后在公司内部需要开一次会议,最终得到一个开发文档, 交给不同岗位的程序员进行开发。 - Python: 后端,爬虫 - 不同的岗位: - UI界面设计: - 设计软件的布局,会分局软件的外观切成一张张图片。 - 前端: - 拿到UI交给他的图片,然后去搭建网页面。 - 设计一些页面中,哪些位置需要接收数据,需要进行数据交互。 - 后端: - 直接核心的业务逻辑,调度数据库进行数据的增删查改。 - 测试: - 会给代码进行全面测试,比如压力测试,界面测试(CF卡箱子)。 - 运维: - 部署项目。 ## 二 程序的架构设计 ### 1、程序设计的好处 1)思路清晰 2)不会出现写一半代码时推翻重写 3)方便自己或以后的同事更好维护 ### 2、三层架构设计的好处 1)把每个功能都分层三部分,逻辑清晰 2)如果用户更换不同的用户界面或不同, 的数据储存机制都不会影响接口层的核心 逻辑代码,扩展性强。 3)可以在接口层,准确的记录日志与流水。 ### 3、三层架构 #### 一 用户视图层 用于与用户交互的,可以接受用户的输入,打印接口返回的数据。 #### 二 逻辑接口层 接受 用户视图层 传递过来的参数,根据逻辑判断调用数据层加以处理, 并返回一个结果给 用户视图层。 #### 三 数据处理层 接受接口层传递过来的参数,做数据的 - 保存数据 save() - 查看数据 select() - 更新数据 - 删除数据 ## 三 分任务开发 ## 四 测试 ## 五 上线 # 统计代码 file ---> settings ---> Plugins --->
''' 程序的入口 ''' import sys import os # 添加解释器的环境变量 sys.path.append( os.path.dirname(__file__) ) from core import src # 开始执行项目函数 if __name__ == '__main__': # 1、先执行用户视图层 src.run()