zoukankan      html  css  js  c++  java
  • Spark 双流join代码示例

    基本思想

      与flink流的join原理不同的是,Spark双流join是对俩个流做满外连接 ,因为网络延迟等关系,不能保证每个窗口中的数据key都能匹配上,这样势必会出现三种情况:(some,some),(None,some),(Some,None),根据这三种情况,下面做一下详细解析:

    (some,some)—— 1号流和2号流中key能正常进行逻辑运算,但是考虑到2号流后续可能会有剩下的数据到来,所以需要将1号流中的key保存到redis,以等待接下来的数据

    (None,Some)—— 找不到1号流中对应key的数据,需要去redis中查找1号流的缓存,如果找不到,则缓存起来,等待1号流

     (Some,None)—— 找不到2号流中的数据,需要将key保存到redis,以等待接下来的数据,并且去reids中找2号流的缓存,如果有,则join,然后删除2号流的缓存

    代码示例

    def fullJoin(orderInfoStream: DStream[OrderInfo], orderDetailStream: DStream[OrderDetail]) = {
            val orderIdAndOrderInfo: DStream[(String, OrderInfo)] =
                orderInfoStream.map(info => (info.id, info))
            val orderIdAndOrderDetail: DStream[(String, OrderDetail)] =
                orderDetailStream.map(info => (info.order_id, info))
            
            orderIdAndOrderInfo
                .fullOuterJoin(orderIdAndOrderDetail)
                .mapPartitions((it: Iterator[(String, (Option[OrderInfo], Option[OrderDetail]))]) => {
                    // 获取redis客户端
                    val client: Jedis = RedisUtil.getClient
                    // 读写操作
                    val result: Iterator[SaleDetail] = it.flatMap {
                        // order_info有数据, order_detail有数据
                        case (orderId, (Some(orderInfo), Some(orderDetail))) =>
                            println("Some(orderInfo)   Some(orderDetail)")
                            // 1. 把order_info信息写入到缓存(因为order_detail信息有部分信息可能迟到)
                            cacheOrderInfo(orderInfo, client)
                            // 2. 把信息join到一起(其实就是放入一个样例类中)  (缺少用户信息, 后面再专门补充)
                            val saleDetail = SaleDetail().mergeOrderInfo(orderInfo).mergeOrderDetail(orderDetail)
                            // 3. 去order_detail的缓存找数据, 进行join
                            // 3.1 先获取这个order_id对应的所有的order_detail的key
                            import scala.collection.JavaConversions._
                            val keys: List[String] = client.keys("order_detail:" + orderInfo.id + ":*").toList // 转成scala集合
                            val saleDetails: List[SaleDetail] = keys.map(key => {
                                val orderDetail: OrderDetail = JSON.parseObject(client.get(key), classOf[OrderDetail])
                                // 删除对应的key, 如果不删, 有可能造成数据重复
                                client.del(key)
                                SaleDetail().mergeOrderInfo(orderInfo).mergeOrderDetail(orderDetail)
                            })
                            saleDetail :: saleDetails
                        case (orderId, (Some(orderInfo), None)) =>
                            println("Some(orderInfo), None")
                            // 1. 把order_info信息写入到缓存(因为order_detail信息有部分信息可能迟到)
                            cacheOrderInfo(orderInfo, client)
                            // 3. 去order_detail的缓存找数据, 进行join
                            // 3.1 先获取这个order_id对应的所有的order_detail的key
                            import scala.collection.JavaConversions._
                            val keys: List[String] = client.keys("order_detail:" + orderInfo.id + ":*").toList // 转成scala集合
                            val saleDetails: List[SaleDetail] = keys.map(key => {
                                val orderDetail: OrderDetail = JSON.parseObject(client.get(key), classOf[OrderDetail])
                                // 删除对应的key, 如果不删, 有可能造成数据重复
                                client.del(key)
                                SaleDetail().mergeOrderInfo(orderInfo).mergeOrderDetail(orderDetail)
                            })
                            saleDetails
                        case (orderId, (None, Some(orderDetail))) =>
                            println("None, Some(orderDetail)")
                            // 1. 去order_info的缓存中查找
                            val orderInfoJson = client.get("order_info:" + orderDetail.order_id)
                            if (orderInfoJson == null) {
                                // 3. 如果不存在, 则order_detail缓存
                                cacheOrderDetail(orderDetail, client)
                                Nil
                            } else {
                                // 2. 如果存在, 则join
                                val orderInfo = JSON.parseObject(orderInfoJson, classOf[OrderInfo])
                                SaleDetail().mergeOrderInfo(orderInfo).mergeOrderDetail(orderDetail) :: Nil
                            }
                    }
                    
                    // 关闭redis客户端
                    client.close()
                    
                    result
                })
            
        }
  • 相关阅读:
    .net操作oracle,一定要用管理员身份运行 visual studio 啊,切记切记,免得报奇怪的错误。
    基于 bootstrap 的 vue 分页组件
    前端UI框架《Angulr》入门
    EF 中 Code First 的数据迁移以及创建视图
    Oracle自动备份.bat 最新更新(支持Win10了)
    总结一下Android中主题(Theme)的正确玩法
    并不优雅
    思考:有三扇门,其中一扇门里有奖品,三选一,你选择其中一扇门之后,主持人先不揭晓答案,而是从另外两扇门中排除掉一个没有奖品的门,现在主持人问你,要不要换个门,请问你换还是不换?
    TensorFlow开发者证书 中文手册
    在C#下使用TensorFlow.NET训练自己的数据集
  • 原文地址:https://www.cnblogs.com/yangxusun9/p/13137592.html
Copyright © 2011-2022 走看看