zoukankan      html  css  js  c++  java
  • spark 集成elasticsearch

    pyspark读写elasticsearch依赖elasticsearch-hadoop包,需要首先在这里下载,版本号可以通过自行修改url解决。

    """
    write data to elastic search
    https://starsift.com/2018/01/18/integrating-pyspark-and-elasticsearch/
    """
    from __future__ import print_function
    import os
    import json
    
    from pyspark import SparkContext
    from pyspark import SparkConf
    
    
    os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3"
    os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
    # pay attention here, jars could be added at here
    os.environ['PYSPARK_SUBMIT_ARGS'] = 
        '--jars /home/buxizhizhoum/2-Learning/pyspark_tutorial/jars/elasticsearch-hadoop-6.4.2/dist/elasticsearch-spark-20_2.11-6.4.2.jar ' 
        '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.1 ' 
        'pyspark-shell'
    
    conf = SparkConf().setAppName("write_es").setMaster("local[2]")
    sc = SparkContext(conf=conf)
    
    # config refer: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html
    es_write_conf = {
        # specify the node that we are sending data to (this should be the master)
        "es.nodes": 'localhost',
        # specify the port in case it is not the default port
        "es.port": '9200',
        # specify a resource in the form 'index/doc-type'
        "es.resource": 'testindex/testdoc',
        # is the input JSON?
        "es.input.json": "yes",
        # is there a field in the mapping that should be used to specify the ES document ID
        "es.mapping.id": "doc_id"
    }
    
    
    if __name__ == "__main__":
        data = [
            {'1': '2', 'doc_id': 1},
            {'2': '4', 'doc_id': 2},
            {'3': '8', 'doc_id': 3},
            {'4': '16', 'doc_id': 4},
            {'5': '32', 'doc_id': 5},
            {'6': '64', 'doc_id': 6},
            {'7': '128', 'doc_id': 7},
            {'8': '256', 'doc_id': 8},
            {'9': '512', 'doc_id': 9},
            {'10': '1024', 'doc_id': 10}
        ]
        rdd = sc.parallelize(data)
        rdd = rdd.map(lambda x: (x["doc_id"], json.dumps(x)))
        rdd.saveAsNewAPIHadoopFile(
            path='-',
            outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
            keyClass="org.apache.hadoop.io.NullWritable",
            valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
            # critically, we must specify our `es_write_conf`
            conf=es_write_conf)

    更多代码见:https://github.com/buxizhizhoum/pyspark_tutorial/tree/master/spark_elasticsearch

    refer: https://starsift.com/2018/01/18/integrating-pyspark-and-elasticsearch/

  • 相关阅读:
    C# 定时任务
    Web电子签集成开发笔记
    海康威视二次开发笔记
    SQL Server 2008R2创建自动备份计划
    图片上传及显示(包含多文件)
    程序员的孤独
    Iframe用法
    Bootstrap模态框(MVC)
    工作时发现的问题【组织管理】【客户对接】【项目流程】
    为啥有人觉得你写程序写得好,有人觉得你不称职
  • 原文地址:https://www.cnblogs.com/buxizhizhoum/p/9796157.html
Copyright © 2011-2022 走看看