zoukankan      html  css  js  c++  java
  • spark连接mysql数据库的几种方式

    一、spark连接mysql数据库的第一种方式:

    
    
    def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder().master("local").appName("createdataframefrommysql")
    .config("spark.sql.shuffle.partitions", 1).getOrCreate()

    /**
    * 读取mysql的第一中方式
    *
    */

    val properties = new Properties()
    properties.setProperty("user","root")
    properties.setProperty("password","123")
    val person: DataFrame = spark.read.jdbc("jdbc:mysql://192.168.126.111:3306/spark","person",properties)

    person.show()
    spark.read.jdbc("jdbc:mysql://192.168.126.111:3306/spark","(select person.id,person.name,person.age,score.score from person,score where person.id = score.id) T",properties).show()


    二、第二种读取mysql数据的方式 

    val map: Map[String, String] = Map[String, String](
          elems = "url" -> "jdbc:mysql://192.168.126.111:3306/spark",
          "driver" -> "com.mysql.jdbc.Driver",
          "user" -> "root",
          "password" -> "123",
          "dbtable" -> "score"
    
        )
    
        val score: DataFrame = spark.read.format("jdbc").options(map).load
    
        score.show()

    三、第三种读取mysql 的方式

      val reader: DataFrameReader = spark.read.format("jdbc")
          .option("url", "jdbc:mysql://192.168.126.111:3306/spark")
          .option("driver", "com.mysql.jdbc.Driver")
          .option("user", "root")
          .option("password", "123")
          .option("dbtable", "score")
    
        val source2: DataFrame = reader.load()
    
        source2.show()

    四、将spark中的数据传输到mysql数据库

    //将以上两张表注册为临时表,进行关联查询
        person.createOrReplaceTempView("person")
        score2.createOrReplaceTempView("score")
    
        val result = spark.sql("select person.id,person.name,person.age,score.score from person,score  where person.id=score.id ")
    
        result.show()
    
        //将查询出的结果保存到mysql表之中
    
        result.write.mode(SaveMode.Append).jdbc("jdbc:mysql://192.168.126.111:3306/spark","result",properties)

    一个重要的参数: 

    参数:  spark.sql.shuffle.partitions指定sql执行时,解析成sparkjob的分区数。

    spark-sql将hive中的数据加载成为Dataframe

    通过配置让spark找到hive所在的位置:

    1、启动hive hive --service metastore &

    2、将mynode3节点hive的配置文件发送到spark的配置节点  scp  ./hive-site-xml  mynode4:/software/spark-2.3.1/conf/

    3、修改hive-site-xml中的配置参数   将其与的配置都删了,只保留

    <configuration>
     <property>
      <name>hive.metastore.uris</name>
      <value>thrift://mynode1:9083</value>
     </property>
    </configuration>
    配置这个文件的作用:让spark可以找到hive中的元数据 ,找到元数据也就找到了hive

    ------------------------------------------------------------------------------------------------------------------------

    用spark-sql查询hive中的数据:

    1、启动hadoop   2、启动hive  3、启动spark   /software/spark-2.3.1/sbin/   ./start-all.sh  

    ./spark-shell --master spark://mynode1:7077,mynode2:7077   --通过这个命令启动spark的服务 
    spark.sql("show databases").show()


    使用MR和spaek sql 测试对同一批数据的查询速度 

    spark代码在本地运行的时候,没有SparkSession.master()属性的设置,运行一定会报错 

    spark-sql读取hive中的数据:

     val spark = SparkSession.builder().appName("CreateDataFrameFromHive").enableHiveSupport().getOrCreate()
        spark.sql("use spark")
        spark.sql("drop table if exists student_infos")
        spark.sql("create table if not exists student_infos (name string,age int) row format delimited fields terminated by '	'")
        spark.sql("load data local inpath '/root/test/student_infos' into table student_infos")
        spark.sql("drop table if exists student_scores")
        spark.sql("create table if not exists student_scores (name string,age int) row format delimited fields terminated by '	'")
        spark.sql("load data local inpath'/root/test/student_scores' into table student_scores")
        val frame: DataFrame = spark.table("student_infos")
    
        frame.show()
    
        val df: DataFrame = spark.sql("select si.name,si.age,ss.score from student_infos si,student_scores ss where si.name = ss.name")
        df.show()
    
        spark.sql("drop table if exists good_student_infos")
    
        /**
          * 将结果保存到hive 表之中
          *
          */
        df.write.mode(SaveMode.Overwrite).saveAsTable("good_student_infos")

     用maven将spark读取hive中的数据进行打包,首先 clear 之前项目中的tarfget文件就会消失,之后package 对数据进行打包 

    将打包完成的jar包上传到linux从服务器,

    用一下命令读取hive中的数据,并在hive中完成创建表或者删除一张表 

    在spark bin 目录下

    ./spark-submit --master spark://mynode1:7077,mynode2:7077 --calss 报名.类名  jar在linux服务器中所在的位置

    UDF : 用户自定义函数 

    使用UDF是一对一的关系,读取一条数据处理得到一条数据

    注册UDF: spark.udf.register("udf name ",function)

    使用UDF:  sparkSession.sql("select xx,udf Name from tableName ....")

    实例代码:

     val spark: SparkSession = SparkSession.builder().master("local").appName("test").getOrCreate()
    
        val nameList: List[String] = List[String]("zhangsan", "lisi", "wangwu", "zhouyu", "lili")
    
        import spark.implicits._
    
        val nameDF: DataFrame = nameList.toDF("name")
        nameDF.createOrReplaceTempView("students")
        nameDF.show()
    
        /**
          * 注册并自定义函数
          *
          */
        spark.udf.register("STRLEN",(name:String)=>
          {name.length}
        )
    
       spark.sql("select name,STRLEN(name) as length from  students order by length desc").show(100)

    UDAF: 用户自定义聚合函数 

           主要是引用一个继承了UserDefinedAggregateFunction 类的类 

           继承这个类需要实现八个方法 ,以及每个方法所实现的作用 

           initialize :  1、在Map端每个RDD分区内,按照group by 的字段分组,每个分组都有初始化的值
                             2、在reduce 端给每个group by 的分组做初始值

           update  : 每个组,有新的值进来的时候,进行分组对应的聚合值的计算 

          merge : 在reduce阶段,有新的数据进来的时候,对该数据进行聚合

          bufferSchema: 聚合操作的时候,所处理数据的类型

         dataType : //最终函数返回值得数据类型 

         deterministic: 多次运行相同的输入总是有相同的输出

          evaluate最后返回一个最终的聚合值要和dataType的类型一致   

        UDAF:用户自定义聚合函数 的主要作用就是可以实现自己的聚合操作的具体内容的控制,具体实现需要按照业务的不同需求,去重新定义继承类的八个方法

        开窗函数 : 

       

      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().appName("over").enableHiveSupport().getOrCreate()
        spark.sql("use spark")
        spark.sql("create table if not exists sales (riqi string,leibie string,jine Int) " + "row format delimited fields terminated by '	'")
        spark.sql("load data local inpath '/root/test/sales' into table sales")
    
        /**
          * rank 在每个组内从1开始
          *   5 A 200   --- 1
          *   3 A 100   ---2
          *   4 A 80   ---3
          *   7 A 60   ---4
          *
          *   1 B 100   ---1
          *   8 B 90  ---2
          *   6 B 80  ---3
          *   1 B 70  ---4
          */
        val result = spark.sql(
          "select"
            +" riqi,leibie,jine "
            + "from ("
            + "select "
            +"riqi,leibie,jine,row_number() over (partition by leibie order by jine desc) rank "
            + "from sales) t "
            + "where t.rank<=3")
        result.write.mode(SaveMode.Append).saveAsTable("salesResult")
        result.show(100)

     

    用java语言实现读取mysql中的数据:

    SparkConf conf =  new SparkConf();
            conf.setMaster("local").setAppName("mysql");
            conf.set("spark.sql.shuffle.partitions","1");
            JavaSparkContext sc = new JavaSparkContext(conf);
            SQLContext sqlContext = new SQLContext(sc);
            Map<String,String> map = new HashMap<String,String>();
            map.put("driver","com.mysql.jdbc.Driver");
            map.put("url","jdbc:mysql://192.168.126.111:3306/spark");
            map.put("user","root");
            map.put("password","123");
            map.put("dbtable","person");
    
            Dataset<Row> df = sqlContext.read().options(map).format("jdbc").load();
            //df.show();
            df.registerTempTable("person1");
    
            /**
             * 第二種连接JDBC的方式
             *
             */
    
            DataFrameReader read = sqlContext.read();
            read.option("driver","com.mysql.jdbc.Driver");
            read.option("url","jdbc:mysql://192.168.126.111:3306/spark");
            read.option("user","root");
            read.option("password","123");
            read.option("dbtable","score");
            Dataset<Row> jdbc = read.format("jdbc").load();
            jdbc.registerTempTable("score1");
            Dataset<Row> result = sqlContext.sql("select person1.id,person1.name,person1.age,score1.score from person1 join  score1 on  person1.name = score1.name ");
            result.show();
            Properties prop = new Properties();
            prop.put("user","root");
            prop.put("password","123");
    
            result.write().mode(SaveMode.Overwrite).jdbc("jdbc:mysql://192.168.126.111:3306/spark","result1",prop);
            //jdbc.show();
            sc.stop();

    用Java言语实现读取hive 中的数据 :

    SparkConf conf = new SparkConf();
            conf.setAppName("hive");
            JavaSparkContext sc = new JavaSparkContext(conf);
            SQLContext hc = new SQLContext(sc);
            //创建表并加载数据
            hc.sql("use spark");
            hc.sql("create table student_info(name string,age int) row format delimited fields terminated by ','");
            hc.sql("load data local inpath '/root/data/student_infos' into table student_info");
            
            hc.sql("create table student_scores(name string,score int) row format delimited fields terminated by ','");
            hc.sql("load data local inpath '/root/data/student_scores' into table student_score");
            //得到表连接结果 
            Dataset<Row> sql = hc.sql("select t1.name,t1.age,t2.score from student_info t1 join student_score t2 on t1.name = t2.name");
            //将结果写回到hive 
            sql.write().mode(SaveMode.Overwrite).saveAsTable("student_result");

     

      

  • 相关阅读:
    CodeForces 510C Fox And Names (拓扑排序)
    Codeforces 1153D Serval and Rooted Tree (简单树形DP)
    HDU 6437 Problem L.Videos (最大费用)【费用流】
    Luogu P3381 (模板题) 最小费用最大流
    Codeforces 741B Arpa's weak amphitheater and Mehrdad's valuable Hoses (并查集+分组背包)
    Codeforces 1144F Graph Without Long Directed Paths (DFS染色+构造)
    HDU 2204 Eddy's 爱好 (容斥原理)
    Codeforces 939E Maximize! (三分 || 尺取)
    Codeforces 938D. Buy a Ticket (最短路+建图)
    CodeForces 959E Mahmoud and Ehab and the xor-MST (MST+找规律)
  • 原文地址:https://www.cnblogs.com/wcgstudy/p/10984550.html
Copyright © 2011-2022 走看看