zoukankan      html  css  js  c++  java
  • Python操作文件模拟SQL语句功能

    #_*_coding:utf-8_*_
    
    import os
    STAFF_DB = "staff.db"
    COLUMN_ORDERS = ['id','name','age','phone','dept','enrolled_date']
    
    
    def load_db():
        staff_data = {
            'id': [],
            'name': [],
            'age': [],
            'phone': [],
            'dept': [],
            'enrolled_date': []
        }
    
        f = open(STAFF_DB, 'r', encoding='utf-8')
    
        for line in f:
            staff_id, name, age, phone, dept, enrolled_date = line.strip().split(',')
            staff_data['id'].append(staff_id)
            staff_data['name'].append(name)
            staff_data['age'].append(age)
            staff_data['phone'].append(phone)
            staff_data['dept'].append(dept)
            staff_data['enrolled_date'].append(enrolled_date)
    
        f.close()
        return staff_data
    
    
    def save_db():
        f = open("%s_tmp" % STAFF_DB, "w", encoding="utf-8")
    
        for index, val in enumerate(STAFF_DATA[COLUMN_ORDERS[0]]):
            row = [str(val)]
            for col in COLUMN_ORDERS[1:]:
                row.append(str(STAFF_DATA[col][index]))
    
            raw_row = ",".join(row)
            f.write(raw_row + "
    ")
        f.close()
        os.replace("%s_tmp" % STAFF_DB, STAFF_DB)
    
    def print_log(msg,msg_type='info'):
        if msg_type == 'error':
            print("33[31;1m Error:%s33[0m" %msg)
        else:
            print("33[32;1mInfo:%s33[0m"%msg)
    
    
    def op_gt(q_name,q_condition):
        matched_data = {}
        for k in STAFF_DATA:
            matched_data[k] = []
        for index,i in enumerate(STAFF_DATA[q_name]):
            q_condition = float(q_condition)
            if float(i) > q_condition:
                for k in matched_data:
                    matched_data[k].append(STAFF_DATA[k][index])
        return matched_data
    
    
    
    def op_lt(q_name,q_condition):
        matched_data = {}
        for k in STAFF_DATA:
            matched_data[k] = []
        for index, i in enumerate(STAFF_DATA[q_name]):
            q_condition = float(q_condition)
            if float(i) < q_condition:
                for k in matched_data:
                    matched_data[k].append(STAFF_DATA[k][index])
        return matched_data
    
    def op_eq(q_name,q_condition):
        matched_data = {}
        for k in STAFF_DATA:
            matched_data[k] = []
        for index, i in enumerate(STAFF_DATA[q_name]):
            q_condition = float(q_condition)
            if float(i) ==  q_condition:
                for k in matched_data:
                    matched_data[k].append(STAFF_DATA[k][index])
        return matched_data
    
    
    def op_like(q_name,q_condition):
        matched_data = {}
        for k in STAFF_DATA:
            matched_data[k] = []
        for index, i in enumerate(STAFF_DATA[q_name]):
            if q_condition in i:
                for k in matched_data:
                    matched_data[k].append(STAFF_DATA[k][index])
        return matched_data
    
    
    def syntax_find(query_clause,mached_data):
        filter_keys = query_clause.split('find')[1].split('from')[0]
        columns = [i.strip() for i in filter_keys.split(',')]
        if "*" in columns:
            if len(columns) == 1:
                columns = COLUMN_ORDERS
            else:
                print_log("*不能同时与其它字段出现","error")
                return False
        if len(columns) == 1:
            if not columns[0]:
                print_log("语法错误,find和from之间必须跟字段名或*", "error")
                return False
        filter_data = []
        for index,val in enumerate(mached_data[columns[0]]):
            row = [val,]
            for col in columns[1:]:
                row.append(mached_data[col][index])
            filter_data.append(row)
        print(filter_data)
    
    
    def syntax_add(query_clause):
        # add staff.db values Alex Li,25,134435344,IT,2015-10-29
        column_vals = [col.strip() for col in query_clause.split("values")[1].split(",")]
        if len(column_vals) == len(COLUMN_ORDERS[1:]):
            init_staff_id = 0
            for i in STAFF_DATA['id']:
                if int(i) > init_staff_id:
                    init_staff_id = int(i)
            init_staff_id += 1
            STAFF_DATA['id'].append(init_staff_id)
            for index, col in enumerate(COLUMN_ORDERS[1:]):
                STAFF_DATA[col].append(column_vals[index])
            save_db()
            print_log("成功添加1条纪录到staff.table表")
        else:
            print_log("提供的字段数据不足,必须字段%s" % COLUMN_ORDERS[1:], 'error')
    
    
    def syntax_update(query_clause, matched_data):
        count = 0
        column_val, value = [col.strip() for col in query_clause.split("set")[1].split('=')]
        if column_val in COLUMN_ORDERS:
             for index, val in enumerate(matched_data[column_val]):
                matched_index = STAFF_DATA[column_val].index(val)
                STAFF_DATA[column_val][matched_index] = value
                count += 1
             print(STAFF_DATA)
             print_log("已更新%s条数据"%count,"info")
             save_db()
        else:
            print_log("字段%s不存在" %column_val,'error')
    
    def syntax_delete(query_clause, matched_data):
        count = 0
        for k in matched_data["id"]:
            matched_index = STAFF_DATA["id"].index(k)
            for col in COLUMN_ORDERS:
                del STAFF_DATA[col][matched_index]
        count += 1
        print(STAFF_DATA )
        print_log("已删除%s条记录"%count,"info")
        save_db()
    
    def syntax_where(clause):
        operators = {
            '>': op_gt,
            '<': op_lt,
            '=': op_eq,
            'like': op_like
        }
        for op_key,op_func in operators.items():
            if op_key in clause:
                q_name,q_condition = clause.split(op_key)
                if q_name.strip() in STAFF_DATA:
                    matched_data = op_func(q_name.strip(),q_condition.strip())
                    return matched_data
                else:
                    print_log("字段'%s'不存在" % q_name, 'error')
                    return False
    
    
    def syntax_parser(cmd):
        syntax_list = {
            'find': syntax_find,
            'add': syntax_add,
            'update': syntax_update,
            'delete': syntax_delete
        }
        if cmd.split()[0] in ['find','update','delete','add'] and 'staff.db' in cmd:
            if 'where' in cmd:
                query_cmd,where_clause = cmd.split('where')
                matched_data = syntax_where(where_clause.strip())
                if matched_data:
                    action_name = cmd.split()[0]
                    syntax_list[action_name](query_cmd,matched_data)
            else:
                syntax_list[cmd.split()[0]](cmd )
    
        else:
            print_log('''语法错误!
    sample:[find/add/update/delete] name,age from [staff_table] where [id][>/</=/like][2]''',
                      'error')
    
            def main():
                while True:
                    cmd = input("SQL>").strip()
                    if not cmd: continue
    
                    syntax_parser(cmd)
    
            STAFF_DATA = load_db()
    
            main()
    
    
    def main():
        while True:
            cmd = input("SQL>").strip()
            if not cmd:continue
    
            syntax_parser(cmd)
    
    
    STAFF_DATA = load_db()
    
    main()
    

      

  • 相关阅读:
    volatile 和 mutable 关键字
    字符串第一个只出现一次的字符
    学一下HDFS,很不错(大数据技术原理及应用)
    把数组排成最小的数
    求第K大的数字
    数组中超过一半的数字
    打印字符的任意排列
    实时推荐系统架构
    带有任意指向指针的链表进行复制
    surfaceflinger中各个layer的排序
  • 原文地址:https://www.cnblogs.com/kumunotes/p/10274191.html
Copyright © 2011-2022 走看看