zoukankan      html  css  js  c++  java
  • scala及spark配置

    1.scala在linux的配置

    上传scala-2.11.5.tgz 到opt目录下,解压缩: tar -zxvf scala-2.11.5.tgz -C /usr/local/
    
    1.cd /usr/local/scala-2.11.5
    2.vi /etc/profile
    3.最后一行编辑
    export SCALA_HOME=/usr/local/scala-2.11.5
    export PATH=$PATH:$SCALA_HOME/bin
    4.保存,执行source /etc/profile
    5.命令输入scala,则进入scala命令提示。:q退出
    

    2.spark配置

    2.1 spark的配置

    spark在hadoop上的配置
    1.上传spark-2.4.0-bin-hadoop2.6.tgz到/opt目录,并解压到/usr/local
    tar -zxf /opt/spark-2.4.0-bin-hadoop2.6.tgz -C /usr/local/
    
    2.进入/usr/local/spark-2.4.0-bin-hadoop2.6/conf
    复制slaves.template:cp slaves.template slaves
    修改slaves,先删除其中的localhost,然后添加:
    slave1
    slave2
    slave3
    
    3.修改spark-defaults.conf
    cp spark-defaults.conf.template spark-defaults.conf
    vi spark-defaults.conf
    添加:
    spark.master                     spark://master:7077
    spark.eventLog.enabled           true
    spark.eventLog.dir               hdfs://master:8020/spark-logs
    spark.history.fs.logDirectory     hdfs://master:8020/spark-logs
    
    4.修改spark-env.sh
    cp spark-env.sh.template spark-env.sh
    vi spark-env.sh
    添加:
    JAVA_HOME=/usr/java/jdk1.8.0_151
    HADOOP_CONF_DIR=/usr/local/hadoop-2.6.5/etc/hadoop
    SPARK_MASTER_IP=master
    SPARK_MASTER_PORT=7077
    SPARK_WORKER_MEMORY=512m
    SPARK_WORKER_CORES=1
    SPARK_EXECUTOR_MEMORY=512m
    SPARK_EXECUTOR_CORES=1
    SPARK_WORKER_INSTANCES=1
    
    5.启动Hadoop集群,在HDFS中新建目录:
    hdfs dfs -mkdir /spark-logs
    
    6.将Spark安装包分发到其他节点
     scp -r /usr/local/spark-2.4.0-bin-hadoop2.6/ slave1:/usr/local/
     scp -r /usr/local/spark-2.4.0-bin-hadoop2.6/ slave2:/usr/local/
     scp -r /usr/local/spark-2.4.0-bin-hadoop2.6/ slave3:/usr/local/
    
    7.在所有节点配置Spark环境变量
    vi /etc/profile
    在文件尾加入:
    export SPARK_HOME=/usr/local/spark-2.4.0-bin-hadoop2.6
    export PATH=$PATH:$SPARK_HOME/bin
    
    执行source /etc/profile使命令生效
    
    8.启动spark
    进入/usr/local/spark-2.4.0-bin-hadoop2.6/sbin
    执行
    ./start-all.sh
    9.查看客户端
    http://master:8080
    
    10.测试spark程序
    输入spark-shell,进入spark命令行
    传一个数据文件到hdfs做数据统计
    sc.textFile("/test/bb.txt").flatMap(x=>x.split(" ")).map(x=>(x,1)).reduceByKey(_+_) #对单词规约统计
    res0.collect()  #展示结果
    
    

    2.2 配置spark的java环境

    新建java工程--》新建包--》新建单例类Object
    (1)Java环境配置集群提交模式代码

    连接集群代码
    package spark
    import org.apache.spark.sql.SparkSession
    object WordCount {
      def main(args: Array[String]): Unit = {
        val spark=SparkSession.builder().appName("wordcount").getOrCreate() //本地模式
        val sc=spark.sparkContext
        val file=args(0)
        val splitter=args(1)
        val output=args(2)
        val count=sc.textFile(file)
          .flatMap(x=>x.split(splitter).map(x=>(x,1))).reduceByKey((x,y)=>x+y)
        count.coalesce(1,false).saveAsTextFile(output)
      }
    }
    
    

    (2)配置Spark开发依赖包

    配置Spark开发依赖包
    • 创建一个Scala工程,点击菜单栏中的“File”->“Project Structure”,选择“Libraries”,单击“+”按钮.

    • 选择“Java”选项->在弹出的界面选择Spark安装包(提前解压)下的“jars”文件夹(注意:事先删除该目录下的commons-compiler-3.0.9.jar)-->点击“OK”

    • 在IDEA中将程序打成jar包.选择“File”→“Project Structure”命令-->在弹出的对话框中选择“Artifacts”选项-->选择“+”下的“JAR”选项中的“Empty”

    • 在弹出的对话框中修改“Name”为自定义的JAR包的名字“word”,双击右侧栏工程下的“‘workspace’compile output”,(不选择scala,jars的jar包,选择output文件夹)它会转移到左侧,wordspace表示工程名

    • 选择菜单栏中的“Build”→“Build Artifacts”命令,在弹出的方框(右下图)中选择“word” →“build”

    • 生成Artifact后,在工程目录中会有一个/out目录,可以看到生成的JAR包,如图所示,在JAR包处单击右键,在弹出菜单中选择“Show in Explorer”命令,直接到达JAR包路径下

    • 上传jar包到/opt/目录,hdfs中准备好数据文件。

    • 集群提交命令spark-submit --master yarn --deploy-mode cluster --class spark.WordCount /opt/word.jar /sparkdata/words.txt " " /sparkdata/wd1

    3.spark SQL

    3.1 配置

    配置Spark SQL
    1.进入hive安装目录bin目录,修改hive文件
    vi hive
    将sparkAssemblyPath=`ls ${SPARK_HOME}/lib/spark-assembly-*.jar`
    修改为:
    sparkAssemblyPath=`ls ${SPARK_HOME}/jars/*.jar`
    
    2.拷贝hive-site.xml到/usr/local/spark-2.4.0-bin-hadoop2.6/conf
    cp /usr/local/apache-hive-1.2.1-bin/conf/hive-site.xml /usr/local/spark-2.4.0-bin-hadoop2.6/conf/
    scp /usr/local/apache-hive-1.2.1-bin/conf/hive-site.xml slave1:/usr/local/spark-2.4.0-bin-hadoop2.6/conf/
    scp /usr/local/apache-hive-1.2.1-bin/conf/hive-site.xml slave2:/usr/local/spark-2.4.0-bin-hadoop2.6/conf/
    scp /usr/local/apache-hive-1.2.1-bin/conf/hive-site.xml slave3:/usr/local/spark-2.4.0-bin-hadoop2.6/conf/
    
    3.拷贝MYSQL驱动到/usr/local/spark-2.4.0-bin-hadoop2.6/jars
    cp /usr/local/apache-hive-1.2.1-bin/lib/mysql-connector-java-5.1.32-bin.jar /usr/local/spark-2.4.0-bin-hadoop2.6/jars/
    scp /usr/local/spark-2.4.0-bin-hadoop2.6/jars/mysql-connector-java-5.1.32-bin.jar slave1:/usr/local/spark-2.4.0-bin-hadoop2.6/jars/
    scp /usr/local/spark-2.4.0-bin-hadoop2.6/jars/mysql-connector-java-5.1.32-bin.jar slave2:/usr/local/spark-2.4.0-bin-hadoop2.6/jars/
    scp /usr/local/spark-2.4.0-bin-hadoop2.6/jars/mysql-connector-java-5.1.32-bin.jar slave3:/usr/local/spark-2.4.0-bin-hadoop2.6/jars/
    
    4.在所有节点/usr/local/spark-2.4.0-bin-hadoop2.6/conf/spark-env.sh 文件中配置 MySQL 驱动
    SPARK_CLASSPATH=/usr/local/spark-2.4.0-bin-hadoop2.6/jars/mysql-connector-java-5.1.32-bin.jar
    
    
    5.启动 MySQL 服务
    service mysqld start
    
    6.启动 Hive 的 metastore 服务
    hive --service metastore &
    
    7.修改日志级别,在各节点:
    cp /usr/local/spark-2.4.0-bin-hadoop2.6/conf/log4j.properties.template /usr/local/spark-2.4.0-bin-hadoop2.6/conf/log4j.properties
    修改log4j.properties
    log4j.rootCategory=WARN, console
    
    8.启动spark集群
    9.访问spark-sql,输入spark-sql,后面就可以输入mysql语句。
    

    3.2 简单语法

    从虚拟机启动spark,务必先启动hadoop、mysql,hive、spark,再输入spark-shell,进入spark界面。云服务器能搭建成功还是需要云,否则每次开电脑,都这些操作,确实麻烦。

    • 创建DataFrame
    从hive表导入数据到DataFrame
    val df_hive=spark.read.table("taitan.people")   //前提hive表有这个数据,生成DataFrame格式数据
    val df_hive2=spark.sql("select * from taitan.people") //利用spark的sql语句查询,比hive的sql查询快,hive是基于mapreduce
    df_hive2.show()
    
    //从外部文件导入到DataFrame
    读取csv:
    val df_csv=spark.read.option("header","true").option("seq",";").csv("/user/root/sparksqldata/people.csv")
    //设置 表头进文件,分隔符
    
    
    • DataFrame数据导入导出
    1.导出dataframe数据到外部文件
    df_data.repartition(1).write.mode("overwrite").option("header","true").csv("/sparkdata/df_person")
    // repartition,设置分区1,数据整合一起,
    
    2.导出DataFrame数据到hive表
    df_data.write.mode("overwrite").saveAsTable("taitan.score") 
    
    
    • 创建DataSet
    1.建样例类。case class Goods(ID:String,Goods:String)
    
    2.创建Dataset
    val df_goods=spark.read.table("taitan.goods")
    val df_order=spark.read.option("header","true").option("sep",",").csv("/sparkdata/GoodsOrder.csv")
    val ds_goods=df_order.as[Goods] //从DataFrame转ds
    val rdd=sc.textFile("/sparkdata/person.txt").map(x=>x.split(",")).map(x=>People(x(0).toInt,x(1),x(2).toInt))
    //从外部文件创建DataSet,People为事先建好的样例类
    3.保存DataSet
     val shop_count=ds_goods.groupBy("Goods").agg(count(lit(1)) as "shopcount") //统计结果为DataFrame
    //保存DataFrame到hive表
    shop_count.write.mode("overwrite").saveAsTable("taitan.shopcount") 
    

    4.SparkMLib案例

    ALS电影推荐模型

    4.1电影推荐数据预处理

    object DataProcess {  //建Object类
      case class Rating(uid:Int,mid:Int,rating:Double)//要在main方法外面定义样例类
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().appName("als model data").enableHiveSupport().getOrCreate()
    
        val sc = spark.sparkContext
        //获取外部传入的参数
        val input = args(0) //输入路径
        val splitter = args(1) //文本分隔符
        val trainTable =args(2) //训练数据输出表表名
        val testTable = args(3) // 测试数据表
    
    
        import spark.implicits._
        val data = sc.textFile(input).map(x=>x.split(splitter)).map(x=>Rating(x(0).trim.toInt,x(1).trim.toInt,x(2).trim.toDouble)).toDF()
    
        val Array(train,test) = data.randomSplit(Array(0.8,0.2))
        train.write.mode("overwrite").saveAsTable(trainTable)
        test.write.mode("overwrite").saveAsTable(testTable)
      }
    }
    

    4.2ALS模型构建

    object ALSModelCreate {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().enableHiveSupport().getOrCreate()
        val sc = spark.sparkContext
        //获取参数
        val inputTable = args(0) //训练数据表,保存在hive中
        val rank = args(1).toInt //矩阵的秩
        val reg = args(2).toDouble //正则化参数
        val uidCol = args(3) //用户列
        val midCol = args(4)  //电影列
        val ratingCol = args(5) //评分列
        val modelPath = args(6) //模型保存的路径
    
        val data = spark.read.table(inputTable)
        //定义ALS模型
        val als = new ALS()
          .setMaxIter(5)
          .setRegParam(reg)
            .setRank(rank)
          .setUserCol(uidCol)
          .setItemCol(midCol)
          .setRatingCol(ratingCol)
        //训练模型
        val model = als.fit(data)
        //保存模型
        model.save(modelPath)
      }
    }
    

    4.3ALS预测与评估

    object ALSModelEvaluator {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().enableHiveSupport().getOrCreate()
        val sc = spark.sparkContext
    
        val modelPath = args(0) //模型路径
        val testTable = args(1) //测试表
        val testResult = args(2) //测试结果保存路径
    
        val model =ALSModel.load(modelPath) //加载模型
        val testdata = spark.read.table(testTable) //读取测试数据
    
        val predictions = model.transform(testdata) //预测过程
        val evaluator = new RegressionEvaluator() //评估预测结果
          .setMetricName("rmse")
          .setLabelCol("rating")
          .setPredictionCol("prediction")
        val rmse = evaluator.evaluate(predictions)
    
        sc.parallelize(List(rmse)).repartition(1).saveAsTextFile(testResult) //将结果保存到HDFS
      }
    }
    
    

    4.4 ALS模型推荐

    object ALSModelPredict {
      case class users(uid:Int)
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().enableHiveSupport().getOrCreate()
    
        val sc = spark.sparkContext
        val modelPath = args(0) //模型路径
        val recommendTable=args(1) //推荐结果表存hive表
        val recommendNum=args(2).toInt //推荐个数
        val userList = args(3).split(",").toList //需要预测的用户组
    
        val model =ALSModel.load(modelPath) //加载模型
    
    
        import spark.implicits._
        val user_df = sc.parallelize(userList).map(x=>users(x.toInt)).toDF()
        val userSubsetRecs = model.recommendForUserSubset(user_df, recommendNum)
        userSubsetRecs.write.mode("overwrite").saveAsTable(recommendTable)
      }
    

    4.5 模型提交集群

    先打包成alsModel.jar,然后提交到/opt/目录下,执行集群提交命令
    spark-submit --.....--class 相应类名 每个类需要的路径参数

    出现问题

    (1). spark集群提交出现exitcode 13问题
    网上说是程序配置中是单机模式引起问题,但是检查代码val spark=SparkSession.builder().appName("wordcount").getOrCreate(),这个是正确的。后面改提交spark集群,提示类找不到问题,决定重建类。重建工程--》建包--》建Object class,然后重新配置编译包,即jar,再次提交集群,成功。

  • 相关阅读:
    三角函数都快忘光了
    Windows 10 LTSC 2019(1809) WSL 安装 CentOS 7
    随手写了个京东发票助手
    ASP 封装基本身份认证( HTTP Basic Authenticate)辅助类
    WebBrowser中打开新页面
    将QT窗口嵌入到WinForm窗口
    [摘录]如何按需前端显示指定的窗口
    实现TabControl 选项卡首个标签缩进的方法
    玩转时间操作
    Java 并发包中的高级同步工具
  • 原文地址:https://www.cnblogs.com/linli069/p/13689590.html
Copyright © 2011-2022 走看看