zoukankan      html  css  js  c++  java
  • 大数据 Spark 连接外部资源

    Spark中使用外部连接获取配置信息

    Spark Streaming在启动的时候只能使用一个数据源的数据,但是我们的配置是随着业务进行改变的,所以需要动态的进行业务配置的获取。

    连接redis

    使用单例模式,在Driver上定义,在分区上遍历,JedisConnectionPool是在Master上定义的,广播到Worker上,同时JedisConnectionPool在每个work上始终只有一个实例存在,因为在方法中使用的是懒加载模式,只有在启动时才会初始化JedisConnectionPool,所以是在节点上完成的初始化,所以也不会出现序列化问题。

    object HandleAppData {
      val LOG = LoggerFactory.getLogger(this.getClass)
    
      val port = 1
    
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[2]").setAppName("HandleKafkaDataAndApp")
    
        val ssc = new StreamingContext(conf, Seconds(10))
        val kafkaParams = Map[String, String](
          "metadata.broker.list" -> "ifan1:9092", //kafka集群地址
          "serializer.class" -> "kafka.serializer.StringEncoder",
          "group.id" -> "test1", //消费者组名
          "auto.offset.reset" -> "largest", //latest自动重置偏移量为最新的偏移量
          "enable.auto.commit" -> "false") //如果是true,则这个消费者的偏移量会在后台自动提交
        val topics = Set("testspark")
    
        val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
    
        stream.foreachRDD {
          rdd =>
            val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
            rdd.foreachPartition(handleHBase(_, offsetRanges))
        }
    
        ssc.start()
        ssc.awaitTermination()
      }
    
      def handleHBase(iterator: Iterator[(String, String)], offsetRanges: Array[OffsetRange]): Unit = {
        val jedis = JedisConnectionPool.getConnection()
        val pipe = jedis.pipelined()
        try {
          val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
          LOG.info(s"topic = ${o.topic} partiton = ${o.partition} offset = ${o.untilOffset} fromOffset = ${o.fromOffset}")
          // 将获取的数据放在 redis 中
          iterator.foreach(kv => {
            pipe.lpush(s"data:${o.topic}:${o.partition}", kv._2)
            LOG.info(s"${o.topic} => ${o.partition} => ${kv._2}")
          })
    
          // 保存 kafka offset
          jedis.set(s"partition:${o.topic}:${o.partition}", s"${o.untilOffset}")
        } finally {
          pipe.exec()
          pipe.close()
          jedis.close()
        }
      }
    
    }
    
    object JedisConnectionPool {
    
      val config = new JedisPoolConfig()
      // 最大连接数
      config.setMaxTotal(3)
      // 最大空闲连接数
      config.setMaxIdle(1)
      val pool = new JedisPool(config, "ifan1", 63790)
    
      def getConnection(): Jedis = {
        pool.getResource
      }
    }
    
    
  • 相关阅读:
    HDU2027 统计元音 一点点哈希思想
    湖南工业大学第一届ACM竞赛 数字游戏 字符串处理
    湖南工业大学第一届ACM竞赛 我素故我在 DFS
    HDU3293sort
    HDU2082 找单词 母函数
    HDU1018 Big Number 斯特林公式
    湖南工业大学第一届ACM竞赛 分糖果 位操作
    UVA 357 Let Me Count The Ways
    UVA 147 Dollars
    UVA 348 Optimal Array Multiplication Sequence
  • 原文地址:https://www.cnblogs.com/iFanLiwei/p/12839689.html
Copyright © 2011-2022 走看看