Python 代码
#!/usr/bin/python
import os
import re
import pymysql
import time
from dateutil.parser import parse
from dateutil import tz
import threading
def listdir(path, list_name):
for file in os.listdir(path):
file_path = os.path.join(path, file)
if os.path.isdir(file_path):
listdir(file_path, list_name)
else:
list_name.append(file_path)
if __name__ == "__main__":
#1. 定义基本的参数
tableName = 'log04'
logPath = 'G:/root-log/'
dbHost = '127.0.0.1'
dbUser = 'root'
dbPassword = 'root'
dbDatabase = 'monitorlog'
dbPort = 3306
# 1.检查表是否存在,否则创建表
startTime = time.time()
dbConnection = pymysql.connect(dbHost, dbUser, dbPassword, dbDatabase)
cursorDb = dbConnection.cursor()
sqlIsExistTable = "select * from information_schema.tables where table_name ='" + tableName + "';"
cursorDb.execute(sqlIsExistTable)
tableIsExist = cursorDb.fetchall()
# print(tableIsExist)
if len(tableIsExist) < 1:
# 创建表
sql = "CREATE TABLE `" + tableName + "` (`ip` varchar(20) NOT NULL COMMENT 'ip',"
"`user` varchar(20) NOT NULL COMMENT '用户',"
"`date` datetime NOT NULL COMMENT '最后登录时间',"
"`request` varchar(800) NOT NULL COMMENT '请求',"
"`referer` varchar(800) NOT NULL COMMENT '来源',"
"`user_agent` varchar(800) NOT NULL,"
"`status` int(11) unsigned NOT NULL COMMENT '响应码',"
"`response_size` varchar(15) NOT NULL COMMENT '响应大小'"
") ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='nwbiotec站点日志'"
cursorDb.execute(sql)
#2. 读取目录下所有log日志
fileList = []
listdir(logPath, fileList)
TZ_LOCAL = tz.tzlocal()
# 3. 循环读取日志文件
i = 0
# 日志格式匹配
pattern = re.compile(
r'(?P<ip>.*) - (?P<user>.*) [(?P<date>.*)] "(?P<request>.*)" (?P<rcode>d+) (?P<rsize>d+) "(?P<referer>.*)" "(?P<user_agent>.*)".*')
for fileName in fileList:
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + " 读取文件【" + fileName +"】的数据")
fileOs = open(fileName, 'r')
dataInsert = []
for line in fileOs:
# 批量处理数据并插入
if i % 10000 == 0 and i > 0:
cursorDb.executemany("INSERT INTO `"+tableName+"` VALUES (%s,%s,%s,%s,%s,%s,%s,%s)", dataInsert)
dbConnection.commit()
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + " 导入第 " + str(i) + " 行数据")
dataInsert = [];
match = pattern.match(line)
if match:
fmt_date = match.group('date').replace(':', ' ', 1)
foreign_date = parse(fmt_date)
local_date = foreign_date.astimezone(TZ_LOCAL)
dataInsert.append([
match.group('ip'),
match.group('user'),
local_date,
match.group('request')[0:800],
match.group('referer')[0:800],
match.group('user_agent')[0:800],
match.group('rcode'),
match.group('rsize')])
i = i + 1
fileOs.close()
# 关闭数据库连接
cursorDb.close()
dbConnection.close()
print("共导入"+str(i)+"条数据,耗时"+str(time.time()-startTime)+"s")