/** */ object AppRealTime { def main(args: Array[String]): Unit = { if (args.length < 5) { println("please input args like: seconds checkpointdir kafkaBrokerList groupId topic") System.exit(1) } val logger = LoggerFactory.getLogger(AppRealTime.getClass) /** * 创建StreamContext * * @return */ def createStreamingContext: StreamingContext = { val conf = new SparkConf //StreamingContext,里面包含SparkContext val ssc = new StreamingContext(conf, Seconds(args(0).trim.toInt)) //设置checkpoint,保存运行数据 ssc.checkpoint(args(1).trim) //kafka连接参数 val kafkaParams = Map("metadata.broker.list" -> args(2).trim, "group.id" -> args(3).trim) //指定要读取的topics val topics = Set(args(4).trim) //创建directStream从kafka读取数据 val data = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) //迭代处理数据 data.foreachRDD(rdd => { rdd.foreachPartition(p => { val conf = HBaseConfiguration.create() //连接Connection val hConnection = ConnectionFactory.createConnection(conf) //获取table val click = hConnection.getTable(TableName.valueOf(Constants.HISTORY_CLICK)) val statistic = hConnection.getTable(TableName.valueOf(Constants.RESULT_STATISTIC)) try { while (p.hasNext) { val tuple = p.next() val logType = tuple._1 val logVal = tuple._2 println(logType+" "+logVal) logType match { case "click" => { val clickObj = new Click(logVal) if (HBaseUtil.isExists(click, clickObj.getRowKey)) { clickObj.doRepeat(statistic) } else { clickObj.doNoRepeat(click, statistic) } } case _ => { logger.info("msg:" + logVal) } } } } catch { case ex: Exception => { logger.error("error :", ex) } } finally { click.close() statistic.close() hConnection.close() } }) }) ssc } val ssc = StreamingContext.getOrCreate(args(1).trim, createStreamingContext _) ssc.start() ssc.awaitTermination() } }
经过粗略的实验(一个分区)发现,使用了这个方法之后可以实现不丢失数据