把aws云redis 5.x(不支持慢日志输出到cloudatch)慢日志采集到mysql库,方便查询:
table ddl:
create table redis_log ( session_id int null, host_ip varchar(100) null, start_time datetime null, duration decimal null, cmd varchar(3000) null, insert_time timestamp(3) null, remark varchar(100) null, constraint redis_slog_pk unique (session_id) );
lambda_function.py:
## ================================================================================== ## 让读书成为一种生活方式。就像吃喝拉撒每天必须要干的事, ## 终有一天你的举止、言谈、气质会不一样。 ## —- async ## ## Created Date: Sunday, 2021-05-23, 9:48:04 am ## copyright (c): WHHL Tech. LTD. ## Engineer: async ## Module Name: ## Revision: v0.01 ## Description: ## ## Revision History : ## Revision editor date Description ## v0.01 async 2021-05-23 File Created ## ================================================================================== import redis import datetime,time import mysql.connector import pytz pystime = time.time() tz = pytz.timezone('Asia/Shanghai') redis_ips=['redis1','redis2','redisn' ] def slow_redis(): for ip in redis_ips: try: Redis = redis.StrictRedis(host=ip,port=6379,db=0,socket_timeout=1) results = Redis.slowlog_get(200) #每次获取200条 session_id=results[0]['id'] i_start_time = datetime.datetime.fromtimestamp(int(results[0]['start_time'])).strftime('%Y-%m-%d %H:%M:%S') #时间格式转换 duration = round(int(results[0]['duration'])/1000,2) #毫秒 command = results[0]['command'] if duration>=30: # 开发定义超过10ms的慢日志会留意 i_host=ip.split('.',1)[0] s_results={ "session_id":session_id,"host_ip":i_host,"start_time":i_start_time,"duration":duration,"cmd":command,'insert_time':datetime.datetime.now(tz) } # print(s_results) conn = mysql.connector.connect(host='10.10.x.x', port=3306, user='xxx', passwd='xxxx',db='xxxx', charset='utf8mb4') cur = conn.cursor(buffered=True) try: sql_d = "delete from devops.redis_log where insert_time <= DATE_SUB(CURDATE(), INTERVAL 7 DAY) " # 删除7天以前的数据 cur.execute(sql_d) conn.commit() except: conn.rollback() print('adevops.redis_log table delete failed. Transaction rolled back') raise try: sql_i = ( f"insert ignore into devops.redis_log ({', '.join(s_results.keys())}) " f"values (%({')s, %('.join(s_results.keys())})s)") cur.execute(sql_i,s_results) conn.commit() # inserted = cur.rowcount except: conn.rollback() print('adevops.redis_log table ops failed. Transaction rolled back') raise cur.close() conn.close() # print('Inserted', inserted, 'row(s) into devops.redis_log table') except: logging.error('%s Timeout reading from socket!' %ip) raise # if __name__ == '__main__': def lambda_handler(event, context): slow_redis() pyetime = time.time() print ('当前脚本运行耗时为:',round(float(pyetime - pystime),3),'s')
然后添加到crontab,5min执行一次。