zoukankan      html  css  js  c++  java
  • 大数据实时项目(dws层)1.2

     

    第一章  双流合并

       除了事实表与维表进行合并形成宽表,还需要事实表与事实表进行合并形成更大的宽表。

     

    1  双流合并的问题

    由于两个流的数据是独立保存,独立消费,很有可能同一业务的数据,分布在不同的批次。因为join算子只join同一批次的数据。如果只用简单的join流方式,会丢失掉不同批次的数据。

    2  解决策略

     一种 利用滑动窗口进行join 然后再进行去重

      

     第二种  把数据存入缓存 ,关联时进行join后 再去查询缓存中的数据,来弥补不同批次的问题。

     

    3   代码实现

    case class


    case class OrderDetailWide(
                           var order_detail_id:Long =0L,
                           var order_id: Long=0L,
                           var order_status:String=null,
                           var create_time:String=null,
                           var user_id: Long=0L,
                           var sku_id: Long=0L,
                           var sku_price: Double=0D,
                           var sku_num: Long=0L,
                           var sku_name: String=null,
                           var benefit_reduce_amount:Double =0D ,
                           var original_total_amount:Double =0D ,
                           var feight_fee:Double=0D,
                           var final_total_amount: Double =0D ,
                           var final_detail_amount:Double=0D,

                           var if_first_order:String=null,

                           var province_name:String=null,
                           var province_area_code:String=null,

                           var user_age_group:String=null,
                           var user_gender:String=null,

                           var dt:String=null,

                           var spu_id: Long=0L,
                           var tm_id: Long=0L,
                           var category3_id: Long=0L,
                           var spu_name: String=null,
                           var tm_name: String=null,
                           var category3_name: String=null


                          )

    {
      def this(orderInfo:OrderInfo,orderDetail: OrderDetail) {
        this
        mergeOrderInfo(orderInfo)

        mergeOrderDetail(orderDetail)

      }

      def mergeOrderInfo(orderInfo:OrderInfo): Unit ={
        if(orderInfo!=null){
          this.order_id=orderInfo.id
          this.order_status=orderInfo.order_status
          this.create_time=orderInfo.create_time
          this.dt=orderInfo.create_date

          this.benefit_reduce_amount  =orderInfo.benefit_reduce_amount
          this.original_total_amount  =orderInfo.original_total_amount
          this.feight_fee =orderInfo.feight_fee
          this.final_total_amount  =orderInfo.final_total_amount


          this.province_name=orderInfo.province_name
          this.province_area_code=orderInfo.province_area_code

          this.user_age_group=orderInfo.user_age_group
          this.user_gender=orderInfo.user_gender

          this.if_first_order=orderInfo.if_first_order

          this.user_id=orderInfo.user_id
        }
      }


      def mergeOrderDetail(orderDetail: OrderDetail): Unit ={
        if(orderDetail!=null){
          this.order_detail_id=orderDetail.id
          this.sku_id=orderDetail.sku_id
          this.sku_name=orderDetail.sku_name
          this.sku_price=orderDetail.order_price
          this.sku_num=orderDetail.sku_num

          this.spu_id =orderDetail.spu_id
          this.tm_id =orderDetail.tm_id
          this.category3_id =orderDetail.category3_id
          this.spu_name =orderDetail.spu_name
          this.tm_name =orderDetail.tm_name
          this.category3_name =orderDetail.category3_name

        }
      }

     

     

     

    实时计算代码

    def main(args: Array[String]): Unit = {


      val sparkConf: SparkConf = new SparkConf().setAppName("dws_order_detail_wide_app").setMaster("local[*]")
      val ssc = new StreamingContext(sparkConf, Seconds(5))
      val orderInfoTopic = "DW_ORDER_INFO"
      val orderDetailTopic = "DW_ORDER_DETAIL"
      val groupId = "dws_order_detail_wide_consumer"
      val orderInfoOffsets: Map[TopicPartition, Long] = OffsetManager.getOffset(groupId, orderInfoTopic)

      val orderDetailOffsets: Map[TopicPartition, Long] = OffsetManager.getOffset(groupId, orderDetailTopic)


      val orderInputDstream: InputDStream[ConsumerRecord[String, String]] = MyKafkaUtil.getKafkaStream(orderInfoTopic, ssc, orderInfoOffsets, groupId)

      val orderDetailInputDstream: InputDStream[ConsumerRecord[String, String]] = MyKafkaUtil.getKafkaStream(orderDetailTopic, ssc, orderDetailOffsets, groupId)

      var orderInfoOffsetRanges: Array[OffsetRange] = Array.empty[OffsetRange]
      val orderInputNDstream: DStream[ConsumerRecord[String, String]] = orderInputDstream.transform { rdd =>
        orderInfoOffsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        rdd
      }
      var orderDetailOffsetRanges: Array[OffsetRange] = Array.empty[OffsetRange]
      val orderDetailInputNDstream: DStream[ConsumerRecord[String, String]] = orderDetailInputDstream.transform { rdd =>
        orderDetailOffsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        rdd
      }


      //把订单和订单明细 转换为 case class的流
      val orderInfoDstream: DStream[OrderInfo] = orderInputNDstream.map { record =>

        val jsonString: String = record.value()
        val orderInfo: OrderInfo = JSON.parseObject(jsonString, classOf[OrderInfo])
        orderInfo
      }

      val orderDetailDstream: DStream[OrderDetail] = orderDetailInputNDstream.map(record => JSON.parseObject(record.value, classOf[OrderDetail]))

      val orderInfoWinDstream: DStream[OrderInfo] = orderInfoDstream.window(Seconds(15), Seconds(5))
      val orderDetailWinDstream: DStream[OrderDetail] = orderDetailDstream.window(Seconds(15), Seconds(5))
      orderInfoWinDstream.cache()
      orderDetailWinDstream.cache()
      // orderInfoWinDstream.print(1000)
      //  orderDetailWinDstream.print(1000)


      // orderinfo 和 orderDetail 的双流join
      val orderInfoWithKeyDstream: DStream[(Long, OrderInfo)] = orderInfoWinDstream.map(orderInfo => (orderInfo.id, orderInfo))

      val orderDetailWithKeyDstream: DStream[(Long, OrderDetail)] = orderDetailWinDstream.map(orderDetail => (orderDetail.order_id, orderDetail))

      val orderJoinDstream: DStream[(Long, (OrderInfo, OrderDetail))] = orderInfoWithKeyDstream.join(orderDetailWithKeyDstream)

      val orderDetailWideDstream: DStream[OrderDetailWide] = orderJoinDstream.map { case (id, (orderInfo, orderDetail)) => new OrderDetailWide(orderInfo, orderDetail) }


      //去重
      val orderDetailWideFilteredDstream: DStream[OrderDetailWide] = orderDetailWideDstream.transform { rdd =>

        println("前:" + rdd.count())
        val logInfoRdd: RDD[OrderDetailWide] = rdd.mapPartitions { orderDetailWideItr =>
          val jedis: Jedis = RedisUtil.getJedisClient
          val orderDetailWideFilteredList = new ListBuffer[OrderDetailWide]

          val orderDetailWideList: List[OrderDetailWide] = orderDetailWideItr.toList

          println(orderDetailWideList.map(orderDetailWide => orderDetailWide.order_id).mkString(","))
          for (orderDetailWide <- orderDetailWideList) {

            val orderDetailWideKey = "order_detail_wide:" + orderDetailWide.dt
            val ifFirst: lang.Long = jedis.sadd(orderDetailWideKey, orderDetailWide.order_detail_id.toString)
            if (ifFirst == 1L) {
              orderDetailWideFilteredList += orderDetailWide
            }
          }
          jedis.close()
          orderDetailWideFilteredList.toIterator
        }
        logInfoRdd.cache()
        println("后:" + logInfoRdd.count())
        logInfoRdd
      }

    orderDetailWideFilteredDstream.map(orderwide=>(orderwide.order_id,orderwide.final_total_amount,orderwide.original_total_amount,  orderwide.sku_price,orderwide.sku_num,orderwide.final_detail_amount)).print(1000)
    ssc.start()
    ssc.awaitTermination()

    }

     

     

     

    注意点:join时尽量不要出现shuffle

    如何解决:

     在join前的数据保证分区是一对一的关系,利用kafka发送时的分区键,两张表的分区键和分区数保持一致。

     

     

     

    第二章 订单明细实付金额分摊

    1 需求

    主订单的应付金额【origin_total_amount】一般是由所有订单明细的商品单价*数量汇总【sku_price*sku_num】组成。

     

    但是由于优惠、运费等都是以订单为单位进行计算的,所以减掉优惠、加上运费会得到一个最终实付金额【final_total_amount】。

        

    但问题在于如果是以商品进行交易额分析,也要把优惠、运费的效果分摊到购买的每个商品中。

     

    2 如何分摊呢?

    一般是由订单明细每种商品的消费占总订单的比重进行分摊,比如总价1000元的商品,

    由分别由600元和400元的A、B两种商品组成, 但是经过打折和加运费后,实际付款金额变为810,那么A的分摊实付金额为486元和B的分摊实付金额为324元。

     

     

    3 麻烦的情况:

     

    由于明细的分摊是由占比而得,那就会进行除法,除法就有可能出现除不尽的情况。

     

    比如:原价90元 ,三种商品每件30元。没有优惠但有10元运费,总实付金额为100元。按占比分摊各三分之一,就会出现三个33.33元。加起来就会出现99.99元。就会出现差一分钱的情况。

    而我们要求所有订单明细的实付分摊加总必须和订单的总实付相等。

    所以我们要的是100=33.33+33.33+33.34

     

     

     

    4  解决思路:

    核心思路:就是需要用两种算法来计算金额

     

    1) 算法一:

    如果 计算时该明细不是最后一笔  

      使用乘除法公式::     实付分摊金额/实付总金额= (数量*单价)/原始总金额

           调整移项可得  实付分摊金额=(数量*单价)*实付总金额 / 原始总金额

    2) 算法二: 

    如果  计算时该明细是最后一笔

           使用减法公式:

       实付分摊金额= 实付总金额 - (其他明细已经计算好的【实付分摊金额】的合计)

    3) 判断是否是最后一笔

     判断公式: 如果 该条明细 (数量*单价)== 原始总金额 -(其他明细 【数量*单价】的合计)

    4)  整个计算中需要的两个合计值:

    l 其他明细已经计算好的【实付分摊金额】的合计

    l 订单的已经计算完的明细的【数量*单价】的合计

    如何保存这两个合计?保存在redis中。

    type

    hash      

    key

    order_split_amount:[order_id]  

    field

    split_amount_sum  ,  origin_amount_sum

    value

    合计值

     

     

     

     

    5、实现代码

    val orderWideWithSplitDstream: DStream[OrderDetailWide] = orderDetailWideDStream.mapPartitions { orderWideItr =>
      val jedis: Jedis = RedisUtil.getJedisClient
      //    1  先从redis取 两个合计    【实付分摊金额】的合计,【数量*单价】的合计
      val orderWideList: List[OrderDetailWide] = orderWideItr.toList

      for (orderWide <- orderWideList) {
        // type ?   hash      key? order_split_amount:[order_id]  field split_amount_sum ,origin_amount_sum    value  ?  累积金额
        val key = "order_split_amount:" + orderWide.order_id

        val orderSumMap: util.Map[String, String] = jedis.hgetAll(key)
        var splitAmountSum = 0D
        var originAmountSum = 0D
        if (orderSumMap != null && orderSumMap.size() > 0) {
          val splitAmountSumString: String = orderSumMap.get("split_amount_sum")
          splitAmountSum = splitAmountSumString.toDouble

          val originAmountSumString: String = orderSumMap.get("origin_amount_sum")
          originAmountSum = originAmountSumString.toDouble
        }
        //    2 先判断是否是最后一笔  : (数量*单价)== 原始总金额 -(其他明细 【数量*单价】的合计)
        val detailOrginAmount: Double = orderWide.sku_num * orderWide.sku_price //单条明细的原始金额  数量*单价
        val restOriginAmount: Double = orderWide.final_total_amount - originAmountSum

        if (detailOrginAmount == restOriginAmount) {
          //3.1  最后一笔 用减法 :实付分摊金额= 实付总金额 - (其他明细已经计算好的【实付分摊金额】的合计)
          orderWide.final_detail_amount = orderWide.final_total_amount - splitAmountSum

        } else {
          //3.2  不是最后一笔 用乘除  实付分摊金额=(数量*单价)*实付总金额 / 原始总金额
          orderWide.final_detail_amount = detailOrginAmount * orderWide.final_total_amount / orderWide.original_total_amount

          orderWide.final_detail_amount= Math.round(orderWide.final_detail_amount*100D)/100D
        }
        //    4  进行合计保存
        splitAmountSum += orderWide.final_detail_amount

        originAmountSum += detailOrginAmount
        orderSumMap.put("split_amount_sum", splitAmountSum.toString)
        orderSumMap.put("origin_amount_sum", originAmountSum.toString)
        jedis.hmset(key, orderSumMap)
      }
      jedis.close()
      orderWideList.toIterator
    }

     

     

     

     

     

     

    第三章 保存到clickhouse

    1 clickhouse 安装及入门,参见《尚硅谷clickhouse》课件

    2 在clickhouse中建立表

    create table  order_wide (

        order_detail_id UInt64,

        order_id  UInt64,

        order_status String,

        create_time DateTime,

        user_id UInt64,

        sku_id  UInt64,

        sku_price Decimal64(2),   

        sku_num  UInt64,     

        sku_name  String,

        benefit_reduce_amount Decimal64(2),

        original_total_amount Decimal64(2),

        feight_fee Decimal64(2),

        final_total_amount  Decimal64(2),

        final_detail_amount Decimal64(2),  

        if_first_order String,

        province_name String,

        province_area_code String,

        user_age_group String,

        user_gender String,

        dt Date,

        spu_id  UInt64,

        tm_id  UInt64,

        category3_id  UInt64,

        spu_name  String,

        tm_name  String,

        category3_name  String

    )engine =ReplacingMergeTree(create_time)

     partition by dt

       primary key (order_detail_id)

       order by (order_detail_id )

     

    3  在sparkstreaming中增加写入clickhouse部分

    3.1  pom.xml

    添加

    <dependency>
        <groupId>ru.yandex.clickhouse</groupId>
        <artifactId>clickhouse-jdbc</artifactId>
        <version>0.1.55</version>
    </dependency>

     

     

    3.2  sparkstreaming  写入clickhouse

    val sparkSession = SparkSession.builder()
      .appName("order_detail_wide_spark_app")
      .getOrCreate()

    import sparkSession.implicits._
    orderDetailWideDStream.foreachRDD{rdd=>

      val df: DataFrame = rdd.toDF()
      df.write.mode(SaveMode.Append)
        .option("batchsize", "100")
        .option("isolationLevel", "NONE") // 设置事务
        .option("numPartitions", "4") // 设置并发
        .option("driver","ru.yandex.clickhouse.ClickHouseDriver")

        .jdbc("jdbc:clickhouse://hdp1:8123/test1","order_wide",new Properties())

    }

     

     

     

     

     

     

    第四章 发布数据接口

    1  代码清单

    控制层

    PublisherController

    实现接口的web发布

    服务层

    ClickhouseService

    数据业务查询interface

    ClickhouseServiceImpl

    业务查询的实现类

    数据层

    OrderMapper

    数据层查询的interface

    OrderMapper.xml

    数据层查询的实现配置

     

     

    2 接口  

    2.1 访问路径

    总数

    http://publisher:8070/realtime-total?date=2019-02-01

    分时统计

    http://publisher:8070/realtime-hour?id=order_amount&date=2019-02-01

     

    2.2 要求数据格式

    总数

    [{"id":"dau","name":"新增日活","value":1200},

    {"id":"new_mid","name":"新增设备","value":233 },

    {"id":"order_amount","name":"新增交易额","value":1000.2 }]

    分时统计

    {"yesterday":{"11":383,"12":123,"17":88,"19":200 },

    "today":{"12":38,"13":1233,"17":123,"19":688 }}

     

    3 代码开发

    3.1 pom.xml

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-jdbc</artifactId>
    </dependency>


    <dependency>
        <groupId>org.mybatis.spring.boot</groupId>
        <artifactId>mybatis-spring-boot-starter</artifactId>
        <version>1.3.4</version>
    </dependency>


    <dependency>
        <groupId>ru.yandex.clickhouse</groupId>
        <artifactId>clickhouse-jdbc</artifactId>
        <version>0.1.55</version>
    </dependency>

     

     

     

    3.2  OrderMapper

    import java.util.List;
    import java.util.Map;

    public interface OrderMapper {

        //1 查询当日交易额总数
        public BigDecimal selectOrderAmountTotal(String date);


        //2 查询当日交易额分时明细
        public List<Map> selectOrderAmountHourMap(String date);


    }

     

     

    3.3  OrderMapper.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <!DOCTYPE mapper SYSTEM "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
    <mapper namespace="com.atguigu.gmall0105.publisher.mapper.OrderMapper">

        <select id="selectOrderAmountTotal" resultType="java.math.BigDecimal">
             select sum(final_total_amount) sum_amount from order_wide where dt=#{date}
        </select>

        <select id="selectOrderAmountHourMap" resultMap="orderAmountHour" >
           select toHour(create_time) hr ,sum(final_total_amount) am from order_wide where dt=#{date} group by toHour(create_time)

        </select>
        <resultMap id="orderAmountHour" type="java.util.Map" autoMapping="true">
        </resultMap>

    </mapper>

     

     

    3.4 application.properties

    添加:

    spring.datasource.driver-class-name=ru.yandex.clickhouse.ClickHouseDriver
    spring.datasource.url=jdbc:clickhouse://hdp1:8123/test1
            
    mybatis.mapperLocations=classpath:mapper/*.xml
    mybatis.configuration.map-underscore-to-camel-case=true

     

     

    3.5 增加扫描包路径

    @SpringBootApplication
    @MapperScan(basePackages = "com.atguigu.gmallXXXXXXX.publisher.mapper")
    public class Gmall2019PublisherApplication{

      public static void main(String[] args) {
         SpringApplication.run(Gmall2019PublisherApplication.class, args);
      }

    }

     

    3.6  ClickHouseService

    public BigDecimal getOrderAmount(String date);

    public Map getOrderAmountHour(String date);

     

     

    3.7  ClickHouseServiceImpl

    @Service
    public class ClickHouseServiceImpl implements ClickHouseService {

        @Autowired
        OrderMapper orderMapper;

        @Override
        public BigDecimal getOrderAmount(String date) {
            return orderMapper.selectOrderAmountTotal(date);
        }

        @Override
        public Map getOrderAmountHour(String date) {
            List<Map> mapList = orderMapper.selectOrderAmountHourMap(date);
            Map orderAmountHourMap=new HashMap();
            for (Map map : mapList) {
                orderAmountHourMap.put(map.get("hr"), map.get("am"));
            }
            return orderAmountHourMap;
        }

    }

     

     

     

    3.5 PublisherController

    @RestController
    public class PublisherController {

        @Autowired
        EsService esService;

        @Autowired
        ClickHouseService clickHouseService;

        @RequestMapping(value = "realtime-total",method = RequestMethod.GET)
        public String realtimeTotal(@RequestParam("date") String dt){
            List<Map<String,Object>>  rsList=new ArrayList<>();

            Map<String,Object> dauMap = new HashMap();
            dauMap.put("id","dau");
            dauMap.put("name","新增日活");
            Long dauTotal=0L;
            try {
                dauTotal = esService.getDauTotal(dt);
            }catch ( Exception e){
                e.printStackTrace();
            }
            if(dauTotal!=null){
                dauMap.put("value",dauTotal);
            }else {
                dauMap.put("value",0L);
            }

            rsList.add(dauMap);

            Map<String,Object> newMidMap = new HashMap();
            newMidMap.put("id","new_mid");
            newMidMap.put("name","新增设备");
            newMidMap.put("value",233);
            rsList.add(newMidMap);


            Map<String,Object> orderAmountMap = new HashMap();
            orderAmountMap.put("id","order_amount");
            orderAmountMap.put("name","新增交易额");
            BigDecimal orderAmount = clickHouseService.getOrderAmount(dt).setScale(2, RoundingMode.HALF_UP);
            orderAmountMap.put("value",orderAmount);

            rsList.add(orderAmountMap);

            return  JSON.toJSONString(rsList);
        }

        @GetMapping("realtime-hour")
        public String realtimeHour(@RequestParam("id") String id ,@RequestParam("date") String dt){
            if(id.equals("dau")){
                Map dauHourMapTD = esService.getDauHour(dt);
                String yd = getYd(dt);
                Map dauHourMapYD = esService.getDauHour(yd);

                Map<String,Map<String,Long>> rsMap=new HashMap<>();
                rsMap.put("yesterday",dauHourMapYD);
                rsMap.put("today",dauHourMapTD);
                return  JSON.toJSONString(rsMap);
            }else if(id.equals("order_amount")){
                Map orderAmountHourMapTD = clickHouseService.getOrderAmountHour(dt);
                String yd = getYd(dt);
                Map orderAmountHourMapYD = clickHouseService.getOrderAmountHour(yd);

                Map<String,Map<String,BigDecimal>> rsMap=new HashMap<>();
                rsMap.put("yesterday",orderAmountHourMapYD);
                rsMap.put("today",orderAmountHourMapTD);
                return  JSON.toJSONString(rsMap);
            }else{
                return  null;
            }

        }

        private  String getYd(String today){
            SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
            try {
                Date todayDate = dateFormat.parse(today);
                Date ydDate = DateUtils.addDays(todayDate, -1);
                return   dateFormat.format(ydDate);

            } catch (ParseException e) {
                e.printStackTrace();
                throw  new RuntimeException("日期格式不正确");
            }

        }



    }

     

     

     

     

  • 相关阅读:
    【修正】gooogleman嵌入式联盟部分图标,并增加gooogleman名片(20101205)
    【程序基础】==和=号的区别
    gooogleman嵌入式开发板联盟准备配发嵌入式视频教程
    【终极版】gooogleman嵌入式开发板联盟图标发布以及使用说明
    关于wince驱动和应用学习的N个问题
    【原创】深刻体会wince 驱动中Sleep函数的作用
    网站策划的经典语录
    repeater中用单选按钮
    一个专业制作网站者的自白
    (原创)文件压缩代码
  • 原文地址:https://www.cnblogs.com/shan13936/p/13949159.html
Copyright © 2011-2022 走看看