zoukankan      html  css  js  c++  java
  • 实时分析系统灵活分析需求

    1章 需求分析

    1.1 灵活查询的场景

      数仓中存储了大量的明细数据,但是Hadoop存储的数仓计算必须经过MR,所以即时交互性非常糟糕。为了方便数据分析人员查看信息,数据平台需要提供一个能够根据文字及选项等条件,进行灵活分析判断的数据功能。

    1.2 需求详细

    输入参数

    日期

    查询数据的日期

    关键字

    根据商品名称涉及到的词进行搜索

    返回结果

    饼图

    男女比例占比

    男 ,女

    年龄比例占比

    20岁以下,20-30岁,30岁以上

    购买行为数据明细

    包括,用户id,性别年龄,级别,购买的时间,商品价格,订单状态,等信息。

    可翻页。

    2章 架构分析

    2.1 T+1 模式

    2.1.1 实现步骤

      1)利用sqoop等工具,从业务数据库中批量抽取数据;

      2)利用数仓作业,在dws层组织宽表(用户购买行为);

      3)开发spark的批处理任务,把dws层的宽表导入到ES中;

      4)从ES读取数据发布接口,对接可视化模块。

    2.1.2 特点

      优点:可以利用在离线作业处理好的dws层宽表,直接导出一份到ES进行快速交互的分析。

      缺点:因为要用离线处理的后的结果在放入ES,所以时效性等同于离线数据。

    2.2 T+0 模式

    2.2.1 实现步骤

      1)利用canal抓取对应的数据表的实时新增变化数据,推送到Kafka;

      2)在spark-streaming中进行转换,过滤,关联组合成宽表的结构;

      3)保存到ES中;

      4)从ES读取数据发布接口,对接可视化模块。

    2.2.2 特点

      优点:实时产生数据,时效性非常高。

      缺点:因为从kafka中得到的是原始数据,所以要利用spark-streaming要进行加工处理,相对来说要比批处理方式麻烦,比如join操作,其中存在网络延迟问题。

    3章 实时采集数据

    3.1 Canal 模块中增加要追踪的表

      1)在gmall_common模块中添加两个常量

    public static final String GMALL_ORDER_DETAIL = "GMALL_ORDER_DETAIL";     //gmall_order_detail
        public static final String GMALL_USER_INFO = "GMALL_USER_INFO";     //gmall_user_info

      2)修改gmall_canalclient模块中的MyClient类

    package com.yuange.canal;
    
    import com.alibaba.fastjson.JSONObject;
    import com.alibaba.otter.canal.client.CanalConnector;
    import com.alibaba.otter.canal.client.CanalConnectors;
    import com.alibaba.otter.canal.protocol.CanalEntry;
    import com.alibaba.otter.canal.protocol.Message;
    import com.google.protobuf.ByteString;
    import com.google.protobuf.InvalidProtocolBufferException;
    import com.yuange.constants.Constants;
    
    import java.net.InetSocketAddress;
    import java.util.List;
    import java.util.Random;
    
    /**
     * @作者:袁哥
     * @时间:2021/7/7 18:10
     * 步骤:
     *    ①创建一个客户端:CanalConnector(SimpleCanalConnector:  单节点canal集群、ClusterCanalConnector: HA canal集群)
     *    ②使用客户端连接 canal server
     *    ③指定客户端订阅 canal server中的binlog信息
     *    ④解析binlog信息
     *    ⑤写入kafka
     *
     * 消费到的数据的结构:
     *      Message:  代表拉取的一批数据,这一批数据可能是多个SQL执行,造成的写操作变化
     *          List<Entry> entries : 每个Entry代表一个SQL造成的写操作变化
     *          id : -1 说明没有拉取到数据
     *      Entry:
     *          CanalEntry.Header header_ :  头信息,其中包含了对这条sql的一些说明
     *              private Object tableName_: sql操作的表名
     *              EntryType; Entry操作的类型
     *                  开启事务: 写操作  begin
     *                  提交事务: 写操作  commit
     *                  对原始数据进行影响的写操作: rowdata
     *                          update、delete、insert
     *          ByteString storeValue_:   数据
     *              序列化数据,需要使用工具类RowChange,进行转换,转换之后获取到一个RowChange对象
     */
    public class MyClient {
    
        public static void main(String[] args) throws InterruptedException, InvalidProtocolBufferException {
            /**
             * 创建一个canal客户端
             * public static CanalConnector newSingleConnector(
             *              SocketAddress address,  //指定canal server的主机名和端口号
             *              tring destination,      //参考/opt/module/canal/conf/canal.properties中的canal.destinations 属性值
             *              String username,        //不是instance.properties中的canal.instance.dbUsername
             *              String password         //参考AdminGuide(从canal 1.1.4 之后才提供的),链接地址:https://github.com/alibaba/canal/wiki/AdminGuide
             * ) {...}
             * */
            CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("hadoop104", 11111),
                    "example", "", "");
    
            //使用客户端连接 canal server
            canalConnector.connect();
    
            //指定客户端订阅 canal server中的binlog信息  只统计在Order_info表
            canalConnector.subscribe("gmall_realtime.*");
    
            //不停地拉取数据   Message[id=-1,entries=[],raw=false,rawEntries=[]] 代表当前这批没有拉取到数据
            while (true){
                Message message = canalConnector.get(100);
                //判断是否拉取到了数据,如果没有拉取到,歇一会再去拉取
                if (message.getId() == -1){
                    System.out.println("暂时没有数据,先等会");
                    Thread.sleep(5000);
                    continue;
                }
    
                // 数据的处理逻辑
                List<CanalEntry.Entry> entries = message.getEntries();
                for (CanalEntry.Entry entry : entries) {
                    //判断这个entry的类型是不是rowdata类型,只处理rowdata类型
                    if (entry.getEntryType().equals(CanalEntry.EntryType.ROWDATA)){
                        ByteString storeValue = entry.getStoreValue();          //数据
                        String tableName = entry.getHeader().getTableName();    //表名
                        handleStoreValue(storeValue,tableName);
                    }
                }
            }
        }
    
        private static void handleStoreValue(ByteString storeValue, String tableName) throws InvalidProtocolBufferException {
            //将storeValue 转化为 RowChange
            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
    
            /**
             * 一个RowChange代表多行数据
             * order_info: 可能会执行的写操作类型,统计GMV    total_amount
             *          insert :  会-->更新后的值
             *          update :  不会-->只允许修改 order_status
             *          delete :  不会,数据是不允许删除
             * 判断当前这批写操作产生的数据是不是insert语句产生的
             * */
    
            // 采集 order_info 的insert
            if ( "order_info".equals(tableName) && rowChange.getEventType().equals(CanalEntry.EventType.INSERT)) {
                writeDataToKafka(Constants.GMALL_ORDER_INFO,rowChange);
            // 采集 order_detail 的insert
            }else if ("order_detail".equals(tableName) && rowChange.getEventType().equals(CanalEntry.EventType.INSERT)) {
                writeDataToKafka(Constants.GMALL_ORDER_DETAIL,rowChange);
            }else if((rowChange.getEventType().equals(CanalEntry.EventType.INSERT)
                    ||rowChange.getEventType().equals(CanalEntry.EventType.INSERT))&&"user_info".equals(tableName)){
                writeDataToKafka(Constants.GMALL_USER_INFO,rowChange);
            }
        }
    
        public static void writeDataToKafka(String topic,CanalEntry.RowChange rowChange){
            //获取行的集合
            List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
    
            for (CanalEntry.RowData rowData : rowDatasList) {
                JSONObject jsonObject = new JSONObject();
    
                //获取insert后每一行的每一列
                List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
    
                for (CanalEntry.Column column : afterColumnsList) {
                    jsonObject.put(column.getName(),column.getValue());
                }
    
                // 模拟网络波动和延迟  随机产生一个 1-5直接的随机数
                int i = new Random().nextInt(15);
    
                /*try {
                    Thread.sleep(15 * 1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }*/
    
                //发送数据到Kafka
                MyProducer.sendDataToKafka(topic,jsonObject.toJSONString());
            }
        }
    }

    4章 实时数据处理

    4.1 数据处理流程

    4.2 双流join(难点)

    4.2.1 程序流程图

    4.2.2样例类

      1)在gmall_realtime模块新建样例类OrderDetail

    package com.yuange.realtime.beans
    
    /**
     * @作者:袁哥
     * @时间:2021/7/11 21:34
     *   样例类的字段:
     *        如果需要取kafka的全部字段,设置对应的全部字段
     *        此外额外添加自己需要的字段,如果不需要Kafka中的全部字段,可以只取需要的字段
     */
    case class OrderDetail(id:String,
                           order_id: String,
                           sku_name: String,
                           sku_id: String,
                           order_price: String,
                           img_url: String,
                           sku_num: String)

      2)在gmall_realtime模块新建样例类SaleDetail

    package com.yuange.realtime.beans
    
    import java.text.SimpleDateFormat
    /** * @作者:袁哥 * @时间:2021/7/11 21:36 */ case class SaleDetail( var order_detail_id:String =null, var order_id: String=null, var order_status:String=null, var create_time:String=null, var user_id: String=null, var sku_id: String=null, var user_gender: String=null, var user_age: Int=0, var user_level: String=null, var sku_price: Double=0D, var sku_name: String=null, var dt:String=null) { def this(orderInfo:OrderInfo,orderDetail: OrderDetail) { this mergeOrderInfo(orderInfo) mergeOrderDetail(orderDetail) } def mergeOrderInfo(orderInfo:OrderInfo): Unit ={ if(orderInfo!=null){ this.order_id=orderInfo.id this.order_status=orderInfo.order_status this.create_time=orderInfo.create_time this.dt=orderInfo.create_date this.user_id=orderInfo.user_id } } def mergeOrderDetail(orderDetail: OrderDetail): Unit ={ if(orderDetail!=null){ this.order_detail_id=orderDetail.id this.sku_id=orderDetail.sku_id this.sku_name=orderDetail.sku_name this.sku_price=orderDetail.order_price.toDouble } } def mergeUserInfo(userInfo: UserInfo): Unit ={ if(userInfo!=null){ this.user_id=userInfo.id val formattor = new SimpleDateFormat("yyyy-MM-dd") val date: java.util.Date = formattor.parse(userInfo.birthday) val curTs: Long = System.currentTimeMillis() val betweenMs= curTs-date.getTime val age=betweenMs/1000L/60L/60L/24L/365L this.user_age=age.toInt this.user_gender=userInfo.gender this.user_level=userInfo.user_level } } }

      3)在gmall_realtime模块新建样例类UserInfo

    case class UserInfo(id:String,
                        login_name:String,
                        user_level:String,
                        birthday:String,
                        gender:String)

    4.2.3 App

      1)在gmall_realtime模块新建OrderDetailApp

    package com.yuange.realtime.app
    
    import java.time.{LocalDate, LocalDateTime}
    import java.time.format.DateTimeFormatter
    import java.util
    
    import com.alibaba.fastjson.JSON
    import com.google.gson.Gson
    import com.yuange.constants.Constants
    import com.yuange.realtime.beans.{OrderDetail, OrderInfo, SaleDetail, UserInfo}
    import com.yuange.realtime.utils.{MyEsUtil, MyKafkaUtil, RedisUtil}
    import org.apache.kafka.clients.consumer.ConsumerRecord
    import org.apache.spark.streaming.dstream.{DStream, InputDStream}
    import redis.clients.jedis.Jedis
    
    import scala.collection.mutable.ListBuffer
    
    /**
     * @作者:袁哥
     * @时间:2021/7/11 21:59
     */
    object OrderDetailApp extends  BaseApp {
      override var appName: String = "OrderDetailApp"
      override var duration: Int = 10
    
      def main(args: Array[String]): Unit = {
    
        run{
          val ds1: InputDStream[ConsumerRecord[String, String]] = MyKafkaUtil.getKafkaStream(Constants.GMALL_ORDER_INFO, streamingContext)
          val ds2: InputDStream[ConsumerRecord[String, String]] = MyKafkaUtil.getKafkaStream(Constants.GMALL_ORDER_DETAIL, streamingContext)
    
          //封装为K-V DS
          val ds3: DStream[(String, OrderInfo)] = ds1.map(record => {
            val orderInfo: OrderInfo = JSON.parseObject(record.value(), classOf[OrderInfo])
            // 封装create_date 和 create_hour   "create_time":"2021-07-07 01:39:33"
            val formatter1: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
            val formatter2: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd")
    
            val localDateTime: LocalDateTime = LocalDateTime.parse(orderInfo.create_time, formatter1)
            orderInfo.create_date = localDateTime.format(formatter2)
            orderInfo.create_hour = localDateTime.getHour.toString
    
            // 订单的明细数据,脱敏  演示手机号脱敏
            orderInfo.consignee_tel = orderInfo.consignee_tel.replaceAll("(\\w{3})\\w*(\\w{4})", "$1****$2")
            (orderInfo.id, orderInfo)
          })
    
          // ds3.print()
          val ds4: DStream[(String, OrderDetail)] = ds2.map(record => {
            val detail: OrderDetail = JSON.parseObject(record.value(), classOf[OrderDetail])
            (detail.order_id, detail)
          })
    
          // ds4.print()
          val ds5: DStream[(String, (Option[OrderInfo], Option[OrderDetail]))] = ds3.fullOuterJoin(ds4)
          ds5.print()
    
          val ds6: DStream[SaleDetail] = ds5.mapPartitions(partition => {
            //存放封装后的订单详请
            val saleDetails: ListBuffer[SaleDetail] = ListBuffer[SaleDetail]()
    
            //获取redis连接
            val jedis: Jedis = RedisUtil.getJedisClient()
            val gson = new Gson
            partition.foreach {
              case (order_id, (orderInfoOption, orderDetailOption)) => {
                if (orderInfoOption != None) {
                  val orderInfo: OrderInfo = orderInfoOption.get
                  // 在当前批次关联Join上的orderDetail
                  if (orderDetailOption != None) {
                    val orderDetail: OrderDetail = orderDetailOption.get
                    val detail = new SaleDetail(orderInfo, orderDetail)
                    saleDetails.append(detail)
                  }
    
                  //将order_info写入redis  ,在redis中存多久:  取系统的最大延迟(假设5min) * 2
                  //  set + expire = setex
                  jedis.setex("order_info:" + order_id, 5 * 2 * 60, gson.toJson(orderInfo))
    
                  // 从redis中查询,是否有早到的order_detail
                  val earlyOrderDetatils: util.Set[String] = jedis.smembers("order_detail:" + order_id)
                  earlyOrderDetatils.forEach(
                    str => {
                      val orderDetail: OrderDetail = JSON.parseObject(str, classOf[OrderDetail])
                      val detail = new SaleDetail(orderInfo, orderDetail)
                      saleDetails.append(detail)
                    }
                  )
    
                } else {
                  //都是当前批次无法配对的orderDetail
                  val orderDetail: OrderDetail = orderDetailOption.get
    
                  // 从redis中查询是否有早到的order_info
                  val orderInfoStr: String = jedis.get("order_info:" + orderDetail.order_id)
                  if (orderInfoStr != null) {
                    val detail = new SaleDetail(JSON.parseObject(orderInfoStr, classOf[OrderInfo]), orderDetail)
                    saleDetails.append(detail)
                  } else {
                    //说明当前Order_detail 早来了,缓存中找不到对于的Order_info,需要将当前的order-detail写入redis
                    jedis.sadd("order_detail:" + orderDetail.order_id, gson.toJson(orderDetail))
                    jedis.expire("order_detail:" + orderDetail.order_id, 5 * 2 * 60)
                  }
                }
              }
            }
    
            jedis.close()
            saleDetails.iterator
          })
    
          // 根据user_id查询 用户的其他信息
          val ds7: DStream[SaleDetail] = ds6.mapPartitions(partition => {
            val jedis: Jedis = RedisUtil.getJedisClient()
            val saleDetailsWithUserInfo: Iterator[SaleDetail] = partition.map(saleDetail => {
              val userInfoStr: String = jedis.get("user_id:" + saleDetail.user_id)
              val userInfo: UserInfo = JSON.parseObject(userInfoStr, classOf[UserInfo])
              saleDetail.mergeUserInfo(userInfo)
              saleDetail
            })
            jedis.close()
    
            saleDetailsWithUserInfo
          })
    
          //写入ES   将DS转换为 docList: List[(String, Any)]
          val ds8: DStream[(String, SaleDetail)] = ds7.map(saleDetail => ((saleDetail.order_detail_id, saleDetail)))
          ds8.foreachRDD(rdd => {
            rdd.foreachPartition(partition => {
              MyEsUtil.insertBulk("gmall_sale_detail" + LocalDate.now() , partition.toList)
            })
          })
        }
      }
    }

      2)在gmall_realtime模块新建UserInfoApp

    package com.yuange.realtime.app
    
    import com.alibaba.fastjson.JSON
    import com.yuange.constants.Constants
    import com.yuange.realtime.utils.{MyKafkaUtil, RedisUtil}
    import org.apache.kafka.clients.consumer.ConsumerRecord
    import org.apache.spark.streaming.dstream.InputDStream
    
    /**
     * @作者:袁哥
     * @时间:2021/7/13 20:11
     */
    object UserInfoApp extends BaseApp {
      override var appName: String = "UserInfoApp"
      override var duration: Int = 10
    
      def main(args: Array[String]): Unit = {
        run{
          val ds: InputDStream[ConsumerRecord[String,String]] = MyKafkaUtil.getKafkaStream(Constants.GMALL_USER_INFO,streamingContext)
          ds.foreachRDD(rdd => {
            rdd.foreachPartition(partiton => {
              //获取连接
              val jedis = RedisUtil.getJedisClient()
              partiton.foreach(record => {
                val key: String = JSON.parseObject(record.value()).getString("id")
                jedis.set("user_id:" + key, record.value())
              })
              jedis.close()
            })
          })
        }
      }
    }

      3)在gmall_realtime模块中的MyEsUtil 中给items做一个判空处理

    package com.yuange.realtime.utils
    
    import java.util.Objects
    import java.util
    
    import io.searchbox.client.config.HttpClientConfig
    import io.searchbox.client.{JestClient, JestClientFactory}
    import io.searchbox.core.{Bulk, BulkResult, Index}
    import collection.JavaConverters._
    
    /**
     * @作者:袁哥
     * @时间:2021/7/9 21:16
     */
    object MyEsUtil {
    
      private val ES_HOST = "http://hadoop102"
      private val ES_HTTP_PORT = 9200
      private var factory: JestClientFactory = null
    
      /**
       * 获取客户端
       *
       * @return jestclient
       */
      def getClient: JestClient = {
        if (factory == null) build()
        factory.getObject
      }
    
      /**
       * 关闭客户端
       */
      def close(client: JestClient): Unit = {
        if (!Objects.isNull(client)) try
          client.shutdownClient()
        catch {
          case e: Exception =>
            e.printStackTrace()
        }
      }
    
      /**
       * 建立连接
       */
      private def build(): Unit = {
        factory = new JestClientFactory
        factory.setHttpClientConfig(new HttpClientConfig.Builder(ES_HOST + ":" + ES_HTTP_PORT).multiThreaded(true)
          .maxTotalConnection(20) //连接总数
          .connTimeout(10000).readTimeout(10000).build)
      }
    
      /*
          批量插入数据到ES
    
            需要先将写入的数据,封装为 docList: List[(String, Any)]
              (String, Any):   K:id
                                V: document
       */
      def insertBulk(indexName: String, docList: List[(String, Any)]): Unit = {
        if (docList.size > 0) {
          val jest: JestClient = getClient
    
          val bulkBuilder = new Bulk.Builder().defaultIndex(indexName).defaultType("_doc")
    
          for ((id, doc) <- docList) {
            val indexBuilder = new Index.Builder(doc)
            if (id != null) {
              indexBuilder.id(id)
            }
            val index: Index = indexBuilder.build()
            bulkBuilder.addAction(index)
          }
    
          val bulk: Bulk = bulkBuilder.build()
          var items: util.List[BulkResult#BulkResultItem] = null
    
          try {
            items = jest.execute(bulk).getItems
          } catch {
            case ex: Exception => println(ex.toString)
          } finally {
            //自动关闭连接
            close(jest)
            if (items != null){
              println("保存" + items.size() + "条数据")
              /*
                items: 是一个java的集合
                 <- 只能用来遍历scala的集合
                 将items,由Java的集合转换为scala的集合    java集合.asScala
                 由scala集合转java集合 scala集合.asJava
             */
              for (item <- items.asScala) {
                if (item.error != null && item.error.nonEmpty) {
                  println(item.error)
                  println(item.errorReason)
                }
              }
            }
          }
        }
      }
    }

    4.3 ES索引建立

    PUT _template/gmall_sale_detail_template
    {
       "index_patterns": ["gmall_sale_detail*"],                  
        "settings": {                                               
          "number_of_shards": 3
        },
        "aliases" : { 
          "{index}-query": {},
          "gmall_sale_detail-query":{}
        },
        "mappings" : {
          "_doc" : {
            "properties" : {
              "order_detail_id" : {
                "type" :   "keyword"
              },
              "order_id" : {
                "type" : "keyword" 
              },
              "create_time" : {
                "type" : "date" ,
                "format" : "yyyy-MM-dd HH:mm:ss"
              },
              "dt" : {
                "type" : "date"
              },
              "order_status" : {
                    "type" : "keyword" 
              },
              "sku_id" : {
                    "type" : "keyword"
              },
              "sku_name" : {
                "type" : "text",
                "analyzer": "ik_max_word"
              },
              "sku_price" : {
                "type" : "float"
              },
              "user_age" : {
                "type" : "long"
              },
              "user_gender" : {
                "type" : "keyword" 
              },
              "user_id" : {
                "type" : "keyword" 
              },
              "user_level" : {
                "type" : "keyword",
                "index" : false 
              }
            }
          }
        }
      }

    4.4 测试

      1)启动Zookeeper

    zookeeper.sh start

      2)启动Kafka

    kafka.sh start

      3)启动canal

    /opt/module/canal/bin/startup.sh

      4)启动gmall_canalclient模块中的MyClient类的main方法,将数据从mysql传输至kafka

      5)启动ES集群

    elasticsearch.sh start

      6)启动Redis

    redis-server /opt/module/redis/redis.conf

      7)使用存储过程模拟生成数据

    #CALL `init_data`(造数据的日期, 生成的订单数, 生成的用户数, 是否覆盖写)
    call `init_data`('2021-07-07',6,3,true)

      8)启动gmall_realtime模块的UserInfoApp中的main方法,将kafka中的用户数据传入Redis

      9)启动gmall_realtime模块的OrderDetailApp中的main方法,将数据Join之后存入ES

      10)查看Redis中的数据

      11)查看ES中的数据

    5章 灵活查询数据接口开发

    5.1 传入路径及参数

    http://localhost:8070/sale_detail?date=2020-08-21&startpage=1&size=5&keyword=小米手机

    5.2 返回值

    {"total":62,"stat":[{"options":[{"name":"20岁以下","value":0.0},{"name":"20岁到30岁","value":25.8},{"name":"30岁及30岁以上","value":74.2}],"title":"用户年龄占比"},{"options":[{"name":"男","value":38.7},{"name":"女","value":61.3}],"title":"用户性别占比"}],"detail":[{"user_id":"9","sku_id":"8","user_gender":"M","user_age":49.0,"user_level":"1","sku_price":8900.0,"sku_name":"Apple iPhone XS Max (A2104) 256GB 深空灰色 移动联通电信4G手机 双卡双待","sku_tm_id":"86","sku_category1_id":"2","sku_category2_id":"13","sku_category3_id":"61","sku_category1_name":"手机","sku_category2_name":"手机通讯","sku_category3_name":"手机","spu_id":"1","sku_num":6.0,"order_count":2.0,"order_amount":53400.0,"dt":"2019-02-14","es_metadata_id":"wPdM7GgBQMmfy2BJr4YT"},{"user_id":"5","sku_id":"8","user_gender":"F","user_age":36.0,"user_level":"4","sku_price":8900.0,"sku_name":"Apple iPhone XS Max (A2104) 256GB 深空灰色 移动联通电信4G手机 双卡双待","sku_tm_id":"86","sku_category1_id":"2","sku_category2_id":"13","sku_category3_id":"61","sku_category1_name":"手机","sku_category2_name":"手机通讯","sku_category3_name":"手机","spu_id":"1","sku_num":5.0,"order_count":1.0,"order_amount":44500.0,"dt":"2019-02-14","es_metadata_id":"wvdM7GgBQMmfy2BJr4YT"},{"user_id":"19","sku_id":"8","user_gender":"F","user_age":43.0,"user_level":"5","sku_price":8900.0,"sku_name":"Apple iPhone XS Max (A2104) 256GB 深空灰色 移动联通电信4G手机 双卡双待","sku_tm_id":"86","sku_category1_id":"2","sku_category2_id":"13","sku_category3_id":"61","sku_category1_name":"手机","sku_category2_name":"手机通讯","sku_category3_name":"手机","spu_id":"1","sku_num":7.0,"order_count":2.0,"order_amount":62300.0,"dt":"2019-02-14","es_metadata_id":"xvdM7GgBQMmfy2BJr4YU"},{"user_id":"15","sku_id":"8","user_gender":"M","user_age":66.0,"user_level":"4","sku_price":8900.0,"sku_name":"Apple iPhone XS Max (A2104) 256GB 深空灰色 移动联通电信4G手机 双卡双待","sku_tm_id":"86","sku_category1_id":"2","sku_category2_id":"13","sku_category3_id":"61","sku_category1_name":"手机","sku_category2_name":"手机通讯","sku_category3_name":"手机","spu_id":"1","sku_num":3.0,"order_count":1.0,"order_amount":26700.0,"dt":"2019-02-14","es_metadata_id":"xvdM7GgBQMmfy2BJr4YU"}]}

    5.3 编写DSL语句

    GET gmall_sale_detail-query/_search
    {
      "query": {
        "bool": {
          "filter": {
            "term": {
              "dt": "2021-07-07"
            }
          }, 
          "must": [
            {"match":{
              "sku_name": {
                "query": "小米手机",
                "operator": "and"
              }
             } 
            }
          ]
        }
      } , 
    "aggs":  {
        "groupby_age": {
          "terms": {
            "field": "user_age"
          }
    },
    "groupby_gender": {
          "terms": {
            "field": "user_gender"
          }
        }
      },
    "from": 0,
      "size": 2
    }

    5.4 代码开发

    5.4.1 代码清单

    bean

    Stat

    饼图

    Option

    饼图中的选项

    控制层

    PublisherController

    增加getSaleDetail方法,调用服务层方法得到数据并根据web接口和参数组织整理返回值

    服务层

    PublisherService

    增加getSaleDetail方法

    PublisherServiceImpl

    实现getSaleDetail方法,依据DSL语句查询ElasticSearch

    5.4.2 在gmall_publisher模块中修改pom.xml

     

    <!--- ES依赖包-->
    <dependency>
        <groupId>io.searchbox</groupId>
        <artifactId>jest</artifactId>
        <version>5.3.3</version>
    </dependency>
    
    <dependency>
        <groupId>net.java.dev.jna</groupId>
        <artifactId>jna</artifactId>
        <version>4.5.2</version>
    </dependency>
    
    <dependency>
        <groupId>org.codehaus.janino</groupId>
        <artifactId>commons-compiler</artifactId>
        <version>2.7.8</version>
    </dependency>
    
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
    </dependency>

    5.4.3在gmall_publisher模块中配置 application.properties

    #es
    spring.elasticsearch.jest.uris=http://hadoop102:9200

    5.4.4 Bean

      1)新建Option

    package com.yuange.gmall.gmall_publisher.beans;
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    
    /**
     * @作者:袁哥
     * @时间:2021/7/15 10:45
     */
    @NoArgsConstructor
    @AllArgsConstructor
    @Data
    public class Option {
        String name;
        Double value;
    }

      2)新建Stat

    package com.yuange.gmall.gmall_publisher.beans;
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    
    import java.util.List;
    
    /**
     * @作者:袁哥
     * @时间:2021/7/15 10:46
     */
    @AllArgsConstructor
    @NoArgsConstructor
    @Data
    public class Stat {
        String title;
        List<Option> options;
    }

      3)新建SaleDetail

    package com.yuange.gmall.gmall_publisher.beans;
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    
    /**
     * @作者:袁哥
     * @时间:2021/7/15 10:51
     */
    @NoArgsConstructor
    @AllArgsConstructor
    @Data
    public class SaleDetail {
        private String order_detail_id;
        private String order_id;
        private String order_status;
        private String create_time;
        private String user_id;
        private String sku_id;
        private String user_gender;
        private Integer user_age;
        private String user_level;
        private Double sku_price;
        private String sku_name;
        private String dt;
        // 多添加
        private String  es_metadata_id;
    }

      4)lombok注解说明:

    @Data:注解会自动增加getter 和setter方法
    @AllArgsConstructor:会自动增加包含全部属性的构造函数
    @NoArgsConstructor:添加无参构造器

      5)需要pom.xml增加依赖(我之前已经加入依赖了,若你没加,就加上)

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>

    5.4.5 ESDao

    package com.yuange.gmall.gmall_publisher.dao;
    
    import com.alibaba.fastjson.JSONObject;
    
    import java.io.IOException;
    
    /**
     * @作者:袁哥
     * @时间:2021/7/15 10:53
     */
    public interface ESDao {
        JSONObject getESData(String date, Integer startpage, Integer size, String keyword) throws IOException;
    }

    5.4.6 ESDaoImpl

    package com.yuange.gmall.gmall_publisher.dao;
    
    import com.alibaba.fastjson.JSONObject;
    import com.yuange.gmall.gmall_publisher.beans.Option;
    import com.yuange.gmall.gmall_publisher.beans.SaleDetail;
    import com.yuange.gmall.gmall_publisher.beans.Stat;
    import io.searchbox.client.JestClient;
    import io.searchbox.core.Search;
    import io.searchbox.core.SearchResult;
    import io.searchbox.core.search.aggregation.MetricAggregation;
    import io.searchbox.core.search.aggregation.TermsAggregation;
    import org.elasticsearch.index.query.MatchQueryBuilder;
    import org.elasticsearch.search.aggregations.AggregationBuilders;
    import org.elasticsearch.search.aggregations.bucket.terms.TermsBuilder;
    import org.elasticsearch.search.builder.SearchSourceBuilder;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Repository;
    
    import java.io.IOException;
    import java.text.DecimalFormat;
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * @作者:袁哥
     * @时间:2021/7/15 10:56
     */
    @Repository
    public class ESDaoImpl implements  ESDao {
    
        @Autowired //从容器中取一个JestClient类型的对象
        private JestClient jestClient;
    
        /*
                从ES中查询出数据
                date :   指定查询的index 名称
                                gmall2020_sale_detail +  date
                keyword: 全文检索的关键字
                startpage:
                        计算from :  (N - 1) size
                        1页 10 条数据。
                        startpage: 1    from: 0  size: 10
                        startpage: 2    from: 10 size 10
                        startpage: N     from: (N - 1) size
                        startpage: 3     from: 20 , size : 10
                size:   查询返回的数据条数
                查询条件:
                    GET /gmall2020_sale_detail-query/_search
                    {
                      "query": {
                        "match": {
                          "sku_name": "手机"
                        }
                      },
                      "from": 0,
                      "size": 20,
                      "aggs": {
                        "genderCount": {
                          "terms": {
                            "field": "user_gender",
                            "size": 10
                          }
                        },
                        "ageCount": {
                          "terms": {
                            "field": "user_age",
                            "size": 150
                          }
                        }
                      }
                    }
    
         */
        public SearchResult getDataFromES(String date, Integer startpage, Integer size, String keyword) throws IOException {
            String indexName ="gmall_sale_detail" + date;
            int from = (startpage - 1 ) * size;
    
            // genderCount:{}
            TermsBuilder aggs1 = AggregationBuilders.terms("genderCount").field("user_gender").size(10);
    
            // "ageCount":{}
            TermsBuilder aggs2 = AggregationBuilders.terms("ageCount").field("user_age").size(150);
    
            // query":{}
            MatchQueryBuilder matchQueryBuilder = new MatchQueryBuilder("sku_name", keyword);
    
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(matchQueryBuilder).aggregation(aggs1).aggregation(aggs2);
    
            //生成查询的字符串
            Search search = new Search.Builder(searchSourceBuilder.toString()).addType("_doc").addIndex(indexName).build();
            SearchResult searchResult = jestClient.execute(search);
            return searchResult;
        }
    
    
        // 将ES中查询的数据,按照指定的格式,封装 detail数据
        public List<SaleDetail> getDetailData(SearchResult searchResult){
    
            ArrayList<SaleDetail> saleDetails = new ArrayList<>();
    
            List<SearchResult.Hit<SaleDetail, Void>> hits = searchResult.getHits(SaleDetail.class);
    
            for (SearchResult.Hit<SaleDetail, Void> hit : hits) {
                SaleDetail saleDetail = hit.source;
                saleDetail.setEs_metadata_id(hit.id);
                saleDetails.add(saleDetail);
            }
            return saleDetails;
        }
    
        //  将ES中查询的数据,按照指定的格式,封装 age相关的 stat数据
        public Stat getAgeStat(SearchResult searchResult){
    
            Stat stat = new Stat();
    
            MetricAggregation aggregations = searchResult.getAggregations();
    
            TermsAggregation ageCount = aggregations.getTermsAggregation("ageCount");
    
            List<TermsAggregation.Entry> buckets = ageCount.getBuckets();
    
            int agelt20=0;
            int agege30=0;
            int age20to30=0;
    
            double sumCount=0;
    
            for (TermsAggregation.Entry bucket : buckets) {
                if (Integer.parseInt(bucket.getKey()) < 20 ){
                    agelt20 += bucket.getCount();
                }else if(Integer.parseInt(bucket.getKey()) >= 30){
                    agege30 += bucket.getCount();
                }else{
                    age20to30+=bucket.getCount();
                }
            }
            sumCount = age20to30 + agege30 + agelt20;
    
            DecimalFormat format = new DecimalFormat("###.00");
    
            List<Option> ageoptions =new ArrayList<>();
    
            double perlt20 = agelt20 / sumCount * 100;
            double per20to30 = age20to30 / sumCount * 100;
    
            ageoptions.add(new Option("20岁以下",Double.parseDouble(format.format(perlt20  ))));
            ageoptions.add(new Option("20岁到30岁",Double.parseDouble(format.format( per20to30))));
            ageoptions.add(new Option("30岁及30岁以上",Double.parseDouble(format.format(100 - perlt20 - per20to30  ))));
    
            stat.setOptions(ageoptions);
            stat.setTitle("用户年龄占比");
            return stat;
        }
    
        public Stat getGenderStat(SearchResult searchResult){
            Stat stat = new Stat();
            MetricAggregation aggregations = searchResult.getAggregations();
    
            TermsAggregation ageCount = aggregations.getTermsAggregation("genderCount");
    
            List<TermsAggregation.Entry> buckets = ageCount.getBuckets();
    
            int maleCount=0;
            int femaleCount=0;
    
            double sumCount=0;
    
            for (TermsAggregation.Entry bucket : buckets) {
                if (bucket.getKey().equals("F") ){
                    femaleCount += bucket.getCount();
                }else{
                    maleCount += bucket.getCount();
                }
            }
            sumCount = maleCount + femaleCount;
    
            DecimalFormat format = new DecimalFormat("###.00");
    
            List<Option> ageoptions =new ArrayList<>();
    
            ageoptions.add(new Option("男",Double.parseDouble(format.format(maleCount / sumCount * 100  ))));
            ageoptions.add(new Option("女",Double.parseDouble(format.format( (1 - maleCount / sumCount ) * 100  ))));
    
            stat.setOptions(ageoptions);
            stat.setTitle("用户性别占比");
            return stat;
        }
    
        //  将ES中查询的数据,按照指定的格式,封装 gender相关的 stat数据
        @Override
        public JSONObject getESData(String date, Integer startpage, Integer size, String keyword) throws IOException {
    
            SearchResult searchResult = getDataFromES(date, startpage, size, keyword);
    
            List<SaleDetail> detailData = getDetailData(searchResult);
    
            Stat ageStat = getAgeStat(searchResult);
    
            Stat genderStat = getGenderStat(searchResult);
    
            JSONObject jsonObject = new JSONObject();
    
            jsonObject.put("total",searchResult.getTotal());
    
            ArrayList<Stat> stats = new ArrayList<>();
            stats.add(ageStat);
            stats.add(genderStat);
            jsonObject.put("stat",stats);
            jsonObject.put("detail",detailData);
            return jsonObject;
        }
    }

    5.4.PublisherService

    JSONObject getESData(String date, Integer startpage, Integer size, String keyword) throws IOException;

    5.4.PublisherServiceImpl

    @Autowired
    private ESDao esDao;
    
    @Override
    public JSONObject getESData(String date, Integer startpage, Integer size, String keyword) throws IOException {
       return esDao.getESData(date,startpage,size,keyword);
    }

    5.4.9 PublisherController

    /*
            http://localhost:8070/sale_detail?date=2020-08-21&startpage=1&size=5&keyword=小米手机
         */
        @RequestMapping(value = "/sale_detail")
        public JSONObject handle3(String date,Integer startpage,Integer size,String keyword) throws IOException {
            return publisherService.getESData(date,startpage,size,keyword);
        }

    5.4.10 index.html

    <!DOCTYPE html>
    <html lang="en">
    <head>
        <meta charset="UTF-8">
        <title>Title</title>
    </head>
    <body>
    
    <a href="/realtime-total?date=2021-07-06">统计每日日活和新增设备</a>
    
    <br/>
    <a href="/realtime-hours?id=dau&date=2021-07-06">统计昨天和今天的分时DAU数据</a>
    
    <br/>
    <a href="/realtime-hours?id=order_amount&date=2021-07-07">统计昨天和今天的分时GMV数据</a>
    
    <br/>
    <a href="/sale_detail?date=2021-07-15&startpage=1&size=10&keyword=手机">请求购物明细明细</a>
    
    </body>
    </html>

    5.4.11 启动gmall_publisher模块

    5.4.12 测试:

      1)先在ES中查询是否有2021-07-15的数据,若没有,下面的操作会返回500错误码

      2)访问:http://localhost:8070/

      3)完整实时项目代码已上传至GitHub:https://github.com/LzMingYueShanPao/gmall_sparkstream.git

    5.5 对接可视化模块 

  • 相关阅读:
    一个实现编译次数记录的jsfl
    特殊的RSS图标设置,您可以免费使用
    jquery实现的视差滚动教程(视差大背景效果)
    40个免费的wordpress主题推荐
    php+mysql方便的查询
    jQuery 简单实现select二级联动
    我对Oracle的刷未提交数据到文件的学习体会
    dbms_output.put_line的小例子
    isqlplus 的 define 与 pl/sql 的 &
    PLSQL 的 for循环的小例子
  • 原文地址:https://www.cnblogs.com/LzMingYueShanPao/p/14995375.html
Copyright © 2011-2022 走看看