zoukankan      html  css  js  c++  java
  • python遍历文件夹下的csv文件,读取文件内容存到数据库

    一、使用python导入的原因

    1、csv文件为从文件数据库导出的数据文件,一个一个的导入到数据库效率就比较低下;

    2、日期形式的字段会存在特殊的字符或者字段中包含了单引号就会报错。

    二、操作

      1、循环读取选定文件夹下的文件

    '''
        读取文件夹下的csv文件
    '''
    def readAllFiles(filePath):
        fileList = os.listdir(filePath)
        for file in fileList:
            path = os.path.join(filePath, file)
            if os.path.isfile(path):
                file = open(path, 'r', encoding='utf-8')
                print(path)
                #流程记录信息
                if path.find("workflow") != -1:
                    analysisWorkflowCsv(file)
                    pass
                #意见信息
                elif path.find("opinion") != -1:
                    analysisOpinionCsv(file)
                    pass
                #发文数据
                elif path.find("wd_24") != -1:
                    analysisWd24Csv(file)
                    pass
                #收文数据
                elif path.find("wd_25") != -1:
                    analysisWd25Csv(file)
                    pass
            else:
                readAllFiles(path)

    2、解析文件内容,首行为标题栏需要跳过。入库操作每满1000条commit一次主要是python频繁提交执行次数达到1000+就会报错。1000条commit一次可以避免错误并缓解内存压力。

    '''
        解析文件
    '''
    def analysisWorkflowCsv(file):
        csvFile = csv.reader(file)
        # 读取一行,下面的reader中已经没有该行了
        head_row = next(csvFile)
        # print(head_row)
        __conn = getConnect_old()
        counter = 0
        for row in csvFile:
            workflow = {}
            workflow['UUID'] = row[0]
            workflow['subject'] = row[1]
            workflow['signdate'] = row[2]
            workflow['U_UnitName'] = row[3]
            workflow['U_UnitUser'] = row[4]
            workflow['U_UnitUserTitle'] = row[5]
            workflow['U_UnitEndTime'] = row[6]
            workflow['U_UnitAction'] = row[7]
            workflow['U_UnitToTitle'] = row[8]
            if insertWorkflows(__conn, workflow):
                counter += 1
            if counter % 1000 == 0:
                __conn.commitData()
        print("已经插入工作流数据: %d 条。"%counter)
        __conn.commitData()
        __conn.closeConn()

    3、数据入库

    '''
        插入工作流程数据
    '''
    def insertWorkflows(__conn, workflow):
        __sql = '''
            INSERT INTO workflows (
                UUID, U_UnitName, U_UnitUser, U_UnitUserTitle, U_UnitEndTime, U_UnitAction, U_UnitToTitle, subject, signdate
            ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
        '''
        __params = (
            workflow['UUID'], workflow['U_UnitName'], workflow['U_UnitUser'], workflow['U_UnitUserTitle'],
            workflow['U_UnitEndTime'], workflow['U_UnitAction'], workflow['U_UnitToTitle'], workflow['subject'],
            workflow['signdate']
            )
        # print(__sql % __params)
        return __conn.mssql_exe_sql(__sql, __params)

    4、python操作sqlserver代码

    import pymssql
    
    os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8'
    
    '''
    数据库连接
    '''
    class ConnectionDatabase(object):
        # 连接mysql数据库
        def __init__(self, ip, user_name, passwd, db, char='utf8'):
            self.ip = ip
            # self.port = port
            self.username = user_name
            self.passwd = passwd
            self.mysqldb = db
            self.char = char
    
            self.MsSQL_db = pymssql.connect(
                host=self.ip,
                user=self.username,
                password=self.passwd,
                database=self.mysqldb,
                charset=self.char)
      # 查询数据(sqlserver)
        def mssql_findList(self, sql):
            cursor = self.MsSQL_db.cursor()
            MsSQL_sql = sql
            results = None
            if not cursor:
                raise (NameError,"数据库连接失败")
            try:
                # 执行SQL语句
                cursor.execute(MsSQL_sql)
                # 获取所有记录列表
                results = cursor.fetchall()
            except Exception as e:
                print(e)
                self.MsSQL_db.close()
            if results:
                return results
            else:
                return None
    
        # 数据增删改查(sqlserver)
        def mssql_exe_sql(self, sql, params):
            cursor = self.MsSQL_db.cursor()
            MsSQL_sql = sql
            result = 0
            if not cursor:
                raise (NameError,"数据库连接失败")
            try:
                # 执行SQL语句
                cursor.execute(MsSQL_sql, params)
                result = cursor.rowcount
            except Exception as e:
                print(e)
                self.MsSQL_db.rollback()
                self.MsSQL_db.close()
    
            return result>0
    
        '''
            提交数据集
        '''
        def commitData(self):
            try:
                self.MsSQL_db.commit()
            except Exception as e:
                print(e)
    
        '''
            关闭数据库连接
        '''
        def closeConn(self):
            if self.MsSQL_db:
                self.MsSQL_db.close()

    5、执行代码

    if __name__ == "__main__":
        #文件所在的文件夹父路径
        # testFilePath = "G:数据解析csvworkflowcsv"
        testFilePath = "G:数据解析csvwd25csv"
        readAllFiles(testFilePath)

    遇到的问题及解决方式:

      (1)以上代码执行时如果有时间类型的字段需要对字符串进行转换;

         re.sub('[^0-9 | - | : ]', '', timestr) 

        利用正则表达式将时间字符串中的特殊字符去掉,再转换为时间字符串,避免代码执行时类型转换错误。

      (2)数据库插入数据的sql语句最好使用的是带参数的执行方式,不要使用sql占位符拼接的方式,这样可能出现单引号“'”导致sql执行失败。

    初心回归,时光已逝!
  • 相关阅读:
    HDU 5912 Fraction (模拟)
    CodeForces 722C Destroying Array (并查集)
    CodeForces 722B Verse Pattern (水题)
    CodeForces 722A Broken Clock (水题)
    CodeForces 723D Lakes in Berland (dfs搜索)
    CodeForces 723C Polycarp at the Radio (题意题+暴力)
    CodeForces 723B Text Document Analysis (水题模拟)
    CodeForces 723A The New Year: Meeting Friends (水题)
    hdu 1258
    hdu 2266 dfs+1258
  • 原文地址:https://www.cnblogs.com/yin1361866686/p/10756228.html
Copyright © 2011-2022 走看看