zoukankan      html  css  js  c++  java
  • spark案例实战

    1.数据导入hive表

    导入opt目录下用户收视信息表.csv到hive的UserViewInfo表

    建表sql命令 **注意如果是分号隔开,不能直接输入,要用转义字符** ``` create table UserViewInfo( TVID bigint, count_date string, TVNum int, TVName string, TVBeginTime double, TVEndTime double) row format delimited fields terminated by ','; load data local inpath '/opt/TVProject/UserViewInfo.csv' overwrite into table UserViewInfo; ``` ## 导入opt目录下用户信息表.csv到hive的UserInfo表 **注意:日期类型数据导入有问题,所以日期这里都处理为string,后续用代码分离** ``` create table UserInfo( UserID bigint, TVBrand string, UserStatus string, ChangeTime bigint, PreMoney int, Package string, XiaoShouPin string, ZiFei string, BeginTime string, EffetiveTime string, InvalidTime string, TVID bigint) row format delimited fields terminated by ','; load data local inpath '/opt/TVProject/UserInfo.csv' overwrite into table UserInfo; ```

    2.统计UserViewInfo表中观看时间超过半个小时的用户

    创建工程,新建Object类

    scala代码,本程序学习如何自定义udf函数建新列
    package spark
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.SparkSession
    object UserViewInfo {
      //机顶盒设备号,统计日期,频道号,频道名,收看开始时间,收看结束时间
      //10001,20170831,1,中央1台-高清,42978.85,42978.85139
      //case class UserView{TVID:String;CountDate:Long;TVNum:Int;TVName:String;
       // TVBeginTime:Double;TVEndTime:Double}
      def main(args: Array[String]): Unit = {
       val spark = SparkSession.builder().enableHiveSupport().getOrCreate()
        spark.sparkContext.setLogLevel("WARN")
       val df_ViewData=spark.read.table("tv.userviewinfo")
         def getTime(beginTimeStr:String,endTimeStr:String):Int={
          val bstr:Array[String]=beginTimeStr.split(":")
           val estr:Array[String]=endTimeStr.split(":")
           val time:Int=estr(0).toInt*60+estr(1).toInt-(bstr(0).toInt*60+bstr(1).toInt)
           return time
        }
        val getLabel_udf=udf((x:String,y:String)=>getTime(x,y))
        val result=df_ViewData.withColumn("WatchTime",getLabel_udf(col("TVBeginTime"),col("TVEndTime")))
          .filter("WatchTime>30")
        result.show(10)
        println(result.count())
        result.write.mode("overwrite").saveAsTable("tv.test1")
      }
    }
    

    调试程序碰到问题

    • 1.spark-submit一直出错,经过很久检查,发现是hive没有启动。(spark启动后一定要jps检查是否有RUNJAR进程)。同时java环境代码务必加入val spark = SparkSession.builder().enableHiveSupport().getOrCreate()
    • 2.虚拟机内存优秀,如果在xshell客户端开了spark-shell,相当于启动一个spark应用进程,一定要在master:8080页面kill掉,否则一直提示任务分配失败
    • 3.集群提交命令spark-submit --master spark://master:7077 --class spark.UserViewInfo /opt/word.jar
    • 4.csv文件导入hive表乱码问题。csv文件不是utf-8编码,可以用记事本打开,另存为时候格式改utf-8,文件再提交opt目录下,重新建hive表,load data数据,则中文显示正常。
  • 相关阅读:
    JSON入门之二:org.json的基本用法
    Eclipse下设置tomcat,修改Java代码不必重启tomcat
    maven仓库快速镜像
    ORA-06413连接未打开的错误的原因和解决方法
    .NET WIN7 64位 连接Oracle数据库
    5种方法解除开机密码
    精美的贴子挂件
    同学,同事,KTV聚会的小游戏
    QQ分组图案
    WIN7 XP 已达到计算机的连接数最大值,无法再同此远程计算机连接
  • 原文地址:https://www.cnblogs.com/linli069/p/13750071.html
Copyright © 2011-2022 走看看