zoukankan      html  css  js  c++  java
  • StreamingContext.getOrCreate

    /**
     
      */
    object AppRealTime {
    
      def main(args: Array[String]): Unit = {
        if (args.length < 5) {
          println("please input args like: seconds checkpointdir kafkaBrokerList groupId topic")
          System.exit(1)
        }
        val logger = LoggerFactory.getLogger(AppRealTime.getClass)
    
        /**
          * 创建StreamContext
          *
          * @return
          */
        def createStreamingContext: StreamingContext = {
          val conf = new SparkConf
          //StreamingContext,里面包含SparkContext
          val ssc = new StreamingContext(conf, Seconds(args(0).trim.toInt))
          //设置checkpoint,保存运行数据
          ssc.checkpoint(args(1).trim)
    
          //kafka连接参数
          val kafkaParams = Map("metadata.broker.list" -> args(2).trim, "group.id" -> args(3).trim)
          //指定要读取的topics
          val topics = Set(args(4).trim)
    
    
          //创建directStream从kafka读取数据
          val data = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
    
          //迭代处理数据
          data.foreachRDD(rdd => {
            rdd.foreachPartition(p => {
              val conf = HBaseConfiguration.create()
              //连接Connection
              val hConnection = ConnectionFactory.createConnection(conf)
              //获取table
              val click = hConnection.getTable(TableName.valueOf(Constants.HISTORY_CLICK))
              val statistic = hConnection.getTable(TableName.valueOf(Constants.RESULT_STATISTIC))
    
              try {
                while (p.hasNext) {
                  val tuple = p.next()
                  val logType = tuple._1
                  val logVal = tuple._2
                  println(logType+"	"+logVal)
                  logType match {
                    case "click" => {
                      val clickObj = new Click(logVal)
                      if (HBaseUtil.isExists(click, clickObj.getRowKey)) {
                        clickObj.doRepeat(statistic)
                      } else {
                        clickObj.doNoRepeat(click, statistic)
                      }
                    }
                    case _ => {
                      logger.info("msg:" + logVal)
                    }
    
                  }
    
                }
              } catch {
                case ex: Exception => {
                  logger.error("error :", ex)
                }
              } finally {
                click.close()
                statistic.close()
                hConnection.close()
              }
            })
          })
          ssc
        }
    
        val ssc = StreamingContext.getOrCreate(args(1).trim, createStreamingContext _)
    
        ssc.start()
        ssc.awaitTermination()
    
      }
    
    }

    经过粗略的实验(一个分区)发现,使用了这个方法之后可以实现不丢失数据

  • 相关阅读:
    ssi服务器端指令
    json格式的转换为json字符串函数
    接口测试基础和jmeter
    【JZOJ6274】梦境
    【JZOJ6275】小L的数列
    【luoguP4721】分治 FFT
    【luoguP3868】猜数字
    中国剩余定理与扩展中国剩余定理
    【JZOJ6277】矩阵游戏
    【JZOJ6271】锻造 (forging)
  • 原文地址:https://www.cnblogs.com/rocky-AGE-24/p/7463698.html
Copyright © 2011-2022 走看看