zoukankan      html  css  js  c++  java
  • 每天迁移MySQL历史数据到历史库Python脚本

    #!/usr/bin/env python
    # coding:utf-8
    #__author__ = 'Logan'
      
     
    import MySQLdb
    import sys
    import datetime
    import time
      
    class ClassMigrate(object):
      def _get_argv(self):
        self.usage = """
          usage():
          python daily_migration.py --source=192.168.1.4:3306/db_name:tab_name/proxy/password \
                        --dest=192.168.1.150:13301/db_name_archive:tab_name_201601/proxy/password \
                        --delete_strategy=delete --primary_key=auto_id --date_col=ut --time_interval=180
          """
        if len(sys.argv) == 1:
          print self.usage
          sys.exit(1)
        elif sys.argv[1] == '--help' or sys.argv[1] == '-h':
            print self.usage
            sys.exit()
        elif len(sys.argv) > 2:
          for i in sys.argv[1:]:
            _argv = i.split('=')
            if _argv[0] == '--source':
              _list = _argv[1].split('/')
              self.source_host = _list[0].split(':')[0]
              self.source_port = int(_list[0].split(':')[1])
              self.source_db = _list[1].split(':')[0]
              self.source_tab = _list[1].split(':')[1]
              self.source_user = _list[2]
              self.source_password = _list[3]
            elif _argv[0] == '--dest':
              _list = _argv[1].split('/')
              self.dest_host = _list[0].split(':')[0]
              self.dest_port = int(_list[0].split(':')[1])
              self.dest_db = _list[1].split(':')[0]
              self.dest_tab = _list[1].split(':')[1]
              self.dest_user = _list[2]
              self.dest_password = _list[3]
            elif _argv[0] == '--delete_strategy':
              self.deleteStrategy = _argv[1]
              if self.deleteStrategy not in ('delete', 'drop'):
                print (self.usage)
                sys.exit(1)
            elif _argv[0] == '--primary_key':
              self.pk = _argv[1]
            elif _argv[0] == '--date_col':
              self.date_col = _argv[1]
            elif _argv[0] == '--time_interval':
              self.interval = _argv[1]
            else:
              print (self.usage)
              sys.exit(1)
      
      def __init__(self):
        self._get_argv()
    ## --------------------------------------------------------------------
        self.sourcedb_conn_str = MySQLdb.connect(host=self.source_host, port=self.source_port, user=self.source_user, passwd=self.source_password, db=self.source_db, charset='utf8')
        self.sourcedb_conn_str.autocommit(True)
        self.destdb_conn_str = MySQLdb.connect(host=self.dest_host, port=self.dest_port, user=self.dest_user, passwd=self.dest_password, db=self.dest_db, charset='utf8')
        self.destdb_conn_str.autocommit(True)
    ## --------------------------------------------------------------------
        self.template_tab = self.source_tab + '_template'
        self.step_size = 20000
    ## --------------------------------------------------------------------
        self._migCompleteState = False
        self._deleteCompleteState = False
    ## --------------------------------------------------------------------
        self.source_cnt = ''
        self.source_min_id = ''
        self.source_max_id = ''
        self.source_checksum = ''
        self.dest_cn = ''
    ## --------------------------------------------------------------------
        self.today = time.strftime("%Y-%m-%d")
        # self.today = '2016-05-30 09:59:40'
     
    def sourcedb_query(self, sql, sql_type):
        try:
          cr = self.sourcedb_conn_str.cursor()
          cr.execute(sql)
          if sql_type == 'select':
            return cr.fetchall()
          elif sql_type == 'dml':
            rows = self.sourcedb_conn_str.affected_rows()
            return rows
          else:
            return True
        except Exception, e:
          print (str(e) + "<br>")
          return False
        finally:
          cr.close()
     
      def destdb_query(self, sql, sql_type, values=''):
        try:
          cr = self.destdb_conn_str.cursor()
          if sql_type == 'select':
            cr.execute(sql)
            return cr.fetchall()
          elif sql_type == 'insertmany':
            cr.executemany(sql, values)
            rows = self.destdb_conn_str.affected_rows()
            return rows
          else:
            cr.execute(sql)
            return True
        except Exception, e:
          print (str(e) + "<br>")
          return False
        finally:
          cr.close()
      
     def create_table_from_source(self):
        '''''因为tab_name表的数据需要迁移到archive引擎表,所以不适合使用这种方式。 预留作其他用途。'''
        try:
          sql = "show create table %s;" % self.source_tab
          create_str = self.sourcedb_query(sql, 'select')[0][1]
          create_str = create_str.replace('CREATE TABLE', 'CREATE TABLE IF NOT EXISTS')
          self.destdb_query(create_str, 'ddl')
          return True
        except Exception, e:
          print (str(e) + "<br>")
          return False
     
      def create_table_from_template(self):
        try:
          sql = 'CREATE TABLE IF NOT EXISTS %s like %s;' % (self.dest_tab, self.template_tab)
          state = self.destdb_query(sql, 'ddl')
          if state:
            return True
          else:
            return False
        except Exception, e:
          print (str(e + "<br>") + "<br>")
          return False
     
      def get_min_max(self):
        """ 创建目标表、并获取源表需要迁移的总条数、最小id、最大id """
        try:
          print (" Starting Migrate at -- %s <br>") % (datetime.datetime.now().__str__())
          sql = """select count(*),IFNULL(min(%s),-1),IFNULL(max(%s),-1) from %s where %s >= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 00:00:00')
               and %s <= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 23:59:59') """
                % (self.pk, self.pk, self.source_tab, self.date_col, self.today, self.interval, self.date_col, self.today, self.interval)
          q = self.sourcedb_query(sql, 'select')
          self.source_cnt = q[0][0]
          self.source_min_id = q[0][1]
          self.source_max_id = q[0][2]
          self.source_checksum = str(self.source_cnt) + '_' + str(self.source_min_id) + '_' + str(self.source_max_id)
          if self.source_cnt == 0 or self.source_min_id == -1 or self.source_max_id == -1:
            print ("There is 0 record in source table been matched! <br>")
            return False
          else:
            return True
        except Exception, e:
          print (str(e) + "<br>")
          return False
      
      def migrate_2_destdb(self):
        try:
          get_min_max_id = self.get_min_max()
          if get_min_max_id:
            k = self.source_min_id
            desc_sql = "desc %s;" % self.source_tab
            # self.filed = []
            cols = self.sourcedb_query(desc_sql, 'select')
            # for j in cols:
            #   self.filed.append(j[0])
            fileds = "%s," * len(cols) # 源表有多少个字段,就拼凑多少个%s,拼接到insert语句
            fileds = fileds.rstrip(',')
            while k <= self.source_max_id:
              sql = """select * from %s where %s >= %d and %s< %d
                   and %s >= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 00:00:00')
                   and %s <= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 23:59:59') """
                 % (self.source_tab, self.pk, k, self.pk, k+self.step_size, self.date_col, self.today, self.interval, self.date_col, self.today, self.interval)
              print (" %s <br>") % sql
              starttime = datetime.datetime.now()
              results = self.sourcedb_query(sql, 'select')
              insert_sql = "insert into " + self.dest_tab + " values (%s)" % fileds
              rows = self.destdb_query(insert_sql, 'insertmany', results)
              if rows == False:
                print ("Insert failed!! <br>")
              else:
                print ("Inserted %s rows. <br>") % rows
              endtime = datetime.datetime.now()
              timeinterval = endtime - starttime
              print("Elapsed :" + str(timeinterval.seconds) + '.' + str(timeinterval.microseconds) + " seconds <br>")
              k += self.step_size
            print (" Insert complete at -- %s <br>") % (datetime.datetime.now().__str__())
            return True
          else:
            return False
        except Exception, e:
          print (str(e) + "<br>")
          return False
       
      def verify_total_cnt(self):
        try:
          sql = """select count(*),IFNULL(min(%s),-1),IFNULL(max(%s),-1) from %s where %s >= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 00:00:00')
               and %s <= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 23:59:59') """
                % (self.pk, self.pk, self.dest_tab, self.date_col, self.today, self.interval, self.date_col, self.today, self.interval)
          dest_result = self.destdb_query(sql, 'select')
          self.dest_cnt = dest_result[0][0]
          dest_checksum = str(self.dest_cnt) + '_' + str(dest_result[0][1]) + '_' + str(dest_result[0][2])
          print ("source_checksum: %s, dest_checksum: %s <br>") % (self.source_checksum, dest_checksum)
          if self.source_cnt == dest_result[0][0] and dest_result[0][0] != 0 and self.source_checksum == dest_checksum:
            self._migCompleteState = True
            print ("Verify successfully !!<br>")
          else:
            print ("Verify failed !!<br>")
            sys.exit(77)
        except Exception, e:
          print (str(e) + "<br>")
       
      def drop_daily_partition(self):
        try:
          if self._migCompleteState:
            sql = """explain partitions select * from %s where %s >= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 00:00:00')
                   and %s <= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 23:59:59') """
                 % (self.source_tab, self.date_col, self.today, self.interval, self.date_col, self.today, self.interval)
            partition_name = self.sourcedb_query(sql, 'select')
            partition_name = partition_name[0][3]
      
      
      
      
            sql = """select count(*),IFNULL(min(%s),-1),IFNULL(max(%s),-1) from %s partition (%s)"""
                % (self.pk, self.pk, self.source_tab, partition_name)
            q = self.sourcedb_query(sql, 'select')
            source_cnt = q[0][0]
            source_min_id = q[0][1]
            source_max_id = q[0][2]
            checksum = str(source_cnt) + '_' + str(source_min_id) + '_' + str(source_max_id)
            if source_cnt == 0 or source_min_id == -1 or source_max_id == -1:
              print ("There is 0 record in source PARTITION been matched! <br>")
            else:
              if checksum == self.source_checksum:
                drop_par_sql = "alter table %s drop partition %s;" % (self.source_tab, partition_name)
                droped = self.sourcedb_query(drop_par_sql, 'ddl')
                if droped:
                  print (drop_par_sql + " <br>")
                  print (" Drop partition complete at -- %s <br>") % (datetime.datetime.now().__str__())
                  self._deleteCompleteState = True
                else:
                  print (drop_par_sql + " <br>")
                  print ("Drop partition failed.. <br>")
              else:
                print ("The partition %s checksum failed !! Drop failed !!") % partition_name
                sys.exit(77)
        except Exception, e:
          print (str(e) + "<br>")
      
      def delete_data(self):
        try:
          if self._migCompleteState:
            k = self.source_min_id
            while k <= self.source_max_id:
              sql = """delete from %s where %s >= %d and %s< %d
                   and %s >= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 00:00:00')
                   and %s <= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 23:59:59') """
                 % (self.source_tab, self.pk, k, self.pk, k+self.step_size, self.date_col, self.today, self.interval, self.date_col, self.today, self.interval)
              print (" %s <br>") % sql
              starttime = datetime.datetime.now()
              rows = self.sourcedb_query(sql, 'dml')
              if rows == False:
                print ("Delete failed!! <br>")
              else:
                print ("Deleted %s rows. <br>") % rows
              endtime = datetime.datetime.now()
              timeinterval = endtime - starttime
              print("Elapsed :" + str(timeinterval.seconds) + '.' + str(timeinterval.microseconds) + " seconds <br>")
              time.sleep(1)
              k += self.step_size
            print (" Delete complete at -- %s <br>") % (datetime.datetime.now().__str__())
            self._deleteCompleteState = True
        except Exception, e:
          print (str(e) + "<br>")
            
      def do(self):
        tab_create = self.create_table_from_template()
        if tab_create:
          migration = self.migrate_2_destdb()
          if migration:
            self.verify_total_cnt()
            if self._migCompleteState:
              if self.deleteStrategy == 'drop':
                self.drop_daily_partition()
              else:
                self.delete_data()
              print (" <br>")
              print ("====="*5 + '<br>')
              print ("source_total_cnt: %s <br>") % self.source_cnt
              print ("dest_total_cnt: %s <br>") % self.dest_cnt
              print ("====="*5 + '<br>')
              if self._deleteCompleteState:
                print (" Final result: Successfully !! <br>")
                sys.exit(88)
              else:
                print (" Final result: Failed !! <br>")
                sys.exit(254)
        else:
          print ("Create table failed ! Exiting. . .")
          sys.exit(255)
       
    f = ClassMigrate()
    f.do()
  • 相关阅读:
    C++中的static关键字的总结
    2017上海C++面试
    Vim 跳到上次光标位置
    Windows XP Professional产品序列号
    Centos7 安装sz,rz命令
    Xshell里连接VirtualBox里的Centos7
    什么是位、字节、字、KB、MB
    Centos7 tmux1.6 安装
    Centos7 在 Xshell里 vim的配置
    对JDBC的轻量级封装,Hibernate框架
  • 原文地址:https://www.cnblogs.com/drizzle-xu/p/10143405.html
Copyright © 2011-2022 走看看