zoukankan      html  css  js  c++  java
  • SparkSQL 疫情Demo练习

    在家闲着没事干, 写个简单的疫情数据处理Demo, 顺便回顾下SparkSQL。

    模拟数据(以下数据皆为虚构, 如有雷同不胜荣幸)

    • 市民信息(civic_info.csv)

      id_no,name,sex,age,province,city,district,residence,home_domicile,working_company
      310228198706300137,李言,男,33,湖北,武汉,江岸区,湖北省武汉市江岸区XXX小区NNN室,上海市松江区XXX小区MMM室,XXX有限公司
      310228198808241049,朱艳,女,32,湖北,武汉,江汉区,湖北省武汉市江汉区XXX小区NNN室,上海市嘉定区XXX小区MMM室,YYY有限公司
      310228198907141175,肖人风,男,31,湖北,武汉,汉阳区,湖北省武汉市汉阳区XXX小区NNN室,上海市浦东新区XXX小区MMM室,ZZZ有限公司
      310228199009212154,黄军,男,30,湖北,武汉,青山区,湖北省武汉市青山区XXX小区NNN室,上海市黄浦区XXX小区MMM室,TTT有限公司
      310228199101304567,周子明,男,29,湖北,武汉,洪山区,湖北省武汉市洪山区XXX小区NNN室,上海市闵行区XXX小区MMM室,FFF有限公司
      310228199204213278,张燕,女,28,湖北,武汉,江夏区,湖北省武汉市江夏区XXX小区NNN室,上海市静安区XXX小区MMM室,SSS有限公司
      310228199305213306,江大仁,男,27,湖北,武汉,蔡甸区,湖北省武汉市蔡甸区XXX小区NNN室,上海市长宁区XXX小区MMM室,UUU有限公司
      310228199411010721,袁天罡,男,26,湖北,武汉,黄陂区,湖北省武汉市黄陂区XXX小区NNN室,上海市虹口区XXX小区MMM室,III有限公司
      310228199503220823,马鹏,男,25,湖北,武汉,硚口区,湖北省武汉市硚口区XXX小区NNN室,上海市徐汇区XXX小区MMM室,PPP有限公司
      310228199608120317,聂平,男,24,湖北,黄冈,黄州区,湖北省黄冈市黄州区XXX小区NNN室,湖北省武汉市东西湖区XXX小区MMM室,WWW有限公司
      310228199609170831,胡冰,女,24,湖北,孝感,孝南区,湖北省孝感市孝南区XXX小区NNN室,湖北省武汉市江夏区XXX小区MMM室,QQQ有限公司
      
    • 票务信息(简化为仅高铁, ticket_info.csv)

      ticket_no,train_no,carriage_no,seat_no,passenger_name,passenger_id,departure,destination,departure_time,arrival_time
      HB9567,SH6634,B,11,李言,310228198706300137,武汉,上海,2020-02-09|19:30,2020-02-10|10:30
      HA6749,SH6634,C,23,朱艳,310228198808241049,武汉,上海,2020-02-09|19:30,2020-02-10|10:30
      HC7746,SH6634,D,14,肖人风,310228198907141175,武汉,上海,2020-02-09|19:30,2020-02-10|10:30
      HD8279,SH6634,A,22,黄军,310228199009212154,武汉,上海,2020-02-09|19:30,2020-02-10|10:30
      HM3324,SH6634,C,12,周子明,310228199101304567,武汉,上海,2020-02-09|19:30,2020-02-10|10:30
      HB4597,SH6634,D,23,张燕,310228199204213278,武汉,上海,2020-02-09|19:30,2020-02-10|10:30
      HA2163,SH6634,E,07,江大仁,310228199305213306,武汉,上海,2020-02-09|19:30,2020-02-10|10:30
      HC5632,SH6634,A,03,袁天罡,310228199411010721,武汉,上海,2020-02-09|19:30,2020-02-10|10:30
      HB3306,SH6634,B,09,马鹏,310228199503220823,武汉,上海,2020-02-09|19:30,2020-02-10|10:30
      HA1601,SH6634,C,11,梁冬,310228199307290931,重庆,上海,2020-02-09|15:30,2020-02-10|10:30
      HA2703,SH6634,D,15,赵珂,310228199106151321,四川,上海,2020-02-09|12:30,2020-02-10|10:30
      HC7734,SH6634,F,13,戴拿,310228199212012371,拉萨,上海,2020-02-09|06:30,2020-02-10|10:30
      

    需要导入的Maven坐标

    <!-- 版本封装在properties属性中以解耦,我用的2.4.4 -->
    
    <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_2.12</artifactId>
          <version>${sparkVersion}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-sql_2.12</artifactId>
          <version>${sparkVersion}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming_2.12</artifactId>
          <version>${sparkVersion}</version>
          <scope>provided</scope>
        </dependency>
    

    先测试一下读取csv文件

    package com.ronnie
    
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    object ReadCSVTest {
      def main(args: Array[String]): Unit = {
        val reader: SparkSession = SparkSession.builder()
          .appName("CSV Reader")
          .master("local")
          .getOrCreate()
    
        val civic_info: DataFrame = reader.read.format("csv")
          .option("delimiter",",") // 分隔符,看你具体是啥, 有的可能是|
          .option("header", "true") // 是否有头部,会自动帮你处理
          .option("nullValue", "\N") // 空值替换成什么
          .option("inferSchema","true") // 启用推断模式
          .load("src/main/resources/civic_info.csv") // 其实应该存到hdfs或S3上, 从hdfs或S3上拿会比较好
    
        civic_info.show()
        civic_info.printSchema()
    
    
        val ticket_info: DataFrame = reader.read.format("csv")
          .option("delimiter",",")
          .option("header", "true")
          .option("nullValue", "\N")
          .option("inferSchema","true")
          .load("src/main/resources/ticket_info.csv")
    
        ticket_info.show()
        civic_info.printSchema()
      }
    }
    
    

    然后直接干业务

    package com.ronnie
    
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    object ReadCSVAsSQLTest {
      def main(args: Array[String]): Unit = {
        val reader: SparkSession = SparkSession.builder()
          .appName("CSV Reader")
          .master("local")
          .getOrCreate()
    
        val civic_info: DataFrame = reader.read.format("csv")
          .option("delimiter",",")
          .option("header", "true")
          .option("nullValue", "\N")
          .option("inferSchema","true")
          .load("src/main/resources/civic_info.csv")
    
        civic_info.createTempView("civic")
    
        val ticket_info: DataFrame = reader.read.format("csv")
          .option("delimiter",",")
          .option("header", "true")
          .option("nullValue", "\N")
          .option("inferSchema","true")
          .load("src/main/resources/ticket_info.csv")
    
        ticket_info.createTempView("ticket")
    
        println("湖北籍人员信息如下: ")
        reader.sql("select id_no, name from civic where province = '湖北'").show()
    
        println("来自武汉疫区人员如下: ")
        reader.sql("select id_no, name from civic where city = '武汉'").show()
    
        println("需要对员工进行隔离观察14天的公司: ")
        reader.sql("select distinct working_company from civic where province = '湖北'").show()
    
        println("有感染风险的车厢为: ")
        reader.sql("select distinct carriage_no from ticket where departure = '武汉'").show()
    
        println("需要执行隔离的人员: ")
        reader.sql("select passenger_name, passenger_id from ticket where carriage_no in (select distinct carriage_no from ticket where departure = '武汉')").show()
    
        // ps: 真正操作大数据时不可能全打印出来, 可以count一下查看到的条数来做判断。
      }
    }
    
    
  • 相关阅读:
    BZOJ 3295 【CQOI2011】 动态逆序对
    POJ 3714 Raid
    树状数组区间修改加区间查询
    codevs 2606 约数和问题
    UOJ #150 【NOIP2015】 运输计划
    分享知识-快乐自己:IDEA 导入(web)项目并部署到 Tomcat
    分享知识-快乐自己:配置(各种)环境变量
    分享知识-快乐自己:什么是MVC
    分享知识-快乐自己:SpringBoot 使用注解API的方式定义启动端口号
    分享知识-快乐自己:Oracle中定义及使用同义词
  • 原文地址:https://www.cnblogs.com/ronnieyuan/p/12249118.html
Copyright © 2011-2022 走看看