start.py
"""
软件启动入口
"""
import os
import sys
sys.path.append(os.path.dirname(__file__))
from core import src
if __name__ == "__main__":
src.run()
conf - - settings.py
"""
配置文件
"""
import os
# 获取根目录的路径
B_PATH = os.path.dirname(os.path.dirname(__file__))
# print(B_PATH)
# 获取 db 目录下的 user_date 文件
USER_DATE_PATH = os.path.join(B_PATH, "db", "user_date")
# print(USER_DATE_PATH)
"""
logging配置
"""
# 定义三种日志输出格式 开始
standard_format = '[%(asctime)s][%(threadName)s:%(thread)d][task_id:%(name)s][%(filename)s:%(lineno)d]'
'[%(levelname)s][%(message)s]' # 其中name为getlogger指定的名字
simple_format = '[%(levelname)s][%(asctime)s][%(filename)s:%(lineno)d]%(message)s'
id_simple_format = '[%(levelname)s][%(asctime)s] %(message)s'
# 定义日志输出格式 结束
# ****************注意1: log文件的目录
BASE_PATH = os.path.dirname(os.path.dirname(__file__))
logfile_dir = os.path.join(BASE_PATH, 'log')
# print(logfile_dir)
# ****************注意2: log文件名
logfile_name = 'atm.log'
# 如果不存在定义的日志目录就创建一个
if not os.path.isdir(logfile_dir):
os.mkdir(logfile_dir)
# log文件的全路径
logfile_path = os.path.join(logfile_dir, logfile_name)
LOGGING_DIC = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'standard': {
'format': standard_format
},
'simple': {
'format': simple_format
},
},
'filters': {},
'handlers': {
# 打印到终端的日志
'console': {
'level': 'DEBUG',
'class': 'logging.StreamHandler', # 打印到屏幕
'formatter': 'simple'
},
# 打印到文件的日志,收集info及以上的日志
'default': {
'level': 'DEBUG',
'class': 'logging.handlers.RotatingFileHandler', # 保存到文件
'formatter': 'standard',
'filename': logfile_path, # 日志文件
'maxBytes': 1024 * 1024 * 5, # 日志大小 5M
'backupCount': 5,
'encoding': 'utf-8', # 日志文件的编码,再也不用担心中文log乱码了
},
},
'loggers': {
# logging.getLogger(__name__)拿到的logger配置
'': {
'handlers': ['default', 'console'], # 这里把上面定义的两个handler都加上,即log数据既写入文件又打印到屏幕
'level': 'DEBUG',
'propagate': True, # 向上(更高level的logger)传递
},
},
}
core -- admin.py
"""
管理员功能
"""
from core import src
from interface import admin_interface
# 添加功能
def add_user():
src.register()
# 修改额度
def change_balance():
while True:
# 输入要修改的用户名
change_user = input("change_user>>>:").strip()
# 输入要修改的用户额度
money = input("money>>>:").strip()
if not money.isdigit():
print("请输入数字")
continue
flag,msg = admin_interface.chdnge_balance_interface(change_user,money)
if flag:
print(msg)
break
else:
print(msg)
# 冻结账户
def lock_user():
while True:
# 输入要冻结的用户名
lock_user = input("lock_user>>>:").strip()
flag,msg = admin_interface.lock_user_interface(lock_user)
if flag:
print(msg)
break
else:
print(msg)
func_dic = {
"1": add_user,
"2": change_balance,
"3": lock_user,
}
def admin_run():
while True:
print("""
1,添加账户
2,修改额度
3,冻结账户
""")
choice = input("choice>>>:").strip()
if choice not in func_dic:
print("超出功能范围")
continue
func_dic.get(choice)()
core -- src.py
"""
用户视图层
"""
from lib import common
from interface import user_interface
from interface import bank_interface
from interface import shop_interface
login_user = None
# 注册功能
def register():
while True:
# 接收用户名
username = input("name>>>:").strip()
# 接收两次密码
password = input("password>>>:").strip()
re_password = input("re_password>>>:").strip()
# 判断两次密码是否一致
if password == re_password:
# 调用注册接口
flaw, msg = user_interface.register_interface(username, password, balance=1500)
if flaw:
print(msg)
break
else:
print(msg)
# 登录功能
def login():
while True:
# 输入用户名
username = input("name>>>:").strip()
# 输入密码
password = input("password>>>:").strip()
# 调用登录接口
flaw, msg = user_interface.login_interface(username, password)
if flaw:
print(msg)
# 记录用户登录
global login_user
login_user = username
break
else:
print(msg)
# 查询余额功能
@common.login_auth
def check_balance():
# 调用查询余额接口
balance = user_interface.check_bal_interface(login_user)
# 打印 用户 与 金额
print(f'用户{login_user} 账户余额:{balance} ')
# 提现功能
@common.login_auth
def withdraw():
while True:
# 输入要提现的金额
input_money = input("input_money>>>:").strip()
# 判断输入的是否数字
if not input_money.isdigit():
print("请输入数字")
continue
# 调用提现接口
flag, msg = bank_interface.withdraw_interface(login_user, input_money)
if flag:
print(msg)
break
else:
print(msg)
# 还款功能
@common.login_auth
def repay():
while True:
# 输入还款金额
out_money = input("input_money>>>:").strip()
# 判断输入的是否是数字
if not out_money.isdigit():
print("请输入数字")
continue
out_money = int(out_money)
# 判断还款的金额是否大于0
if out_money > 0:
# 调用还款接口
flag, msg = bank_interface.repay_interface(login_user, out_money)
if flag:
print(msg)
break
else:
print(msg)
# 转账功能
@common.login_auth
def transfer():
while True:
# 转账给目标用户
to_user = input("input_name>>>:").strip()
to_money = input("input_money>>>:").strip()
if not to_money.isdigit():
print("请输入数字")
continue
to_money = int(to_money)
if to_money > 0:
flag, msg = bank_interface.transfer_interface(login_user, to_user, to_money)
if flag:
print(msg)
break
else:
print(msg)
else:
print("请输入数字")
# 查看流水账
@common.login_auth
def check_flow():
# 调用查看流水账接口
flow_list = bank_interface.check_interface(login_user)
if flow_list:
# 循环打印流水账列表
for flow in flow_list:
print(flow)
else:
print("当前用户没有流水账")
# 购物功能
@common.login_auth
def shopping():
# 商品列表
shop_list = [
["苹果", 5],
["雪梨", 6],
["泡椒凤爪", 10],
["Mac", 1000],
]
# 初始化购物车
shopping_car = {}
while True:
# 打印商品信息
for index, shop in enumerate(shop_list):
print(f'商品编号[{index}] '
f'商品名称[{shop}] '
f'商品价格[{shop[1]}]')
# 接收用户输入的编号
choice = input("choice(结账(y/Y) or 添加购物车(n/N))>>>:").strip()
# 判断是否结账
if choice == "y" or choice == "Y":
if not shopping_car:
print("当前购物车是空的,不能支付")
continue
# 调用支付接口
flag, msg = shop_interface.shopping_interface(login_user, shopping_car)
if flag:
print(msg)
break
else:
print(msg)
elif choice == "n" or choice == "N":
if not shopping_car:
print("当前购物车是空的")
continue
# 调用购物车接口
flag, msg = shop_interface.add_shop_car_interface(login_user, shopping_car)
if flag:
print(msg)
break
# 判断输入的是否是数字
if not choice.isdigit():
print("请输入数字")
choice = int(choice)
# 判断输入的商品是否在列表中
if choice not in range(len(shop_list)):
print("没有这个商品")
continue
# 获取 商品名称,单价
shop_name, shop_peice = shop_list[choice]
# 判断是否商品重复
if shop_name in shopping_car:
# 添加商品数量
shopping_car[shop_name][1] += 1
print(shop_list)
else:
# 数量默认为 1
shopping_car[shop_name] = [shop_peice, 1]
# 查看购物车
@common.login_auth
def check_shop_car():
shop_car = shop_interface.check_car_interface(login_user)
print(shop_car)
# 管理员
@common.login_auth
def admin():
from core import admin
admin.admin_run()
# 函数字典
func_dic = {
"0": None,
"1": login,
"2": register,
"3": check_balance,
"4": withdraw,
"5": repay,
"6": transfer,
"7": check_flow,
"8": shopping,
"9": check_shop_car,
"10": admin,
}
# 主程序
def run():
while True:
print("""
------ 功能选择 ------
0 退出
1 登录
2 注册
3 查询余额
4 提现
5 还款
6 转账
7 查看流水账
8 购物商品
9 查看购物车
10 管理员
--------------------------
""")
choice = input("choice>>>:").strip()
if choice == "0":
print("拜拜!")
break
if choice not in func_dic:
print("输入的功能编码超出范围")
continue
func_dic.get(choice)()
# run()
db -- db_hanlder.py
"""
存放文件数据
"""
import os
import json
from conf import settings
# 查看数据
def select(username):
# 创建 用户.json
user_path = os.path.join(settings.USER_DATE_PATH, f'{username}.json')
# 判断文件是否存在
if os.path.exists(user_path):
# 读取文件数据
with open(user_path, "rt", encoding="utf-8") as f:
user_dic = json.load(f)
return user_dic
# 保存数据
def save(user_dic):
# 获取 username
username = user_dic.get("username")
user_path = os.path.join(settings.USER_DATE_PATH, f'{username}.json')
# 把数据写入文件
with open(user_path, "wt", encoding="utf-8") as f:
json.dump(user_dic, f, ensure_ascii=False)
interface -- admin_interface.py
"""
管理员功能
"""
from db import db_hanlder
# 修改额度接口
def chdnge_balance_interface(username,money):
# 获取用户字典
user_dic = db_hanlder.select(username)
# 判断用户名是否存在用户字典里
if user_dic:
# 存在直接修改额度
user_dic["balance"] = int(money)
# 保存数据
db_hanlder.save(user_dic)
return True,'额度修改成功'
return False,'修改用户不存在'
# 冻结账户接口
def lock_user_interface(username):
# 获取用户字典
user_dic = db_hanlder.select(username)
# 判断用户是否存在
if user_dic:
# 存在直接修改冻结的值
user_dic["locked"] = True
# 保存数据
db_hanlder.save(user_dic)
return True,f'用户{username}修改成功'
return False,f'用户{username}不存在'
interface -- bank_interfacek.py
"""
银行接口
"""
from db import db_hanlder
# 提现接口
def withdraw_interface(username, money):
# 获取用户字典
user_dic = db_hanlder.select(username)
# 校验用户金额
balance = int(user_dic.get("balance"))
# 提款扣除的手续费
money1 = int(money) * 1.05
# 判断用户金额是否足够
if balance >= money1:
# 修改金额
balance -= money1
user_dic["balance"] = balance
# 记录流水账
flow = f'用户:[{username}] 提现[{money}元]成功 手续费[{money1 - float(money)}元]'
user_dic["flow"].append(flow)
# 保存数据
db_hanlder.save(user_dic)
return True, flow
return False, '提现余额不足请重新输入'
# 还款接口
def repay_interface(username, money):
# 获取用户字典
user_dic = db_hanlder.select(username)
# 加钱操作
user_dic["balance"] += money
flow = f'用户[{username}] 还款[{money}元]成功 剩余金额[{user_dic["balance"]}元]'
user_dic["flow"].append(flow)
# 保存数据
db_hanlder.save(user_dic)
return True, flow
# 转账接口
def transfer_interface(login_user, to_user, money):
# 获取当前用户字典
login_user_dic = db_hanlder.select(login_user)
# 获取目标用户字典
to_user_dic = db_hanlder.select(to_user)
# 判断目标用户是否存在
if not to_user_dic:
return False, '目标用户不存在'
# 判断转账金额是否足够
if login_user_dic["balance"] >= money:
# 当前用户扣钱
login_user_dic["balance"] -= money
# 目标用户加钱
to_user_dic["balance"] += money
# 流水账记录
login_user_flow = f'用户[{login_user}] 给用户[{to_user}]转账[{money}元]成功'
login_user_dic["flow"].append(login_user_flow)
to_user_flow = f'用户[{to_user}] 接收用户[{login_user}]成功转入[{money}元]'
to_user_dic["flow"].append(to_user_flow)
# 保存数据
db_hanlder.save(login_user_dic)
db_hanlder.save(to_user_dic)
return True, login_user_flow
return False, "转账金额不足"
# 查看流水账接口
def check_interface(login_user):
user_dic = db_hanlder.select(login_user)
return user_dic.get("flow")
# 支付接口
def pay_interface(login_user,count):
user_dic = db_hanlder.select(login_user)
# 判断用户金额是否足够
if user_dic.get("balance") >= count:
user_dic["balance"] -= count
# 记录流水账
flow = f'用户消费金额为[{count}元]'
# 消费记录添加到流水账中
user_dic["flow"].append(flow)
# 保存数据
db_hanlder.save(user_dic)
return True
return False
interface -- shop_interfacek.py
"""
购物接口
"""
from interface import bank_interface
from db import db_hanlder
# 结算接口
def shopping_interface(login_user, shopping_car):
# 计算消费总额
count = 0
for value in shopping_car.values():
price, number = value
count += (price * number)
# 调用银行接口
flag = bank_interface.pay_interface(login_user, count)
if flag:
return True, "支付成功"
return False, "金额不足"
# 添加购物车接口
def add_shop_car_interface(login_user, shopping_car):
user_dic = db_hanlder.select(login_user)
for shop_name, number in shopping_car.items():
if shop_name in user_dic.get("shop_car"):
user_dic["shop_car"][shop_name] += number[1]
else:
user_dic["shop_car"].update({shop_name: number})
db_hanlder.save(user_dic)
return True, '购物车添加成功'
# 查看购物车接口
def check_car_interface(username):
user_dic = db_hanlder.select(username)
return user_dic.get("shop_car")
interface -- user_interfacek.py
"""
用户信息接口
"""
from lib import common
from db import db_hanlder
user_logger = common.get_logger("user")
# 注册接口
def register_interface(username, password, balance):
# 查看用户名是否存在
user_dic = db_hanlder.select(username)
# 判断用户名是否存在
if user_dic:
return False, "用户已存在"
# 加密
password = common.get_md5(password)
# 创建 user_dic 字典
user_dic = {
"username": username,
"password": password,
"balance": balance,
"flow": [],
"shop_car": {},
"locked": False
}
# 将字典保存到文件里
db_hanlder.save(user_dic)
msg = f'{username}注册成功'
# 记录日志
user_logger.info(msg)
return True,msg
# 登录接口
def login_interface(username,password):
# 获取用户字典
user_dic = db_hanlder.select(username)
# 判断是否用户是否被冻结
if user_dic.get("locked"):
return False, "当前用户已被锁定"
if user_dic:
password = common.get_md5(password)
if password == user_dic.get("password"):
msg = f'用户{username}登录成功'
user_logger.info(msg)
return True, msg
else:
msg = "密码错误"
user_logger.info(msg)
return True, msg
msg = '用户不存在,请重新输入'
user_logger.info(msg)
return True, msg
# 查看余额
def check_bal_interface(username):
user_dic = db_hanlder.select(username)
return user_dic["balance"]
lib -- common.py
"""
公共方法
"""
import hashlib
import logging.config
from conf import settings
# 加密
def get_md5(password):
m = hashlib.md5()
m.update(password.encode("utf-8"))
slat = "猪猪侠,童话里做英雄"
m.update(slat.encode("utf-8"))
res = m.hexdigest()
return res
# 登录装饰器
def login_auth(func):
from core import src
def inner(*args,**kwargs):
if src.login_user:
return func(*args,**kwargs)
else:
print("未登录无法使用其他功能")
src.login()
return inner
# 添加日志功能
def get_logger(log_type):
# 添加日志信息
logging.config.dictConfig(settings.LOGGING_DIC)
# 获取字典对象
logger = logging.getLogger(log_type)
return logger
log -- atm.log
[2020-04-04 22:24:05,491][MainThread:9792][task_id:user][user_interface.py:49][INFO][用户eav登录成功]
[2020-04-04 22:25:45,641][MainThread:9696][task_id:user][user_interface.py:49][INFO][用户eav登录成功]
[2020-04-04 22:27:33,245][MainThread:9900][task_id:user][user_interface.py:49][INFO][用户eav登录成功]
[2020-04-04 22:29:35,004][MainThread:9568][task_id:user][user_interface.py:49][INFO][用户eav登录成功]
[2020-04-04 22:31:39,511][MainThread:9620][task_id:user][user_interface.py:49][INFO][用户eav登录成功]
[2020-04-04 22:32:42,167][MainThread:5252][task_id:user][user_interface.py:49][INFO][用户eav登录成功]