作业需求:85% 1. 用户额度 15000或自定义 2. 实现购物商城,买东西加入 购物车,调用信用卡功能进行结账 3. 可以提现,手续费5% 4. 支持多账户登录 5. 支持账户间转账(用户A转账给用户B,A账户减钱、B账户加钱) 6. 记录每月日常消费流水 7. 提供还款功能 8. ATM记录操作日志(使用logging模块记录日志) 9. 提供管理功能,包括添加账户、用户额度,冻结账户等。。。 10. 用户认证用装饰器 示例代码 https://github.com/triaquae/py3_training/tree/master/atm 简易流程图:https://www.processon.com/view/link/589eb841e4b0999184934329 编码规范需求:15% 1. 代码规范遵守pep8 (https://python.org/dev/peps/pep-0008/) 2. 函数有相应的注释 3. 程序有文档说明文件(README.md参考:https://github.com/csrftoken/vueDrfDemo) 4. 程序的说明文档必须包含的内容:程序的实现的功能、程序的启动方式、登录用户信息、程序的运行效果 5. 程序设计的流程图: (可以参考:https://www.processon.com/view/link/589eb841e4b0999184934329)
分管理员和普通用户登陆,登陆以后界面不一样,不互通.
管理员用户为2222
其他都是普通用户
起始文件是atm_server,购物车在普通用户面板里面,
目录结构为
首先是启动文件
if __name__ == "__main__":
from atm import main
main.controller() # 入口
然后是日志保存文件
transaction# 保存交易信息的模块
access# 保存各种操作的模块
用户保存文件
普通用户格式
管理员用户格式
settings模块,获取目录还有一些格式:
模块代码
import logging
import os, time
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) # 获取到当前程序在的目录
DB_PATH = "%s/db/account" % BASE_DIR # 在当前程序目录上面加上文件保存的目录
DB_PATH_1 = "%s/db/shopping/goods.txt" % BASE_DIR # 设置购物车商品列表的目录
LOG_LEVEL = logging.INFO # 默认级别是INFO
LOG_TYPES = { # 判断写进那个日志
'transaction': 'transactions.log',
'access': 'access.log',
}
LOG_PATH = os.path.join(BASE_DIR, 'logs') # 设置路径
LOG_FORMAT = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') # 日志格式模板
TRANSACTION_TYPE = {
'repay': {'action': 'plus', 'interest': 0},
'withdraw': {'action': 'minus', 'interest': 0.05},
'transfer': {'action': 'minus', 'interest': 0},
'consume': {'action': 'minus', 'interest': 0},
'collect': {'action': 'plus', 'interest': 0},
'shopping': {'action': 'minus', 'interest': 0}
}
下面是正餐:
这个是主程序main:
from .auth import authenticate
from .logger import logger
from atm import logics
from atm import admin_logics
transaction_logger = logger("transaction") # 保存交易信息的模块
access_logger = logger("access") # 保存各种操作的模块
features = [ # 功能列表
('账号信息', logics.view_account_info),
('取现', logics.with_draw),
('还款', logics.pay_back),
('转账', logics.transfer),
('购物车', logics.shopping_cart)
]
the_admin = [ # 管理员功能列表
('修改用户', admin_logics.modify_the_account),
('添加用户', admin_logics.create_user),
('冻结用户', admin_logics.freeze),
]
def admin(user_obj):
"""管理员功能分发"""
while True:
for index, feature in enumerate(the_admin): # 循环打印
print(index, feature[0])
choice = input("请选择》》》(退出exit):").strip()
if not choice:
continue # 判断值是否等于空
if choice.isdigit():
choice = int(choice)
if 0 <= choice < len(features): # 判断输入值是否正确
the_admin[choice][1](user_obj, transaction_logger=transaction_logger, access_logger=access_logger)
"""传输数据及日志格式"""
if choice == 'exit':
exit("拜拜")
def login(func):
def entrance():
"""ATM登陆入口"""
user_obj = {
'is_authenticated': False,
'data': None
}
"""保存登陆信息和用户信息"""
retry_count = 0
while user_obj['is_authenticated'] is not True: # 判断用户是否已登陆
account = input("\033[32;1m请输入姓名:\033[0m").strip()
password = input("\033[32;1m请输入密码:\033[0m").strip()
auth_data = authenticate(account, password) # 验证函数,并把值返回给auth_data
if auth_data:
if auth_data['status'] == 1:
print("该用户以冻结!!")
continue
user_obj['is_authenticated'] = True # 当用户成功登陆,把登陆状态保存下来
user_obj['data'] = auth_data # 把返回的数据保存在data里面
access_logger.info("user %s just logged in" % user_obj["data"]["id"])
""" 调用函数,把过程写进日志"""
if user_obj['data']['id'] == 2222:
print('您以登陆,身份:管理员!')
admin(user_obj)
else:
print("欢迎登陆")
func(user_obj) # 调用函数
else:
print("登陆失败")
retry_count += 1
if retry_count == 3: # 判断是否错误超过三次
msg = "user %s tried wrong password reached 3 times" % account
print(msg)
access_logger.info(msg) # 写进日志
break
if user_obj['is_authenticated']:
func()
return entrance
@login
def controller(user_obj): # 功能分发器
while True:
for index, feature in enumerate(features): # 循环打印
print(index, feature[0])
choice = input("请选择》》》(退出exit):").strip()
if not choice:
continue # 判断值是否等于空
if choice.isdigit():
choice = int(choice)
if 0 <= choice < len(features): # 判断输入值是否正确
features[choice][1](user_obj, transaction_logger=transaction_logger, access_logger=access_logger)
"""传输数据及日志格式"""
if choice == 'exit':
exit("拜拜")
admin_logics模块,是管理员功能模块:
#! /usr/bin/env python
# _*_ coding: utf-8 _*_
from .auth import authenticate
import time, os, json
from conf import settings
def create_user(*args, **kwargs):
"""添加账号"""
logger = kwargs.get("access_logger")
while True:
name = input("请输入用户名(q退出):").strip()
if name == "q":
return
elif not name:
print("请重新输入")
continue
pwd = input("请输入密码:").strip()
if not pwd:
print("请重新输入!")
continue
if pwd != input("请确定密码:").strip():
print("两次密码不一致!")
continue
user = authenticate(name, pwd)
if user:
print("用户已存在")
continue
new_user = {"balance": 15000, # 设置一个模版
"expire_date": "%s-%s" % (2+int(time.strftime("%Y")), time.strftime("%m-%d")),
"enroll_date": "%s" % time.strftime("%Y-%m-%d"),
"credit": 15000,
"id": name,
"status": 0,
"pay_day": 22,
"password": pwd}
"""将用户输入的信息打包 存到列表中"""
account_file = os.path.join(settings.DB_PATH,"%s.json" % name) # 按照输入的名称创建文件夹
f = open("%s" % account_file, "w") # 写入内容
json.dump(new_user, f)
f.close()
logger.info("TO create a user %s,password is %s" % (name, pwd))
print("创建成功")
def modify_the_account(*args, **kwargs):
"""修改账号"""
logger = kwargs.get("access_logger")
while True:
name = input("请输入要修改的名称(q退出):").strip()
if name == "q":
return
elif not name:
print("请重新输入")
continue
account_file = os.path.join(settings.DB_PATH,"%s.json" % name)
if os.path.isfile(account_file): # 判断这个文件是否存在
f = open(account_file)
data = json.load(f)
f.close()
print("名称".center(50, '-'))
for k, v in data.items():
print("%15s. %s" % (k, v))
print("END".center(50, '-'))
else:
print("账号不存在,请重新选择")
continue
account = input("请输入要修改的值(q退出):").strip()
if not account:
continue # 判断值是否等于空
if account in data:
print("%s - %s" % (account, data[account]))
data_1 = input("请输入值:").strip()
data[account] = data_1
account_file_1 = os.path.join(settings.DB_PATH,"%s.json"% data["id"])
f = open("%s.new" % account_file_1, "w")
json.dump(data, f)
f.close()
os.remove(account_file)
os.rename("%s.new" % account_file_1, account_file_1)
logger.info("%s modified %s act as %s" % (name, account, data_1))
print("以修改")
elif account == "q":
return
else:
print("输入错误,请重新输入!")
continue
def freeze(*args, **kwargs):
logger = kwargs.get("access_logger")
while True:
name = input("请输入要冻结的用户(q退出):")
if name == "q":
return
elif not name:
print("请重新输入!")
continue
account_file = os.path.join(settings.DB_PATH,"%s.json"% name)
if os.path.isfile(account_file): # 判断这个文件是否存在
f = open(account_file)
data = json.load(f)
f.close()
data["status"] = 1
account_file_1 = os.path.join(settings.DB_PATH,"%s.json" % data["id"])
f = open("%s.new" % account_file_1, "w")
json.dump(data, f)
f.close()
os.remove(account_file)
os.rename("%s.new" % account_file_1, account_file_1)
logger.info(" %s ln order to freeze " % name)
print("以冻结!")
else:
print("账号不存在,请重新选择")
logics是普通用户功能模块,里面有购物车程序:
from .transaction import make_transaction
from .db_handler import load_account_data
def view_account_info(account_data, *args, **kwargs):
trans_logger = kwargs.get("transaction_logger") # 拿取logger
print("账号额度".center(50, '-'))
for k, v in account_data['data'].items():
if k not in ('password',): # 当数值不是密码时打印出来
print("%25s: %s" % (k, v))
print("END".center(50, '-'))
def with_draw(account_data, *args, **kwargs):
"""取款模块"""
trans_logger = kwargs.get("transaction_logger")
current_balance = '''----------可用余额----------
Credit : %s
Balance : %s
''' % (account_data['data']['credit'], account_data['data']['balance'])
print(current_balance)
back_flag = False # 记录当前状态,输入exit退出
while not back_flag:
withdraw_amount = input("\033[33;1m请输入金额:\033[0m").strip()
if len(withdraw_amount) > 0 and withdraw_amount.isdigit(): # 判断输入数值
withdraw_amount = int(withdraw_amount) # 转换输入数值
if (account_data['data']['balance'] / 2) >= withdraw_amount: # 判断是否大于余额的一半
transaction_result = make_transaction(trans_logger, account_data, 'withdraw', withdraw_amount)
if transaction_result['status'] == 0:
print('''\033[42;1mNew Balance:%s\033[0m''' % (account_data['data']['balance']))
else:
print(transaction_result)
else:
print("余额不足,可提现余额为%s" % (int(account_data['data']['balance']/2)))
if withdraw_amount == "exit":
back_flag = True
def pay_back(account_data, *args, **kwargs):
"""还款模块"""
trans_logger = kwargs.get("transaction_logger") # 日志
current_balabce = '''-------------- BALANCE INFO--------------
Credit : %s
Balance : %s
''' % (account_data['data']['credit'], account_data['data']['balance'])
print(current_balabce) # 打印额度
pay_amount = input("\033[33;1m请输入金额:\033[0m").strip() # 格式输入值
if len(pay_amount) > 0 and pay_amount.isdigit():
pay_amount = int(pay_amount)
transaction_result = make_transaction(trans_logger, account_data, 'repay', pay_amount)
if transaction_result['status'] == 0:
print('''\033[42;1mNew Balance:%s\033[0m''' % (account_data['data']['balance']))
else:
print(transaction_result)
def transfer(account_data, *args, **kwargs):
"""转账模块"""
account_1 = input("\033[32;1m请输入转账用户姓名:\033[0m").strip()
account_data_1 = load_account_data(account_1) # 调用函数,返回文件里面的值
if account_data_1['status'] == 0: # 判断是否返回了值
trans_logger = kwargs.get("transaction_logger")
current_balance = '''----------可用余额----------
Credit : %s
Balance : %s
''' % (account_data['data']['credit'], account_data['data']['balance'])
print(current_balance)
back_flag = False # 记录当前状态,输入exit退出
while not back_flag:
withdraw_amount = input("\033[33;1m请输入金额:\033[0m").strip()
if len(withdraw_amount) > 0 and withdraw_amount.isdigit(): # 判断输入数值
withdraw_amount = int(withdraw_amount) # 转换输入数值
transaction_result = make_transaction(trans_logger, account_data, 'transfer', withdraw_amount)
"""调用模块进行扣钱"""
if transaction_result['status'] == 0: # 判断是否成功
make_transaction(trans_logger, account_data_1, 'collect', withdraw_amount)
"""调用模块进行加钱"""
print('''\033[42;1m转账成功,余额还有:%s\033[0m''' % (account_data['data']['balance']))
else:
print(transaction_result)
elif withdraw_amount == "exit":
back_flag = True
else:
print("用户错误")
def shopping_cart(account_data, *args, **kwargs):
trans_logger = kwargs.get("transaction_logger")
goods = [{"name": "电脑", "price": 1999},
{"name": "鼠标", "price": 10},
{"name": "游艇", "price": 20},
{"name": "美女", "price": 998}, ]
list_1 = []
list_2 = []
list_3 = []
for list_0 in goods:
list_1.append([list_0["name"], list_0["price"]])
dd = account_data['data']['balance']
while True:
wage = sum(list_3)
print('商品列表'.center(50, '-'))
for index, n in enumerate(list_1):
print(' %s . %s---%s' % (index, n[0], n[1]))
print('可用余额'.center(50, '-'))
current_balance = '''
Credit : %s
Balance : %s
''' % (account_data['data']['credit'], account_data['data']['balance'])
print(current_balance)
print('-' * 50)
choice = input("请输入商品编号,按 q 结算:")
if choice == "q":
print("已购商品列表".center(50, "-"))
for index_1, n_1 in enumerate(list_2):
print("\033[0;33m%s. %s %s\033[0m" % (index_1, n_1[0], n_1[1])) # 打印已购商品列表
transaction_result = make_transaction(trans_logger, account_data, 'shopping', wage)
if transaction_result['status'] == 0: # 判断是否成功
print('''\033[42;1m购买成功,余额还有:%s\033[0m''' % (account_data['data']['balance']))
exit()
else:
print(transaction_result)
elif choice == "exit":
exit()
print("拜拜!")
else:
if choice.isdigit(): # 判断是否是数字
choice = int(choice)
if choice <= 3:
price = list_1[choice][1]
if dd >= wage:
list_3.append(price)
list_2.append(list_1[choice])
wage = sum(list_3)
print("\033[0;33m以加进购物车!\033[0m")
print("\033[0;33m 总价%s\033[0m" % wage)
else:
print("余额不足!")
else:
print("没有这个选项,请重新输入!")
else:
print("没有这个选项,请重新输入!")
auth模块是判断用户登陆:
from .db_handler import load_account_data
def authenticate(account, password): # 登陆验证模块
account_data = load_account_data(account) # 调用函数,返回文件里面的值
if account_data['status'] == 0: # 判断是否返回了值
account_data = account_data['data'] # 提取用户判断
if password == account_data['password']: # 判断密码是否正确
return account_data # 返回从文件提出的数据
else:
return None # 不成功就返回None
else:
return None
db_handler是读写模块:
import json, time, os
from conf import settings
def load_account_data(account): # 提取用户文件模块
account_file = os.path.join(settings.DB_PATH,"%s.json"% account) # 调用os模块打开目录文件,目录路径由模块提供
if os.path.isfile(account_file): # 判断文件是否存在
f = open(account_file) # 打开文件
data = json.load(f) # 序列话后保存在data里面
f.close()
return {'status': 0, 'data': data} # 返回值
else:
return {'status': -1, 'error': "账号不存在"}
def save_db(account_data):
"""写入模块"""
account_file = os.path.join(settings.DB_PATH,"%s.json" % account_data['id']) # 根据名称设置路径
if os.path.isfile(account_file): # 判断这个文件是否存在
f = open("%s.new" % account_file, "w") # 创建一个同名称的新文件
data = json.dump(account_data, f) # 把修改好的值写进创建的文件
f.close() # 关闭文件
os.remove(account_file) # 删除旧文件
os.rename("%s.new" % account_file, account_file) # 重命名新文件
return {"status": 0, 'data': data} # 返回
logger是日志模块:
import logging
from conf import settings
import os
def logger(log_type):
logger = logging.getLogger(log_type)
logger.setLevel(settings.LOG_LEVEL)
log_file = os.path.join(settings.LOG_PATH, settings.LOG_TYPES[log_type]) # 判断是那个文件
fh = logging.FileHandler(log_file)
fh.setLevel(settings.LOG_LEVEL) # 设置日志级别
formatter = settings.LOG_FORMAT # 设置日志格式
fh.setFormatter(formatter)
logger.addHandler(fh) # 加上格式
return logger # 返回
transaction模块是交易模块:
from conf import settings
from .db_handler import save_db
def make_transaction(logger, user_obj, tran_type, amount, **kwargs):
"""交易模块"""
amount = float(amount)
if tran_type in settings.TRANSACTION_TYPE: # 判断交易类型是否支持
interest = amount * settings.TRANSACTION_TYPE[tran_type]['interest'] # 计算利息
old_balabce = user_obj['data']['balance'] # 拿出旧余额
if settings.TRANSACTION_TYPE[tran_type]['action'] == 'plus': # 判断交易类型,是加或者减
new_balance = old_balabce + amount + interest
elif settings.TRANSACTION_TYPE[tran_type]['action'] == 'minus':
new_balance = old_balabce - amount - interest
if new_balance < 0: # 判断余额是否小于0
print('''\033[31;1mYour credit [%s] is not enough for this transaction [-%s],your current[%s]'''
% (user_obj['data']['credit'], (amount + interest), old_balabce))
return {'status': 1, 'error': '交易失败,余额不足!'} # 返回值,提示交易失败
user_obj['data']['balance'] = new_balance # 把新余额保存在内存里面
save_db(user_obj['data']) # 用函数保存在账号文件里面
logger.info("account:%s action:%s amount:%s interest:%s balabce:%s" %
(user_obj['data']['id'], tran_type, amount, interest, new_balance))
return {'status': 0, 'msg': '交易成功!'}
else:
print("\033[31;1mTransaction type [%s] is not exist!\033[0m" % tran_type)
return {'status': 1, 'error': '交易失败,Transaction type [%s] is not exist!' % tran_type}
谢谢欣赏,有问题或者建议请联系我邮箱861257034@qq.com