zoukankan      html  css  js  c++  java
  • pyspark读取elasticsearch

    代码:

    import json
    from pyspark.sql import SparkSession
    from pyspark import SparkConf
    
    
    def trans_form(data_tuple):
        """
        对从es读取出来的每一条数据进行格式转换
        :param data_tuple:
        :return:
        """
        data = data_tuple[1]
        dic = json.loads(data)
        return dic
    
    
    def get_es_conf(es_hot, es_port, index, type_, query_dic):
        query = {"query": {"match_all": {}}}
        if isinstance(query_dic, dict):
            query = json.dumps(query_dic)
        else:
            query = json.dumps(query)
    
        es_read_conf = {
            "es.nodes": es_hot,
            "es.port": es_port,  # 必须是字符串类型
            "es.resource": '{}/{}'.format(index, type_),
            "es.out.json": "yes",
            "es.query": query
        }
        return es_read_conf
    
    
    def read_data_from_es(sc, es_hot, es_port, index, type_, query_dic):
        es_read_conf = get_es_conf(es_hot, es_port, index, type_, query_dic)
        es_rdd = sc.newAPIHadoopRDD(
            inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
            keyClass="org.apache.hadoop.io.NullWritable",
            valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
            conf=es_read_conf
        )
        return es_rdd
    
    
    if __name__ == '__main__':
        conf = SparkConf()
        spark = SparkSession().builder.config(conf).appName('test').getOrCreate()
        sc = spark.SparkContext
    
        es_host = '127.0.0.1'
        es_port = '9200'
        index = 'test'
        type_name = 'result'
        query = {"query": {"match_all": {}}}
        es_rdd = read_data_from_es(sc, es_host, es_port, index, type_name, query)
    
        # 读取出来的是_id和数据组成的元组,转换格式之后过滤空值就是我们要的数据
        hdd = es_rdd.map(lambda x: trans_form(x)).filter(lambda x: x)
  • 相关阅读:
    10w行级别数据的Excel导入优化记录
    迷失的人
    spring切面表达式详解
    json-path(json查找与操作)
    sprongboot yml文件语法
    消息队列面试突击系列--消息经典问题解决
    消息队列面试突击系列--消息产品对比
    mysql系列--sql实现原理
    mysql系列--锁和MVCC
    mysql系列--索引
  • 原文地址:https://www.cnblogs.com/tjp40922/p/13332679.html
Copyright © 2011-2022 走看看