代码:
import json from pyspark.sql import SparkSession from pyspark import SparkConf def trans_form(data_tuple): """ 对从es读取出来的每一条数据进行格式转换 :param data_tuple: :return: """ data = data_tuple[1] dic = json.loads(data) return dic def get_es_conf(es_hot, es_port, index, type_, query_dic): query = {"query": {"match_all": {}}} if isinstance(query_dic, dict): query = json.dumps(query_dic) else: query = json.dumps(query) es_read_conf = { "es.nodes": es_hot, "es.port": es_port, # 必须是字符串类型 "es.resource": '{}/{}'.format(index, type_), "es.out.json": "yes", "es.query": query } return es_read_conf def read_data_from_es(sc, es_hot, es_port, index, type_, query_dic): es_read_conf = get_es_conf(es_hot, es_port, index, type_, query_dic) es_rdd = sc.newAPIHadoopRDD( inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat", keyClass="org.apache.hadoop.io.NullWritable", valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=es_read_conf ) return es_rdd if __name__ == '__main__': conf = SparkConf() spark = SparkSession().builder.config(conf).appName('test').getOrCreate() sc = spark.SparkContext es_host = '127.0.0.1' es_port = '9200' index = 'test' type_name = 'result' query = {"query": {"match_all": {}}} es_rdd = read_data_from_es(sc, es_host, es_port, index, type_name, query) # 读取出来的是_id和数据组成的元组,转换格式之后过滤空值就是我们要的数据 hdd = es_rdd.map(lambda x: trans_form(x)).filter(lambda x: x)