zoukankan      html  css  js  c++  java
  • ATM项目

    项目

    #一个项目是怎么从无到有的
    
    #1.需求分析
    	额度15000或者自定义		--> 注册功能
        实现购物商城,买东西自动加入购物车,调用信用卡接口结账		--> 购物车、支付功能
        可以提现,支付手续费5%	--> 提现
        支持多账户登录		--> 登录
        支持账户间转账		--> 转账,记录流水
        提供还款接口		--> 还款
        ATM记录操作日志	--> 日志功能
        提供管理接口,包括添加账户、用户额度、冻结账户...		--> 管理员功能
        用户认证功能		--> 登录认证,使用装饰器
    
    #2.程序的架构设计
    	用户功能层,接收用户输入,输出执行结果,还可以做小的逻辑判断
        接口层,业务逻辑的处理、日志、流水
        数据处理层,数据的处理(增删改查)
        
        #三层架构的好处
        	1.代码的结构更加清晰
            2.可扩展性强
            3.易于维护、管理
    
    #3.分任务开发
    	- CTO
        	- 技术总监
            	- 架构师
                	- 项目经理
                    	- 普通开发
                        - UI:用户界面设计师
                        - 前端:网页的开发
                        - 后端:写业务逻辑、写接口
                        - 测试:测试软件
                        - 运维:部署项目
                        
    #4.测试
    	- 1.手动测试
        	传统人工去手动测试
        - 2.自动化测试
        	通过脚本模拟人的行为,自动化执行测试
        - 3.黑盒测试
        	对用户界面进行测试 
        - 4.白盒测试
        	对软件的性能进行测试,例如每分钟能接收多少并发量
            
    #5.上线运行
    

    实例

    参考网站

    启动文件

    #创建项目目录ATM
    #ATM/bin/start.py
    
    import sys
    import os
    BASE_dir = os.path.dirname(os.path.dirname(__file__))
    sys.path.append(BASE_dir)
    from core import src
    if __name__ == '__main__':
        src.run()
    

    配置文件

    #ATM/conf/settings.py
    
    import os
    base_path = os.path.dirname(os.path.dirname(__file__))
    db_path = os.path.join(base_path, 'db')
    #不要使用%s拼接路径,要使用os模块
    LOG_path = os.path.join(base_path,'log')
    
    """
    logging配置
    """
    
    # 定义三种日志输出格式 开始
    standard_format = '[%(asctime)s][%(threadName)s:%(thread)d][task_id:%(name)s][%(filename)s:%(lineno)d]' 
                      '[%(levelname)s][%(message)s]'  # 其中name为getlogger指定的名字
    simple_format = '[%(levelname)s][%(asctime)s][%(filename)s:%(lineno)d]%(message)s'
    id_simple_format = '[%(levelname)s][%(asctime)s] %(message)s'
    
    # 定义日志输出格式 结束
    logfile_dir = os.path.join(base_path, 'log')  # log文件的目录
    logfile_name = 'atm_shop_log.log'  # log文件名
    
    # 如果不存在定义的日志目录就创建一个
    if not os.path.isdir(logfile_dir):
        os.mkdir(logfile_dir)
    
    # log文件的全路径
    logfile_path = os.path.join(logfile_dir, logfile_name)
    
    # log配置字典
    LOGGING_DIC = {
        'version': 1,
        'disable_existing_loggers': False,
        'formatters': {
            'standard': {
                'format': standard_format
            },
            'simple': {
                'format': simple_format
            },
        },
        'filters': {},
        'handlers': {
            # 打印到终端的日志
            'console': {
                'level': 'DEBUG',
                'class': 'logging.StreamHandler',  # 打印到屏幕
                'formatter': 'simple'
            },
            # 打印到文件的日志,收集info及以上的日志
            'default': {
                'level': 'DEBUG',
                'class': 'logging.handlers.RotatingFileHandler',  # 保存到文件
                'formatter': 'standard',
                'filename': logfile_path,  # 日志文件
                'maxBytes': 1024 * 1024 * 5,  # 日志大小5M
                'backupCount': 5,
                'encoding': 'utf-8',  # 日志文件的编码,再也不用担心中文log乱码了
            },
        },
        'loggers': {
            # logging.getLogger(__name__)拿到的logger配置
            '': {
                'handlers': ['default', 'console'],  # 这里把上面定义的两个handler都加上,即log数据既写入文件又打印到屏幕
                'level': 'DEBUG',
                'propagate': True,  # 向上(更高level的logger)传递
            },
        },
    }
    

    核心文件

    #ATM/core/src.py
    
    import time
    from ATM.interface import user_interface
    from ATM.lib import comman
    from ATM.interface import bank_interface
    from ATM.interface import shop_interface
    from ATM.interface import admin_interface
    
    user_info = {
        'user':None,
    }
    #注册
    def register():
        while True:
            user = input('请输入用户名>>>: ').strip()
            # 调用接口,判断用户是否存在
            flag = user_interface.check_user_interface(user)
            if flag:
                print('用户已存在,请重新输入!')
                continue
            pwd = input('请输入密码>>>: ').strip()
            re_pwd = input('请确认密码>>>: ').strip()
            if pwd == re_pwd:
                #调用接口,保存用户信息
                msg = user_interface.register_interface(user,pwd)
                if msg:
                    print(msg)
                    break
                else:
                    print('注册失败')
            else:
                print('两次输入密码不一致')
    #登录
    def login():
        while True:
            user = input('请输入用户名>>>: ').strip()
            flag = user_interface.check_user_interface(user)
            if not flag:
                print('用户不存在,请重新输入!')
                continue
            pwd = input('请输入用户密码>>>: ').strip()
            flag,msg = user_interface.login_interface(user,pwd)
            if flag:
                print(msg)
                #登录成功后,做一个记录
                user_info['user'] = user
                break
            else:
                print(msg)
                break
    #查看余额
    @comman.login_auth
    def check_balance():
        balabce = user_interface.check_balance_interface(user_info['user'])
        msg = f"{user_info['user']}余额为: "
        print(msg,balabce)
    #提现
    @comman.login_auth
    def withdraw():
        while True:
            money = input('请输入提现金额>>>: ').strip()
            if money.isdigit():
                money = int(money)
                flag,msg = bank_interface.withdraw_interface(user_info['user'],money)
                if flag:
                    print(msg)
                    break
                else:
                    print(msg)
            else:
                print('请输入数字')
    #还款
    @comman.login_auth
    def pay_bak():
        while True:
            money = input('请输入还款金额>>>: ').strip()
            if not money.isdigit():
                print('请输入数字!')
                continue
            money = int(money)
            flag,msg = bank_interface.repay_interface(user_info['user'],money)
            if flag:
                print(msg)
                break
    #转账
    @comman.login_auth
    def transfer():
        while True:
            to_user = input('请输入转账用户>>>: ').strip()
            flag = user_interface.check_user_interface(to_user)
            if not flag:
                print('目标用户不存在!')
                continue
            #输入转账金额
            money = input('请输入转账金额>>>: ').strip()
            if not money.isdigit():
                print('请输入数字')
                continue
            money = int(money)
            flag,msg = bank_interface.transfer_interface(to_user,user_info['user'],money)
            if flag:
                print(msg)
                break
            else:
                print(msg)
    #查看流水
    @comman.login_auth
    def check_record():
        flow_list = bank_interface.check_flow_interface(user_info['user'])
        for flow in flow_list:
            print(flow)
    #购物车功能
    @comman.login_auth
    def shopping_cart():
        good_list = [
            ['衬衫', 500],
            ['裤子', 300],
            ['鞋子', 150],
            ['外套', 1000]
        ]
        #用户余额
        user_balance = user_interface.check_balance_interface(user_info['user'])
        #购物车
        shop_cart = {}
        #清空购物车,用户需要支付的金额
        cost = 0
        while True:
            # 打印商品信息
            for index, good_price in enumerate(good_list, 1):
                print(index, good_price)
            # 选择商品编号或者输入q退出
            choice = input('请输入商品编号或输入q退出>>>: ').strip()
            if choice.isdigit():
                choice = int(choice)
                if choice >= 0 and choice <= len(good_list):
                    good_name, good_price = good_list[choice - 1]
                    if user_balance >= good_price:
                        if good_name in shop_cart:
                            shop_cart[good_name] += 1
                        else:
                            shop_cart[good_name] = 1
                        cost += good_price
                    else:
                        print('余额不足,无法添加购物车!')
                else:
                    print('商品编号不存在!')
            elif choice == 'q':
                commit = input('是否确认结账,请输入(y/n)').strip()
                if commit == 'y':
                    # 调用商城支付功能,并调用银行支付接口
                    flag, msg = shop_interface.buy_shop_interface(user_info['user'], cost)
                    if flag:
                        print(msg)
                        break
                    else:
                        print(msg)
                elif commit == 'n':
                    # 调用添加购物车接口
                    flag, msg = shop_interface.add_shop_cart_interface(user_info['user'], shop_cart)
                    if flag:
                        print(msg)
                        break
                    else:
                        print(msg)
            else:
                print('请输入商品编号或输入q')
    #查看购物车
    @comman.login_auth
    def check_shop_cart():
        shop_cart = shop_interface.check_shop_cart_interface(user_info['user'])
        if not shop_cart:
            print('购物车为空')
        print(shop_cart)
    #注销
    @comman.login_auth
    def logout():
        while True:
            flag,msg = user_interface.logout_interface(user_info['user'])
            if flag:
                print(msg)
                break
    #管理员功能
    def lock_user():
        while True:
            user = input('请输入需要冻结的账户>>>: ').strip()
            flag = user_interface.check_user_interface(user)
            if flag:
                msg = admin_interface.lock_interface(user)
                print(msg)
                break
            else:
                print('用户不存在,请重新输入!')
    def unlock_user():
        while True:
            user = input('请输入需要解冻的账户>>>: ').strip()
            flag = user_interface.check_user_interface(user)
            if flag:
                msg = admin_interface.unlock_interface(user)
                print(msg)
                break
            else:
                print('用户不存在,请重新输入!')
    def change_limit():
        while True:
            user = input('请输入需要修改额度的账户>>>: ').strip()
            flag = user_interface.check_user_interface(user)
            limit = input('请输入修改额度>>>: ').strip()
            if flag:
                if limit.isdigit():
                    limit = input(limit)
                    #调用修改额度接口
                    msg = admin_interface.change_limit_interface(user,limit)
                    print(msg)
                    break
                else:
                    print('输入额度无效,请重新输入')
                    continue
            else:
                print('用户不存在,请重新输入!')
    admin_func_dic = {
        '1':lock_user,
        '2':unlock_user,
        '3':change_limit,
    }
    @comman.login_auth
    def admin_sys():
        while True:
            print('''
                1.冻结账户
                2.解冻账户
                3.用户额度
            ''')
            choice = input('请选择管理员功能>>>: ').strip()
            if choice.isdigit():
                if choice in admin_func_dic:
                    admin_func_dic[choice]()
                    break
                else:
                    print('相关功能还在开发中...')
                    continue
            else:
                print('输入无效,请重新输入!')
    my_dict = {
        '1':register,
        '2':login,
        '3':check_balance,
        '4':withdraw,
        '5':pay_bak,
        '6':transfer,
        '7':check_record,
        '8':shopping_cart,
        '9':check_shop_cart,
        '10':logout,
        '11':admin_sys
    }
    def run():
        while True:
            print("""
            1  register(注册)
            2  login(登录)
            3  check_balance(查看余额)
            4  withdraw(提现)
            5  pay_bak(还款)
            6  transfer(转账)
            7  check_record(查看流水)
            8  shopping_cart(购物车功能)
            9  check_shop_cart(查看购物车)
            10 logout(注销)
            11 admin_sys(管理员功能)
            """)
            choice = input('please choice func to operate>>>: ').strip()
            if choice == 'q':
                break
            if choice not in my_dict:
                print('相关功能还在开发中')
                time.sleep(1)
                continue
            my_dict.get(choice)()
    

    文件数据操作

    # ATM/db/db_handler.py 
    
    import json
    import os
    from ATM.conf import settings
    #保存用户数据
    def save(user_dic):
        user_path = f'{settings.db_path}/{user_dic["user"]}.json'
        with open(user_path, 'w', encoding='utf-8') as f:
            #中文不转码
            json.dump(user_dic, f,ensure_ascii=False)
            f.flush()
    #查看用户数据
    def select(user):
        user_path = f'{settings.db_path}/{user}.json'
        #判断用户文件是否存在
        if os.path.exists(user_path):
            with open(user_path,'r',encoding='utf-8') as f:
                user_dic = json.load(f)
                return user_dic
    

    接口文件

    用户相关接口

    # ATM/interface/user_interface
    
    from ATM.db import db_handler
    from ATM.lib import comman
    import os
    
    user_logger = comman.get_logger('user')
    #注册接口
    def register_interface(user,pwd,balance=15000):
        pwd = comman.get_md5(pwd)
        user_dic = {
            'user': user,
            'pwd': pwd,
            'balance': balance,
            'flow':[],
            'shop_cart':{},
            'lock':False
        }
        db_handler.save(user_dic)
        user_logger.info(f'{user_dic["user"]}用户注册成功!')
        return f'{user_dic["user"]}用户注册成功!'
    #查看用户接口
    def check_user_interface(user):
        #调用数据处理层的select函数,查看用户信息
        user_dic = db_handler.select(user)
        if user_dic:
            return True
    #登录接口
    def login_interface(user,pwd):
        pwd = comman.get_md5(pwd)
        user_dic = db_handler.select(user)
        if user_dic['lock']:
            user_logger.info(f'用户{user}已经被冻结!')
            return False,f'用户{user}已经被冻结!'
        if user_dic['pwd'] == pwd:
            user_logger.info(f'用户{user}登录成功')
            return True,f'用户{user}登录成功'
        user_logger.info(f'用户{user}登录失败,密码错误')
        return False,f'用户{user}登录失败,密码错误'
    #查看余额接口
    def check_balance_interface(user):
        user_dic = db_handler.select(user)
        return user_dic['balance']
    #注销接口
    def logout_interface(user):
        from ATM.core import src
        src.user_info['user'] = None
        user_logger.info(f'用户{user}注销成功!欢迎下次使用')
        return True,f'用户{user}注销成功!欢迎下次使用'
    

    商店相关接口

    from ATM.db import db_handler
    from ATM.interface import bank_interface
    from ATM.lib import comman
    
    user_logger = comman.get_logger('shop')
    #商城结账接口
    def buy_shop_interface(user,cost):
        flag,msg = bank_interface.pay_interface(user,cost)
        if flag:
            user_logger.info(f'用户{user}购物成功,余额为{msg}')
            return True,f'购物成功,余额为{msg}'
        else:
            user_logger.info(f'用户{user}余额为{msg},余额不足,购物失败!')
            return False,f'余额为{msg},余额不足,购物失败!'
    #添加购物车接口
    def add_shop_cart_interface(user,shop_cart):
        user_dic = db_handler.select(user)
        old_cart = user_dic['shop_cart']
        #循环当前购物车
        for shop in shop_cart:
            if shop in old_cart:
                #把回购的商品信息添加到原来的购物车
                old_cart[shop] += shop_cart[shop]
            else:
                #把新商品添加到购物车
                old_cart[shop] = shop_cart[shop]
        user_dic['shop_cart'].update(old_cart)
        db_handler.save(user_dic)
        user_logger.info(f'用户{user}添加购物车成功')
        return True,'添加购物车成功'
    #查看购物车接口
    def check_shop_cart_interface(user):
        user_dic = db_handler.select(user)
        user_logger.info(f'用户{user}查看购物车')
        return user_dic['shop_cart']
    

    银行相关接口

    from ATM.db import db_handler
    from ATM.lib import comman
    
    user_logger = comman.get_logger('bank')
    #提现接口
    def withdraw_interface(user,money):
        user_dic = db_handler.select(user)
        if user_dic['balance'] >= money*1.05:
            user_dic['balance']-=money*1.05
            msg = f'用户{user}提现[{money}]元成功!'
            user_dic['flow'].append(msg)
            db_handler.save(user_dic)
            user_logger.info(f'用户{user}提现[{money}]元成功!')
            return True,msg
        user_logger.info(f'用户{user}提现失败,余额不足...')
        return False,'余额不足...'
    #还款接口
    def repay_interface(user,money):
        user_dic = db_handler.select(user)
        user_dic['balance'] +=money
        msg = f'用户{user},还款{money}成功'
        user_dic['flow'].append(msg)
        db_handler.save(user_dic)
        user_logger.info(f'用户{user},还款{money}成功')
        return True,msg
    #转账接口
    def transfer_interface(to_user,from_user,money):
        to_user_dic = db_handler.select(to_user)
        from_user_dic = db_handler.select(from_user)
        if from_user_dic['balance'] >=money:
            #金额操作
            from_user_dic['balance'] -=money
            to_user_dic['balance'] +=money
            #拼接流水
            to_user_flow = f'{to_user}用户接受{from_user}用户转账{money}元'
            from_user_flow = f'{from_user}用户给{to_user}用户转账{money}元'
            #记录流水
            from_user_dic['flow'].append(from_user_flow)
            to_user_dic['flow'].append(to_user_flow)
            #保存用户余额信息
            db_handler.save(from_user_dic)
            db_handler.save(to_user_dic)
            return True,from_user_flow
        user_logger.info(f'用户{from_user}转账失败,余额不足,请充值!')
        return False,'余额不足,请充值!'
    #查看流水接口
    def check_flow_interface(user):
        user_dic = db_handler.select(user)
        user_logger.info(f'用户{user}查看了银行流水')
        return user_dic['flow']
    #银行支付接口
    def pay_interface(user,cost):
        user_dic = db_handler.select(user)
        if user_dic['balance'] >= cost:
            user_dic['balance'] -=cost
            user_dic['flow'].append(f'{user}用户支付{cost}成功')
            db_handler.save(user_dic)
            user_logger.info(f'用户{user}支付{cost}成功')
            return True,user_dic['balance']
        else:
            user_logger.info(f'用户{user}支付{cost}失败')
            return False,user_dic['balance']
    if __name__ == '__main__':
        res = pay_interface('syy',1)
        print(res)
    

    管理员相关接口

    from ATM.db import db_handler
    from ATM.lib import comman
    
    user_logger = comman.get_logger('admin')
    #冻结账户接口
    def lock_interface(user):
        user_dic = db_handler.select(user)
        user_dic['lock'] = True
        db_handler.save(user_dic)
        user_logger.info(f'用户{user}冻结成功')
        return f'用户{user}冻结成功'
    #解冻账户接口
    def unlock_interface(user):
        user_dic = db_handler.select(user)
        user_dic['lock'] = False
        db_handler.save(user_dic)
        user_logger.info(f'用户{user}解冻成功')
        return f'用户{user}解冻成功'
    #修改余额接口
    def change_limit_interface(user,limit):
        user_dic = db_handler.select(user)
        user_dic['balance'] = limit
        db_handler.save(user_dic)
        user_logger.info(f'用户{user}修改余额成功')
        return f'用户{user}修改额度成功'
    

    通用文件

    #ATM/lib/comman.py
    
    import hashlib
    import logging.config
    from ATM.conf import settings
    
    #登录认证
    def login_auth(func):
        from core import src
        def inner(*args,**kwargs):
            #判断用户是否登录,是则执行被装饰函数,否则要求登录
            if src.user_info['user']:
                res = func(*args,**kwargs)
                return res
            else:
                print('请先登录...')
                src.login()
        return inner
    
    #md5加密
    def get_md5(pwd):
        val = '加盐'
        md5 = hashlib.md5()
        md5.update(val.encode('utf-8'))
        md5.update(pwd.encode('utf-8'))
        res = md5.hexdigest()
        return res
    
    #日志功能
    def get_logger(type_name):
        logging.config.dictConfig(settings.LOGGING_DIC)
        logger = logging.getLogger(type_name)
        return logger
    
    if __name__ == '__main__':
        logger = get_logger('syy')
        logger.info('哈哈哈哈哈哈哈')
    
  • 相关阅读:
    [LeetCode] 240
    [LeetCode] 169
    [LeetCode] 28
    [LeetCode] 27
    [LeetCode] 14
    [LeetCode] 9
    [LeetCode] 7
    [LeetCode] 2
    数据库开发规范
    Mysql优化
  • 原文地址:https://www.cnblogs.com/syy1757528181/p/14059494.html
Copyright © 2011-2022 走看看