zoukankan      html  css  js  c++  java
  • 如何使用python将Spark数据写入ElasticSearch

    这里以将Apache的日志写入到ElasticSearch为例,来演示一下如何使用Python将Spark数据导入到ES中。

    实际工作中,由于数据与使用框架或技术的复杂性,数据的写入变得比较复杂,在这里我们简单演示一下。

    如果使用Scala或Java的话,Spark提供自带了支持写入ES的支持库,但Python不支持。所以首先你需要去这里下载依赖的ES官方开发的依赖包包。

    下载完成后,放在本地目录,以下面命令方式启动pyspark:

         pyspark --jars elasticsearch-hadoop-6.4.1.jar

    如果你想pyspark使用Python3,请设置环境变量:

     export PYSPARK_PYTHON=/usr/bin/python3

    理解如何写入ES的关键是要明白,ES是一个JSON格式的数据库,它有一个必须的要求。数据格式必须采用以下格式

     
    { "id: { the rest of your json}}

    往下会展示如何转换成这种格式。

    解析Apache日志文件

    我们将Apache的日志文件读入,构建Spark RDD。然后我们写一个parse()函数用正则表达式处理每条日志,提取我们需要的字

    rdd = sc.textFile("/home/ubuntu/walker/apache_logs")
    regex='^(S+) (S+) (S+) [([w:/]+s[+-]d{4})] "(S+)s?(S+)?s?(S+)?" (d{3}|-) (d+|-)s?"?([^"]*)"?s?"?([^"]*)?"?$'
     
    p=re.compile(regex)
    
    def parse(str):
        s=p.match(str)
        d = {}
        d['ip']=s.group(1)
        d['date']=s.group(4)
        d['operation']=s.group(5)
        d['uri']=s.group(6)
        return d  

    换句话说,我们刚开始从日志文件读入RDD的数据类似如下:

    ['83.149.9.216 - - [17/May/2015:10:05:03 +0000] "GET /presentations/logstash-monitorama-2013/images/kibana-search.png HTTP/1.1" 200 203023 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"']

    然后我们使用map函数转换每条记录:

    rdd2 = rdd.map(parse)
    
    rdd2.take(1)
    
    [{'date': '17/May/2015:10:05:03 +0000', 'ip': '83.149.9.216', 'operation': 'GET', 'uri': '/presentations/logstash-monitorama-2013/images/kibana-search.png'}]

    现在看起来像JSON,但并不是JSON字符串,我们需要使用json.dumps将dict对象转换。

    我们同时增加一个doc_id字段作为整个JSON的ID。在配置ES中我们增加如下配置“es.mapping.id”: “doc_id”告诉ES我们将这个字段作为ID。

    这里我们使用SHA算法,将这个JSON字符串作为参数,得到一个唯一ID。
    计算结果类似如下,可以看到ID是一个很长的SHA数值。

    rdd3.take(1)
    
    [('a5b086b04e1cc45fb4a19e2a641bf99ea3a378599ef62ba12563b75c', '{"date": "17/May/2015:10:05:03 +0000", "ip": "83.149.9.216", "operation": "GET", "doc_id": "a5b086b04e1cc45fb4a19e2a641bf99ea3a378599ef62ba12563b75c", "uri": "/presentations/logstash-monitorama-2013/images/kibana-search.png"}')]
     

    现在我们需要制定ES配置,比较重要的两项是:

    • “es.resource” : ‘walker/apache’: "walker"是索引,apache是类型,两者一般合称索引
    • “es.mapping.id”: “doc_id”: 告诉ES那个字段作为整个文档的ID,也就是查询结果中的_id
      其他的配置自己去探索。

    然后我们使用saveAsNewAPIHadoopFile()将RDD写入到ES。这部分代码对于所有的ES都是一样的,比较固定,不需要理解每一个细节

    es_write_conf = {
            "es.nodes" : "localhost",
            "es.port" : "9200",
            "es.resource" : 'walker/apache',
            "es.input.json": "yes",
            "es.mapping.id": "doc_id"
        }
           
    rdd3.saveAsNewAPIHadoopFile(
            path='-',
         outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",       keyClass="org.apache.hadoop.io.NullWritable",
            valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
            conf=es_write_conf)
    
    rdd3 = rdd2.map(addID)
    
    def addId(data):
        j=json.dumps(data).encode('ascii', 'ignore')
        data['doc_id'] = hashlib.sha224(j).hexdigest()
        return (data['doc_id'], json.dumps(data))

    最后我们可以使用curl进行查询

    curl http://localhost:9200s/walker/apache/_search?pretty=true&?q=*
    
    
    {
            "_index" : "walker",
            "_type" : "apache",
            "_id" : "227e977849bfd5f8d1fca69b04f7a766560745c6cb3712c106d590c2",
            "_score" : 1.0,
            "_source" : {
              "date" : "17/May/2015:10:05:32 +0000",
              "ip" : "91.177.205.119",
              "operation" : "GET",
              "doc_id" : "227e977849bfd5f8d1fca69b04f7a766560745c6cb3712c106d590c2",
              "uri" : "/favicon.ico"
            }

    如下是所有代码:

    import json
    import hashlib
    import re
    
    def addId(data):
        j=json.dumps(data).encode('ascii', 'ignore')
        data['doc_id'] = hashlib.sha224(j).hexdigest()
        return (data['doc_id'], json.dumps(data))
    
    def parse(str):
        s=p.match(str)
        d = {}
        d['ip']=s.group(1)
        d['date']=s.group(4)
        d['operation']=s.group(5)
        d['uri']=s.group(6)
        return d    
    
    regex='^(S+) (S+) (S+) [([w:/]+s[+-]d{4})] "(S+)s?(S+)?s?(S+)?" (d{3}|-) (d+|-)s?"?([^"]*)"?s?"?([^"]*)?"?$'
    
    p=re.compile(regex)
    
    rdd = sc.textFile("/home/ubuntu/walker/apache_logs")
    
    rdd2 = rdd.map(parse)
    
    rdd3 = rdd2.map(addID)
    
    es_write_conf = {
            "es.nodes" : "localhost",
            "es.port" : "9200",
            "es.resource" : 'walker/apache',
            "es.input.json": "yes",
            "es.mapping.id": "doc_id"
        }
         
    rdd3.saveAsNewAPIHadoopFile(
            path='-',
         outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",       keyClass="org.apache.hadoop.io.NullWritable",
            valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
            conf=es_write_conf)
            

    也可以这么封装,其实原理是一样的

    import hashlib
    import json
    from pyspark import Sparkcontext
    
    def make_md5(line):
        md5_obj=hashlib.md5()
        md5_obj.encode(line)
        return md5_obj.hexdigest()
    
    def parse(line):
        dic={}
        l = line.split('	')
        doc_id=make_md5(line)
        dic['name']=l[1]
        dic['age'] =l[2]
        dic['doc_id']=doc_id
        return dic   #记得这边返回的是字典类型的,在写入es之前要记得dumps
    
    def saveData2es(pdd, es_host, port,index, index_type, key):
        """
        把saprk的运行结果写入es
        :param pdd: 一个rdd类型的数据
        :param es_host: 要写es的ip
        :param index: 要写入数据的索引
        :param index_type: 索引的类型
        :param key: 指定文档的id,就是要以文档的那个字段作为_id
        :return:
        """
        #实例es客户端记得单例模式
        if es.exist.index(index):
            es.index.create(index, 'spo')
        es_write_conf = {
            "es.nodes": es_host,
            "es.port": port,
            "es.resource": index/index_type,
            "es.input.json": "yes",
            "es.mapping.id": key
        }
    
        (pdd.map(lambda _dic: ('', json.dumps(_dic))))   #这百年是为把这个数据构造成元组格式,如果传进来的_dic是字典则需要jdumps,如果传进来之前就已经dumps,这便就不需要dumps了
        .saveAsNewAPIHadoopFile(
            path='-',
            outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", keyClass="org.apache.hadoop.io.NullWritable",
            valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
            conf=es_write_conf)
    
        )
    
    
    if __name__ == '__main__':
    
        #实例化sp对象
        sc=Sparkcontext()
        #文件中的呢内容一行一行用sc的读取出来
        json_text=sc.textFile('./1.txt')
        #进行转换
        json_data=json_text.map(lambda line:parse(line))
    
        saveData2es(json_data,'127.0.01','9200','index_test','index_type','doc_id')
    
        sc.stop()

     看到了把,面那个例子在写入es之前加了一个id,返回一个元组格式的,现在这个封装指定_id就会比较灵活了




    链接:https://www.jianshu.com/p/c7365b9bda0a

  • 相关阅读:
    网页设计~老生常谈~浏览器兼容2个主要问题的解决
    谈谈网页功能测试
    从PMP学习中浅谈公司行政工作
    肉肉谈对需求设计的想法到底是功能驱动界面?还是界面驱动功能?
    jndi和rmi学习
    mysql赋值变量:=的使用
    用Cookies和HashTable制作购物车
    nginx实现简单的反向代理
    .net Form认证扩展保存 Object 类型
    基于Docker搭建私有镜像仓库
  • 原文地址:https://www.cnblogs.com/tjp40922/p/12716870.html
Copyright © 2011-2022 走看看