zoukankan      html  css  js  c++  java
  • 转: spark:scala读取mysql的4种方法

    转自:http://blog.csdn.net/liuzongxi/article/details/51764104

    SparkScala读取MySQL的4种方法

    1. 引入mysql的驱动包到/usr/local/spark/spark-1.6.0-bin-hadoop2.6/lib/Hbase(目录根据配置而不同)

    mysql-connector-Java-5.1.7-bin.jar

    并在mysql创建表stock:

    CREATE TABLE `stock` (  
      `id` decimal(20,0) DEFAULT NULL,  
      `stock_code` varchar(20) DEFAULT NULL,  
      `stock_cname` varchar(50) DEFAULT NULL,  
      `stock_ename` varchar(50) DEFAULT NULL,  
      `mark` varchar(255) DEFAULT NULL  
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8; 
    

    2.scala程序

    package com.wonhigh.test  
      
    import java.sql.DriverManager  
    import org.apache.spark.SparkContext  
    import org.apache.spark.rdd.JdbcRDD  
    import org.apache.spark.SparkConf  
    import org.apache.spark.sql.SQLContext  
    import java.util.Properties  
      
    object SparkOnMysql {  
      
      def main(args: Array[String]) {  
          
        val sparkConf = new SparkConf().setMaster("spark://OPENFIRE-DEV:7080").setAppName("spark sql test");  
        val sc = new SparkContext(sparkConf);  
        val sqlContext = new SQLContext(sc);  
          
        //1. 不指定查询条件  
        //这个方式链接MySql的函数原型是:  
        //我们只需要提供Driver的url,需要查询的表名,以及连接表相关属性properties。下面是具体例子:  
        val url = "jdbc:mysql://192.168.0.101:3306/sas_vip?user=root&password=123456";  
        val prop = new Properties();  
        val df = sqlContext.read.jdbc(url, "stock", prop);  
        println("第一种方法输出:"+df.count());  
        println("1.------------->" + df.count());  
        println("1.------------->" + df.rdd.partitions.size);  
          
        //2.指定数据库字段的范围  
        //这种方式就是通过指定数据库中某个字段的范围,但是遗憾的是,这个字段必须是数字,来看看这个函数的函数原型:  
        /* def jdbc( 
        url: String, 
        table: String, 
        columnName: String, 
        lowerBound: Long, 
        upperBound: Long, 
        numPartitions: Int, 
        connectionProperties: Properties): DataFrame*/  
        //前两个字段的含义和方法一类似。columnName就是需要分区的字段,这个字段在数据库中的类型必须是数字;  
        //lowerBound就是分区的下界;upperBound就是分区的上界;numPartitions是分区的个数。同样,我们也来看看如何使用:  
        val lowerBound = 1;  
        val upperBound = 6;  
        val numPartitions = 2;  
        val url1 = "jdbc:mysql://192.168.0.101:3306/sas_vip?user=root&password=123456";  
        val prop1 = new Properties();  
        val df1 = sqlContext.read.jdbc(url1, "stock", "id", lowerBound, upperBound, numPartitions, prop1);  
        println("第二种方法输出:" + df1.rdd.partitions.size);  
        df1.collect().foreach(println)  
          
         /*这个方法可以将iteblog表的数据分布到RDD的几个分区中,分区的数量由numPartitions参数决定,在理想情况下,每个分区处理相同数量的数据,我们在使用的时候不建议将这个值设置的比较大,因为这可能导致数据库挂掉!但是根据前面介绍,这个函数的缺点就是只能使用整形数据字段作为分区关键字。 
    这个函数在极端情况下,也就是设置将numPartitions设置为1,其含义和第一种方式一致。*/  
          
        //3.根据任意字段进行分区  
        //基于前面两种方法的限制, Spark 还提供了根据任意字段进行分区的方法,函数原型如下:  
        /*def jdbc( 
        url: String, 
        table: String, 
        predicates: Array[String], 
        connectionProperties: Properties): DataFrame*/  
        //这个函数相比第一种方式多了predicates参数,我们可以通过这个参数设置分区的依据,来看看例子:  
        //这个函数相比第一种方式多了predicates参数,我们可以通过这个参数设置分区的依据,来看看例子:  
        val predicates = Array[String]("id <= 2", "id >= 4 and id <= 5 ")  
        val url2 = "jdbc:mysql://192.168.0.101:3306/sas_vip?user=root&password=123456"  
        val prop2 = new Properties()  
        val df2 = sqlContext.read.jdbc(url, "stock", predicates, prop2)  
        println("第三种方法输出:"+df2.rdd.partitions.size+","+predicates.length);  
        df2.collect().foreach(println)  
        //最后rdd的分区数量就等于predicates.length。  
         
          
        //4.通过load获取  
        //Spark还提供通过load的方式来读取数据。  
        val url3 = "jdbc:mysql://192.168.0.101:3306/sas_vip?user=root&password=123456"  
        val df3 = sqlContext.read.format("jdbc").option("url", url).option("dbtable", "stock").load()  
        println("第四种方法输出:"+df3.rdd.partitions.size);  
        df.collect().foreach(println)  
      
        sc.stop()  
      }  
    }  
    

    3. 提交作业

        

    spark-submit --class com.wonhigh.liuzx.SparkOnMysql --master spark://dev-app-209-211:7080 /usr/local/wonhigh/miu-tag-spark-0.0.1-SNAPSHOT.jar  
    
  • 相关阅读:
    SCRUM第一天
    第八周总结
    第7周总结
    团队项目nabcd
    人月神话1
    课堂练习之四则运算
    第六周总结
    Storm Grouping —— 流分组策略
    抓取网页内容生成Kindle电子书(转)
    浅析PageRank算法(转)
  • 原文地址:https://www.cnblogs.com/sunyaxue/p/6526597.html
Copyright © 2011-2022 走看看