zoukankan      html  css  js  c++  java
  • 实时数仓项目-01实时模块搭建

    学习资料收集于尚硅谷

    一、实时模块搭建

    1、创建模块gmall-realtime

        <properties>
            <spark.version>2.4.0</spark.version>
            <scala.version>2.11.8</scala.version>
            <kafka.version>1.0.0</kafka.version>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
            <java.version>1.8</java.version>
        </properties>
    
       <dependencies>
           <dependency>
               <groupId>com.alibaba</groupId>
               <artifactId>fastjson</artifactId>
               <version>1.2.56</version>
           </dependency>
    
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>${kafka.version}</version>
    
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
    
           <dependency>
               <groupId>redis.clients</groupId>
               <artifactId>jedis</artifactId>
               <version>2.9.0</version>
           </dependency>
    
    
           <dependency>
               <groupId>org.apache.phoenix</groupId>
               <artifactId>phoenix-spark</artifactId>
               <version>4.14.2-HBase-1.3</version>
           </dependency>
    
           <dependency>
               <groupId>org.apache.spark</groupId>
               <artifactId>spark-sql_2.11</artifactId>
               <version>${spark.version}</version>
           </dependency>
    
           <dependency>
               <groupId>io.searchbox</groupId>
               <artifactId>jest</artifactId>
               <version>5.3.3</version>
               <exclusions>
                   <exclusion>
                       <groupId>org.slf4j</groupId>
                       <artifactId>slf4j-api</artifactId>
                   </exclusion>
               </exclusions>
           </dependency>
    
           <dependency>
               <groupId>net.java.dev.jna</groupId>
               <artifactId>jna</artifactId>
               <version>4.5.2</version>
           </dependency>
    
           <dependency>
               <groupId>org.codehaus.janino</groupId>
               <artifactId>commons-compiler</artifactId>
               <version>2.7.8</version>
           </dependency>
    
       </dependencies>
    
        <build>
            <plugins>
                <!-- 该插件用于将Scala代码编译成class文件 -->
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.4.6</version>
                    <executions>
                        <execution>
                            <!-- 声明绑定到maven的compile阶段 -->
                            <goals>
                                <goal>compile</goal>
                                <goal>testCompile</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
    
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <version>3.0.0</version>
                    <configuration>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                    </configuration>
                    <executions>
                        <execution>
                            <id>make-assembly</id>
                            <phase>package</phase>
                            <goals>
                                <goal>single</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    
    

    2、创建配置文件config.properties

    # Kafka配置
    kafka.broker.list=cdh01:9092,cdh02:9092,cdh03:9092
    # Redis配置
    redis.host=cdh01
    redis.port=6379
    

    3、PropertiesUtil

    package com.learning.realtime.util
    
    import java.io.InputStreamReader
    import java.util.Properties
    
    object PropertiesUtil {
      def main(args: Array[String]): Unit = {
        val properties: Properties = PropertiesUtil.load("config.properties")
    
        println(properties.getProperty("kafka.broker.list"))
      }
    
      def load(propertieName:String): Properties ={
        val prop=new Properties();
        prop.load(new InputStreamReader(Thread.currentThread().getContextClassLoader.getResourceAsStream(propertieName) , "UTF-8"))
        prop
      }
    
    }
    

    4、KafkaUtil

    package com.learning.realtime.util
    
    import java.util.Properties
    
    import org.apache.kafka.clients.consumer.ConsumerRecord
    import org.apache.kafka.common.TopicPartition
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.streaming.StreamingContext
    import org.apache.spark.streaming.dstream.InputDStream
    import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
    
    object KafkaUtil {
      private val properties: Properties = PropertiesUtil.load("config.properties")
      val broker_list = properties.getProperty("kafka.broker.list")
    
      // kafka消费者配置
      var kafkaParam = collection.mutable.Map(
        "bootstrap.servers" -> broker_list,//用于初始化链接到集群的地址
        "key.deserializer" -> classOf[StringDeserializer],
        "value.deserializer" -> classOf[StringDeserializer],
        //用于标识这个消费者属于哪个消费团体
        "group.id" -> "gmall_consumer_group",
        //如果没有初始化偏移量或者当前的偏移量不存在任何服务器上,可以使用这个配置属性
        //可以使用这个配置,latest自动重置偏移量为最新的偏移量
        "auto.offset.reset" -> "latest",
        //如果是true,则这个消费者的偏移量会在后台自动提交,但是kafka宕机容易丢失数据
        //如果是false,会需要手动维护kafka偏移量
        "enable.auto.commit" -> (true: java.lang.Boolean)
      )
    
      // 创建DStream,返回接收到的输入数据
      // LocationStrategies:根据给定的主题和集群地址创建consumer
      // LocationStrategies.PreferConsistent:持续的在所有Executor之间分配分区
      // ConsumerStrategies:选择如何在Driver和Executor上创建和配置Kafka Consumer
      // ConsumerStrategies.Subscribe:订阅一系列主题
    
    
      def getKafkaStream(topic: String,ssc:StreamingContext ): InputDStream[ConsumerRecord[String,String]]={
        val dStream = KafkaUtils.createDirectStream[String,String](ssc, LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](Array(topic),kafkaParam ))
        dStream
      }
    
    
      def getKafkaStream(topic: String,ssc:StreamingContext,groupId:String): InputDStream[ConsumerRecord[String,String]]={
        kafkaParam("group.id")=groupId
        val dStream = KafkaUtils.createDirectStream[String,String](ssc, LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](Array(topic),kafkaParam ))
        dStream
      }
    
      def getKafkaStream(topic: String,ssc:StreamingContext,offsets:Map[TopicPartition,Long],groupId:String): InputDStream[ConsumerRecord[String,String]]={
        kafkaParam("group.id")=groupId
        val dStream = KafkaUtils.createDirectStream[String,String](ssc, LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](Array(topic),kafkaParam,offsets))
        dStream
      }
    }
    
    
    

    5、RedisUtil

    package com.learning.realtime.util
    
    import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}
    
    object RedisUtil {
    
      var jedisPool:JedisPool=null
    
      def getJedisClient: Jedis = {
        if(jedisPool==null){
          //      println("开辟一个连接池")
          val config = PropertiesUtil.load("config.properties")
          val host = config.getProperty("redis.host")
          val port = config.getProperty("redis.port")
    
          val jedisPoolConfig = new JedisPoolConfig()
          jedisPoolConfig.setMaxTotal(100)  //最大连接数
          jedisPoolConfig.setMaxIdle(20)   //最大空闲
          jedisPoolConfig.setMinIdle(20)     //最小空闲
          jedisPoolConfig.setBlockWhenExhausted(true)  //忙碌时是否等待
          jedisPoolConfig.setMaxWaitMillis(500)//忙碌时等待时长 毫秒
          jedisPoolConfig.setTestOnBorrow(true) //每次获得连接的进行测试
    
          jedisPool=new JedisPool(jedisPoolConfig,host,port.toInt)
        }
        //    println(s"jedisPool.getNumActive = ${jedisPool.getNumActive}")
        //   println("获得一个连接")
        jedisPool.getResource
      }
    }
    
    

    6、log4j.properties

    log4j.appender.atguigu.MyConsole=org.apache.log4j.ConsoleAppender
    log4j.appender.atguigu.MyConsole.target=System.out
    log4j.appender.atguigu.MyConsole.layout=org.apache.log4j.PatternLayout    
    log4j.appender.atguigu.MyConsole.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %10p (%c:%M) - %m%n
    
    log4j.rootLogger =error,atguigu.MyConsole
    
    

    7、业务类DauApp

    package com.learning.realtime.app
    
    import java.text.SimpleDateFormat
    import java.util.Date
    
    import com.alibaba.fastjson.{JSON, JSONObject}
    import com.learning.realtime.util.KafkaUtil
    import org.apache.kafka.clients.consumer.ConsumerRecord
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.dstream.{DStream, InputDStream}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object DauApp {
      def main(args: Array[String]): Unit = {
        val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("dau_app")
        val ssc = new StreamingContext(sparkConf, Seconds(5))
        val groupId = "GMALL_DAU_CONSUMER"
        val topic = "GMALL_STARTUP"
        val startupInputDstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtil.getKafkaStream(topic, ssc)
    
    
        val startLogInfoDStream: DStream[JSONObject] = startupInputDstream.map { record =>
          val startupJson: String = record.value()
          val startupJSONObj: JSONObject = JSON.parseObject(startupJson)
          val ts: Long = startupJSONObj.getLong("ts")
          val dateHourStr: String = new SimpleDateFormat("yyyy-MM-dd :HH").format(new Date(ts))
          val dateHour = dateHourStr.split(" ")
          startupJSONObj.put("dt",dateHour(0))
          startupJSONObj.put("hr",dateHour(1))
          startupJSONObj
        }
        startLogInfoDStream.print(100)
    
        ssc.start()
        ssc.awaitTermination()
    
      }
    
    }
    
    

    8、启动

    cdh01启动

    /opt/module/nginx/sbin/nginx
    

    cdh01、cdh02、cdh03

    java -jar /opt/gmall/gmall-logger-0.0.1-SNAPSHOT.jar  >/dev/null 2>&1  &
    

    启动日志生成器 在cdh01

    java -jar /opt/applog/gmall2020-mock-log-2020-05-10.jar
    

    在idea中启动DauApp

    如果idea控制台能够打印出数据,初步搭建成功。

    二、Redis去重

    redis安装参考REDIES归一化文档

    在cdh01启动redis

     /usr/local/redis/bin/redis-server /usr/local/redis/bin/redis.conf
    
        val filterDstream = startLogInfoDStream.mapPartitions{itr => {
          val filterList: ListBuffer[JSONObject] = new ListBuffer[JSONObject]
          val jedisClient = RedisUtil.getJedisClient
          //把迭代器的数据提取到对象中,便于观察
          val listJsonObjs: List[JSONObject] = itr.toList
          println("过滤前:" + listJsonObjs.size)
          for (jsonObj <- listJsonObjs) {
            val dt = jsonObj.getString("dt")
            val mid = jsonObj.getJSONObject("common").getString("mid")
            var daukey = "dau:" + dt
            val res = jedisClient.sadd(daukey, mid) //如果不存在,保存,返回1,如果已经存在,不保存,返回0
            if (res == 1L) {
              filterList += jsonObj
            }
          }
          jedisClient.close()
          println("过滤后:" + filterList.size)
          filterList.toIterator
        }}
        filterDstream.print(1)
    

    三、数据保存到ES

    3.1创建索引模板

    PUT   _template/gmall_dau_info_template
    {
      "index_patterns": ["gmall_dau_info*"],                  
      "settings": {                                               
        "number_of_shards": 3
      },
      "aliases" : { 
        "{index}-query": {},
        "gmall_dau_info-query":{}
      },
       "mappings": {
         "_doc":{  
           "properties":{
             "mid":{
               "type":"keyword"
             },
             "uid":{
               "type":"keyword"
             },
             "ar":{
               "type":"keyword"
             },
             "ch":{
               "type":"keyword"
             },
             "vc":{
               "type":"keyword"
             },
              "dt":{
               "type":"keyword"
             },
              "hr":{
               "type":"keyword"
             },
              "mi":{
              "type":"keyword"
             },
    
             "ts":{
               "type":"date"
             }  
      
           }
         }
       }
    }
    
    

    3.2创建case class

    case class DauInfo(
                    mid:String,
                    uid:String,
                    ar:String,
                    ch:String,
                    vc:String,
                    var dt:String,
                    var hr:String,
         		var mi:String,
                    ts:Long) {
    }
    
    

    3.3 添加依赖

    <dependency>
        <groupId>io.searchbox</groupId>
        <artifactId>jest</artifactId>
        <version>5.3.3</version>
        <exclusions>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    
    <dependency>
        <groupId>net.java.dev.jna</groupId>
        <artifactId>jna</artifactId>
        <version>4.5.2</version>
    </dependency>
    
    <dependency>
        <groupId>org.codehaus.janino</groupId>
        <artifactId>commons-compiler</artifactId>
        <version>2.7.8</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch -->
    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch</artifactId>
        <version>2.4.6</version>
    </dependency>
    
    

    3.4 添加批量保存的方法

      def bulkInsert(sourceList:List[Any],indexName:String): Unit ={
        if(sourceList != null && sourceList.size >0){
          val jest: JestClient = getClient
          val builder: Bulk.Builder = new Bulk.Builder
          for ( source<- sourceList ) {
    
            val index: Index = new Index.Builder(source).index(indexName).`type`("_doc").build()
            builder.addAction(index)
          }
          val result: BulkResult = jest.execute(builder.build())
          val items: util.List[BulkResult#BulkResultItem] = result.getItems
          println("保存到ES中的条数:"+items.size())
          jest.close()
        }
      }
    

    3.5主流程代码

        filterDstream.foreachRDD(rdd=>{
          rdd.foreachPartition( itr =>{
            val list: List[JSONObject] = itr.toList
    
            val dauinfoList: List[DauInfo] = list.map(jsonObj => {
              val commonJSONObj: JSONObject = jsonObj.getJSONObject("common")
              DauInfo(
                commonJSONObj.getString("mid"),
                commonJSONObj.getString("uid"),
                commonJSONObj.getString("ar"),
                commonJSONObj.getString("ch"),
                commonJSONObj.getString("vc"),
                jsonObj.getString("dt"),
                jsonObj.getString("hr"),
                "00",
                jsonObj.getLong("ts")
              )
            })
    
            val dt: String = new SimpleDateFormat("yyyy-MM-dd").format(new Date())
            EsUtils.bulkInsert(dauinfoList,"gmall_dau_info_"+dt)
          })
        })
    

    3.6清空redis中的数据

    ./redis-cli
    
    flushall
    

    3.7查询数据

    GET gmall_dau_info-query/_search
    

    四、精确一次消费

    4.1 定义

    精确一次消费(Exactly-once) 是指消息一定会被处理且只会被处理一次。不多不少就一次处理。

    如果达不到精确一次消费,可能会达到另外两种情况:

    至少一次消费(at least once),主要是保证数据不会丢失,但有可能存在数据重复问题。

    最多一次消费(at most once),主要是保证数据不会重复,但有可能存在数据丢失问题。

    如果同时解决了数据丢失和数据重复的问题,那么就实现了精确一次消费的语义了。

    4.2 问题如何产生

    数据何时会丢失: 比如实时计算任务进行计算,到数据结果存盘之前,进程崩溃,假设在进程崩溃前kafka调整了偏移量,那么kafka就会认为数据已经被处理过,即使进程重启,kafka也会从新的偏移量开始,所以之前没有保存的数据就被丢失掉了。

    数据何时会重复: 如果数据计算结果已经存盘了,在kafka调整偏移量之前,进程崩溃,那么kafka会认为数据没有被消费,进程重启,会重新从旧的偏移量开始,那么数据就会被2次消费,又会被存盘,数据就被存了2遍,造成数据重复。

    4.3 如何解决

    策略一:利用关系型数据库的事务进行处理。

    出现丢失或者重复的问题,核心就是偏移量的提交与数据的保存,不是原子性的。如果能做成要么数据保存和偏移量都成功,要么两个失败。那么就不会出现丢失或者重复了。

    这样的话可以把存数据和偏移量放到一个事务里。这样就做到前面的成功,如果后面做失败了,就回滚前面那么就达成了原子性。

    问题与限制

    但是这种方式有限制就是数据必须都要放在某一个关系型数据库中,无法使用其他功能强大的nosql数据库。如果保存的数据量较大一个数据库节点不够,多个节点的话,还要考虑分布式事务的问题。

    策略二:手动提交偏移量+幂等性处理

    咱们知道如果能够同时解决数据丢失和数据重复问题,就等于做到了精确一次消费。

    那咱们就各个击破。

    首先解决数据丢失问题,办法就是要等数据保存成功后再提交偏移量,所以就必须手工来控制偏移量的提交时机。

    但是如果数据保存了,没等偏移量提交进程挂了,数据会被重复消费。怎么办?那就要把数据的保存做成幂等性保存。即同一批数据反复保存多次,数据不会翻倍,保存一次和保存一百次的效果是一样的。如果能做到这个,就达到了幂等性保存,就不用担心数据会重复了。

    难点

    话虽如此,在实际的开发中手动提交偏移量其实不难,难的是幂等性的保存,有的时候并不一定能保证。所以有的时候只能优先保证的数据不丢失。数据重复难以避免。即只保证了至少一次消费的语义

    五、偏移量管理

    本身kafka 0.9版本以后consumer的偏移量是保存在kafka的__consumer_offsets主题中。但是如果用这种方式管理偏移量,有一个限制就是在提交偏移量时,数据流的元素结构不能发生转变,即提交偏移量时数据流,必须是InputDStream[ConsumerRecord[String, String]] 这种结构。但是在实际计算中,数据难免发生转变,或聚合,或关联,一旦发生转变,就无法在利用以下语句进行偏移量的提交:

    xxDstream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)

    所以实际生产中通常会利用zookeeper,redis,mysql等工具对偏移量进行保存。

    5.1OffsetManager

    package com.learning.realtime.util
    
    import java.util
    
    import org.apache.kafka.common.TopicPartition
    import org.apache.spark.TaskContext
    import org.apache.spark.streaming.kafka010.OffsetRange
    import redis.clients.jedis.Jedis
    
    import scala.collection.JavaConversions._
    
    object OffsetManager {
      def getOffset(groupId:String,topic:String):Map[TopicPartition,Long]={
        var offsetMap=Map[TopicPartition,Long]()
        val jedisClient: Jedis = RedisUtil.getJedisClient
        val redisOffsetMap: util.Map[String, String] = jedisClient.hgetAll("offset:"+groupId+":"+topic)
        jedisClient.close()
        if(redisOffsetMap!=null&&redisOffsetMap.isEmpty){
          null
        }else {
          val redisOffsetList: List[(String, String)] = redisOffsetMap.toList
          val kafkaOffsetList: List[(TopicPartition, Long)] = redisOffsetList.map { case ( partition, offset) =>
            println("加载分区偏移量:"+partition+"	 start:->"+offset)
            (new TopicPartition(topic, partition.toInt), offset.toLong)
          }
          kafkaOffsetList.toMap
        }
      }
    
      /**
        * 偏移量写入到Redis中
        * @param groupId
        * @param topic
        * @param offsetArray
        */
      def saveOffset(groupId:String,topic:String ,offsetArray:Array[OffsetRange]):Unit= {
        val offsetKey = "offset:" + groupId + ":" + topic
        val jedisClient: Jedis = RedisUtil.getJedisClient
        var offsetMap: util.HashMap[String, String] = new util.HashMap[String,String]()
        for( offset <- offsetArray) {
          val partition: Int = offset.partition
          val untilOffset: Long = offset.untilOffset
          offsetMap.put(partition+"",untilOffset+"")
          //观察偏移量
          println("写入分区:"+partition+"	 start:->"+offset.fromOffset+"	 end: "+offset)
        }
          jedisClient.hmset(offsetKey, offsetMap)
          jedisClient.close()
      }
    
    }
    
    

    5.2DauApp

    完整代码:

    package com.learning.realtime.app
    
    import java.text.SimpleDateFormat
    import java.util.Date
    
    import com.alibaba.fastjson.{JSON, JSONObject}
    import com.learning.realtime.DauInfo
    import com.learning.realtime.util.{EsUtils, KafkaUtil, OffsetManager, RedisUtil}
    import org.apache.kafka.clients.consumer.ConsumerRecord
    import org.apache.kafka.common.TopicPartition
    import org.apache.spark.{SparkConf, TaskContext}
    import org.apache.spark.streaming.dstream.{DStream, InputDStream}
    import org.apache.spark.streaming.kafka010.{HasOffsetRanges, OffsetRange}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    import scala.collection.mutable.ListBuffer
    
    object DauApp {
      def main(args: Array[String]): Unit = {
        val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("dau_app")
        val ssc = new StreamingContext(sparkConf, Seconds(5))
        val groupId = "GMALL_DAU_CONSUMER"
        val topic = "GMALL_STARTUP"
        //val startupInputDstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtil.getKafkaStream(topic, ssc)
    
        //从redis读取偏移量
        val startupOffsets: Map[TopicPartition, Long] = OffsetManager.getOffset(groupId,topic)
        var startupInputDstream: InputDStream[ConsumerRecord[String, String]] = null
        if(startupOffsets!=null&&startupOffsets.isEmpty){
          //根据偏移起始点获得数据
          startupInputDstream = KafkaUtil.getKafkaStream(topic, ssc,startupOffsets,groupId)
        }else {
          startupInputDstream = KafkaUtil.getKafkaStream(topic, ssc)
        }
    
        //获得本批次偏移结束点,在transform中的目的是为了给startupOffsetRanges赋值
        var startupOffsetRanges: Array[OffsetRange] = Array.empty[OffsetRange]
        val startupInputGetOffsetDstream: DStream[ConsumerRecord[String, String]] = startupInputDstream.transform { rdd =>
          startupOffsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
          rdd
        }
    
    
        val startLogInfoDStream: DStream[JSONObject] = startupInputGetOffsetDstream.map { record =>
          val startupJson: String = record.value()
          val startupJSONObj: JSONObject = JSON.parseObject(startupJson)
          val ts: Long = startupJSONObj.getLong("ts")
          val dateHourStr: String = new SimpleDateFormat("yyyy-MM-dd :HH").format(new Date(ts))
          val dateHour = dateHourStr.split(" ")
          startupJSONObj.put("dt",dateHour(0))
          startupJSONObj.put("hr",dateHour(1))
          startupJSONObj
        }
        //startLogInfoDStream.print(100)
        val filterDstream = startLogInfoDStream.mapPartitions{itr => {
          val filterList: ListBuffer[JSONObject] = new ListBuffer[JSONObject]
          val jedisClient = RedisUtil.getJedisClient
          //把迭代器的数据提取到对象中,便于观察
          val listJsonObjs: List[JSONObject] = itr.toList
          println("过滤前:" + listJsonObjs.size)
          for (jsonObj <- listJsonObjs) {
            val dt = jsonObj.getString("dt")
            val mid = jsonObj.getJSONObject("common").getString("mid")
            var daukey = "dau:" + dt
            val res = jedisClient.sadd(daukey, mid) //如果不存在,保存,返回1,如果已经存在,不保存,返回0
            if (res == 1L) {
              filterList += jsonObj
            }
          }
          jedisClient.close()
          println("过滤后:" + filterList.size)
          filterList.toIterator
        }}
    
        filterDstream.foreachRDD(rdd=>{
          rdd.foreachPartition( itr =>{
            val list: List[JSONObject] = itr.toList
            val dauinfoList: List[DauInfo] = list.map(jsonObj => {
              val commonJSONObj: JSONObject = jsonObj.getJSONObject("common")
              DauInfo(
                commonJSONObj.getString("mid"),
                commonJSONObj.getString("uid"),
                commonJSONObj.getString("ar"),
                commonJSONObj.getString("ch"),
                commonJSONObj.getString("vc"),
                jsonObj.getString("dt"),
                jsonObj.getString("hr"),
                "00",
                jsonObj.getLong("ts")
              )
            })
    
            val dt: String = new SimpleDateFormat("yyyy-MM-dd").format(new Date())
            EsUtils.bulkInsert(dauinfoList,"gmall_dau_info_"+dt)
          })
          OffsetManager.saveOffset(groupId ,topic, startupOffsetRanges)
        })
    
        ssc.start()
        ssc.awaitTermination()
    
      }
    
    }
    
    

    六、kibana搭建数据可视化

    6.1创建index patterns


    上图圈错了,是Index Patterns


    6.2 配置单图


    横坐标是渠道:

    6.3 配置仪表盘

    七、发布查询接口

    7.1创建springboot模块

    gmall-publisher

    7.2 导入依赖

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.1.15.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.learning</groupId>
        <artifactId>gmall-publisher</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>gmall-publisher</name>
        <description>Demo project for Spring Boot</description>
    
        <properties>
            <java.version>1.8</java.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <optional>true</optional>
            </dependency>
            <dependency>
                <groupId>com.google.guava</groupId>
                <artifactId>guava</artifactId>
                <version>19.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
                <version>3.8.1</version>
            </dependency>
    
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.56</version>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
    
            <dependency>
                <groupId>io.searchbox</groupId>
                <artifactId>jest</artifactId>
                <version>5.3.3</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-api</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
    
            <dependency>
                <groupId>net.java.dev.jna</groupId>
                <artifactId>jna</artifactId>
                <version>4.5.2</version>
            </dependency>
    
            <dependency>
                <groupId>org.codehaus.janino</groupId>
                <artifactId>commons-compiler</artifactId>
                <version>2.7.8</version>
            </dependency>
    
            <dependency>
                <groupId>org.elasticsearch</groupId>
                <artifactId>elasticsearch</artifactId>
                <version>2.4.6</version>
            </dependency>
    
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>
    
    

    7.3 修改配置文件

    #配置成master所在的节点
    spring.elasticsearch.jest.uris=http://cdh01:9200
    server.port=8070
    

    7.4 service层代码

    package com.learning.gmallpublisher.service;
    
    import java.util.Map;
    
    public interface PubliserService {
        //日活的总数查询
        public   Long getDauTotal(String date);
        //日活的分时查询
        public Map getDauHour(String date);
    
    }
    
    
    package com.learning.gmallpublisher.service;
    
    import io.searchbox.client.JestClient;
    import io.searchbox.core.Search;
    import io.searchbox.core.SearchResult;
    import io.searchbox.core.search.aggregation.TermsAggregation;
    import org.elasticsearch.index.query.MatchAllQueryBuilder;
    import org.elasticsearch.search.aggregations.AggregationBuilders;
    import org.elasticsearch.search.aggregations.bucket.terms.TermsBuilder;
    import org.elasticsearch.search.builder.SearchSourceBuilder;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    @Service
    public class PubliserServiceImpl implements PubliserService{
        @Autowired
        private JestClient jestClient;
    
        @Override
        public Long getDauTotal(String date) {
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
            searchSourceBuilder.query(new MatchAllQueryBuilder());
    
            String query = searchSourceBuilder.toString();
            String indexName="gmall_dau_info_"+date+"-query";
            System.out.println(indexName);
            Search search = new Search.Builder(query).addIndex(indexName).addType("_doc").build();
            Long total=0L;
            try {
                SearchResult searchResult = jestClient.execute(search);
                if(searchResult.getTotal()!=null){
                    total = searchResult.getTotal();
                }
            } catch (IOException e) {
                e.printStackTrace();
                throw  new RuntimeException("查询ES异常");
            }
            return total;
        }
    
        @Override
        public Map getDauHour(String date) {
    
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
    
            TermsBuilder termsBuilder = AggregationBuilders.terms("groupby_hr").field("hr").size(24);
            searchSourceBuilder.aggregation(termsBuilder);
    
            String query = searchSourceBuilder.toString();
    
            String indexName="gmall_dau_info_"+date+"-query";
    
            Search search = new Search.Builder(query).addIndex(indexName).addType("_doc").build();
            Map aggsMap=new HashMap();
            try {
                SearchResult searchResult = jestClient.execute(search);
                if(searchResult.getAggregations().getTermsAggregation("groupby_hr")!=null){
                    List<TermsAggregation.Entry> buckets = searchResult.getAggregations().getTermsAggregation("groupby_hr").getBuckets();
                    for (TermsAggregation.Entry bucket : buckets) {
                        aggsMap.put(  bucket.getKey(),bucket.getCount());
                    }
                }
    
            } catch (IOException e) {
                e.printStackTrace();
                //throw  new RuntimeException("查询ES异常");
            }
    
            return aggsMap;
        }
    
    }
    
    

    7.5 controller层代码

    package com.learning.gmallpublisher.controller;
    
    import com.alibaba.fastjson.JSON;
    import com.learning.gmallpublisher.service.PubliserService;
    import org.apache.commons.lang3.time.DateUtils;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestParam;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.*;
    
    @RestController
    public class PublisherController {
        @Autowired
        private PubliserService esService;
    
        //@RequestMapping(value = "realtime-total" ,method = RequestMethod.GET)
        @GetMapping("realtime-total")
        public String realtimeTotal(@RequestParam("date") String dt){
            List<Map> totalList=new ArrayList<>();
            HashMap dauMap = new HashMap<>();
            dauMap.put("id","dau");
            dauMap.put("name","新增日活");
            Long dauTotal = esService.getDauTotal(dt);
            dauMap.put("value",dauTotal);
    
            totalList.add(dauMap);
    
            HashMap newMidMap = new HashMap<>();
            newMidMap.put("id","dau");
            newMidMap.put("name","新增设备");
            newMidMap.put("value",233);
    
            totalList.add(newMidMap);
    
            return JSON.toJSONString(totalList);
    
        }
    
        @GetMapping("realtime-hour")
        public String realtimeHour(@RequestParam(value = "id",defaultValue ="-1" ) String id ,@RequestParam("date") String dt ){
            if(id.equals("dau")){
                Map<String,Map> hourMap=new HashMap<>();
                Map dauHourTdMap = esService.getDauHour(dt);
                hourMap.put("today",dauHourTdMap);
                String yd = getYd(dt);
                Map dauHourYdMap = esService.getDauHour(yd);
                hourMap.put("yesterday",dauHourYdMap);
                return JSON.toJSONString(hourMap);
            }
            return null;
        }
    
        private  String getYd(String td){
            SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
            String yd=null;
            try {
                Date tdDate = dateFormat.parse(td);
                Date ydDate = DateUtils.addDays(tdDate, -1);
                yd=dateFormat.format(ydDate);
            } catch (ParseException e) {
                e.printStackTrace();
                throw new RuntimeException("日期格式转变失败");
            }
            return yd;
        }
    
    }
    
    

    7.6 接口验证

    http://localhost:8070/realtime-hour?id=dau&&date=2020-08-30

    http://localhost:8070/realtime-total?date=2020-08-30

    八、canal安装

  • 相关阅读:
    转:python2.x 和 python3.x的区别
    迭代器
    C++学习笔记-预备知识
    phpstudy扩展mongoDB
    Linux gd库安装步骤说明
    Linux jpeglib库的安装
    github开源项目
    本地文件拖拽到虚拟机里,文件存储位置
    linux php 扩展安装
    CentOS6.10 Nginx无法解析php文件
  • 原文地址:https://www.cnblogs.com/tangliping/p/14083595.html
Copyright © 2011-2022 走看看