zoukankan      html  css  js  c++  java
  • Spark:如何替换sc.parallelize(List(item1,item2)).collect().foreach(row=>{})为并行?

    代码场景:

    1)设定的几种数据场景,遍历所有场景:依次统计满足每种场景条件下的数据,并把统计结果存入hive;

    2)已有代码如下:

        case class IndoorOTTCalibrateBuildingVecotrLegend(oid: Int, minHeight: Int, maxHeight: Int, minGridIDCount: Int, maxGridIDCount: Int, heightType: Int) extends Serializable
    
        //  实例化建筑物区间段:按照栅格的个数(面积)、楼的高度(商场等场景)来划分场景
        val buildingHeightLegends = List(
          IndoorOTTCalibrateBuildingVecotrLegend(1, 1, 30, 1, 21, BuildingCalibrateHeightType.HeightType1.toString.toInt),
          IndoorOTTCalibrateBuildingVecotrLegend(2, 1, 30, 21, 45, BuildingCalibrateHeightType.HeightType2.toString.toInt),
          IndoorOTTCalibrateBuildingVecotrLegend(3, 1, 30, 45, 100, BuildingCalibrateHeightType.HeightType3.toString.toInt),
          IndoorOTTCalibrateBuildingVecotrLegend(4, 30, 50, 1, 21, BuildingCalibrateHeightType.HeightType4.toString.toInt),
          IndoorOTTCalibrateBuildingVecotrLegend(5, 30, 50, 21, 45, BuildingCalibrateHeightType.HeightType5.toString.toInt),
          IndoorOTTCalibrateBuildingVecotrLegend(6, 30, 50, 45, 100, BuildingCalibrateHeightType.HeightType6.toString.toInt),
          IndoorOTTCalibrateBuildingVecotrLegend(7, 50, 5000, 1, 100, BuildingCalibrateHeightType.HeightType7.toString.toInt)
        )
    
        spark.sparkContext.parallelize(buildingHeightLegends).collect().foreach(buildingHeightLegend => {
          generateSampleBySenceType(spark, p_city, p_hour_start, p_hour_end, p_fpb_day, p_day_sample, linkLossCalibrateParameter, buildingHeightLegend)
        })

    备注:

    在generateSampleBySenceType()函数内部包含有:

    spark.sql(s"""
    |xxx |where t10.heihgt>=${buildingHieghtLegend.MinHeight} and t10.height<${buildingHieghtLegend.MaxHeight} |and t10.gridcount<=${buildingHieghtLegend.MinGridIDCount} and t10.gridcount>${buildingHieghtLegend.MaxGridIDCount}
    |""".stripMargin)

    如果把代码修改:

        val buildingHeightLegends_df = spark.sqlContext.createDataFrame(buildingHeightLegends)
        buildingHeightLegends_df.createOrReplaceTempView("temp_buildingheightlegends")
        
        sql(s"""|select * from temp_buildingheightlegends""".stripMargin).repartition(buildingHeightLegends.length).foreachPartition(rows => {
          for (row <- rows) {
            val buildingHeightLegend = new IndoorOTTCalibrateBuildingVecotrLegend(
              row.getAs[Int]("oid"),
              row.getAs[Int]("minheight"),
              row.getAs[Int]("maxheight"),
              row.getAs[Int]("mingrididcount"),
              row.getAs[Int]("maxgrididcount"),
              row.getAs[Int]("heighttype"))
            generateSampleBySenceType(spark, p_city, p_hour_start, p_hour_end, p_fpb_day, p_day_sample, linkLossCalibrateParameter, buildingHeightLegend)
          }
        })

    则会提示:generateSampleBySenceType()内部sql代码位置抛出SparkSession为NULL的异常。

    修改方案:

    把buildingHeightLegends注册为临时表temp_buildingHeightLegends,去掉外层的foreach,之后在generateSampleBySenceType()内部把temp_buildingHeightLegends与其他结果集合进行cross join:

    测试代码如下:

    -- 场景表
    CREATE TABLE [dbo].[test_senceitems](
        [sencetype] [int] NULL,
        [minheight] [int] NULL,
        [maxheight] [int] NULL,
        [mingridcount] [int] NULL,
        [maxgridcount] [int] NULL
    )
    INSERT [dbo].[test_senceitems] ([sencetype], [minheight], [maxheight], [mingridcount], [maxgridcount]) VALUES (1, 1, 30, 1, 21)
    INSERT [dbo].[test_senceitems] ([sencetype], [minheight], [maxheight], [mingridcount], [maxgridcount]) VALUES (2, 1, 30, 21, 45)
    INSERT [dbo].[test_senceitems] ([sencetype], [minheight], [maxheight], [mingridcount], [maxgridcount]) VALUES (3, 1, 30, 45, 100)
    INSERT [dbo].[test_senceitems] ([sencetype], [minheight], [maxheight], [mingridcount], [maxgridcount]) VALUES (4, 30, 50, 1, 21)
    INSERT [dbo].[test_senceitems] ([sencetype], [minheight], [maxheight], [mingridcount], [maxgridcount]) VALUES (5, 30, 50, 21, 45)
    INSERT [dbo].[test_senceitems] ([sencetype], [minheight], [maxheight], [mingridcount], [maxgridcount]) VALUES (6, 30, 50, 45, 100)
    INSERT [dbo].[test_senceitems] ([sencetype], [minheight], [maxheight], [mingridcount], [maxgridcount]) VALUES (7, 50, 5000, 1, 100)
    
    -- 业务过滤统计表
    CREATE TABLE [dbo].[test_grid](
        [gridid] [nvarchar](50) NULL,
        [height] [int] NULL,
        [gridcount] [int] NULL
    ) 
    
    INSERT [dbo].[test_grid] ([gridid], [height], [gridcount]) VALUES (N'g1', 8, 23)
    INSERT [dbo].[test_grid] ([gridid], [height], [gridcount]) VALUES (N'g2', 3, 87)
    INSERT [dbo].[test_grid] ([gridid], [height], [gridcount]) VALUES (N'g3', 4, 34)
    INSERT [dbo].[test_grid] ([gridid], [height], [gridcount]) VALUES (N'g4', 30, 54)
    INSERT [dbo].[test_grid] ([gridid], [height], [gridcount]) VALUES (N'g5', 32, 32)
    INSERT [dbo].[test_grid] ([gridid], [height], [gridcount]) VALUES (N'g6', 32, 20)
    INSERT [dbo].[test_grid] ([gridid], [height], [gridcount]) VALUES (N'g7', 120, 34)
    INSERT [dbo].[test_grid] ([gridid], [height], [gridcount]) VALUES (N'g8', 89, 54)
    INSERT [dbo].[test_grid] ([gridid], [height], [gridcount]) VALUES (N'g9', 9, 16)

    替换generateSampleBySenceType()内部sql(s"""|""".stripMargin)代码类似如下:

    select t10.*,t11.* 
    from test_grid t10 
    cross join test_senceitems t11
    where t10.height>=t11.minheight and t10.height<t11.maxheight
    and t10.gridcount>=t11.mingridcount and t10.gridcount<t11.maxgridcount

  • 相关阅读:
    指针的引用
    引用的基础知识
    const的基础用法
    解决'fopen':this function or variable may be unsafe先关问题的方法
    C++类中不写成员函数易犯错模型
    PCB接地设计宝典:ADI资深专家总结的良好接地指导原则
    电路板级的电磁兼容性设计
    自然对流仿真设置
    Pspice 原理图以及仿真图 输出到 word
    信号完整性常见问题
  • 原文地址:https://www.cnblogs.com/yy3b2007com/p/8505152.html
Copyright © 2011-2022 走看看