zoukankan      html  css  js  c++  java
  • Spark Streaming 读取 Kafka 中数据

    一、什么是 Spark Streaming

      1、SparkStreaming 是 Spark核心API 的扩展。可实现可伸缩、高吞吐、容错机制的实时流处理。

       如图,数据可从 Kafka、Flume、HDFS 等多种数据源获得,最后将数据推送到 HDFS、数据库 或者 Dashboards 上面。

            

        2、SparkStreaming 接收到实时的数据,然后按照时间段将实时数据分成多个批次,经过Spark处理引擎的数据处理,最后按照批次输出。

               

       3、SparkStreaming 提供了一个高抽象的离散流或者叫做 DStream,它相当于连续的数据流。

        外部数据不断的涌入,数据按照自定义的时间将数据进行切片,每个时间段内的数据是连续的,时间段与时间段之间的关系史相互独立的。这就是 离散流

          DStreamRDD 的序列化,DStream 可以看作一组 RDD 的集合。

              

       4、在DStream上执行的任何操作都转换为对基础RDD的操作。

    二、需要的maven包

    <!-- Spark Streaming -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>
    
    <!-- config的统一配置 -->
    <dependency>
        <groupId>com.typesafe</groupId>
        <artifactId>config</artifactId>
        <version>1.3.3</version>
    </dependency>
    
    <!-- json格式转化 -->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.62</version>
        </dependency>

    三、项目参数的配置文件

      首先在 src/resources 路径下创建一个 application.conf 文件,上面 maven 文件中添加的 config 依赖默认会读取 src/resources 路径下的 application.conf 文件。

    applicaltion.conf

    kafka.topic = "topicName"
    kafka.group.id = "Your group"
    kafka.broker.list = "your Kafka's host:port"
    
    redis.host = "redisIP, no port"
    redis.db=1

       Redis 会有 16个库,这里面 redis.db=1,代表我会将数据存入到redis的1库中。

       然后我会创建一个 Object,通过 ConfigFactory 来获取 application 中的参数的值.

    object ParamsConf {
      private lazy val conf = ConfigFactory.load() 
    
      val topics = conf.getString("kafka.topic").split(",")
      val groupId = conf.getString("kafka.group.id")
      val brokers = conf.getString("kafka.broker.list")
      val redisHost = conf.getString("redis.host")
      val redisDB = conf.getInt("redis.db")
    
        
      val kafkaParams = Map[String, Object](
        "bootstrap.servers" -> brokers,
        "key.deserializer" -> classOf[StringDeserializer],
        "value.deserializer" -> classOf[StringDeserializer],
        "group.id" -> groupId,
        "auto.offset.reset" -> "latest",
        "enable.auto.commit" -> (false: java.lang.Boolean)
      )
    }

       这样项目就会很方便的通过这个工具类来获取到统一的参数配置了。

      

    四、向 Kafka 传入 Demo 数据

       通过 KafkaProducer 创建与 Kafka 之间的连接,KafkaProducer 在创建对象的时候需要传入生产者的配置参数。通过读 Source Code 都可以查到对应的参数配置

    val prop = new Properties()
    // 序列化用到的 key、value
    prop.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
    prop.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")
    // kafka的地址,在上面的配置里
    prop.put("bootstrap.servers", ParamsConf.brokers)
    prop.put("request.required.acks", "1")
    
    val topic = ParamsConf.topics(0)
    
    val producer = new KafkaProducer[String, String](prop)
    

      其中 request.required.acks 是 Kafka 的发送确认,有 -1、0、1 三个级别。-1 代表 producer 会获得所有同步replicas 都收到数据的确认,才会发下一条消息;0代表 producer 不等待确认消息,producer中有message就会发给broker;1代表 获得leader replica已经接收了数据的确认信息。

      之后利用循环编造数据,并将数据发送给 Kafka

    val random = new Random()
    val dateFormat = FastDateFormat.getInstance("yyyyMMddHHmmss")
    
    for(i <- 1 to 100) {
          val time = dateFormat.format(new Date())
          val userId = random.nextInt(1000).toString
          val courseid = random.nextInt(500).toString
          val fee = random.nextInt(400).toString
          val result = Array("0","1") // 0 表示未成功;1 表示成功
          val flag = result(random.nextInt(2)).toString
          var orderId = UUID.randomUUID().toString()
    
          val map = new util.HashMap[String, Object]()
          map.put("time",time)
          map.put("userId",userId)
          map.put("courseid",courseid)
          map.put("fee",fee)
          map.put("result",result)
          map.put("flag",flag)
          map.put("orderId",orderId)
    
          val json = new JSONObject(map)
    
      producer.send(new ProducerRecord[String, String](topic, i.toString, json.toString()))
        }
    

      

    五、 Spark Streaming 对接 Kafka

      初始化 SparkStreaming 程序,首先需要创建 StreamingContext。StreamingContext的创建方式:

    val conf = new SparkConf().setMaster("local[2]").setAppName("hunterV2")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(5))
    ...
    ... Do Something...
    ...
    ssc.start()
    ssc.awaitTermination()
    

      在逻辑代码之后要调用 ssc.start() 来开始实时处理数据。ssc.awaitTermination() 方法也是必需的, 来等待应用程序的终止,也可以用 ssc.stop() 来终止程序。或者就是让它持续不断的运行进行计算

      通过 KafkaUtils 以直连方式拉取数据,这种方式不会修改数据的偏移量,需要手动的更新

    val stream = KafkaUtils.createDirectStream(ssc,
                           LocationStrategies.PreferConsistent,
                           ConsumerStrategies.Subscribe[String, String](ParamsConf.topics, ParamsConf.kafkaParams))
    

      createDirectStream 有三个参数,查看 Source Code 

    def createDirectStream[K, V](
          ssc: StreamingContext,
          locationStrategy: LocationStrategy,
          consumerStrategy: ConsumerStrategy[K, V]
    )
    

      前两个参数较明确,第三个参数中则是要添加 kafka 的 topic 以及 Kafka 的信息,在 ParamsConf 中已经定义好了,这个也可以在官网文档中可以找到。(https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html)

      拉取数据之后,SparkStreaming 提供的 stream 变量是 DStream 类型数据。DStream 是一组 RDD 的序列,DStream 任何操作都转换为对基础 RDD 的操作。因此需要用到 DStream 的 action -- foreachRDD.

    stream.foreachRDD( rdd => {
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      ...
      ... Do SomeThing
      ...
      stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
    })
    

      在逻辑代码的前后有两行 关于 offsetRanges 的语句,这是 kafka 自带的用作管理 offset 的语句。第一句是读取当前的偏移量的数据,逻辑执行成功之后,最后一句是将偏移量提交到 Kafka 上。(https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#kafka-itself)

        

  • 相关阅读:
    系统使用 aspose.cell , 使得ashx第一次访问会变很慢
    aspx页面生成html
    SQL Server 监控 使用sp_trace_create
    IE6 IE7: div中table宽度100%导致的宽度问题
    VSTO Word2003 添加菜单栏, 添加工具栏
    mysql主主同步的配置
    linux永久添加静态路由有两种方法
    centos 7 进入单用户系统并且更改所有系统文件
    rabbitmq-3.7.2编译安装全过程
    linux centos 升级 make 4.2
  • 原文地址:https://www.cnblogs.com/Lyh1997/p/12260949.html
Copyright © 2011-2022 走看看