zoukankan      html  css  js  c++  java
  • 基于Spark的电影推荐系统(推荐系统~2)

    第四部分-推荐系统-数据ETL

    • 本模块完成数据清洗,并将清洗后的数据load到Hive数据表里面去

    前置准备:

    spark +hive

    vim $SPARK_HOME/conf/hive-site.xml 
    	<?xml version="1.0"?>
    		<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
    	<configuration>
    	<property>
    			<name>hive.metastore.uris</name>
    			<value>thrift://hadoop001:9083</value>
    	</property>
    	</configuration>  
    
    • 启动Hive metastore server

    [root@hadoop001 conf]# nohup hive --service metastore &

    [root@hadoop001 conf]# netstat -tanp | grep 9083
    tcp 0 0 0.0.0.0:9083 0.0.0.0:* LISTEN 24787/java
    [root@hadoop001 conf]#

    测试:
    [root@hadoop001 ~]# spark-shell --master local[2]

    scala> spark.sql("select * from liuge_db.dept").show;
    +------+-------+-----+                                                          
    |deptno|  dname|  loc|
    +------+-------+-----+
    |     1|  caiwu| 3lou|
    |     2|  renli| 4lou|
    |     3|  kaifa| 5lou|
    |     4|qiantai| 1lou|
    |     5|lingdao|4 lou|
    +------+-------+-----+
    

    ==》保证Spark SQL 能够访问到Hive 的元数据才行。

    然而我们采用的是standalone模式:需要启动master worker
    [root@hadoop001 sbin]# pwd
    /root/app/spark-2.4.3-bin-2.6.0-cdh5.7.0/sbin
    [root@hadoop001 sbin]# ./start-all.sh

    [root@hadoop001 sbin]# jps
    26023 Master
    26445 Worker

    Spark常用端口

    8080	spark.master.ui.port	Master WebUI
    8081	spark.worker.ui.port	Worker WebUI
    18080	spark.history.ui.port	History server WebUI
    7077	SPARK_MASTER_PORT	    Master port
    6066	spark.master.rest.port	Master REST port
    4040	spark.ui.port	        Driver WebUI
    

    这个时候打开:http://hadoop001:8080/

    在这里插入图片描述

    开始项目Coding

    IDEA+Scala+Maven进行项目的构建

    步骤一: 新建scala项目后,可以参照如下pom进行配置修改

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <groupId>com.csylh</groupId>
      <artifactId>movie-recommend</artifactId>
      <version>1.0</version>
      <inceptionYear>2008</inceptionYear>
      <properties>
        <scala.version>2.11.8</scala.version>
        <spark.version>2.4.3</spark.version>
      </properties>
    
      <repositories>
        <repository>
          <id>scala-tools.org</id>
          <name>Scala-Tools Maven2 Repository</name>
          <url>http://scala-tools.org/repo-releases</url>
        </repository>
      </repositories>
    
      <dependencies>
    
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_2.11</artifactId>
          <version>${spark.version}</version>
        </dependency>
    
        <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-client</artifactId>
          <version>2.6.0</version>
        </dependency>
    
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-sql_2.11</artifactId>
          <version>${spark.version}</version>
        </dependency>
    
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-hive_2.11</artifactId>
          <version>${spark.version}</version>
        </dependency>
    
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-mllib_2.11</artifactId>
          <version>${spark.version}</version>
        </dependency>
    
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming_2.11</artifactId>
          <version>${spark.version}</version>
        </dependency>
    
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
          <version>${spark.version}</version>
        </dependency>
    
        <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-clients</artifactId>
          <version>1.1.1</version>
        </dependency>
        <!--// 0.10.2.1-->
    
        <dependency>
          <groupId>mysql</groupId>
          <artifactId>mysql-connector-java</artifactId>
          <version>5.1.39</version>
        </dependency>
    
        <dependency>
          <groupId>log4j</groupId>
          <artifactId>log4j</artifactId>
          <version>1.2.17</version>
        </dependency>
    
      </dependencies>
    
      <build>
        <!--<sourceDirectory>src/main/scala</sourceDirectory>-->
        <!--<testSourceDirectory>src/test/scala</testSourceDirectory>-->
        <plugins>
          <plugin>
            <!-- see http://davidb.github.com/scala-maven-plugin -->
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.1.3</version>
            <executions>
              <execution>
                <goals>
                  <goal>compile</goal>
                  <goal>testCompile</goal>
                </goals>
                <configuration>
                  <args>
                    <arg>-dependencyfile</arg>
                    <arg>${project.build.directory}/.scala_dependencies</arg>
                  </args>
                </configuration>
              </execution>
            </executions>
          </plugin>
          <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-surefire-plugin</artifactId>
            <version>2.13</version>
            <configuration>
              <useFile>false</useFile>
              <disableXmlReport>true</disableXmlReport>
              <!-- If you have classpath issue like NoDefClassError,... -->
              <!-- useManifestOnlyJar>false</useManifestOnlyJar -->
              <includes>
                <include>**/*Test.*</include>
                <include>**/*Suite.*</include>
              </includes>
            </configuration>
          </plugin>
        </plugins>
      </build>
    </project>
    
    

    步骤二:新建com.csylh.recommend.dataclearer.SourceDataETLApp

    import com.csylh.recommend.entity.{Links, Movies, Ratings, Tags}
    import org.apache.spark.sql.{SaveMode, SparkSession}
    
    /**
      * Description:
      *    hadoop001  file:///root/data/ml/ml-latest 下的文件
      *    ====>  SparkSQL ETL
      *    ===>  load data to Hive数据仓库
      *
      * @Author: 留歌36
      * @Date: 2019-07-12 13:48
      */
    object SourceDataETLApp{
      def main(args: Array[String]): Unit = {
        // 面向SparkSession编程
        val spark = SparkSession.builder()
    //          .master("local[2]")
          .enableHiveSupport() //开启访问Hive数据, 要将hive-site.xml等文件放入Spark的conf路径
          .getOrCreate()
    
        val sc = spark.sparkContext
    
        // 设置RDD的partitions 的数量一般以集群分配给应用的CPU核数的整数倍为宜, 4核8G ,设置为8就可以
        // 问题一:为什么设置为CPU核心数的整数倍?
        // 问题二:数据倾斜,拿到数据大的partitions的处理,会消耗大量的时间,因此做数据预处理的时候,需要考量会不会发生数据倾斜
        val minPartitions = 8
    
        //  在生产环境中一定要注意设置spark.sql.shuffle.partitions,默认是200,及需要配置分区的数量
        val shuffleMinPartitions = "8"
        spark.sqlContext.setConf("spark.sql.shuffle.partitions",shuffleMinPartitions)
        /**
          * 1
          */
        import spark.implicits._
        val links = sc.textFile("file:///root/data/ml/ml-latest/links.txt",minPartitions) //DRIVER
          .filter(!_.endsWith(",")) //EXRCUTER
          .map(_.split(",")) //EXRCUTER
          .map(x => Links(x(0).trim.toInt, x(1).trim.toInt, x(2).trim.toInt)) //EXRCUTER
          .toDF()
        println("===============links===================:",links.count())
        links.show()
    
        // 把数据写入到HDFS上
        links.write.mode(SaveMode.Overwrite).parquet("/tmp/links")
    
        // 将数据从HDFS加载到Hive数据仓库中去
        spark.sql("drop table if exists links")
        spark.sql("create table if not exists links(movieId int,imdbId int,tmdbId int) stored as parquet")
        spark.sql("load data inpath '/tmp/links' overwrite into table links")
    
        /**
          * 2
          */
        val movies = sc.textFile("file:///root/data/ml/ml-latest/movies.txt",minPartitions)
          .filter(!_.endsWith(","))
          .map(_.split(","))
          .map(x => Movies(x(0).trim.toInt, x(1).trim.toString, x(2).trim.toString))
          .toDF()
        println("===============movies===================:",movies.count())
        movies.show()
    
        // 把数据写入到HDFS上
        movies.write.mode(SaveMode.Overwrite).parquet("/tmp/movies")
    
        // 将数据从HDFS加载到Hive数据仓库中去
        spark.sql("drop table if exists movies")
        spark.sql("create table if not exists movies(movieId int,title String,genres String) stored as parquet")
        spark.sql("load data inpath '/tmp/movies' overwrite into table movies")
    
        /**
          * 3
          */
        val ratings = sc.textFile("file:///root/data/ml/ml-latest/ratings.txt",minPartitions)
          .filter(!_.endsWith(","))
          .map(_.split(","))
          .map(x => Ratings(x(0).trim.toInt, x(1).trim.toInt, x(2).trim.toDouble, x(3).trim.toInt))
          .toDF()
        println("===============ratings===================:",ratings.count())
    
        ratings.show()
    
        // 把数据写入到HDFS上
        ratings.write.mode(SaveMode.Overwrite).parquet("/tmp/ratings")
    
        // 将数据从HDFS加载到Hive数据仓库中去
        spark.sql("drop table if exists ratings")
        spark.sql("create table if not exists ratings(userId int,movieId int,rating Double,timestamp int) stored as parquet")
        spark.sql("load data inpath '/tmp/ratings' overwrite into table ratings")
    
        /**
          * 4
          */
        val tags = sc.textFile("file:///root/data/ml/ml-latest/tags.txt",minPartitions)
          .filter(!_.endsWith(","))
          .map(x => rebuild(x))  // 注意这个坑的解决思路
          .map(_.split(","))
          .map(x => Tags(x(0).trim.toInt, x(1).trim.toInt, x(2).trim.toString, x(3).trim.toInt))
          .toDF()
    
        tags.show()
    
        // 把数据写入到HDFS上
        tags.write.mode(SaveMode.Overwrite).parquet("/tmp/tags")
    
        // 将数据从HDFS加载到Hive数据仓库中去
        spark.sql("drop table if exists tags")
        spark.sql("create table if not exists tags(userId int,movieId int,tag String,timestamp int) stored as parquet")
        spark.sql("load data inpath '/tmp/tags' overwrite into table tags")
      }
      /**
        * 该方法是用于处理不符合规范的数据
        * @param input
        * @return
        */
      private def rebuild(input:String): String ={
        val a = input.split(",")
    
        val head = a.take(2).mkString(",")
        val tail = a.takeRight(1).mkString
        val tag = a.drop(2).dropRight(1).mkString.replaceAll(""","")
        val output = head + "," + tag + "," + tail
        output
      }
    }
    

    再有一些上面主类引用到的case 对象,你可以理解为Java 实体类

    package com.csylh.recommend.entity
    
    /**
      * Description: 数据的schema
      *
      * @Author: 留歌36
      * @Date: 2019-07-12 13:46
      */
    case class Links(movieId:Int,imdbId:Int,tmdbId:Int)
    
    package com.csylh.recommend.entity
    
    /**
      * Description: TODO
      *
      * @Author: 留歌36
      * @Date: 2019-07-12 14:09
      */
    case class Movies(movieId:Int,title:String,genres:String)
    
    
    package com.csylh.recommend.entity
    
    /**
      * Description: TODO
      *
      * @Author: 留歌36
      * @Date: 2019-07-12 14:10
      */
    case class Ratings(userId:Int,movieId:Int,rating:Double,timestamp:Int)
    
    
    package com.csylh.recommend.entity
    
    /**
      * Description: TODO
      *
      * @Author: 留歌36
      * @Date: 2019-07-12 14:11
      */
    case class Tags(userId:Int,movieId:Int,tag:String,timestamp:Int)
    
    

    步骤三:将创建的项目进行打包上传到服务器
    mvn clean package -Dmaven.test.skip=true

    [root@hadoop001 ml]# ll -h movie-recommend-1.0.jar 
    -rw-r--r--. 1 root root 156K 10月 20 13:56 movie-recommend-1.0.jar
    [root@hadoop001 ml]# 
    

    步骤四:提交运行上面的jar,编写shell脚本

    [root@hadoop001 ml]# vim etl.sh
    export HADOOP_CONF_DIR=/root/app/hadoop-2.6.0-cdh5.7.0/etc/hadoop

    $SPARK_HOME/bin/spark-submit
    --class com.csylh.recommend.dataclearer.SourceDataETLApp
    --master spark://hadoop001:7077
    --name SourceDataETLApp
    --driver-memory 10g
    --executor-memory 5g
    /root/data/ml/movie-recommend-1.0.jar

    步骤五:sh etl.sh 即可

    先把数据写入到HDFS上
    创建Hive表
    load 数据到表

    sh etl.sh之前:

    [root@hadoop001 ml]# hadoop fs -ls /tmp
    19/10/20 19:26:58 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Found 2 items
    drwx------   - root supergroup          0 2019-04-01 16:27 /tmp/hadoop-yarn
    drwx-wx-wx   - root supergroup          0 2019-04-02 09:33 /tmp/hive
    
    [root@hadoop001 ml]# hadoop fs -ls /user/hive/warehouse
    19/10/20 19:27:03 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    [root@hadoop001 ml]#
    

    sh etl.sh之后:
    这里的shell 是 ,spark on standalone,后面会spark on yarn。其实也没差,都可以

    [root@hadoop001 ~]# hadoop fs -ls /tmp
    19/10/20 19:43:17 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Found 6 items
    drwx------   - root supergroup          0 2019-04-01 16:27 /tmp/hadoop-yarn
    drwx-wx-wx   - root supergroup          0 2019-04-02 09:33 /tmp/hive
    drwxr-xr-x   - root supergroup          0 2019-10-20 19:42 /tmp/links
    drwxr-xr-x   - root supergroup          0 2019-10-20 19:42 /tmp/movies
    drwxr-xr-x   - root supergroup          0 2019-10-20 19:43 /tmp/ratings
    drwxr-xr-x   - root supergroup          0 2019-10-20 19:43 /tmp/tags
    [root@hadoop001 ~]# hadoop fs -ls /user/hive/warehouse
    19/10/20 19:43:32 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Found 4 items
    drwxr-xr-x   - root supergroup          0 2019-10-20 19:42 /user/hive/warehouse/links
    drwxr-xr-x   - root supergroup          0 2019-10-20 19:42 /user/hive/warehouse/movies
    drwxr-xr-x   - root supergroup          0 2019-10-20 19:43 /user/hive/warehouse/ratings
    drwxr-xr-x   - root supergroup          0 2019-10-20 19:43 /user/hive/warehouse/tags
    [root@hadoop001 ~]# 
    

    这样我们就把数据etl到我们的数据仓库里了,接下来,基于这份基础数据做数据加工

    有任何问题,欢迎留言一起交流~~
    更多文章:基于Spark的电影推荐系统:https://blog.csdn.net/liuge36/column/info/29285

  • 相关阅读:
    多重平行中介(Mplus)
    小米手机,发短信出现闪退
    宇宙是有边还是没边?
    如何查一篇文章的引用文章
    卡方检验
    函数的形参与实参(二维数组)
    输出矩阵四周的数字的平均数(C)
    关于amos 的自由度
    Sql server case when then
    Sql Server中两个表之间数据备份和导入
  • 原文地址:https://www.cnblogs.com/liuge36/p/11713148.html
Copyright © 2011-2022 走看看