zoukankan      html  css  js  c++  java
  • spark案例实战

    1.数据导入hive表

    导入opt目录下用户收视信息表.csv到hive的UserViewInfo表

    建表sql命令 **注意如果是分号隔开,不能直接输入,要用转义字符** ``` create table UserViewInfo( TVID bigint, count_date string, TVNum int, TVName string, TVBeginTime double, TVEndTime double) row format delimited fields terminated by ','; load data local inpath '/opt/TVProject/UserViewInfo.csv' overwrite into table UserViewInfo; ``` ## 导入opt目录下用户信息表.csv到hive的UserInfo表 **注意:日期类型数据导入有问题,所以日期这里都处理为string,后续用代码分离** ``` create table UserInfo( UserID bigint, TVBrand string, UserStatus string, ChangeTime bigint, PreMoney int, Package string, XiaoShouPin string, ZiFei string, BeginTime string, EffetiveTime string, InvalidTime string, TVID bigint) row format delimited fields terminated by ','; load data local inpath '/opt/TVProject/UserInfo.csv' overwrite into table UserInfo; ```

    2.统计UserViewInfo表中观看时间超过半个小时的用户

    创建工程,新建Object类

    scala代码,本程序学习如何自定义udf函数建新列
    package spark
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.SparkSession
    object UserViewInfo {
      //机顶盒设备号,统计日期,频道号,频道名,收看开始时间,收看结束时间
      //10001,20170831,1,中央1台-高清,42978.85,42978.85139
      //case class UserView{TVID:String;CountDate:Long;TVNum:Int;TVName:String;
       // TVBeginTime:Double;TVEndTime:Double}
      def main(args: Array[String]): Unit = {
       val spark = SparkSession.builder().enableHiveSupport().getOrCreate()
        spark.sparkContext.setLogLevel("WARN")
       val df_ViewData=spark.read.table("tv.userviewinfo")
         def getTime(beginTimeStr:String,endTimeStr:String):Int={
          val bstr:Array[String]=beginTimeStr.split(":")
           val estr:Array[String]=endTimeStr.split(":")
           val time:Int=estr(0).toInt*60+estr(1).toInt-(bstr(0).toInt*60+bstr(1).toInt)
           return time
        }
        val getLabel_udf=udf((x:String,y:String)=>getTime(x,y))
        val result=df_ViewData.withColumn("WatchTime",getLabel_udf(col("TVBeginTime"),col("TVEndTime")))
          .filter("WatchTime>30")
        result.show(10)
        println(result.count())
        result.write.mode("overwrite").saveAsTable("tv.test1")
      }
    }
    

    调试程序碰到问题

    • 1.spark-submit一直出错,经过很久检查,发现是hive没有启动。(spark启动后一定要jps检查是否有RUNJAR进程)。同时java环境代码务必加入val spark = SparkSession.builder().enableHiveSupport().getOrCreate()
    • 2.虚拟机内存优秀,如果在xshell客户端开了spark-shell,相当于启动一个spark应用进程,一定要在master:8080页面kill掉,否则一直提示任务分配失败
    • 3.集群提交命令spark-submit --master spark://master:7077 --class spark.UserViewInfo /opt/word.jar
    • 4.csv文件导入hive表乱码问题。csv文件不是utf-8编码,可以用记事本打开,另存为时候格式改utf-8,文件再提交opt目录下,重新建hive表,load data数据,则中文显示正常。
  • 相关阅读:
    vim 末行模式简单练习
    末行模式
    vim 简单用法
    sed用法
    在原有的基础之上,启用NAT模型
    启用隔离模型
    一个前端的自我修养
    如何提升我的HTML&CSS技术,编写有结构的代码
    MVC缓存
    MVC分页
  • 原文地址:https://www.cnblogs.com/linli069/p/13750071.html
Copyright © 2011-2022 走看看