zoukankan      html  css  js  c++  java
  • spark基础(1)

    将相同国家进行分组,然后将count相加sum(count), 对sum(count)进行排序,输出top5

        val path="/Volumes/Data/BigData_code/data/flight-data/csv/2015-summary.csv"
        val data = spark.read.option("inferSchema", "true").option("header", "true").csv(path)
        //查询前5个count max 的国家
        data.groupBy("DEST_COUNTRY_NAME").sum("count")
          .withColumnRenamed("sum(count)", "destination_total")
          .sort(desc("destination_total"))
          .limit(5).show()
    

    代码的执行如图:

    查看用户在一天内进行采集所用费用最多的日期:
    下面是表格的格式:

        //添加一个列用于统计总费用,并查看用户话费最多的是哪个日期
        val selectData = staticData.selectExpr("CustomerId", "(UnitPrice * Quantity) as total_cost", "InvoiceDate")
        selectData.show()
        //进行分组:分组的标准:客户的ID 和 购买的时间(一天内为相同标准), 并对分组的内容进行统计
        val groupData = selectData.groupBy(
          col("CustomerId"), window(col("InvoiceDate"), "1 day")
        ).sum("total_cost")
        groupData.show(5)
    

    window函数:https://blog.csdn.net/weixin_38653290/article/details/83962789

    使用流处理实现相同功能

        //进行分组:分组的标准:客户的ID 和 购买的时间(一天内为相同标准), 并对分组的内容进行统计
        val streamData = spark.readStream.schema(staticSchema) //设置分区
          .option("maxFilesPerTrigger", 1) //设置一次读入的文件个数
          .format("csv")
          .option("header", "true")
          .load(path)
        //执行相同的逻辑操作
        val streamGroupData = streamData.selectExpr(
          "CustomerId", "(UnitPrice * Quantity) as total_cost", "InvoiceDate"
        ).groupBy(
          $"CustomerId", window($"InvoiceDate", "1 day")
        ).sum("total_cost")
    

    注意由于流处理和静态处理不一样,所以无法使用静态处理中的动作操作。流处理是将流处理的结果放入内存的一个表中。每一次处理完,不断的更新这个表即可

        //将结果存入内存中
        streamGroupData.writeStream.format("memory")    //表示存入内存中
          .queryName("streamGroupData")     //表示存入内存的表的名字
          .outputMode("complete")     //complete表示表中所有记录
          .start()
    

    然后查询

        //对流处理后的结果进行查询
        spark.sql(
          """
            |select *
            |from streamGroupData
            |order by 'sum(total_cost)' desc
            |""".stripMargin).show(5)
    
  • 相关阅读:
    MySQL 中 truncate 和 delete 的坑
    理解 Linux 中的 inodes
    Linux 下date命令的常见用法
    Linux 下du命令详解
    nc
    启动Tomcat时报错:错误: 代理抛出异常错误: java.rmi.server.ExportException: Port already in use: 1101; nested exception is: java.net.BindException: Address already in use: JVM_Bind
    ssm调用webservice接口并进行文件传输
    mybatis 中 执行INSERT操作后获取自增主键
    mysql时间差8小时
    生成验证码工具类
  • 原文地址:https://www.cnblogs.com/ALINGMAOMAO/p/14432200.html
Copyright © 2011-2022 走看看