zoukankan      html  css  js  c++  java
  • sparkStreaming读取kafka写入hive表

    sparkStreaming:

    package hive
     
    import java.io.File
    import org.apache.kafka.clients.consumer.ConsumerRecord
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.sql.{Row, SparkSession}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.dstream.InputDStream
    import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
     
    /**
      * spark消费多个topic的数据写入不同的hive表
      */
    object SparkToHive {
      def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.WARN)
        Logger.getLogger("org.apache.kafka.clients.consumer").setLevel(Level.WARN)
        val warehouseLocation = new File("hdfs://cluster/hive/warehouse").getAbsolutePath
        @transient
        val spark = SparkSession
          .builder()
          .appName("Spark SQL To Hive")
          .config("spark.sql.warehouse.dir", warehouseLocation)
          .enableHiveSupport()
          .getOrCreate()
        spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
     
        @transient
        val sc = spark.sparkContext
        val scc = new StreamingContext(sc, Seconds(1))
        val kafkaParams = Map[String, Object](
          "auto.offset.reset" -> "latest", //latest,earliest
          "value.deserializer" -> classOf[StringDeserializer]
          , "key.deserializer" -> classOf[StringDeserializer]
          , "bootstrap.servers" -> "10.200.10.24:6667,10.200.10.26:6667,10.200.10.29:6667"
          , "group.id" -> "test_jason"
          , "enable.auto.commit" -> (true: java.lang.Boolean)
        )
     
        var stream: InputDStream[ConsumerRecord[String, String]] = null
        val topics = Array("test", "test1","test2")
     
        stream = KafkaUtils.createDirectStream[String, String](
          scc,
          LocationStrategies.PreferConsistent,
          ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
        )
     
        stream.foreachRDD(rdd=>{
          if (!rdd.isEmpty()) {
            val cache_rdd = rdd.map(_.value()).cache()
            // a 表
            val a = cache_rdd.filter(_.contains("hello"))
            // b 表
            val b = cache_rdd.filter(_.contains("jason"))
            // 都可以打印结果,下面的代码就不在写了,可以参考另一篇博客里面写hive的
            a.foreach(println)
            b.foreach(println)
          }
        })
        scc.start()
        scc.awaitTermination()
      }
    }
  • 相关阅读:
    for,foreach,$.each()跳出循环的比较
    十大经典排序算法
    沙箱模式以及其使用到的IIFE
    绝对路径和相对路径的区别
    必备的JS调试技巧汇总
    通过.frm表结构和.ibd文件恢复数据
    Jenkins使用QQ邮箱构建邮件提醒服务
    windows常用命令积累
    7.手机屏幕分辨率
    6.移动端App安装包的测试用例
  • 原文地址:https://www.cnblogs.com/30go/p/11676670.html
Copyright © 2011-2022 走看看