zoukankan      html  css  js  c++  java
  • spark写入ES(动态模板)

    使用es-hadoop插件,主要使用elasticsearch-spark-20_2.11-6.2.x.jar

    官网:https://www.elastic.co/guide/en/elasticsearch/hadoop/current/reference.html

    关于ES详细的配置参数 大家可以看下面的这个类:

    org.elasticsearch.hadoop.cfg.ConfigurationOptions

    sparkstreaming写入ES:
          
     
            SparkConf conf = new SparkConf();
            conf.set("es.index.auto.create", "true");
            conf.set("es.nodes", "10.8.18.16,10.8.18.45,10.8.18.76");
            conf.set("es.port", "9200");
            JavaStreamingContext ssc= null;
            try {
                ssc= new JavaStreamingContext(conf, new Duration(5000L));
                JavaSparkContext jsc =ssc.sparkContext();                        
                String json1 = "{"reason" : "business","airport" : "sfo"}";  
                String json2 = "{"participants" : 5,"airport" : "otp"}";
    
                JavaRDD<String> stringRDD = jsc.parallelize(ImmutableList.of(json1, json2));
                Queue<JavaRDD<String>> microbatches = new LinkedList<JavaRDD<String>>();      
                microbatches.add(stringRDD);
                JavaDStream<String> stringDStream = ssc.queueStream(microbatches);
                
                //接口1:es的配置通过SparkConf配置
                //使用动态模板,用{}将动态生成的字段名括起来,注意是作用于index
                //而不是type
                //JavaEsSparkStreaming.saveJsonToEs(stringDStream, "spark-{airport}/doc");
                
                Map<String,String> map = new HashMap<String,String>();
                map.put("es.index.auto.create", "true");
                map.put("es.nodes", "ip1,ip2,ip3");
                map.put("es.resource.write", "spark-{airport}/doc");
                map.put("es.port", "9200");
                //接口2:es的配置通过HashMap配置,其中读取es是index的key为es.resource.read
                //写入的key为es.resource.write
                //JavaEsSparkStreaming.saveJsonToEs(stringDStream, map);
                //接口3:与接口2类似,只是该接口支持直接填写index参数
                JavaEsSparkStreaming.saveJsonToEs(stringDStream,"spark-{airport}/doc", map);
                ssc.start();
                ssc.awaitTermination();
            } catch (Throwable e) {
                // TODO 自动生成的 catch 块
                ssc.close();
                e.printStackTrace();
            }
    //使用动态模板,用{}将动态生成的字段名括起来,注意是作用于index
  • 相关阅读:
    496. 下一个更大元素 I『简单』
    492. 构造矩形『简单』
    443. 压缩字符串『简单』
    455. 分发饼干『简单』
    463. 岛屿的周长『简单』
    38. 外观数列『简单』
    28. 实现 strStr()『简单』
    441. 排列硬币『简单』
    628. 三个数的最大乘积『简单』
    575. 分糖果『简单』
  • 原文地址:https://www.cnblogs.com/lyy-blog/p/9728001.html
Copyright © 2011-2022 走看看