zoukankan      html  css  js  c++  java
  • Spark 整合ElasticSearch

    Spark 整合ElasticSearch

    因为做资料搜索用到了ElasticSearch,最近又了解一下 Spark ML,先来演示一个Spark 读取/写入 ElasticSearch 简单示例。(spark 读取ElasticSearch中数据)

    环境:IDEA2016,JDK8,windows10,安装的 ElasticSearch6.3.2 和 spark-2.3.1-bin-hadoop2.7,使用mvn package 将程序打成jar包,采用spark-submit提交给spark执行。

    先在ElasticSearch中创建一个索引用来演示。因为是文本数据,因此采用ik分词。可参考:elasticsearch-ik

    • 创建索引:PUT /index_ik_test

    • 设置mapping 及相应的分词器,这里指定 content 字段为 ElasticSearch 的text 类型,并使用ik_max_word 分词模式

      POST index_ik_test/fulltext/_mapping
      {
      "properties": {
      "content":{
      "type": "text",
      "analyzer": "ik_max_word",
      "search_analyzer": "ik_max_word"
      }
      }
      }

    • 存几篇文档到ElasticSearch中

      POST index_ik_test/fulltext/1
      {"content":"其中有两个人受伤了"}

    • ik 分词器有两种分词模式:ik_max_wordik_smart。可通过如下方式查看一下这两者的区别:

      GET index_ik_test/_analyze
      {
      "text": ["其中国家投资了500万"],
      "tokenizer": "ik_smart"
      }

      分词结果:其中、国家、投资、了、500万

      GET index_ik_test/_analyze
      {
      "text": ["其中国家投资了500万"],
      "tokenizer": "ik_max_word"
      }

      分词结果:其中、中国、国家、投资、了、500、万

    • 使用GET index_ik_test/_mapping可查看索引的配置信息

      {
      "index_ik_test": {
      "mappings": {
      "fulltext": {
      "properties": {
      "content": {
      "type": "text",
      "analyzer": "ik_max_word"
      }
      }
      }
      }
      }
      }

    好,现在ElasticSearch中有数据了,现在看怎么基于Spark读取ElasticSearch中的数据。

    IDEA2016中新建一个Maven工程,当然也可以用SpringBoot工程,但是这里的是单纯的Maven Project。

    ElasticSearch官方提供了elasticsearch-hadoop来供Spark访问ElasticSearch。具体可参考:官方文档es for spark

    官方提供了elasticsearch-hadoopmaven 依赖,这个依赖包括了:ElasticSearch for Hadoop MR、ElasticSearch for Hadoop Hive、ElasticSearch for Hadoop Spark。如果只用到了Spark,也可以只添加ElasticSearch for spark依赖。具体可参考:(这个链接)[https://www.elastic.co/guide/en/elasticsearch/hadoop/current/install.html]

    <dependency>
      <groupId>org.elasticsearch</groupId>
      <artifactId>elasticsearch-spark-20_2.10</artifactId>
      <version>6.3.2</version>
    </dependency>
    

    创建spark运行上下文时需要spark-sql_2.11依赖,可参考:spark 官方文档quick start

    To build the program, we also write a Maven pom.xml file that lists Spark as a dependency. Note that Spark artifacts are tagged with a Scala version.

    在本文的示例中,添加了下面3个maven依赖:

    <dependency>
      <groupId>org.elasticsearch</groupId>
      <artifactId>elasticsearch-hadoop</artifactId>
      <version>6.3.2</version>
    </dependency>
    <!-- Spark dependency -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>2.3.1</version>
    </dependency>
    
    <dependency>
      <groupId>com.google.guava</groupId>
      <artifactId>guava</artifactId>
      <version>22.0</version>
    </dependency>
    

    下面来直接看示例代码:

    向ElasticSearch中写入数据

    • spark配置连接ElasticSearch。可参考:elasticsearch-hadoop-master,我们采用的是:Configure the connector to run in WAN mode

      SparkConf sparkConf = new SparkConf().setAppName("writeEs").setMaster("local[*]").set("es.index.auto.create", "true")
      			.set("es.nodes", "ELASTIC_SEARCH_IP").set("es.port", "9200").set("es.nodes.wan.only", "true");
      
    • 将数据写入到ElasticSearch

      JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(numbers, airports));
      JavaEsSpark.saveToEs(javaRDD, elasticIndex);
      

    从ElasticSearch查询数据

    	JavaRDD<Map<String, Object>> searchRdd = esRDD(jsc, "index_ik_test/fulltext", "?q=中国").values();
    	for (Map<String, Object> item : searchRdd.collect()) {
    	    item.forEach((key, value)->{
    		System.out.println("search key:" + key + ", search value:" + value);
    	    });
    	}
    

    使用?q=中国作为查询条件。整个完整示例代码如下:

    import com.google.common.collect.ImmutableList;
    import com.google.common.collect.ImmutableMap;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.sql.SparkSession;
    import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
    
    import java.util.Map;
    
    import static org.elasticsearch.spark.rdd.api.java.JavaEsSpark.esRDD;
    
    /**
     * Created by Administrator on 2018/8/28.
     */
    public class EsSparkTest {
        public void writeEs() {
    	String elasticIndex = "spark/docs";
    	//https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html#spark-native
    	SparkConf sparkConf = new SparkConf().setAppName("writeEs").setMaster("local[*]").set("es.index.auto.create", "true")
    			.set("es.nodes", "ELASTIC_SEARCH_IP").set("es.port", "9200").set("es.nodes.wan.only", "true");
    	SparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();
    	JavaSparkContext jsc = new JavaSparkContext(sparkSession.sparkContext());//adapter
    	Map<String, ?> numbers = ImmutableMap.of("one", 1, "two", 2);
    	Map<String, ?> airports = ImmutableMap.of("OTP", "Otopeni", "SFO", "San Fran");
    	JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(numbers, airports));
    	JavaEsSpark.saveToEs(javaRDD, elasticIndex);
        }
    
        public void readEs() {
    	SparkConf sparkConf = new SparkConf().setAppName("writeEs").setMaster("local[*]").set("es.index.auto.create", "true")
    			.set("es.nodes", "ELASTIC_SEARCH_IP").set("es.port", "9200").set("es.nodes.wan.only", "true");
    	SparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();
    	JavaSparkContext jsc = new JavaSparkContext(sparkSession.sparkContext());//adapter
    	JavaRDD<Map<String, Object>> searchRdd = esRDD(jsc, "index_ik_test/fulltext", "?q=中国").values();
    	for (Map<String, Object> item : searchRdd.collect()) {
    	    item.forEach((key, value)->{
    		System.out.println("search key:" + key + ", search value:" + value);
    	    });
    	}
    	sparkSession.stop();
        }
    }
    

    DemoApplication.java 入口main类

    public class DemoApplication {
        public static void main(String[] args) {
    	new EsSparkTest().readEs();
        }
    }
    

    IDEA菜单栏:view ---> window tools --->maven projects 打开maven 侧边栏。直接双击package打包。

    $rz -bey esdemo-1.0-SNAPSHOT.jar 将打成的jar包上传到部署spark服务器上,使用如下命令提交运行:

    ~/spark-2.3.1-bin-hadoop2.7/bin/spark-submit --class DemoApplication esdemo-1.0-SNAPSHOT.jar

    --class 是类的全路径名。如果执行过程中抛出ClassNotFoundException异常,要看一下pom.xml中指定的依赖是否在Spark安装目录下的 jars/ 目录下(比如事先把Guava jar 和 elasticsearch-hadoop-6.3.2.jar 上传到 jars/目录下)。最终执行readEs()方法查询得到的文档如下:

    因为 content 字段采用的是ik_max_word分词模式,因此文本其中国家投资了500万 分词结果中包含了 中国,从而使得这篇document被查询到了。

    后期补充:

    在使用Spark 查询ElasticSearch中数据时,由于ElasticSearch索引user中定义了一个日期字段,如下:

        "created": {
          "type": "date",
          "format": "yyyy-MM-dd HH:mm:ss"
        }
    

    导致Spark执行下面语句查询

    JavaRDD<Map<String, Object>> searchRdd = JavaEsSpark.esRDD(jsc, "user/profile", "?q=test").values();
    for (Map<String, Object> item : searchRdd.collect()) {
        item.forEach((key, value)->{
            System.out.println("search key:" + key + ", search value:" + value);
        });
    }
    

    反序列化构建日期对象时,报错:

    Caused by: org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Cannot invoke method public org.joda.time.DateTime org.joda.time.format.DateTimeFormatter.parseDateTime(java.lang.String)
    at org.elasticsearch.hadoop.util.ReflectionUtils.invoke(ReflectionUtils.java:93)
    at org.elasticsearch.hadoop.util.DateUtils$JodaTime.parseDate(DateUtils.java:105)
    at org.elasticsearch.hadoop.util.DateUtils.parseDate(DateUtils.java:122)
    at org.elasticsearch.hadoop.serialization.builder.JdkValueReader.parseDate(JdkValueReader.java:424)
    at org.elasticsearch.hadoop.serialization.builder.JdkValueReader.date(JdkValueReader.java:412)
    at org.elasticsearch.hadoop.serialization.builder.JdkValueReader.readValue(JdkValueReader.java:88)
    at org.elasticsearch.hadoop.serialization.ScrollReader.parseValue(ScrollReader.java:789)
    at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:739)
    ... 31 more
    Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.elasticsearch.hadoop.util.ReflectionUtils.invoke(ReflectionUtils.java:91)
    ... 38 more
    Caused by: java.lang.IllegalArgumentException: Invalid format: "2018-10-08 19:00:41" is malformed at " 19:00:41"
    at org.joda.time.format.DateTimeFormatter.parseDateTime(DateTimeFormatter.java:945)
    ... 43 more

    这应该是我索引中定义的日期格式是yyyy-MM-dd HH:mm:ss,而org.joda.time.format.DateTimeFormatter默认使用的日期格式不同导致的,但是又不知道在哪里指定日期格式进行Format,所以真的是又遇到了个坑……

    如下测试,joda 是支持如下格式的日期格式的:

            String pattern = "yyyy-MM-dd HH:mm:ss";
            String aTime = "2018-10-08 19:00:41";
            DateTimeFormatter format = DateTimeFormat.forPattern(pattern);
            DateTime dateTime = format.parseDateTime(aTime);//no error
    

    spark2.3中依赖的:joda的版本如下:

    ~/spark-2.3.1-bin-hadoop2.7/jars$ ls | grep joda
    joda-time-2.9.3.jar

  • 相关阅读:
    微信公众平台开发(53)砸金蛋
    微信公众平台高级功能
    微信5.0安卓内测版下载
    微信公众平台2013.08.05更新说明
    淘宝微信互相屏蔽影响了谁
    WAP网页输入框的默认键盘类型控制
    如何通过微信创业赚钱
    腾讯风铃
    腾讯推出微信企业服务平台风铃
    一键生成HTML4和WAP站
  • 原文地址:https://www.cnblogs.com/hapjin/p/9550430.html
Copyright © 2011-2022 走看看