zoukankan      html  css  js  c++  java
  • python操作mysql数据库读取一个数据库的表写入另一个数据库

    写这个肯定是工作需要了,不啰嗦,直接说事
    我现在有两台主机,一台是公司主机,一台是客户主机,要求把公司主机上的三个表同步到客户主机上的数据库
    注意是同步,首先就得考虑用linux定时任务或者主从复制,主从复制因为我没有权限在主机上设置,所以只能选择通过脚本,做定时任务
    涉及的三个表创建语句

    # 创建表`schedule_building`
    create_sql_schedule_building = """
        DROP table IF EXISTS schedule_building ;
        CREATE TABLE `schedule_building` (
          `id` int(11) NOT NULL AUTO_INCREMENT,
          `uuid` varchar(32) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
          `proj_id` int(11) DEFAULT NULL,
          `team_id` int(11) DEFAULT NULL,
          `Building` varchar(32) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
          `status` tinyint(4) DEFAULT '1',
          `cid` int(11) DEFAULT NULL,
          `cusrname` varchar(50) CHARACTER SET utf8 DEFAULT NULL COMMENT '建立人',
          `ctime` datetime DEFAULT NULL,
          `uid` int(11) DEFAULT NULL,
          `uusrname` varchar(50) CHARACTER SET utf8 DEFAULT NULL COMMENT '修改人',
          `utime` datetime DEFAULT NULL,
          `random_no` varchar(50) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
          PRIMARY KEY (`id`)
        ) ENGINE=InnoDB AUTO_INCREMENT=91 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
    """
    
    # 创建表`schedule_floor`
    create_sql_schedule_floor = """
        DROP table IF EXISTS schedule_floor ;
        CREATE TABLE `schedule_floor` (
          `id` int(11) NOT NULL AUTO_INCREMENT,
          `m_id` int(11) DEFAULT NULL,
          `sort` int(11) DEFAULT NULL,
          `cname` varchar(100) CHARACTER SET utf8 DEFAULT NULL,
          `ctime` datetime DEFAULT NULL,
          `cid` int(11) DEFAULT NULL,
          `utime` datetime DEFAULT NULL,
          `uid` int(11) DEFAULT NULL,
          `status` tinyint(4) DEFAULT '1',
          PRIMARY KEY (`id`)
        ) ENGINE=InnoDB AUTO_INCREMENT=3249 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci ;
    """
    
    # 创建表`schedule_room`
    create_sql_schedule_room = """
        DROP table IF EXISTS schedule_room ;
        CREATE TABLE `schedule_room` (
          `id` int(11) NOT NULL AUTO_INCREMENT,
          `m_id` int(11) DEFAULT NULL,
          `ilevel` int(11) DEFAULT NULL,
          `parent_id` int(11) DEFAULT NULL,
          `cname` varchar(50) DEFAULT NULL,
          `mark` varchar(50) DEFAULT NULL,
          `status` tinyint(4) DEFAULT '1',
          `sort` int(11) DEFAULT NULL,
          PRIMARY KEY (`id`)
        ) ENGINE=InnoDB AUTO_INCREMENT=926 DEFAULT CHARSET=utf8;
    """
    

    核心代码

    from mysql_base import DataBaseParent_local, DataBaseParent_remote, DataBaseParent_test
    import MySQLdb
    
    db1 = DataBaseParent_local()
    db2 = DataBaseParent_remote()
    db3 = DataBaseParent_test()
    
    
    def read(tb_name):
        sql = "SELECT * FROM {0};".format(tb_name)
        rows, length = db1.select(sql)
        data = []
        for row in rows:
            data.append(row)
        return data
    
    
    def write_building():
        schedule_building = read("schedule_building")
        sql_schedule_building_2 = "delete from schedule_building ;"
        sql_schedule_building_3 = "insert into schedule_building values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);"
        db3.insert_many(sql_schedule_building_3, sql_schedule_building_2, schedule_building)
    
    
    def write_floor():
        schedule_floor = read("schedule_floor")
        sql_schedule_floor_2 = "delete from schedule_floor ;"
        sql_schedule_floor_1 = "insert into schedule_floor values(%s,%s,%s,%s,%s,%s,%s,%s,%s);"
        db3.insert_many(sql_schedule_floor_1, sql_schedule_floor_2, schedule_floor)
    
    
    def write_room():
        schedule_room = read("schedule_room")
        sql_schedule_room_2 = "delete from schedule_room ;"
        sql_schedule_room_1 = "insert into schedule_room values(%s,%s,%s,%s,%s,%s,%s,%s);"
        db3.insert_many(sql_schedule_room_1, sql_schedule_room_2, schedule_room)
    
    
    if __name__ == '__main__':
        write_floor()
        write_building()
        write_room()
    

    数据库共享基类

    #!/usr/bin/python
    # -*- coding: UTF-8 -*-
    """DB共享类库"""
    # 使用此类,先实例化一个DataBaseParent_local对象,然后对象调用相应方法
    # from django.db import connection
    
    import MySQLdb
    db1 = MySQLdb.connect("www.shdfshajd.cn", "db_user", "qazeDC!@12", "xcx", charset='utf8')
    db3 = MySQLdb.connect("localhost", "root", "root", "apollo", charset='utf8')
    
    class DataBaseParent_local:
        def __init__(self):
            self.cursor = "Initial Status"
            self.cursor = db1.cursor()
            if self.cursor == "Initial Status":
                raise Exception("Can't connect to Database server!")
    
        # 返回元组套元组数据
        def select(self, sqlstr):
            # result = (('apollo', 'male', '164.jpeg'), ('apollo', 'male', ''))
            cur = db1.cursor()
            cur.execute(sqlstr)
            List = cur.fetchall()
            iTotal_length = len(List)
            self.description = cur.description
            cur.close()
            return List, iTotal_length
    
        # 返回列表套字典数据
        def select_include_name(self, sqlstr):
            # result = [{'name':'apollo','age':28},{'name':'jack','age':27}]
            cur = db1.cursor()
            cur.execute(sqlstr)
            index = cur.description
            List = cur.fetchall()
            iTotal_length = len(List)
            result = []
            for res in List:
                row = {}
                for i in range(len(index) - 1):
                    row[index[i][0]] = res[i]
                result.append(row)
            cur.close()
            return result, iTotal_length
    
        # 返回指定页码数据(元组套元组)
        def select_for_grid(self, sqlstr, pageNo=1, select_size=5):
            # List: (('apollo','male','28'),('jack','male','27'))
            # iTotal_length: 查询结果元组的长度
            # select_size:分页每页显示
            # pageNo:页码
            List, iTotal_length = self.select(sqlstr)
            # 确定页码
            if iTotal_length % select_size == 0:
                iTotal_Page = iTotal_length / select_size
            else:
                iTotal_Page = iTotal_length / select_size + 1
    
            start, end = (pageNo - 1) * select_size, pageNo * select_size
            if end >= iTotal_length: end = iTotal_length
            if iTotal_length == 0 or start > iTotal_length or start < 0:
                return [], iTotal_length, iTotal_Page, pageNo, select_size
            # 假设有10条数据,select_size=5,对应结果如下:
            # List[start:end]:(('apollo','male','28'),('jack','male','27')) 10,2,
            # iTotal_length:10
            # iTotal_Page:2
            # pageNo:1
            # select_size:5
            return List[start:end], iTotal_length, iTotal_Page, pageNo, select_size
    
        # 执行sql语句
        def executesql(self, sqlstr):
            cur = db1.cursor()
            r = cur.execute(sqlstr)
            db1.commit()
            cur.close()
            return r
    
        # 插入数据
        def insert(self, sql, param):
            cur = self.cursor
            n = cur.execute(sql, param)
            db1.commit()
            cur.close()
            return n
    
        def release(self):
            return 0
    
    
    class DataBaseParent_test:
        def __init__(self):
            self.cursor = "Initial Status"
            self.cursor = db3.cursor()
            if self.cursor == "Initial Status":
                raise Exception("Can't connect to Database server!")
    
        # 返回元组套元组数据
        def select(self, sqlstr):
            # result = (('apollo', 'male', '164.jpeg'), ('apollo', 'male', ''))
            cur = db3.cursor()
            cur.execute(sqlstr)
            List = cur.fetchall()
            iTotal_length = len(List)
            self.description = cur.description
            cur.close()
            return List, iTotal_length
    
        # 返回列表套字典数据
        def select_include_name(self, sqlstr):
            # result = [{'name':'apollo','age':28},{'name':'jack','age':27}]
            cur = db3.cursor()
            cur.execute(sqlstr)
            index = cur.description
            List = cur.fetchall()
            iTotal_length = len(List)
            result = []
            for res in List:
                row = {}
                for i in range(len(index) - 1):
                    row[index[i][0]] = res[i]
                result.append(row)
            cur.close()
            return result, iTotal_length
    
        # 返回指定页码数据(元组套元组)
        def select_for_grid(self, sqlstr, pageNo=1, select_size=5):
            # List: (('apollo','male','28'),('jack','male','27'))
            # iTotal_length: 查询结果元组的长度
            # select_size:分页每页显示
            # pageNo:页码
            List, iTotal_length = self.select(sqlstr)
            # 确定页码
            if iTotal_length % select_size == 0:
                iTotal_Page = iTotal_length / select_size
            else:
                iTotal_Page = iTotal_length / select_size + 1
    
            start, end = (pageNo - 1) * select_size, pageNo * select_size
            if end >= iTotal_length: end = iTotal_length
            if iTotal_length == 0 or start > iTotal_length or start < 0:
                return [], iTotal_length, iTotal_Page, pageNo, select_size
            # 假设有10条数据,select_size=5,对应结果如下:
            # List[start:end]:(('apollo','male','28'),('jack','male','27')) 10,2,
            # iTotal_length:10
            # iTotal_Page:2
            # pageNo:1
            # select_size:5
            return List[start:end], iTotal_length, iTotal_Page, pageNo, select_size
    
        # 执行sql语句
        def executesql(self, sqlstr):
            cur = db3.cursor()
            r = cur.execute(sqlstr)
            db1.commit()
            cur.close()
            return r
    
        # 插入数据
        def insert(self, sql, param):
            cur = self.cursor
            n = cur.execute(sql, param)
            db3.commit()
            cur.close()
            return n
    
        def release(self):
            return 0
    
        def insert_many(self, sql, sql1, args):
            cur = self.cursor
            cur.execute(sql1)
            res = cur.executemany(sql, args)
            db3.commit()
            # cur.close()
            return res
    
    
  • 相关阅读:
    MVC路由测试
    关于Dapper的使用笔记3
    关于Dapper的使用笔记2
    关于Dapper的使用笔记1
    关于WCF与Autofac的整合
    js获取页面元素距离浏览器工作区顶端的距离
    document.body.scrollTop和document.documentElement.scrollTop 以及值为0的问题
    js实现获取对象key名
    微信小程序分包跳转主包页面
    js禁止页面滚动
  • 原文地址:https://www.cnblogs.com/apollo1616/p/10450795.html
Copyright © 2011-2022 走看看