使用es-hadoop插件,主要使用elasticsearch-spark-20_2.11-6.2.x.jar
官网:https://www.elastic.co/guide/en/elasticsearch/hadoop/current/reference.html
关于ES详细的配置参数 大家可以看下面的这个类:
org.elasticsearch.hadoop.cfg.ConfigurationOptions
sparkstreaming写入ES:
SparkConf conf = new SparkConf(); conf.set("es.index.auto.create", "true"); conf.set("es.nodes", "10.8.18.16,10.8.18.45,10.8.18.76"); conf.set("es.port", "9200"); JavaStreamingContext ssc= null; try { ssc= new JavaStreamingContext(conf, new Duration(5000L)); JavaSparkContext jsc =ssc.sparkContext(); String json1 = "{"reason" : "business","airport" : "sfo"}"; String json2 = "{"participants" : 5,"airport" : "otp"}"; JavaRDD<String> stringRDD = jsc.parallelize(ImmutableList.of(json1, json2)); Queue<JavaRDD<String>> microbatches = new LinkedList<JavaRDD<String>>(); microbatches.add(stringRDD); JavaDStream<String> stringDStream = ssc.queueStream(microbatches); //接口1:es的配置通过SparkConf配置 //使用动态模板,用{}将动态生成的字段名括起来,注意是作用于index //而不是type //JavaEsSparkStreaming.saveJsonToEs(stringDStream, "spark-{airport}/doc"); Map<String,String> map = new HashMap<String,String>(); map.put("es.index.auto.create", "true"); map.put("es.nodes", "ip1,ip2,ip3"); map.put("es.resource.write", "spark-{airport}/doc"); map.put("es.port", "9200"); //接口2:es的配置通过HashMap配置,其中读取es是index的key为es.resource.read //写入的key为es.resource.write //JavaEsSparkStreaming.saveJsonToEs(stringDStream, map); //接口3:与接口2类似,只是该接口支持直接填写index参数 JavaEsSparkStreaming.saveJsonToEs(stringDStream,"spark-{airport}/doc", map); ssc.start(); ssc.awaitTermination(); } catch (Throwable e) { // TODO 自动生成的 catch 块 ssc.close(); e.printStackTrace(); }
//使用动态模板,用{}将动态生成的字段名括起来,注意是作用于index