zoukankan      html  css  js  c++  java
  • python中使用pyspark 读取和整理日志数据并将数据写入到es中去

    代码:

    import re
    import datetime
    from pyspark.sql import SparkSession
    from pyspark import SparkContext
    from elasticsearch import Elasticsearch
    spark=SparkSession.builder.appName("lz").getOrCreate()
    sc = SparkContext.getOrCreate()
    es = Elasticsearch()
    month_map = {'Jan': '1', 'Feb': '2', 'Mar':'3', 'Apr':'4', 'May':'5', 'Jun':'6', 'Jul':'7',
        'Aug':'8',  'Sep': '9', 'Oct':'10', 'Nov': '11', 'Dec': '12'}
    
    log_data = sc.textFile("/Desktop/data_doc/data_Log/sshlogin/03.txt") #使用spark读取本地日志文件
    
    
    for b in log_data.toLocalIterator(): 
        #以迭代的方式来把一条条数据读取出来进行正则匹配,并最终将 dict作为body写入到es中去
        # e='Ambari:Mar  2 02:14:16 ambari sshd[16716]: Accepted password for root from 172.21.202.174 port 59886 ssh2'#日志格式
        log_group=re.search('^(S+):(w{3})s+(d{1,2})s(d{2}:d{2}:d{2})s(S+)s(S+)[(d+)]:s(.+)',b)
        if log_group:
            year='2019'
            try:
                logtime = year+'-'+month_map[log_group.group(2)]+'-'+log_group.group(3)+' '+log_group.group(4) #将字段拼接成年月日的格式
                logtime = datetime.datetime.strptime(logtime,'%Y-%m-%d %H:%M:%S')
            except Exception as e:
               pass
            row = dict(_hostname=log_group.group(1), #将数据组成一个字典  k,v
                      syslog_timestamp=logtime,
                      hostname=log_group.group(5),
                      program=log_group.group(6),
                      pid=log_group.group(7),
                      msg = log_group.group(8))
            if re.match('^Accepted password for',row['msg']) or re.match('^Accepted publickey for',row['msg']) :
    
                msg_a=re.search('Acceptedsw+sfors(S+)sfroms(d{2,3}.d{2,3}.d{2,3}.d{2,3})sports(d+)',row['msg'])
                row['login_success']=True
                row['login_success_msg']={'username':msg_a.group(1),'user_ip':msg_a.group(2),'user_port':msg_a.group(3)}
            es.index(index='data_log02',doc_type='test02',body=row) #将数据写入到es中去
        else:
            break

    转自:https://www.cnblogs.com/wangkun122/articles/10936938.html

  • 相关阅读:
    第二周学习总结
    2019春总结作业
    第十二周作业
    第十一周作业
    第九周作业
    第八周作业
    第七周作业
    第六周作业
    第五周课程总结与报告
    Java第四周编程总结
  • 原文地址:https://www.cnblogs.com/tjp40922/p/13125182.html
Copyright © 2011-2022 走看看