zoukankan      html  css  js  c++  java
  • Spark Streaming 读取Kafka数据写入ES

    简介: 
    目前项目中已有多个渠道到Kafka的数据处理,本文主要记录通过Spark Streaming 读取Kafka中的数据,写入到Elasticsearch,达到一个实时(严格来说,是近实时,刷新时间间隔可以自定义)数据刷新的效果。

    应用场景: 
    业务库系统做多维分析的时候,数据来源各不相同。很多历史数据都是每天定时跑批生成。但是做分析产品,对于T+0日的数据, 则不好取。对于T+0日的数据,目前我采取的解决方案就是Spark Streaming 读取Kafka写入到Elasticsearch,业务系统通过查询历史数据和T+0日数据,得到一个数据实时展示的效果。


    先介绍一下内容涉及的几个版本:

    <java.version>1.8</java.version>
    <spark.version>1.6.2</spark.version>
    <scala.version>2.10.6</scala.version>
    <elasticsearch.version>5.2.0</elasticsearch.version>
    <kafka.version>1.0</kafka.version>
    • 1
    • 2
    • 3
    • 4
    • 5

    下面是Spring boot搭建的项目结构:

    这里写图片描述

    之前学习的时候,参考的spark版本1.6.2,kafka版本是0.8的,但是后面自己做项目的kafka版本是1.0的。我把对应的kafka_2.10-0.8.2.1.jar改成kafka_2.10-0.10.0.0.jar 但是遇到了下面的这个异常:

    Exception in thread "main" java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6$$anonfun$apply$7.apply(KafkaCluster.scala:90)
        at scala.Option.map(Option.scala:145)
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:90)
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:87)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:87)
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:86)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:86)
    	at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:85)
        at scala.util.Either$RightProjection.flatMap(Either.scala:523)
        at org.apache.spark.streaming.kafka.KafkaCluster.findLeaders(KafkaCluster.scala:85)
        at org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:179)
        at org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:161)
        at org.apache.spark.streaming.kafka.KafkaCluster.getEarliestLeaderOffsets(KafkaCluster.scala:155)
        at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:213)
    	at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:211)
        at scala.util.Either$RightProjection.flatMap(Either.scala:523)
        at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:211)
        at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
        at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607)
        at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
        at com.midea.magiccube.spark.LoanInfoStatistic.getActionDStream(LoanInfoStatistic.java:210)
        at com.midea.magiccube.spark.LoanInfoStatistic.main(LoanInfoStatistic.java:69)
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    主要内容是:java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker,经过一番了解后,初步估计是kafka版本和spark版本不兼容,于是我又将版本回退,发现能够跑通。

    pom.xml内容如下:

        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>1.5.7.RELEASE</version>
            <relativePath />
        </parent>
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
            <java.version>1.7</java.version>
            <spark.version>1.6.2</spark.version>
            <scala.version>2.10.6</scala.version>
            <elasticsearch.version>5.2.0</elasticsearch.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>com.google.code.gson</groupId>
                <artifactId>gson</artifactId>
            </dependency>
    
            <!-- spark 相关 -->
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>${scala.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.10</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.10</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming-kafka_2.10</artifactId>
                <version>${spark.version}</version>
            </dependency>
    
            <!-- spark to es 相关支持 -->
            <dependency>
                <groupId>org.elasticsearch</groupId>
                <artifactId>elasticsearch-spark-13_2.10</artifactId>
                <version>${elasticsearch.version}</version>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>

    接下来就是开发具体的Spark Streaming 读写的代码了。

    1. 配置SparkConf对象并初始化es配置参数。

      SparkConf sc = new SparkConf();
      sc.setAppName("Name").setMaster("local[2]");
      sc.set("es.nodes", IP);
      sc.set("es.index.auto.create", "true");
      sc.set("es.mapping.id", "id");
      sc.set("es.port", PORT);
    2. 绑定sc参数,并设置循环取数时间间隔为5s

      JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(5));
      jssc.checkpoint("E:/checkpoint");
    3. 设置kafka配置信息,KafkaUtils.createDirectStream()方法读取信息得到 JavaPairDStream< String,String>对象dStream。

    4. dStream.mapToPair()解析kafka数据并封装成JavaPairDStream< String, 自定义实体> entityDStream对象。

    5. entityDStream.transform()将数据转化为JavaDStream dataDStream方便写入ES。

    6. 接着将数据写入ES,JavaEsSparkStreaming.saveToEs(dataDStream, “索引名”);

    7. 最后启动和关闭对象JavaStreamingContext jssc

      jssc.start();
      jssc.awaitTermination();
      jssc.close();
  • 相关阅读:
    poj2478
    poj2376
    poj2192
    poj1062
    [HDOJ2639]Bone Collector II(第k优01背包)
    [HDOJ3466]Proud Merchants(贪心+01背包)
    [HDOJ5510]Bazinga(并查集)
    [POJ3264]Balanced Lineup(线段树,区间最值差)
    [HDOJ4325]Flowers(树状数组 离散化)
    [HDOJ5521]Meeting(最短路)
  • 原文地址:https://www.cnblogs.com/hejianxin/p/9316722.html
Copyright © 2011-2022 走看看