zoukankan      html  css  js  c++  java
  • spark连接mysql数据库的几种方式

    一、spark连接mysql数据库的第一种方式:

    
    
    def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder().master("local").appName("createdataframefrommysql")
    .config("spark.sql.shuffle.partitions", 1).getOrCreate()

    /**
    * 读取mysql的第一中方式
    *
    */

    val properties = new Properties()
    properties.setProperty("user","root")
    properties.setProperty("password","123")
    val person: DataFrame = spark.read.jdbc("jdbc:mysql://192.168.126.111:3306/spark","person",properties)

    person.show()
    spark.read.jdbc("jdbc:mysql://192.168.126.111:3306/spark","(select person.id,person.name,person.age,score.score from person,score where person.id = score.id) T",properties).show()


    二、第二种读取mysql数据的方式 

    val map: Map[String, String] = Map[String, String](
          elems = "url" -> "jdbc:mysql://192.168.126.111:3306/spark",
          "driver" -> "com.mysql.jdbc.Driver",
          "user" -> "root",
          "password" -> "123",
          "dbtable" -> "score"
    
        )
    
        val score: DataFrame = spark.read.format("jdbc").options(map).load
    
        score.show()

    三、第三种读取mysql 的方式

      val reader: DataFrameReader = spark.read.format("jdbc")
          .option("url", "jdbc:mysql://192.168.126.111:3306/spark")
          .option("driver", "com.mysql.jdbc.Driver")
          .option("user", "root")
          .option("password", "123")
          .option("dbtable", "score")
    
        val source2: DataFrame = reader.load()
    
        source2.show()

    四、将spark中的数据传输到mysql数据库

    //将以上两张表注册为临时表,进行关联查询
        person.createOrReplaceTempView("person")
        score2.createOrReplaceTempView("score")
    
        val result = spark.sql("select person.id,person.name,person.age,score.score from person,score  where person.id=score.id ")
    
        result.show()
    
        //将查询出的结果保存到mysql表之中
    
        result.write.mode(SaveMode.Append).jdbc("jdbc:mysql://192.168.126.111:3306/spark","result",properties)

    一个重要的参数: 

    参数:  spark.sql.shuffle.partitions指定sql执行时,解析成sparkjob的分区数。

    spark-sql将hive中的数据加载成为Dataframe

    通过配置让spark找到hive所在的位置:

    1、启动hive hive --service metastore &

    2、将mynode3节点hive的配置文件发送到spark的配置节点  scp  ./hive-site-xml  mynode4:/software/spark-2.3.1/conf/

    3、修改hive-site-xml中的配置参数   将其与的配置都删了,只保留

    <configuration>
     <property>
      <name>hive.metastore.uris</name>
      <value>thrift://mynode1:9083</value>
     </property>
    </configuration>
    配置这个文件的作用:让spark可以找到hive中的元数据 ,找到元数据也就找到了hive

    ------------------------------------------------------------------------------------------------------------------------

    用spark-sql查询hive中的数据:

    1、启动hadoop   2、启动hive  3、启动spark   /software/spark-2.3.1/sbin/   ./start-all.sh  

    ./spark-shell --master spark://mynode1:7077,mynode2:7077   --通过这个命令启动spark的服务 
    spark.sql("show databases").show()


    使用MR和spaek sql 测试对同一批数据的查询速度 

    spark代码在本地运行的时候,没有SparkSession.master()属性的设置,运行一定会报错 

    spark-sql读取hive中的数据:

     val spark = SparkSession.builder().appName("CreateDataFrameFromHive").enableHiveSupport().getOrCreate()
        spark.sql("use spark")
        spark.sql("drop table if exists student_infos")
        spark.sql("create table if not exists student_infos (name string,age int) row format delimited fields terminated by '	'")
        spark.sql("load data local inpath '/root/test/student_infos' into table student_infos")
        spark.sql("drop table if exists student_scores")
        spark.sql("create table if not exists student_scores (name string,age int) row format delimited fields terminated by '	'")
        spark.sql("load data local inpath'/root/test/student_scores' into table student_scores")
        val frame: DataFrame = spark.table("student_infos")
    
        frame.show()
    
        val df: DataFrame = spark.sql("select si.name,si.age,ss.score from student_infos si,student_scores ss where si.name = ss.name")
        df.show()
    
        spark.sql("drop table if exists good_student_infos")
    
        /**
          * 将结果保存到hive 表之中
          *
          */
        df.write.mode(SaveMode.Overwrite).saveAsTable("good_student_infos")

     用maven将spark读取hive中的数据进行打包,首先 clear 之前项目中的tarfget文件就会消失,之后package 对数据进行打包 

    将打包完成的jar包上传到linux从服务器,

    用一下命令读取hive中的数据,并在hive中完成创建表或者删除一张表 

    在spark bin 目录下

    ./spark-submit --master spark://mynode1:7077,mynode2:7077 --calss 报名.类名  jar在linux服务器中所在的位置

    UDF : 用户自定义函数 

    使用UDF是一对一的关系,读取一条数据处理得到一条数据

    注册UDF: spark.udf.register("udf name ",function)

    使用UDF:  sparkSession.sql("select xx,udf Name from tableName ....")

    实例代码:

     val spark: SparkSession = SparkSession.builder().master("local").appName("test").getOrCreate()
    
        val nameList: List[String] = List[String]("zhangsan", "lisi", "wangwu", "zhouyu", "lili")
    
        import spark.implicits._
    
        val nameDF: DataFrame = nameList.toDF("name")
        nameDF.createOrReplaceTempView("students")
        nameDF.show()
    
        /**
          * 注册并自定义函数
          *
          */
        spark.udf.register("STRLEN",(name:String)=>
          {name.length}
        )
    
       spark.sql("select name,STRLEN(name) as length from  students order by length desc").show(100)

    UDAF: 用户自定义聚合函数 

           主要是引用一个继承了UserDefinedAggregateFunction 类的类 

           继承这个类需要实现八个方法 ,以及每个方法所实现的作用 

           initialize :  1、在Map端每个RDD分区内,按照group by 的字段分组,每个分组都有初始化的值
                             2、在reduce 端给每个group by 的分组做初始值

           update  : 每个组,有新的值进来的时候,进行分组对应的聚合值的计算 

          merge : 在reduce阶段,有新的数据进来的时候,对该数据进行聚合

          bufferSchema: 聚合操作的时候,所处理数据的类型

         dataType : //最终函数返回值得数据类型 

         deterministic: 多次运行相同的输入总是有相同的输出

          evaluate最后返回一个最终的聚合值要和dataType的类型一致   

        UDAF:用户自定义聚合函数 的主要作用就是可以实现自己的聚合操作的具体内容的控制,具体实现需要按照业务的不同需求,去重新定义继承类的八个方法

        开窗函数 : 

       

      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().appName("over").enableHiveSupport().getOrCreate()
        spark.sql("use spark")
        spark.sql("create table if not exists sales (riqi string,leibie string,jine Int) " + "row format delimited fields terminated by '	'")
        spark.sql("load data local inpath '/root/test/sales' into table sales")
    
        /**
          * rank 在每个组内从1开始
          *   5 A 200   --- 1
          *   3 A 100   ---2
          *   4 A 80   ---3
          *   7 A 60   ---4
          *
          *   1 B 100   ---1
          *   8 B 90  ---2
          *   6 B 80  ---3
          *   1 B 70  ---4
          */
        val result = spark.sql(
          "select"
            +" riqi,leibie,jine "
            + "from ("
            + "select "
            +"riqi,leibie,jine,row_number() over (partition by leibie order by jine desc) rank "
            + "from sales) t "
            + "where t.rank<=3")
        result.write.mode(SaveMode.Append).saveAsTable("salesResult")
        result.show(100)

     

    用java语言实现读取mysql中的数据:

    SparkConf conf =  new SparkConf();
            conf.setMaster("local").setAppName("mysql");
            conf.set("spark.sql.shuffle.partitions","1");
            JavaSparkContext sc = new JavaSparkContext(conf);
            SQLContext sqlContext = new SQLContext(sc);
            Map<String,String> map = new HashMap<String,String>();
            map.put("driver","com.mysql.jdbc.Driver");
            map.put("url","jdbc:mysql://192.168.126.111:3306/spark");
            map.put("user","root");
            map.put("password","123");
            map.put("dbtable","person");
    
            Dataset<Row> df = sqlContext.read().options(map).format("jdbc").load();
            //df.show();
            df.registerTempTable("person1");
    
            /**
             * 第二種连接JDBC的方式
             *
             */
    
            DataFrameReader read = sqlContext.read();
            read.option("driver","com.mysql.jdbc.Driver");
            read.option("url","jdbc:mysql://192.168.126.111:3306/spark");
            read.option("user","root");
            read.option("password","123");
            read.option("dbtable","score");
            Dataset<Row> jdbc = read.format("jdbc").load();
            jdbc.registerTempTable("score1");
            Dataset<Row> result = sqlContext.sql("select person1.id,person1.name,person1.age,score1.score from person1 join  score1 on  person1.name = score1.name ");
            result.show();
            Properties prop = new Properties();
            prop.put("user","root");
            prop.put("password","123");
    
            result.write().mode(SaveMode.Overwrite).jdbc("jdbc:mysql://192.168.126.111:3306/spark","result1",prop);
            //jdbc.show();
            sc.stop();

    用Java言语实现读取hive 中的数据 :

    SparkConf conf = new SparkConf();
            conf.setAppName("hive");
            JavaSparkContext sc = new JavaSparkContext(conf);
            SQLContext hc = new SQLContext(sc);
            //创建表并加载数据
            hc.sql("use spark");
            hc.sql("create table student_info(name string,age int) row format delimited fields terminated by ','");
            hc.sql("load data local inpath '/root/data/student_infos' into table student_info");
            
            hc.sql("create table student_scores(name string,score int) row format delimited fields terminated by ','");
            hc.sql("load data local inpath '/root/data/student_scores' into table student_score");
            //得到表连接结果 
            Dataset<Row> sql = hc.sql("select t1.name,t1.age,t2.score from student_info t1 join student_score t2 on t1.name = t2.name");
            //将结果写回到hive 
            sql.write().mode(SaveMode.Overwrite).saveAsTable("student_result");

     

      

  • 相关阅读:
    浏览器 显示flash问题
    类型参数的约束
    C# FUNC 应用
    c#抽奖系统
    3D基础数学小结
    google应用之字体引用
    MYSQL启动参数
    chrome中你不知道的快捷方式
    SQL Server 2008在添加用户时弹出15195错误
    Hibernate Maven Missing artifact javax.transaction:jta:jar:1.0.1B
  • 原文地址:https://www.cnblogs.com/wcgstudy/p/10984550.html
Copyright © 2011-2022 走看看