zoukankan      html  css  js  c++  java
  • flum到kafka 收集数据 存储到redis 案例 (ip.txt)

    ip.scala

    package ip
    
    import org.apache.kafka.clients.consumer.ConsumerRecord
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.SparkConf
    import org.apache.spark.rdd.RDD
    import org.apache.spark.streaming.dstream.InputDStream
    import org.apache.spark.streaming.kafka010._
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object ip {
      Logger.getLogger("org").setLevel(Level.WARN)
      def main(args: Array[String]): Unit = {
        //new sc
        val conf = new SparkConf ()
          .setAppName ( this.getClass.getSimpleName )
          .setMaster ( "local[*]" )
        val ssc=new StreamingContext(conf,Seconds(3))
    
        //创建topic
        val topic="ip01"
        val topics=Array(topic)
        //创建groupid
        val groupid="IPoffsets"
        //创建kafka链接参数
        val params=Map(
          "bootstrap.servers" -> "hadoop01:9092,hadoop02:9092,hadoop03:9092",
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          "group.id" -> groupid,
          //告诉大家从哪里消费
          "auto.offset.reset" -> "earliest",
          //是否自动提交偏移量
          "enable.auto.commit" -> (false: java.lang.Boolean)
        )
        //创建kafka直连方式
    
        //判断偏移量是否存在
        val stream: InputDStream[ConsumerRecord[String, String]] =
          KafkaUtils.createDirectStream(
            ssc,
            LocationStrategies.PreferConsistent,
            ConsumerStrategies.Subscribe[String,String](topics,params)
          )
    
        stream.foreachRDD(rdd=>{
        rdd.foreach(println(_))
          //开启偏移量
          val ranges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
          //去获取数据
          val ip1:RDD[((String,String,String),Int)] = rdd.map ( tp => {
            val splits = tp.value().split ( "[|]" )
            val prive=splits(6)
            val city = splits ( 7 )
            val fangshi=splits(9)
            ((prive,city,fangshi),1)
          } ).reduceByKey(_+_)
    
          //写一个方法,存储数据与偏移量信息
          DateMyRedis.saveDataOffset(ip1,ranges,groupid)
    
    
        })
        ssc.start()
        ssc.awaitTermination()
    
      }
    }

    DataMyRedis.scala

    package ip
    
    import java.util
    
    import day15.Jpoods
    import org.apache.kafka.common.TopicPartition
    import org.apache.spark.rdd.RDD
    import org.apache.spark.streaming.kafka010.OffsetRange
    
    import scala.collection.mutable
    
    object DateMyRedis {
      //保存数据到redis
      def saveDataOffset(result: RDD[((String,String, String),Int)],ranges: Array[OffsetRange],groupingID: String): Unit ={
        result.foreachPartition(filter=>{
          //获取jedis对象
          val jedis = Jpoods.getJedis ()
          //redis开启事务
          val transaction = jedis.multi()
          filter.foreach(tp=>{
            try {
              //存储数据
              transaction.hincrBy("IP1", tp._1._1+":"+tp._1._2+":"+tp._1._3, tp._2)
              //存储偏移量
              for (o <- ranges) {
                transaction.hset(groupingID, o.topic + ":" + o.partition, o.untilOffset.toString)
              }
            }catch {
              case _ =>
                println("报错了,需要回滚")
                transaction.discard()
            }
          })
          transaction.exec()
          jedis.close()
        })
    
    
      }
    
    
      //从redis中获取偏移量信息
      def getOffset(groupid: String, topic: String): mutable.Map[TopicPartition, Long] = {
        val offset = mutable.Map [TopicPartition, Long]()
        //tp._1  topic+partition   tp._2  offset
        import scala.collection.JavaConversions._
        val jedis = Jpoods.getJedis ()
        //导入转换list隐士转换
        val map: util.Map[String, String] = jedis.hgetAll ( groupid )
        val list = map.toList
        for (o <- list) {
          offset += new TopicPartition ( o._1.split ( ":" )( 0 ), o._1.split ( ":" )( 1 ).toInt ) -> o._2.toLong
        }
        offset
      }
    
    
    
    
    }

    Jpoods.scala

    package day15
    
    import org.apache.commons.pool2.impl.GenericObjectPoolConfig
    import redis.clients.jedis.{Jedis, JedisPool}
    
    object Jpoods {
      //设置参数
      private val conf=new GenericObjectPoolConfig()
      conf.setMaxIdle(2000)
      conf.setMaxTotal(5000)
    
      //获取jedis的连接对象
      private val jpoods=new JedisPool("192.168.186.150",6379)
    
      //获取jedis对象的方法
      def getJedis():Jedis={
        val jedis=jpoods.getResource()
        jedis.select(6)
        jedis
      }
    
    
    
    
    }

    shell脚本  flum-kafka.conf

    a1.sources = r1
    a1.channels = c1
    
    #定义source
    a1.sources.r1.type = TAILDIR
    a1.sources.r1.positionFile = /usr/local/apache-flume-1.8.0-bin/taildir_position.json
    a1.sources.r1.filegroups = f1
    a1.sources.r1.filegroups.f1 = /root/myde/logs/access.log
    
    
    #定义channel
    a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
    a1.channels.c1.kafka.bootstrap.servers = hadoop01:9092,hadoop02:9092,hadoop03:9092
    a1.channels.c1.kafka.topic = ip01
    a1.channels.c1.parseAsFlumeEvent = false
    
    #将Source和channle组装在一起
    a1.sources.r1.channels = c1
  • 相关阅读:
    ASP.NET获取服务器信息
    检测到有潜在危险的 Request.Form 值错误解决办法
    修改sql server数据库逻辑文件名的语句
    遍历类的所有属性和根据属性名动态设置属性值
    JS三种编解码方式
    获取QQ群用户列表
    关于字符串类型相关的问题总结
    学习C++之父的最新姐妹作笔记1
    【转】 Uniode TO ANSI 转换函数封装
    添加工具栏图标按钮(转)
  • 原文地址:https://www.cnblogs.com/wangshuang123/p/11165209.html
Copyright © 2011-2022 走看看